From d9fb2bbf2751a6f924afbc32dd7371a28c82d262 Mon Sep 17 00:00:00 2001 From: markus583 Date: Tue, 16 Jul 2024 13:29:59 +0200 Subject: [PATCH 01/14] add script --- v2d_to_transcript.py | 128 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 v2d_to_transcript.py diff --git a/v2d_to_transcript.py b/v2d_to_transcript.py new file mode 100644 index 0000000..f894c22 --- /dev/null +++ b/v2d_to_transcript.py @@ -0,0 +1,128 @@ +import argparse +import json +import os +import shutil +import tarfile +import tempfile +from datetime import timedelta + + +def timestamp_to_frames(timestamp, fps): + """Converts a timestamp in the format 'HH:MM:SS.mss' into a frame count.""" + # TODO: check correctness of this conversion + hrs, mins, secs = map(float, timestamp.split(":")) + total_seconds = timedelta(hours=hrs, minutes=mins, seconds=secs).total_seconds() + # TODO: is round the right way of doing this? Most transcripts are assigned to only 1-2 frames... + return round(total_seconds * fps) + + +def process_tar_files(source_directory, target_directory, skip_existing=True): + """Extract, process, and re-package JSON files in TAR archives.""" + # source_directory = os.path.join(source_directory, "video_rgb") + target_directory = os.path.join(target_directory, "transcripts") + + os.makedirs(target_directory, exist_ok=True) + + for tar_path in os.listdir(source_directory): + if tar_path.endswith(".tar"): + shard_name = os.path.splitext(tar_path)[0] + ".tar" + target_tar_path = os.path.join(target_directory, shard_name) + print(target_tar_path) + + if skip_existing and os.path.exists(target_tar_path): + print(f"Skipping already processed file: {target_tar_path}") + continue + + source_tar_path = os.path.join(source_directory, tar_path) + with tarfile.open(source_tar_path, "r") as tar: + temp_dir = tempfile.mkdtemp() + try: + tar.extractall(path=temp_dir) + + # Process all JSON files extracted + for root, dirs, files in os.walk(temp_dir): + for file in files: + if file.endswith(".json"): + process_json_file(os.path.join(root, file), temp_dir) + + # Repackage into new TAR file + with tarfile.open(target_tar_path, "w") as out_tar: + for root, dirs, files in os.walk(temp_dir): + for file in files: + if file.endswith(".jsonl"): + out_tar.add(os.path.join(root, file), arcname=file) + finally: + shutil.rmtree(temp_dir) + + +def process_json_file(json_file_path, output_dir): + """Reads and processes a single JSON file to convert it to the required format.""" + with open(json_file_path, "r", encoding="utf-8") as file: + data = json.load(file) + video_key = os.path.splitext(os.path.basename(json_file_path))[0] + + if data["status"] != "success": + # TODO: what to do with videos that errored when downloading? + return + elif "subtitles" not in data["yt_meta_dict"]: + # TODO: what to do with videos that have no subtitles? When can this occur? + return + if data["yt_meta_dict"]["subtitles"].keys() != {"en"}: + # TODO: what to do with non-English videos? + # TODO: what to do with videos w/o a transcript? When does this occur? + print(data["yt_meta_dict"]["subtitles"].keys()) + return + subtitles = data["yt_meta_dict"]["subtitles"]["en"] + fps = data["yt_meta_dict"]["info"]["fps"] + + json_content = [] + for subtitle in subtitles: + start_frame = timestamp_to_frames(subtitle["start"], fps) + end_frame = timestamp_to_frames(subtitle["end"], fps) + json_content.append( + { + "transcript": " ".join(subtitle["lines"]), + "start_frame_index": start_frame, + "end_frame_index": end_frame, + } + ) + + jsonl_filename = f"{video_key}.jsonl" + with open(os.path.join(output_dir, jsonl_filename), "w") as outfile: + json.dump(json_content, outfile, indent=4) + + +def main(args): + for folder in os.listdir(args.data_root): + if folder in ["train", "val", "test"]: + print(f"Processing {folder}.") + process_tar_files( + source_directory=os.path.join( + args.data_root, + folder, + ), + target_directory=os.path.join(args.data_root, folder), + skip_existing=args.skip_existing, + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Process TAR files with JSONs and convert to structured JSONL format." + ) + + parser.add_argument( + "--data_root", + type=str, + # FIXME: default dir + default="/store/swissai/a08/data/4m-data/train/DEBUG/v2d_40k", + help="Directory containing the JSON files to process.", + ) + parser.add_argument( + "--skip_existing", + default=False, # FIXME + help="Skip processing TAR files that have already been processed and exist in the target directory.", + ) + + args = parser.parse_args() + main(args) From 02218468dd312dda4203055b9e5ee7cf42973b39 Mon Sep 17 00:00:00 2001 From: markus583 Date: Tue, 16 Jul 2024 13:40:19 +0200 Subject: [PATCH 02/14] minor cleanup --- v2d_to_transcript.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/v2d_to_transcript.py b/v2d_to_transcript.py index f894c22..07b3240 100644 --- a/v2d_to_transcript.py +++ b/v2d_to_transcript.py @@ -18,6 +18,7 @@ def timestamp_to_frames(timestamp, fps): def process_tar_files(source_directory, target_directory, skip_existing=True): """Extract, process, and re-package JSON files in TAR archives.""" + # TODO: this path # source_directory = os.path.join(source_directory, "video_rgb") target_directory = os.path.join(target_directory, "transcripts") @@ -39,13 +40,12 @@ def process_tar_files(source_directory, target_directory, skip_existing=True): try: tar.extractall(path=temp_dir) - # Process all JSON files extracted + # process json files for root, dirs, files in os.walk(temp_dir): for file in files: if file.endswith(".json"): process_json_file(os.path.join(root, file), temp_dir) - # Repackage into new TAR file with tarfile.open(target_tar_path, "w") as out_tar: for root, dirs, files in os.walk(temp_dir): for file in files: @@ -108,7 +108,7 @@ def main(args): if __name__ == "__main__": parser = argparse.ArgumentParser( - description="Process TAR files with JSONs and convert to structured JSONL format." + description="Process tarfiles contati JSONs and convert to structured JSONL format." ) parser.add_argument( @@ -116,12 +116,12 @@ def main(args): type=str, # FIXME: default dir default="/store/swissai/a08/data/4m-data/train/DEBUG/v2d_40k", - help="Directory containing the JSON files to process.", + help="Dir containing the JSON files to process.", ) parser.add_argument( "--skip_existing", default=False, # FIXME - help="Skip processing TAR files that have already been processed and exist in the target directory.", + help="Skip tarfiles already processed (exist in the target directory).", ) args = parser.parse_args() From fd9666b4c1fa35aec5796c2a193a6fc8ca40c5a8 Mon Sep 17 00:00:00 2001 From: markus583 Date: Wed, 17 Jul 2024 11:18:45 +0200 Subject: [PATCH 03/14] update comments (euler + TODO) --- v2d_to_transcript.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/v2d_to_transcript.py b/v2d_to_transcript.py index 07b3240..a414510 100644 --- a/v2d_to_transcript.py +++ b/v2d_to_transcript.py @@ -62,15 +62,14 @@ def process_json_file(json_file_path, output_dir): video_key = os.path.splitext(os.path.basename(json_file_path))[0] if data["status"] != "success": - # TODO: what to do with videos that errored when downloading? + # errored while downloading return elif "subtitles" not in data["yt_meta_dict"]: + print(data) # TODO: what to do with videos that have no subtitles? When can this occur? return if data["yt_meta_dict"]["subtitles"].keys() != {"en"}: - # TODO: what to do with non-English videos? - # TODO: what to do with videos w/o a transcript? When does this occur? - print(data["yt_meta_dict"]["subtitles"].keys()) + # XXX: for now, we decided to only exclude non-English videos return subtitles = data["yt_meta_dict"]["subtitles"]["en"] fps = data["yt_meta_dict"]["info"]["fps"] @@ -115,7 +114,8 @@ def main(args): "--data_root", type=str, # FIXME: default dir - default="/store/swissai/a08/data/4m-data/train/DEBUG/v2d_40k", + # default="/store/swissai/a08/data/4m-data/train/DEBUG/v2d_40k", + default="/cluster/work/cotterell/mfrohmann/ml-4m/data/DEBUG/1000_hd_vila_shuffled", help="Dir containing the JSON files to process.", ) parser.add_argument( From 26dc42230f00af03f346307728da2bcd708d49d3 Mon Sep 17 00:00:00 2001 From: markus583 Date: Thu, 18 Jul 2024 11:08:52 +0200 Subject: [PATCH 04/14] add metadata; "shard-" --- v2d_to_metadata.py | 126 +++++++++++++++++++++++++++++++++++++++++++ v2d_to_transcript.py | 4 +- 2 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 v2d_to_metadata.py diff --git a/v2d_to_metadata.py b/v2d_to_metadata.py new file mode 100644 index 0000000..af1248c --- /dev/null +++ b/v2d_to_metadata.py @@ -0,0 +1,126 @@ +import argparse +import json +import os +import shutil +import tarfile +import tempfile +from datetime import timedelta + +# FIXME: may need adaptation +METADATA_MAPPING = { + "webpage_url": "url", + "title": "title", + "duration": "duration", + "channel": "channel", + "fps": "fps", + "tags": "tags", + "resolution": "resolution", + "aspect_ratio": "aspect_ratio", +} + + +def process_tar_files(source_directory, target_directory, skip_existing=True): + """Extract, process, and re-package JSON files in TAR archives.""" + # TODO: this path + # source_directory = os.path.join(source_directory, "video_rgb") + target_directory = os.path.join(target_directory, "metadata") + + os.makedirs(target_directory, exist_ok=True) + + for tar_path in os.listdir(source_directory): + if tar_path.endswith(".tar"): + shard_name = "shard-" + os.path.splitext(tar_path)[0] + ".tar" + target_tar_path = os.path.join(target_directory, shard_name) + print(target_tar_path) + + if skip_existing and os.path.exists(target_tar_path): + print(f"Skipping already processed file: {target_tar_path}") + continue + + source_tar_path = os.path.join(source_directory, tar_path) + with tarfile.open(source_tar_path, "r") as tar: + temp_dir = tempfile.mkdtemp() + try: + tar.extractall(path=temp_dir) + + # process json files + for root, dirs, files in os.walk(temp_dir): + for file in files: + if file.endswith(".json"): + process_json_file(os.path.join(root, file), temp_dir) + + with tarfile.open(target_tar_path, "w") as out_tar: + for root, dirs, files in os.walk(temp_dir): + for file in files: + if file.endswith(".json"): + out_tar.add(os.path.join(root, file), arcname=file) + finally: + shutil.rmtree(temp_dir) + + +def process_json_file(json_file_path, output_dir): + """Reads and processes a single JSON file to convert it to the required format.""" + with open(json_file_path, "r", encoding="utf-8") as file: + data = json.load(file) + # remove filepath of json + os.remove(json_file_path) + video_key = os.path.splitext(os.path.basename(json_file_path))[0] + + json_content = {} + + if data["status"] != "success": + # errored while downloading + return + elif "subtitles" not in data["yt_meta_dict"]: + print(data) + # XXX: always ensure to only write metadata where we have everything we need + # (transcript, video, ...) + return + if data["yt_meta_dict"]["subtitles"].keys() != {"en"}: + # XXX: for now, we decided to only exclude non-English videos + return + for key, value in METADATA_MAPPING.items(): + if value in data["yt_meta_dict"]["info"]: + json_content[key] = data["yt_meta_dict"]["info"][value] + + json_filename = f"{video_key}.json" + with open(os.path.join(output_dir, json_filename), "w") as outfile: + print(outfile) + json.dump(json_content, outfile, indent=4) + + +def main(args): + for folder in os.listdir(args.data_root): + if folder in ["train", "val", "test"]: + print(f"Processing {folder}.") + process_tar_files( + source_directory=os.path.join( + args.data_root, + folder, + ), + target_directory=os.path.join(args.data_root, folder), + skip_existing=args.skip_existing, + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Process tarfiles contati JSONs and convert to structured JSONL format." + ) + + parser.add_argument( + "--data_root", + type=str, + # FIXME: default dir + # default="/store/swissai/a08/data/4m-data/train/DEBUG/v2d_40k", + default="/cluster/work/cotterell/mfrohmann/data/v2d/howto100m", + help="Dir containing the JSON files to process.", + ) + parser.add_argument( + "--skip_existing", + default=False, # FIXME + help="Skip tarfiles already processed (exist in the target directory).", + ) + + args = parser.parse_args() + main(args) diff --git a/v2d_to_transcript.py b/v2d_to_transcript.py index a414510..77569b3 100644 --- a/v2d_to_transcript.py +++ b/v2d_to_transcript.py @@ -26,7 +26,7 @@ def process_tar_files(source_directory, target_directory, skip_existing=True): for tar_path in os.listdir(source_directory): if tar_path.endswith(".tar"): - shard_name = os.path.splitext(tar_path)[0] + ".tar" + shard_name = "shard-" + os.path.splitext(tar_path)[0] + ".tar" target_tar_path = os.path.join(target_directory, shard_name) print(target_tar_path) @@ -115,7 +115,7 @@ def main(args): type=str, # FIXME: default dir # default="/store/swissai/a08/data/4m-data/train/DEBUG/v2d_40k", - default="/cluster/work/cotterell/mfrohmann/ml-4m/data/DEBUG/1000_hd_vila_shuffled", + default="/cluster/work/cotterell/mfrohmann/data/v2d/howto100m", help="Dir containing the JSON files to process.", ) parser.add_argument( From 266b7398656bdea3c27b76d1ad11472fcef04e23 Mon Sep 17 00:00:00 2001 From: markus583 Date: Thu, 18 Jul 2024 15:26:59 +0200 Subject: [PATCH 05/14] add dataset option --- v2d_to_metadata.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/v2d_to_metadata.py b/v2d_to_metadata.py index af1248c..8769d0a 100644 --- a/v2d_to_metadata.py +++ b/v2d_to_metadata.py @@ -19,7 +19,7 @@ } -def process_tar_files(source_directory, target_directory, skip_existing=True): +def process_tar_files(source_directory, target_directory, dataset, skip_existing=True): """Extract, process, and re-package JSON files in TAR archives.""" # TODO: this path # source_directory = os.path.join(source_directory, "video_rgb") @@ -47,7 +47,7 @@ def process_tar_files(source_directory, target_directory, skip_existing=True): for root, dirs, files in os.walk(temp_dir): for file in files: if file.endswith(".json"): - process_json_file(os.path.join(root, file), temp_dir) + process_json_file(os.path.join(root, file), temp_dir, dataset) with tarfile.open(target_tar_path, "w") as out_tar: for root, dirs, files in os.walk(temp_dir): @@ -58,7 +58,7 @@ def process_tar_files(source_directory, target_directory, skip_existing=True): shutil.rmtree(temp_dir) -def process_json_file(json_file_path, output_dir): +def process_json_file(json_file_path, output_dir, dataset): """Reads and processes a single JSON file to convert it to the required format.""" with open(json_file_path, "r", encoding="utf-8") as file: data = json.load(file) @@ -83,9 +83,9 @@ def process_json_file(json_file_path, output_dir): if value in data["yt_meta_dict"]["info"]: json_content[key] = data["yt_meta_dict"]["info"][value] + json_content["dataset"] = dataset json_filename = f"{video_key}.json" with open(os.path.join(output_dir, json_filename), "w") as outfile: - print(outfile) json.dump(json_content, outfile, indent=4) @@ -99,6 +99,7 @@ def main(args): folder, ), target_directory=os.path.join(args.data_root, folder), + dataset=args.dataset, skip_existing=args.skip_existing, ) @@ -121,6 +122,13 @@ def main(args): default=False, # FIXME help="Skip tarfiles already processed (exist in the target directory).", ) + # TODO: is this also in filestructure or do we have to provide it like this? + parser.add_argument( + "--dataset", + type=str, + required=True, + help="Which dataset tar is coming from (HDVILA/HowTo100M)" + ) args = parser.parse_args() main(args) From 856d24569ae66171711ef17cef391087c5c6eb65 Mon Sep 17 00:00:00 2001 From: markus583 Date: Tue, 23 Jul 2024 10:18:05 +0200 Subject: [PATCH 06/14] adapt to whisper; adapt dirs --- v2d_to_metadata.py | 7 +++--- v2d_to_transcript.py | 51 +++++++++++++++++++++++++++++--------------- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/v2d_to_metadata.py b/v2d_to_metadata.py index 8769d0a..fb083ab 100644 --- a/v2d_to_metadata.py +++ b/v2d_to_metadata.py @@ -23,11 +23,12 @@ def process_tar_files(source_directory, target_directory, dataset, skip_existing """Extract, process, and re-package JSON files in TAR archives.""" # TODO: this path # source_directory = os.path.join(source_directory, "video_rgb") - target_directory = os.path.join(target_directory, "metadata") + target_directory = os.path.join(target_directory, "video_metadata") os.makedirs(target_directory, exist_ok=True) for tar_path in os.listdir(source_directory): + print(source_directory) if tar_path.endswith(".tar"): shard_name = "shard-" + os.path.splitext(tar_path)[0] + ".tar" target_tar_path = os.path.join(target_directory, shard_name) @@ -106,7 +107,7 @@ def main(args): if __name__ == "__main__": parser = argparse.ArgumentParser( - description="Process tarfiles contati JSONs and convert to structured JSONL format." + description="Process tarfiles containing JSONs and convert to structured JSONL format." ) parser.add_argument( @@ -114,7 +115,7 @@ def main(args): type=str, # FIXME: default dir # default="/store/swissai/a08/data/4m-data/train/DEBUG/v2d_40k", - default="/cluster/work/cotterell/mfrohmann/data/v2d/howto100m", + default="/cluster/work/cotterell/mm_swissai/raw/v2d_500/howto100m", help="Dir containing the JSON files to process.", ) parser.add_argument( diff --git a/v2d_to_transcript.py b/v2d_to_transcript.py index 77569b3..aba290e 100644 --- a/v2d_to_transcript.py +++ b/v2d_to_transcript.py @@ -8,11 +8,10 @@ def timestamp_to_frames(timestamp, fps): - """Converts a timestamp in the format 'HH:MM:SS.mss' into a frame count.""" - # TODO: check correctness of this conversion - hrs, mins, secs = map(float, timestamp.split(":")) - total_seconds = timedelta(hours=hrs, minutes=mins, seconds=secs).total_seconds() - # TODO: is round the right way of doing this? Most transcripts are assigned to only 1-2 frames... + """Converts a timestamp in the format 'min.ms' into a frame count.""" + total_seconds = float(timestamp) + print(total_seconds) + # TODO: right-exlusive, left-inclusive. return round(total_seconds * fps) @@ -20,7 +19,7 @@ def process_tar_files(source_directory, target_directory, skip_existing=True): """Extract, process, and re-package JSON files in TAR archives.""" # TODO: this path # source_directory = os.path.join(source_directory, "video_rgb") - target_directory = os.path.join(target_directory, "transcripts") + target_directory = os.path.join(target_directory, "video_transcript") os.makedirs(target_directory, exist_ok=True) @@ -71,18 +70,32 @@ def process_json_file(json_file_path, output_dir): if data["yt_meta_dict"]["subtitles"].keys() != {"en"}: # XXX: for now, we decided to only exclude non-English videos return - subtitles = data["yt_meta_dict"]["subtitles"]["en"] + subtitles = data["whisper_alignment"]["segments"] fps = data["yt_meta_dict"]["info"]["fps"] json_content = [] for subtitle in subtitles: start_frame = timestamp_to_frames(subtitle["start"], fps) end_frame = timestamp_to_frames(subtitle["end"], fps) + sentence = subtitle["text"] + word_timestamps = [] + for word in subtitle["words"]: + word_timestamps.append( + { + "word": word["word"], + "start": timestamp_to_frames(word["start"], fps) + if "start" in word.keys() else None, + "end": timestamp_to_frames(word["end"], fps) + if "end" in word.keys() else None, + } + ) + json_content.append( { - "transcript": " ".join(subtitle["lines"]), - "start_frame_index": start_frame, - "end_frame_index": end_frame, + "sentence": sentence, + "start": start_frame, + "end": end_frame, + "words": word_timestamps, } ) @@ -94,12 +107,10 @@ def process_json_file(json_file_path, output_dir): def main(args): for folder in os.listdir(args.data_root): if folder in ["train", "val", "test"]: - print(f"Processing {folder}.") + current_folder = os.path.join(args.data_root, folder, args.whisper_dir) + print(f"Processing {current_folder}.") process_tar_files( - source_directory=os.path.join( - args.data_root, - folder, - ), + source_directory=current_folder, target_directory=os.path.join(args.data_root, folder), skip_existing=args.skip_existing, ) @@ -107,7 +118,7 @@ def main(args): if __name__ == "__main__": parser = argparse.ArgumentParser( - description="Process tarfiles contati JSONs and convert to structured JSONL format." + description="Process tarfiles containing JSONs and convert to structured JSONL format." ) parser.add_argument( @@ -115,9 +126,15 @@ def main(args): type=str, # FIXME: default dir # default="/store/swissai/a08/data/4m-data/train/DEBUG/v2d_40k", - default="/cluster/work/cotterell/mfrohmann/data/v2d/howto100m", + default="/cluster/work/cotterell/mm_swissai/raw/v2d_500/howto100m", help="Dir containing the JSON files to process.", ) + parser.add_argument( + "--whisper_dir", + type=str, + default="whisperx", + help="Dir containing the WhisperX transcripts.", + ) parser.add_argument( "--skip_existing", default=False, # FIXME From 1511020add33f4adb44611ede9f358d850e3574a Mon Sep 17 00:00:00 2001 From: markus583 Date: Tue, 30 Jul 2024 08:42:14 +0200 Subject: [PATCH 07/14] mv into folder --- save_vq_tokens.py => pseudolabeling/save_vq_tokens.py | 0 v2d_to_metadata.py => pseudolabeling/v2d_to_metadata.py | 0 v2d_to_transcript.py => pseudolabeling/v2d_to_transcript.py | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename save_vq_tokens.py => pseudolabeling/save_vq_tokens.py (100%) rename v2d_to_metadata.py => pseudolabeling/v2d_to_metadata.py (100%) rename v2d_to_transcript.py => pseudolabeling/v2d_to_transcript.py (100%) diff --git a/save_vq_tokens.py b/pseudolabeling/save_vq_tokens.py similarity index 100% rename from save_vq_tokens.py rename to pseudolabeling/save_vq_tokens.py diff --git a/v2d_to_metadata.py b/pseudolabeling/v2d_to_metadata.py similarity index 100% rename from v2d_to_metadata.py rename to pseudolabeling/v2d_to_metadata.py diff --git a/v2d_to_transcript.py b/pseudolabeling/v2d_to_transcript.py similarity index 100% rename from v2d_to_transcript.py rename to pseudolabeling/v2d_to_transcript.py From 83af75d873988104037eab2caed99205262ce0aa Mon Sep 17 00:00:00 2001 From: markus583 Date: Tue, 30 Jul 2024 09:21:52 +0200 Subject: [PATCH 08/14] add splitter --- pseudolabeling/merge_data.py | 92 +++++++ pseudolabeling/save_vq_tokens.py | 369 +++++++++++++++++----------- pseudolabeling/v2d_to_metadata.py | 10 +- pseudolabeling/v2d_to_transcript.py | 6 +- 4 files changed, 333 insertions(+), 144 deletions(-) create mode 100644 pseudolabeling/merge_data.py diff --git a/pseudolabeling/merge_data.py b/pseudolabeling/merge_data.py new file mode 100644 index 0000000..a434c47 --- /dev/null +++ b/pseudolabeling/merge_data.py @@ -0,0 +1,92 @@ +import argparse +import os +import shutil +from sklearn.model_selection import train_test_split + + +def main(args): + """Main function to partition datasets into train, val, and test splits.""" + + # Get all class directories from the source directory + dset_dirs = [ + d + for d in os.listdir(args.source_dir) + if os.path.isdir(os.path.join(args.source_dir, d)) + ] + + for dset_dir in dset_dirs: + dset_path = os.path.join(args.source_dir, dset_dir) + print(dset_path) + all_files = os.listdir(dset_path) + if len(all_files) == 0: + print(f"Skipping dataset {dset_dir} as it has no files.") + continue + + # filter out files not ending with .tar + all_files = sorted([f for f in all_files if f.endswith(".tar")]) + # Split shards into train/temp + train_files, temp_files = train_test_split( + all_files, + train_size=args.train_ratio, + random_state=42, + shuffle=args.shuffle, + ) + + # Split temp into val/test + val_files, test_files = train_test_split( + temp_files, + test_size=args.test_ratio / (1 - args.train_ratio), + random_state=42, + shuffle=args.shuffle, + ) + + # move files to respective splits + for dataset, files in zip( + ["train", "val", "test"], [train_files, val_files, test_files] + ): + split_path = os.path.join(args.output_dir, dset_dir, dataset) + print(f"Move {dset_path} -----------> {split_path}") + os.makedirs(split_path, exist_ok=True) # Create class directory in split + for file in files: + shutil.move( + os.path.join(dset_path, file), os.path.join(split_path, file) + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Partition datasets into train, val, and test splits." + ) + parser.add_argument( + "--source_dir", + type=str, + required=True, + help="Path to the source directory containing dataset folders.", + ) + parser.add_argument( + "--output_dir", + type=str, + required=True, + help="Path to the output directory to store the splits. (--output_dir/dataset/split)", + ) + parser.add_argument( + "--train_ratio", + type=float, + default=0.7, + help="Ratio of data for the training set.", + ) + parser.add_argument( + "--test_ratio", + type=float, + default=0.2, + help="Ratio of data for the test set (remaining will be validation).", + ) + parser.add_argument( + "--shuffle", + type=bool, + default=False, + help="Whether to shuffle shards befores splitting. Otherwise, train is 0, 1, 2, etc.", + ) + args = parser.parse_args() + + main(args) diff --git a/pseudolabeling/save_vq_tokens.py b/pseudolabeling/save_vq_tokens.py index 9374399..d7dcb40 100755 --- a/pseudolabeling/save_vq_tokens.py +++ b/pseudolabeling/save_vq_tokens.py @@ -35,9 +35,22 @@ import fourm.utils.clip as clip -FEATURE_TASKS = ['CLIP-B16'] -IMG_EXTENSIONS = (".jpg", ".jpeg", ".png", ".ppm", ".bmp", ".pgm", ".tif", ".tiff", ".webp", ".jpx", ".gif") - +FEATURE_TASKS = ["CLIP-B16"] +IMG_EXTENSIONS = ( + ".jpg", + ".jpeg", + ".png", + ".ppm", + ".bmp", + ".pgm", + ".tif", + ".tiff", + ".webp", + ".jpx", + ".gif", +) + + def find_image_extension(root_dir): for root, dirs, files in os.walk(root_dir): for file in files: @@ -45,23 +58,26 @@ def find_image_extension(root_dir): return os.path.splitext(file)[1] return None + class SaveVQDataset(Dataset): - def __init__(self, - root: str, - tokens_dir: str, - crop_settings_dir: str, - task: str, - n_crops: int = 10, - min_crop_scale: float = 0.2, - input_size: int = 224, - mask_value: Optional[float] = None, - task_transforms: dict = MODALITY_TRANSFORMS_DIVAE, - resample_mode: str = 'bilinear', - corrupt_samples_log: Optional[str] = None, - dryrun: bool = False, - force_load_crop: bool = False): + def __init__( + self, + root: str, + tokens_dir: str, + crop_settings_dir: str, + task: str, + n_crops: int = 10, + min_crop_scale: float = 0.2, + input_size: int = 224, + mask_value: Optional[float] = None, + task_transforms: dict = MODALITY_TRANSFORMS_DIVAE, + resample_mode: str = "bilinear", + corrupt_samples_log: Optional[str] = None, + dryrun: bool = False, + force_load_crop: bool = False, + ): super().__init__() - + self.data_root = root self.tokens_root = os.path.join(root, tokens_dir) self.crop_settings_root = os.path.join(root, crop_settings_dir) @@ -76,65 +92,77 @@ def __init__(self, self.dryrun = dryrun self.force_load_crop = force_load_crop - + self.loader = lambda path: Image.open(path) - + self.classes, self.class_to_idx = find_classes(os.path.join(root, task)) if corrupt_samples_log is not None: task_ext = find_image_extension(os.path.join(root, task)) self.samples = self.get_corrupt_samples(corrupt_samples_log, task_ext) else: - self.samples = make_dataset(os.path.join(root, task), self.class_to_idx, IMG_EXTENSIONS, None) - + self.samples = make_dataset( + os.path.join(root, task), self.class_to_idx, IMG_EXTENSIONS, None + ) + self.center_crop_augmenter = CenterCropImageAugmenter( target_size=self.input_size, hflip=0.0, main_domain=task ) self.random_crop_augmenter = RandomCropImageAugmenter( - target_size=self.input_size, hflip=0.5, + target_size=self.input_size, + hflip=0.5, crop_scale=(min_crop_scale, 1.0), crop_ratio=(0.75, 1.3333), - main_domain=task + main_domain=task, ) def get_corrupt_samples(self, corrupt_samples_log, task_ext): # Load the log file from find_corrupted_pseudolabels.py - with open(corrupt_samples_log, 'r') as f: + with open(corrupt_samples_log, "r") as f: corrupt_samples = f.readlines() - + # Remove the error message that was thrown and empty characters - corrupt_samples = [sample.split(':')[-1].strip() for sample in corrupt_samples] - + corrupt_samples = [sample.split(":")[-1].strip() for sample in corrupt_samples] + # Extract the folder and file names - corrupt_samples = [sample.split('/')[-2:] for sample in corrupt_samples] - + corrupt_samples = [sample.split("/")[-2:] for sample in corrupt_samples] + # Construct path corrupt_samples = [ - (os.path.join(self.data_root, self.task, s[0], s[1].replace('.npy', task_ext)), self.class_to_idx[s[0]]) + ( + os.path.join( + self.data_root, self.task, s[0], s[1].replace(".npy", task_ext) + ), + self.class_to_idx[s[0]], + ) for s in corrupt_samples ] - + return corrupt_samples - + def __len__(self): return len(self.samples) - def __getitem__(self, index): + def __getitem__(self, index): path, _ = self.samples[index] img = self.loader(path) - img = img.convert("RGB") if self.task in ['rgb', 'normal'] else img - - class_id, file_id = path.split('/')[-2:] - file_id = file_id.split('.')[0] + img = img.convert("RGB") if self.task in ["rgb", "normal"] else img + + class_id, file_id = path.split("/")[-2:] + file_id = file_id.split(".")[0] if self.mask_value is not None: - mask_path = os.path.join(self.data_root, 'mask_valid', class_id, f'{file_id}.png') + mask_path = os.path.join( + self.data_root, "mask_valid", class_id, f"{file_id}.png" + ) mask = Image.open(mask_path) - tokens_path = os.path.join(self.tokens_root, class_id, f'{file_id}.npy') + tokens_path = os.path.join(self.tokens_root, class_id, f"{file_id}.npy") if not self.dryrun: os.makedirs(os.path.dirname(tokens_path), exist_ok=True) - crop_settings_path = os.path.join(self.crop_settings_root, class_id, f'{file_id}.npy') + crop_settings_path = os.path.join( + self.crop_settings_root, class_id, f"{file_id}.npy" + ) # Create or load crop settings if os.path.exists(crop_settings_path) or self.force_load_crop: @@ -151,7 +179,9 @@ def __getitem__(self, index): # Subsequent crops are random for _ in range(1, self.n_crops): - crop_coords, h_flip, _, _, _ = self.random_crop_augmenter({self.task: img}, None) + crop_coords, h_flip, _, _, _ = self.random_crop_augmenter( + {self.task: img}, None + ) settings.append((*crop_coords, 1 if h_flip else 0)) settings = np.array(settings) @@ -162,38 +192,55 @@ def __getitem__(self, index): # Perform augmentations and optionally mask images imgs = [] for i, j, h, w, h_flip in settings: - img_mod = self.task_transforms[self.task].preprocess(img.copy()) img_mod = self.task_transforms[self.task].image_augment( - img_mod, (i,j,h,w), h_flip, None, - (self.input_size, self.input_size), None, self.resample_mode + img_mod, + (i, j, h, w), + h_flip, + None, + (self.input_size, self.input_size), + None, + self.resample_mode, ) img_mod = self.task_transforms[self.task].postprocess(img_mod) if self.mask_value is not None: - mask_valid = self.task_transforms['mask_valid'].preprocess(mask.copy()) - mask_valid = self.task_transforms['mask_valid'].image_augment( - mask_valid, (i,j,h,w), h_flip, None, - (self.input_size, self.input_size), None, None + mask_valid = self.task_transforms["mask_valid"].preprocess(mask.copy()) + mask_valid = self.task_transforms["mask_valid"].image_augment( + mask_valid, + (i, j, h, w), + h_flip, + None, + (self.input_size, self.input_size), + None, + None, ) - mask_valid = self.task_transforms['mask_valid'].postprocess(mask_valid) - img_mod[~repeat(mask_valid, '1 h w -> c h w', c=img_mod.shape[0])] = self.mask_value - mask_valid = mask_valid.float() * 2 - 1 # Valid regions -> 1, Masked-out regions -> -1 - img_mod = torch.cat([img_mod, mask_valid], dim=0) # Concat image with mask - + mask_valid = self.task_transforms["mask_valid"].postprocess(mask_valid) + img_mod[~repeat(mask_valid, "1 h w -> c h w", c=img_mod.shape[0])] = ( + self.mask_value + ) + mask_valid = ( + mask_valid.float() * 2 - 1 + ) # Valid regions -> 1, Masked-out regions -> -1 + img_mod = torch.cat( + [img_mod, mask_valid], dim=0 + ) # Concat image with mask + imgs.append(img_mod) imgs = torch.stack(imgs) return imgs, tokens_path + def get_feature_extractor(args): - if args.task == 'CLIP-B16': - teacher_model, _ = clip.load("ViT-B/16", device='cpu', jit=False) + if args.task == "CLIP-B16": + teacher_model, _ = clip.load("ViT-B/16", device="cpu", jit=False) teacher_model = teacher_model.visual return teacher_model.eval() else: return None + def main(args): utils.init_distributed_mode(args) device = torch.device(args.device) @@ -203,7 +250,9 @@ def main(args): np.random.seed(seed) random.seed(seed) - model = get_image_tokenizer(args.tokenizer_id, tokenizers_root=args.tokenizers_root, encoder_only=True) + model = get_image_tokenizer( + args.tokenizer_id, tokenizers_root=args.tokenizers_root, encoder_only=True + ) feature_extractor = get_feature_extractor(args) num_tasks = utils.get_world_size() @@ -211,16 +260,31 @@ def main(args): global_rank = utils.get_rank() sampler_rank = global_rank - loader_task = 'rgb' if args.task in FEATURE_TASKS else args.task - dataset = SaveVQDataset(root=os.path.join(args.data_root, args.split), crop_settings_dir='crop_settings', - tokens_dir=f'{args.task}_{args.folder_suffix}', task=loader_task, - min_crop_scale=args.min_crop_scale, n_crops=args.n_crops, - input_size=args.input_size, mask_value=args.mask_value, - resample_mode=args.resample_mode, corrupt_samples_log=args.corrupt_samples_log, force_load_crop=args.force_load_crop) - - sampler = torch.utils.data.DistributedSampler(dataset, num_replicas=num_tasks, rank=sampler_rank, shuffle=False) - data_loader = torch.utils.data.DataLoader(dataset, sampler=sampler, batch_size=args.batch_size_dataloader, - num_workers=args.num_workers, drop_last=False) + loader_task = "rgb" if args.task in FEATURE_TASKS else args.task + dataset = SaveVQDataset( + root=os.path.join(args.data_root, args.split), + crop_settings_dir="crop_settings", + tokens_dir=f"{args.task}_{args.folder_suffix}", + task=loader_task, + min_crop_scale=args.min_crop_scale, + n_crops=args.n_crops, + input_size=args.input_size, + mask_value=args.mask_value, + resample_mode=args.resample_mode, + corrupt_samples_log=args.corrupt_samples_log, + force_load_crop=args.force_load_crop, + ) + + sampler = torch.utils.data.DistributedSampler( + dataset, num_replicas=num_tasks, rank=sampler_rank, shuffle=False + ) + data_loader = torch.utils.data.DataLoader( + dataset, + sampler=sampler, + batch_size=args.batch_size_dataloader, + num_workers=args.num_workers, + drop_last=False, + ) model.to(device) if feature_extractor is not None: @@ -235,7 +299,6 @@ def main(args): pbar = None for imgs_batch, tokens_paths in data_loader: - # Filter out already saved images imgs_batch_filtered, tokens_paths_filtered = [], [] for imgs, tokens_path in zip(imgs_batch, tokens_paths): @@ -248,139 +311,169 @@ def main(args): continue imgs_batch = torch.stack(imgs_batch_filtered) tokens_paths = tokens_paths_filtered - - + # Merge batch and number of augmentation dimensions - if 'semseg' in args.task: - imgs_batch = rearrange(imgs_batch, 'b n h w -> (b n) h w') + if "semseg" in args.task: + imgs_batch = rearrange(imgs_batch, "b n h w -> (b n) h w") else: - imgs_batch = rearrange(imgs_batch, 'b n c h w -> (b n) c h w') - + imgs_batch = rearrange(imgs_batch, "b n c h w -> (b n) c h w") + # For efficiency, process images with batch size that might be different from loader batch size or num augmentations sub_batches = imgs_batch.split(args.batch_size, dim=0) - + all_tokens = [] - + for sub_batch in sub_batches: sub_batch = sub_batch.to(device) - + with torch.no_grad(): - if 'CLIP' in args.task: + if "CLIP" in args.task: B, C, H, W = sub_batch.shape P_H, P_W = feature_extractor.conv1.kernel_size N_H, N_W = H // P_H, W // P_W - sub_batch = feature_extractor(sub_batch, return_final_tokens_no_cls=True) - sub_batch = rearrange(sub_batch, 'b (nh nw) d -> b d nh nw', nh=N_H, nw=N_W) + sub_batch = feature_extractor( + sub_batch, return_final_tokens_no_cls=True + ) + sub_batch = rearrange( + sub_batch, "b (nh nw) d -> b d nh nw", nh=N_H, nw=N_W + ) tokens = model.tokenize(sub_batch) tokens = rearrange(tokens, "b h w -> b (h w)") tokens = tokens.detach().cpu().numpy().astype(np.int16) all_tokens.append(tokens) - + all_tokens = np.concatenate(all_tokens) - all_tokens = rearrange(all_tokens, '(b n) d -> b n d', n=args.n_crops) - + all_tokens = rearrange(all_tokens, "(b n) d -> b n d", n=args.n_crops) + for tokens, tokens_path in zip(all_tokens, tokens_paths): if args.dryrun: - print(f'Dryrun: rank {global_rank} -> {tokens_path}') + print(f"Dryrun: rank {global_rank} -> {tokens_path}") else: np.save(tokens_path, tokens) if pbar is not None: pbar.update(1) - #torch.distributed.barrier() + # torch.distributed.barrier() total_time = time.time() - start_time total_time_str = str(datetime.timedelta(seconds=int(total_time))) - print('Tokenization time {}'.format(total_time_str)) + print("Tokenization time {}".format(total_time_str)) -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser(prog="VQ token saver") parser.add_argument( - "--tokenizer_id", type=str, default='cc12m/rgb_ViTB-UNetP4_16k_224-448', - help="ID of tokenizer to load." + "--tokenizer_id", + type=str, + default="cc12m/rgb_ViTB-UNetP4_16k_224-448", + help="ID of tokenizer to load.", ) parser.add_argument( - "--tokenizers_root", type=str, default='./tokenizer_ckpts', - help="Path where tokenizer checkpoints are saved." + "--tokenizers_root", + type=str, + default="./tokenizer_ckpts", + help="Path where tokenizer checkpoints are saved.", ) parser.add_argument( - "--data_root", type=str, default='/path/to/dataset', - help="Path to dataset root" + "--data_root", type=str, default="/path/to/dataset", help="Path to dataset root" ) + parser.add_argument("--split", type=str, default="train", help="train or val") parser.add_argument( - "--split", type=str, default='train', - help="train or val" - ) - parser.add_argument( - "--n_crops", type=int, default='1', + "--n_crops", + type=int, + default="1", help="Number of crops to save. If 1, only a center crop will be saved. \ - If > 1, first image will be center cropped, the subsequent ones will be randomly cropped." + If > 1, first image will be center cropped, the subsequent ones will be randomly cropped.", ) parser.add_argument( - "--min_crop_scale", type=float, default=0.8, - help="Minimum crop scale (Only for n_crops > 1)" + "--min_crop_scale", + type=float, + default=0.8, + help="Minimum crop scale (Only for n_crops > 1)", ) + parser.add_argument("--input_size", type=int, default=224, help="Image size") + parser.add_argument("--task", type=str, default="rgb", help="Task name") parser.add_argument( - "--input_size", type=int, default=224, - help="Image size" + "--mask_value", + type=float, + default=None, + help="Optionally set masked-out regions to this value after data augs (default: %(default)s)", ) parser.add_argument( - "--task", type=str, default='rgb', - help="Task name" + "--resample_mode", + type=str, + default=None, + help="PIL resample mode for resizing loaded images. One out of ['bilinear', 'bicubic', 'nearest', None]. (default: %(default)s)", ) parser.add_argument( - "--mask_value", type=float, default=None, - help="Optionally set masked-out regions to this value after data augs (default: %(default)s)" + "--corrupt_samples_log", + type=str, + default=None, + help="Path to log file with corrupted samples from find_corrupted_pseudolabels.py. \ + If provided, only corrupted samples will be re-tokenized.", ) parser.add_argument( - "--resample_mode", type=str, default=None, - help="PIL resample mode for resizing loaded images. One out of ['bilinear', 'bicubic', 'nearest', None]. (default: %(default)s)" + "--verbose", + action="store_true", + default=False, + help="Set to enable progress bar", ) parser.add_argument( - "--corrupt_samples_log", type=str, default=None, - help="Path to log file with corrupted samples from find_corrupted_pseudolabels.py. \ - If provided, only corrupted samples will be re-tokenized." + "--dryrun", + action="store_true", + default=False, + help="Set to do a dry run that creates the tokens and prints the paths without saving them to disk.", ) parser.add_argument( - "--verbose", action='store_true', default=False, - help="Set to enable progress bar" + "--device", default="cuda", help="Device to use for tokenization" ) + parser.add_argument("--seed", default=0, type=int, help="Random seed") parser.add_argument( - "--dryrun", action='store_true', default=False, - help="Set to do a dry run that creates the tokens and prints the paths without saving them to disk." + "--folder_suffix", + type=str, + default="dvae_BUa_224", + help="Suffix to add to the folder under which the tokens are saved.", ) - parser.add_argument('--device', default='cuda', help='Device to use for tokenization') - parser.add_argument('--seed', default=0, type=int, help='Random seed') + parser.add_argument("--num_workers", default=16, type=int) parser.add_argument( - "--folder_suffix", type=str, - default='dvae_BUa_224', - help="Suffix to add to the folder under which the tokens are saved." + "--pin_mem", + action="store_true", + help="Pin CPU memory in DataLoader for more efficient (sometimes) transfer to GPU.", ) - parser.add_argument('--num_workers', default=16, type=int) - parser.add_argument('--pin_mem', action='store_true', - help='Pin CPU memory in DataLoader for more efficient (sometimes) transfer to GPU.') - parser.add_argument('--no_pin_mem', action='store_false', dest='pin_mem', - help='') + parser.add_argument("--no_pin_mem", action="store_false", dest="pin_mem", help="") parser.set_defaults(pin_mem=True) - parser.add_argument('--batch_size_dataloader', default=64, type=int, - help='Dataloader batch size (default: %(default)s)') - parser.add_argument('--batch_size', default=64, type=int, - help='Batch size per GPU (default: %(default)s)') + parser.add_argument( + "--batch_size_dataloader", + default=64, + type=int, + help="Dataloader batch size (default: %(default)s)", + ) + parser.add_argument( + "--batch_size", + default=64, + type=int, + help="Batch size per GPU (default: %(default)s)", + ) # Distributed parameters - parser.add_argument('--world_size', default=1, type=int, - help='number of distributed processes') - parser.add_argument('--local_rank', default=-1, type=int) - parser.add_argument('--dist_on_itp', action='store_true') - parser.add_argument('--dist_url', default='env://', help='url used to set up distributed training') - - parser.add_argument('--force_load_crop', action='store_true', - help='Make sure to load crops locally, otherwise break the code.') + parser.add_argument( + "--world_size", default=1, type=int, help="number of distributed processes" + ) + parser.add_argument("--local_rank", default=-1, type=int) + parser.add_argument("--dist_on_itp", action="store_true") + parser.add_argument( + "--dist_url", default="env://", help="url used to set up distributed training" + ) + + parser.add_argument( + "--force_load_crop", + action="store_true", + help="Make sure to load crops locally, otherwise break the code.", + ) args = parser.parse_args() print("Force loading existing crop settings: {}".format(args.force_load_crop)) diff --git a/pseudolabeling/v2d_to_metadata.py b/pseudolabeling/v2d_to_metadata.py index fb083ab..ec1853c 100644 --- a/pseudolabeling/v2d_to_metadata.py +++ b/pseudolabeling/v2d_to_metadata.py @@ -48,7 +48,9 @@ def process_tar_files(source_directory, target_directory, dataset, skip_existing for root, dirs, files in os.walk(temp_dir): for file in files: if file.endswith(".json"): - process_json_file(os.path.join(root, file), temp_dir, dataset) + process_json_file( + os.path.join(root, file), temp_dir, dataset + ) with tarfile.open(target_tar_path, "w") as out_tar: for root, dirs, files in os.walk(temp_dir): @@ -66,7 +68,7 @@ def process_json_file(json_file_path, output_dir, dataset): # remove filepath of json os.remove(json_file_path) video_key = os.path.splitext(os.path.basename(json_file_path))[0] - + json_content = {} if data["status"] != "success": @@ -83,7 +85,7 @@ def process_json_file(json_file_path, output_dir, dataset): for key, value in METADATA_MAPPING.items(): if value in data["yt_meta_dict"]["info"]: json_content[key] = data["yt_meta_dict"]["info"][value] - + json_content["dataset"] = dataset json_filename = f"{video_key}.json" with open(os.path.join(output_dir, json_filename), "w") as outfile: @@ -128,7 +130,7 @@ def main(args): "--dataset", type=str, required=True, - help="Which dataset tar is coming from (HDVILA/HowTo100M)" + help="Which dataset tar is coming from (HDVILA/HowTo100M)", ) args = parser.parse_args() diff --git a/pseudolabeling/v2d_to_transcript.py b/pseudolabeling/v2d_to_transcript.py index aba290e..95e4037 100644 --- a/pseudolabeling/v2d_to_transcript.py +++ b/pseudolabeling/v2d_to_transcript.py @@ -84,9 +84,11 @@ def process_json_file(json_file_path, output_dir): { "word": word["word"], "start": timestamp_to_frames(word["start"], fps) - if "start" in word.keys() else None, + if "start" in word.keys() + else None, "end": timestamp_to_frames(word["end"], fps) - if "end" in word.keys() else None, + if "end" in word.keys() + else None, } ) From e3e5fd44bff72004d3d33adf58f06d63a40f41aa Mon Sep 17 00:00:00 2001 From: markus583 Date: Wed, 31 Jul 2024 14:40:29 +0200 Subject: [PATCH 09/14] adapt paths + langs --- .gitignore | 4 +++ pseudolabeling/v2d_to_metadata.py | 44 +++++++++++++------------------ 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/.gitignore b/.gitignore index e2f7103..61abb8f 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,8 @@ wandb/ tokenizer_ckpts/ *pkl *.egg-info +<<<<<<< HEAD *.log +======= +build/** +>>>>>>> da82c44 (adapt paths + langs) diff --git a/pseudolabeling/v2d_to_metadata.py b/pseudolabeling/v2d_to_metadata.py index ec1853c..ae1383f 100644 --- a/pseudolabeling/v2d_to_metadata.py +++ b/pseudolabeling/v2d_to_metadata.py @@ -21,14 +21,10 @@ def process_tar_files(source_directory, target_directory, dataset, skip_existing=True): """Extract, process, and re-package JSON files in TAR archives.""" - # TODO: this path - # source_directory = os.path.join(source_directory, "video_rgb") - target_directory = os.path.join(target_directory, "video_metadata") os.makedirs(target_directory, exist_ok=True) for tar_path in os.listdir(source_directory): - print(source_directory) if tar_path.endswith(".tar"): shard_name = "shard-" + os.path.splitext(tar_path)[0] + ".tar" target_tar_path = os.path.join(target_directory, shard_name) @@ -73,15 +69,20 @@ def process_json_file(json_file_path, output_dir, dataset): if data["status"] != "success": # errored while downloading + print(data["status"]) return elif "subtitles" not in data["yt_meta_dict"]: - print(data) - # XXX: always ensure to only write metadata where we have everything we need - # (transcript, video, ...) - return - if data["yt_meta_dict"]["subtitles"].keys() != {"en"}: - # XXX: for now, we decided to only exclude non-English videos + print("NO SUBTITLES: ", data) + # indeed, there are some videos without subtitles (np speech) return + if ( + data["yt_meta_dict"]["subtitles"].keys() != {"en"} + and len(data["yt_meta_dict"]["subtitles"].keys()) > 0 + ): + # XXX: for now, we decided to only exclude non-English videos. + raise ValueError( + f"Non-English subtitles found: {data['yt_meta_dict']['subtitles'].keys()}" + ) for key, value in METADATA_MAPPING.items(): if value in data["yt_meta_dict"]["info"]: json_content[key] = data["yt_meta_dict"]["info"][value] @@ -93,18 +94,12 @@ def process_json_file(json_file_path, output_dir, dataset): def main(args): - for folder in os.listdir(args.data_root): - if folder in ["train", "val", "test"]: - print(f"Processing {folder}.") - process_tar_files( - source_directory=os.path.join( - args.data_root, - folder, - ), - target_directory=os.path.join(args.data_root, folder), - dataset=args.dataset, - skip_existing=args.skip_existing, - ) + process_tar_files( + source_directory=args.data_root, + target_directory=os.path.join(args.data_root, "..", "video_metadata"), + dataset=args.dataset, + skip_existing=args.skip_existing, + ) if __name__ == "__main__": @@ -115,9 +110,8 @@ def main(args): parser.add_argument( "--data_root", type=str, - # FIXME: default dir - # default="/store/swissai/a08/data/4m-data/train/DEBUG/v2d_40k", - default="/cluster/work/cotterell/mm_swissai/raw/v2d_500/howto100m", + default="/store/swissai/a08/data/4m/video_rgb", + # default="/cluster/work/cotterell/mm_swissai/raw/v2d_500/howto100m", help="Dir containing the JSON files to process.", ) parser.add_argument( From 691eece5ecf91132032f7f89ac0dd6fa8449f31a Mon Sep 17 00:00:00 2001 From: markus583 Date: Wed, 31 Jul 2024 18:07:41 +0200 Subject: [PATCH 10/14] rename --- pseudolabeling/{merge_data.py => train_val_test_split.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename pseudolabeling/{merge_data.py => train_val_test_split.py} (100%) diff --git a/pseudolabeling/merge_data.py b/pseudolabeling/train_val_test_split.py similarity index 100% rename from pseudolabeling/merge_data.py rename to pseudolabeling/train_val_test_split.py From 74e103878b70d33c60702ec1f9cc9afee169577a Mon Sep 17 00:00:00 2001 From: Kevin Du Date: Fri, 2 Aug 2024 00:55:34 +0200 Subject: [PATCH 11/14] Modify train_val_test_split to invert from modality/train to train/modality structure --- pseudolabeling/train_val_test_split.py | 45 +++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 4 deletions(-) diff --git a/pseudolabeling/train_val_test_split.py b/pseudolabeling/train_val_test_split.py index a434c47..e22953a 100644 --- a/pseudolabeling/train_val_test_split.py +++ b/pseudolabeling/train_val_test_split.py @@ -13,6 +13,7 @@ def main(args): for d in os.listdir(args.source_dir) if os.path.isdir(os.path.join(args.source_dir, d)) ] + print(f"Subdirectories of source dir {args.source_dir}: {dset_dirs}") for dset_dir in dset_dirs: dset_path = os.path.join(args.source_dir, dset_dir) @@ -44,16 +45,46 @@ def main(args): for dataset, files in zip( ["train", "val", "test"], [train_files, val_files, test_files] ): - split_path = os.path.join(args.output_dir, dset_dir, dataset) + split_path = os.path.join(args.output_dir, dataset, dset_dir) print(f"Move {dset_path} -----------> {split_path}") os.makedirs(split_path, exist_ok=True) # Create class directory in split for file in files: - shutil.move( - os.path.join(dset_path, file), os.path.join(split_path, file) - ) + if args.copy: + shutil.copy(os.path.join(dset_path, file), os.path.join(split_path, file)) + else: + shutil.move( + os.path.join(dset_path, file), os.path.join(split_path, file) + ) if __name__ == "__main__": + """ + Given a source directory containing the data for multiple modalities, e.g., + + ``` + |--source_dir/ + | |--modality_a/ + | |--modality_b/ + | |--modality_c/ + ``` + + move the files into a specified output_dir/ with the structure: + ``` + |--source_dir/ + | |--train/ + | | |--modality_a/ + | | |--modality_b/ + | | |--modality_c/ + | |--val/ + | | |--modality_a/ + | | |--modality_b/ + | | |--modality_c/ + | |--test/ + | | |--modality_a/ + | | |--modality_b/ + | | |--modality_c/ + ``` + """ parser = argparse.ArgumentParser( description="Partition datasets into train, val, and test splits." ) @@ -87,6 +118,12 @@ def main(args): default=False, help="Whether to shuffle shards befores splitting. Otherwise, train is 0, 1, 2, etc.", ) + parser.add_argument( + "--copy", + type=bool, + default=False, + help="Whether to copy the files instead of move. Defaults to False.", + ) args = parser.parse_args() main(args) From 6a51d4732abb5e24b0ddccc8a90e6b1896597885 Mon Sep 17 00:00:00 2001 From: Kevin Du Date: Mon, 5 Aug 2024 01:49:15 +0200 Subject: [PATCH 12/14] Change v2d to metadata and transcript filepaths. Transcript is untested because we don't have the container working yet to use whisper (cry) --- pseudolabeling/v2d_to_metadata.py | 30 ++++++++++++++----- pseudolabeling/v2d_to_transcript.py | 45 +++++++++++++++++------------ 2 files changed, 49 insertions(+), 26 deletions(-) diff --git a/pseudolabeling/v2d_to_metadata.py b/pseudolabeling/v2d_to_metadata.py index ae1383f..3777604 100644 --- a/pseudolabeling/v2d_to_metadata.py +++ b/pseudolabeling/v2d_to_metadata.py @@ -4,6 +4,7 @@ import shutil import tarfile import tempfile +from tqdm import tqdm from datetime import timedelta # FIXME: may need adaptation @@ -24,7 +25,7 @@ def process_tar_files(source_directory, target_directory, dataset, skip_existing os.makedirs(target_directory, exist_ok=True) - for tar_path in os.listdir(source_directory): + for tar_path in tqdm(os.listdir(source_directory)): if tar_path.endswith(".tar"): shard_name = "shard-" + os.path.splitext(tar_path)[0] + ".tar" target_tar_path = os.path.join(target_directory, shard_name) @@ -94,9 +95,14 @@ def process_json_file(json_file_path, output_dir, dataset): def main(args): + output_dir = ( + args.output_dir + if args.output_dir is not None + else os.path.join(args.input_dir.replace("filtered_raw", "4m"), "video_metadata") + ) process_tar_files( - source_directory=args.data_root, - target_directory=os.path.join(args.data_root, "..", "video_metadata"), + source_directory=args.input_dir, + target_directory=output_dir, dataset=args.dataset, skip_existing=args.skip_existing, ) @@ -104,23 +110,33 @@ def main(args): if __name__ == "__main__": parser = argparse.ArgumentParser( - description="Process tarfiles containing JSONs and convert to structured JSONL format." + description="Process tarfiles from `filtered_raw` format containing JSONs and extract relevant metadata into the `video_metadata` modality." ) parser.add_argument( - "--data_root", + "-I", + "--input_dir", type=str, - default="/store/swissai/a08/data/4m/video_rgb", + default="/store/swissai/a08/data/filtered_raw/howto100m/v2d_5000/", # default="/cluster/work/cotterell/mm_swissai/raw/v2d_500/howto100m", - help="Dir containing the JSON files to process.", + help="A `filtered_raw` dir containing the JSON files to process.", + ) + parser.add_argument( + "-O", + "--output_dir", + type=str, + default=None, + help="Output dir to save the pseudolabeled metadata.", ) parser.add_argument( + "-S", "--skip_existing", default=False, # FIXME help="Skip tarfiles already processed (exist in the target directory).", ) # TODO: is this also in filestructure or do we have to provide it like this? parser.add_argument( + "-D", "--dataset", type=str, required=True, diff --git a/pseudolabeling/v2d_to_transcript.py b/pseudolabeling/v2d_to_transcript.py index 95e4037..59c68cb 100644 --- a/pseudolabeling/v2d_to_transcript.py +++ b/pseudolabeling/v2d_to_transcript.py @@ -17,10 +17,6 @@ def timestamp_to_frames(timestamp, fps): def process_tar_files(source_directory, target_directory, skip_existing=True): """Extract, process, and re-package JSON files in TAR archives.""" - # TODO: this path - # source_directory = os.path.join(source_directory, "video_rgb") - target_directory = os.path.join(target_directory, "video_transcript") - os.makedirs(target_directory, exist_ok=True) for tar_path in os.listdir(source_directory): @@ -107,37 +103,48 @@ def process_json_file(json_file_path, output_dir): def main(args): - for folder in os.listdir(args.data_root): - if folder in ["train", "val", "test"]: - current_folder = os.path.join(args.data_root, folder, args.whisper_dir) - print(f"Processing {current_folder}.") - process_tar_files( - source_directory=current_folder, - target_directory=os.path.join(args.data_root, folder), - skip_existing=args.skip_existing, - ) + current_folder = os.path.join(args.input_dir, args.whisper_dir) + output_dir = ( + args.output_dir + if args.output_dir is not None + else os.path.join(args.input_dir.replace("filtered_raw", "4m"), "video_transcript") + ) + print(f"Processing {current_folder}.") + process_tar_files( + source_directory=current_folder, + target_directory=output_dir, + skip_existing=args.skip_existing, + ) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Process tarfiles containing JSONs and convert to structured JSONL format." ) - parser.add_argument( - "--data_root", + "-I", + "--input_dir", + type=str, + default="/store/swissai/a08/data/filtered_raw/howto100m/v2d_5000/", + # default="/cluster/work/cotterell/mm_swissai/raw/v2d_500/howto100m", + help="A `filtered_raw` dir containing the JSON files to process.", + ) + parser.add_argument( + "-O", + "--output_dir", type=str, - # FIXME: default dir - # default="/store/swissai/a08/data/4m-data/train/DEBUG/v2d_40k", - default="/cluster/work/cotterell/mm_swissai/raw/v2d_500/howto100m", - help="Dir containing the JSON files to process.", + default=None, + help="Output dir to save the pseudolabeled transcripts.", ) parser.add_argument( + "-W", "--whisper_dir", type=str, default="whisperx", help="Dir containing the WhisperX transcripts.", ) parser.add_argument( + "-S", "--skip_existing", default=False, # FIXME help="Skip tarfiles already processed (exist in the target directory).", From 90fd7c835ea14a06f4d2dd18de60581c5afcbef7 Mon Sep 17 00:00:00 2001 From: Kevin Du Date: Mon, 5 Aug 2024 02:00:16 +0200 Subject: [PATCH 13/14] Remove shard- from v2d_to_metadata, transcript --- pseudolabeling/v2d_to_metadata.py | 9 ++++----- pseudolabeling/v2d_to_transcript.py | 10 +++++----- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/pseudolabeling/v2d_to_metadata.py b/pseudolabeling/v2d_to_metadata.py index 3777604..e821a85 100644 --- a/pseudolabeling/v2d_to_metadata.py +++ b/pseudolabeling/v2d_to_metadata.py @@ -25,17 +25,16 @@ def process_tar_files(source_directory, target_directory, dataset, skip_existing os.makedirs(target_directory, exist_ok=True) - for tar_path in tqdm(os.listdir(source_directory)): - if tar_path.endswith(".tar"): - shard_name = "shard-" + os.path.splitext(tar_path)[0] + ".tar" - target_tar_path = os.path.join(target_directory, shard_name) + for filename in tqdm(os.listdir(source_directory)): + if filename.endswith(".tar"): + target_tar_path = os.path.join(target_directory, filename) print(target_tar_path) if skip_existing and os.path.exists(target_tar_path): print(f"Skipping already processed file: {target_tar_path}") continue - source_tar_path = os.path.join(source_directory, tar_path) + source_tar_path = os.path.join(source_directory, filename) with tarfile.open(source_tar_path, "r") as tar: temp_dir = tempfile.mkdtemp() try: diff --git a/pseudolabeling/v2d_to_transcript.py b/pseudolabeling/v2d_to_transcript.py index 59c68cb..cccde55 100644 --- a/pseudolabeling/v2d_to_transcript.py +++ b/pseudolabeling/v2d_to_transcript.py @@ -4,6 +4,7 @@ import shutil import tarfile import tempfile +from tqdm import tqdm from datetime import timedelta @@ -19,17 +20,16 @@ def process_tar_files(source_directory, target_directory, skip_existing=True): """Extract, process, and re-package JSON files in TAR archives.""" os.makedirs(target_directory, exist_ok=True) - for tar_path in os.listdir(source_directory): - if tar_path.endswith(".tar"): - shard_name = "shard-" + os.path.splitext(tar_path)[0] + ".tar" - target_tar_path = os.path.join(target_directory, shard_name) + for filename in tqdm(os.listdir(source_directory)): + if filename.endswith(".tar"): + target_tar_path = os.path.join(target_directory, filename) print(target_tar_path) if skip_existing and os.path.exists(target_tar_path): print(f"Skipping already processed file: {target_tar_path}") continue - source_tar_path = os.path.join(source_directory, tar_path) + source_tar_path = os.path.join(source_directory, filename) with tarfile.open(source_tar_path, "r") as tar: temp_dir = tempfile.mkdtemp() try: From 1168fa07b094869dcef9f60417c242ddaddbf2cc Mon Sep 17 00:00:00 2001 From: Kevin Du Date: Mon, 5 Aug 2024 02:14:29 +0200 Subject: [PATCH 14/14] Validate that inputdir is a filtered raw subdir --- pseudolabeling/v2d_to_metadata.py | 3 +++ pseudolabeling/v2d_to_transcript.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/pseudolabeling/v2d_to_metadata.py b/pseudolabeling/v2d_to_metadata.py index e821a85..16764fe 100644 --- a/pseudolabeling/v2d_to_metadata.py +++ b/pseudolabeling/v2d_to_metadata.py @@ -94,6 +94,9 @@ def process_json_file(json_file_path, output_dir, dataset): def main(args): + if "filtered_raw" not in args.input_dir: + raise ValueError(f"Expected input dir to be a subdir of `filtered_raw/`, instead received {args.input_dir}.") + output_dir = ( args.output_dir if args.output_dir is not None diff --git a/pseudolabeling/v2d_to_transcript.py b/pseudolabeling/v2d_to_transcript.py index cccde55..f24d53b 100644 --- a/pseudolabeling/v2d_to_transcript.py +++ b/pseudolabeling/v2d_to_transcript.py @@ -103,6 +103,9 @@ def process_json_file(json_file_path, output_dir): def main(args): + if "filtered_raw" not in args.input_dir: + raise ValueError(f"Expected input dir to be a subdir of `filtered_raw/`, instead received {args.input_dir}.") + current_folder = os.path.join(args.input_dir, args.whisper_dir) output_dir = ( args.output_dir