From c604a959623132bbd6fe22ae353ea7ca0e3dbec0 Mon Sep 17 00:00:00 2001 From: Yadu Nand B Date: Mon, 10 Feb 2025 20:45:29 +0000 Subject: [PATCH 1/6] Hack changes to allow insertion skip --- deduplication/args.py | 5 +++++ deduplication/lshbloom.py | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/deduplication/args.py b/deduplication/args.py index ce7e792..91681d0 100644 --- a/deduplication/args.py +++ b/deduplication/args.py @@ -97,5 +97,10 @@ def parse_args(): help="If set, will skip the minhashing step of each workflow (useful if minhashes have been precomputed at minhash_dir)", action="store_true" ) + parser.add_argument( + "--skip-insertion", + help="If set, will skip inserting entries to index. THis is a MOCK ARG", + action="store_true" + ) return parser.parse_args() diff --git a/deduplication/lshbloom.py b/deduplication/lshbloom.py index 7a0b5d6..59a8c19 100644 --- a/deduplication/lshbloom.py +++ b/deduplication/lshbloom.py @@ -67,7 +67,8 @@ def deduplicate_and_insert(self, params: Tuple) -> List[Tuple[str]]: # insert if not duplicated in index if not result: - self.lsh.insert(m_query) + # WARNING YADU: Hack! We are skipping insertion + # self.lsh.insert(m_query) return None return [(key,)] From 34157562c6899dfd688f6fa0edd1da2ffa71ed9e Mon Sep 17 00:00:00 2001 From: Yadu Nand B Date: Wed, 3 Sep 2025 19:52:10 +0000 Subject: [PATCH 2/6] Minor type fixes --- deduplication/__main__.py | 3 +++ deduplication/args.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/deduplication/__main__.py b/deduplication/__main__.py index ee61c36..7c2bc2b 100644 --- a/deduplication/__main__.py +++ b/deduplication/__main__.py @@ -1,8 +1,11 @@ from deduplication.workflows import * from deduplication.args import parse_args + args = parse_args() +args.sim_threshold = float(args.sim_threshold) + if args.mode == "bloom": if args.single: assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" diff --git a/deduplication/args.py b/deduplication/args.py index 91681d0..5f4bc86 100644 --- a/deduplication/args.py +++ b/deduplication/args.py @@ -50,11 +50,13 @@ def parse_args(): parser.add_argument( "--sim-threshold", help="Jaccard Similarity threshold for deduplication, should be in [0, 1]. Default is 0.8", + type=float, default=0.8, ) parser.add_argument( "--num-perm", help="Number of hash functions for MinHashing. Default is 128", + type=int, default=128, ) parser.add_argument( From de320ea40f0aabebf1658812cccd7ea26e9d8080 Mon Sep 17 00:00:00 2001 From: Robert Underwood Date: Thu, 11 Sep 2025 13:26:54 -0400 Subject: [PATCH 3/6] Improve deduplication to support many file formats robustly --- deduplication/minhash.py | 103 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 1 deletion(-) diff --git a/deduplication/minhash.py b/deduplication/minhash.py index f905a0c..e62d427 100644 --- a/deduplication/minhash.py +++ b/deduplication/minhash.py @@ -1,15 +1,116 @@ from tqdm.autonotebook import tqdm from multiprocessing import Pool from datasketch import MinHash -from typing import Optional +from typing import Optional, cast from glob import glob +import logging + import pickle import json from functools import partial import os +import collections.abc +from pathlib import Path +import zstandard +import gzip +import io +import pyarrow.parquet +import itertools # TODO check if minhashes already exist, recompute only if forced +def compute_minhash_text(t: tuple[int, str], fname: str, num_perm: int) -> tuple[str, MinHash] | None: + lineNo, line = t + s = set(line.split()) + if not s: + return None + m = MinHash(num_perm=num_perm) + for d in s: + m.update(d) + # generate a unique key for this document + key = f"{fname}-{lineNo}" + return (key, m) + +SUPPORTED_FILETYPES = set([ + *[ + ((format, encoding) if encoding != "" else (format,)) + for format, encoding + in itertools.product( + [".jsonl", ".json"], + [".gz", ".zstd", ".zst", ""] + )], + (".parquet",) + ]) + +def is_supported_dataset(p :Path) -> bool: + return tuple(p.suffixes[-2:]) in SUPPORTED_FILETYPES + + +def open_anydataset(p: Path) -> collections.abc.Generator[tuple[int, str]]: + match p.suffixes[-2:]: + case [(".json" |".jsonl")]: + with open(p, "r") as f: + for lineNo, line in enumerate(f): + try: + yield lineNo, cast(str, json.loads(line)["text"]) + except GeneratorExit as e: + raise e + except: + logging.exception("failed to parse lineNo %s of \"%s\"", lineNo, p) + case [(".jsonl" | ".json"), (".zst"| ".zstd")]: + with open(p, "rb") as f: + dctx = zstandard.ZstdDecompressor() + stream_reader = dctx.stream_reader(f) + text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8') + for lineNo, line in enumerate(text_stream): + try: + yield lineNo, cast(str, json.loads(line)["text"]) + except GeneratorExit as e: + raise e + except: + logging.exception("failed to parse lineNo %s of \"%s\"", lineNo, p) + case [(".jsonl" | ".json"), ".gz"]: + with gzip.open(p, "r") as f: + for lineNo, line in enumerate(f): + try: + yield lineNo, cast(str, json.loads(line)["text"]) + except GeneratorExit as e: + raise e + except: + logging.exception("failed to parse lineNo %s of \"%s\"", lineNo, p) + case ['.parquet']: + pq = pyarrow.parquet.ParquetFile(p) + idx = 0 + for row_group in range(pq.num_row_groups): + table = pq.read_row_group(row_group, columns=["text"]) + for v in table["text"]: + try: + yield idx, v.as_py() + idx += 1 + except GeneratorExit as e: + raise e + except: + logging.exception("failed to parse lineNo %s of \"%s\"", idx, p) + + case _: + raise NotImplementedError(f"{p.suffixes[-2:]} is not a supported filetype") + +def compute_minhash_for_anyfile(infile: str, output_dir: str, num_perm: int): + n = 50000 + path = Path(infile) + fin = open_anydataset(path) + with Pool(32) as p, tqdm(total=n, desc=path.stem) as pbar: + minhash_list = [] + partial_compute_minhash = partial(compute_minhash_text, fname=path.stem, num_perm=num_perm) + for result in p.imap_unordered(partial_compute_minhash, fin): + if result: + minhash_list.append(result) + pbar.update() + with open(f"{output_dir}/{path.stem[:-6]}.pkl", "wb") as fp: + pickle.dump(minhash_list, fp) + print(f"Generated MinHash for {len(minhash_list):,} documents in {path.stem}") + + def compute_minhash_jsonl(t, fname, num_perm): lineNo, line = t lineNo += 1 From 6dbcfbcc58172f17d0757ec4e06b635ebc480285 Mon Sep 17 00:00:00 2001 From: Robert Underwood Date: Thu, 11 Sep 2025 13:37:15 -0400 Subject: [PATCH 4/6] update dependencies --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 6018ec4..6776b7e 100644 --- a/setup.py +++ b/setup.py @@ -34,5 +34,7 @@ 'datasketch @ git+https://github.com/123epsilon/datasketch.git@060a32b4b4a2272d77480dd633a1bf770678ba49', 'pybloomfiltermmap3==0.5.7', 'tqdm>=4.60.0', + 'zstandard>=0.23.0', + 'pyarrow>=18.0.0', ] -) \ No newline at end of file +) From 131f83fe9af36dec98fdbb09114d023a0a25b528 Mon Sep 17 00:00:00 2001 From: Yadu Nand B Date: Wed, 3 Sep 2025 19:52:10 +0000 Subject: [PATCH 5/6] Minor type fixes --- deduplication/__main__.py | 3 +++ deduplication/args.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/deduplication/__main__.py b/deduplication/__main__.py index ee61c36..7c2bc2b 100644 --- a/deduplication/__main__.py +++ b/deduplication/__main__.py @@ -1,8 +1,11 @@ from deduplication.workflows import * from deduplication.args import parse_args + args = parse_args() +args.sim_threshold = float(args.sim_threshold) + if args.mode == "bloom": if args.single: assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" diff --git a/deduplication/args.py b/deduplication/args.py index ce7e792..98f4b7a 100644 --- a/deduplication/args.py +++ b/deduplication/args.py @@ -50,11 +50,13 @@ def parse_args(): parser.add_argument( "--sim-threshold", help="Jaccard Similarity threshold for deduplication, should be in [0, 1]. Default is 0.8", + type=float, default=0.8, ) parser.add_argument( "--num-perm", help="Number of hash functions for MinHashing. Default is 128", + type=int, default=128, ) parser.add_argument( From 0f263c18754ab4d8c22ec9fa9471d1311ac00982 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Mon, 22 Sep 2025 19:16:20 -0500 Subject: [PATCH 6/6] Add `skip_insertion` kwarg to all LSHBloom based workflows and components * When `skip_insertion` is enabled, unique entries found are not inserted into the index. This enables deduplicating against an index without modifying it. --- deduplication/__main__.py | 6 +++--- deduplication/args.py | 5 +++++ deduplication/lshbloom.py | 13 +++++++------ deduplication/workflows.py | 10 +++++++--- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/deduplication/__main__.py b/deduplication/__main__.py index 7c2bc2b..91bf67e 100644 --- a/deduplication/__main__.py +++ b/deduplication/__main__.py @@ -9,12 +9,12 @@ if args.mode == "bloom": if args.single: assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" - dedup_single_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing) + dedup_single_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing, skip_insertion=args.skip_insertion) elif args.multi: - dedup_multi_bloom(args.input, args.minhash_dir, args.num, args.fp, args.output_file, args.name, args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing) + dedup_multi_bloom(args.input, args.minhash_dir, args.num, args.fp, args.output_file, args.name, args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing, skip_insertion=args.skip_insertion) else: assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" - dedup_single_file_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing) + dedup_single_file_bloom(args.input[0], args.minhash_dir[0], args.num, args.fp, args.output_file, args.name[0], args.sim_threshold, args.num_perm, args.save_dir, not args.skip_minhashing, skip_insertion=args.skip_insertion) else: if args.single: assert len(args.input) == 1 and len(args.minhash_dir) == 1 and len(args.name) == 1, "Expected single input argument but got a list" diff --git a/deduplication/args.py b/deduplication/args.py index 98f4b7a..9de9429 100644 --- a/deduplication/args.py +++ b/deduplication/args.py @@ -99,5 +99,10 @@ def parse_args(): help="If set, will skip the minhashing step of each workflow (useful if minhashes have been precomputed at minhash_dir)", action="store_true" ) + parser.add_argument( + "--skip-insertion", + help="If set, will skip inserting unique documents into the index (works only with LSHBloom)", + action="store_true" + ) return parser.parse_args() diff --git a/deduplication/lshbloom.py b/deduplication/lshbloom.py index 7a0b5d6..78e8739 100644 --- a/deduplication/lshbloom.py +++ b/deduplication/lshbloom.py @@ -31,7 +31,7 @@ def __init__(self, minhash_dir: str, lsh_params: Dict): self.minhash_dir = minhash_dir self.lsh = MinHashLSHBloom(**lsh_params) - def deduplicate_corpus(self) -> List[Tuple[str]]: + def deduplicate_corpus(self, skip_insertion: bool = False) -> List[Tuple[str]]: """ Deduplicates documents in the given corpus and adds them to the LSH index if appropriate. Documents without existing duplicates will be stored in the LSH index for future deduplication. @@ -45,12 +45,12 @@ def deduplicate_corpus(self) -> List[Tuple[str]]: if f.endswith(".pkl") ] for minhashfile in minhash_files: - dups = self.deduplicate_minhash_file(minhashfile) + dups = self.deduplicate_minhash_file(minhashfile, skip_insertion=skip_insertion) duplicate_list.extend(dups) return duplicate_list - def deduplicate_and_insert(self, params: Tuple) -> List[Tuple[str]]: + def deduplicate_and_insert(self, params: Tuple, skip_insertion: bool = False) -> List[Tuple[str]]: """ Deduplicates a MinHash signature corresponding to a document using the provided LSH index. If the document is not duplicated in the LSH index, it is added to the index. @@ -67,12 +67,13 @@ def deduplicate_and_insert(self, params: Tuple) -> List[Tuple[str]]: # insert if not duplicated in index if not result: - self.lsh.insert(m_query) + if not skip_insertion: + self.lsh.insert(m_query) return None return [(key,)] - def deduplicate_minhash_file(self, minhashfile: str) -> List[Tuple[str]]: + def deduplicate_minhash_file(self, minhashfile: str, skip_insertion: bool = False) -> List[Tuple[str]]: """ Deduplicate documents in the given minhash file and adds them to the LSH index if appropriate. Documents without existing duplicates will be stored in the LSH index for future deduplication. @@ -91,7 +92,7 @@ def deduplicate_minhash_file(self, minhashfile: str) -> List[Tuple[str]]: # can't multiprocess here as insertion requires C++ dependencies that are not compatible with pickle with tqdm(total=len(minhash_list), desc=fname) as pbar: for i in range(len(minhash_list)): - result = self.deduplicate_and_insert(minhash_list[i]) + result = self.deduplicate_and_insert(minhash_list[i], skip_insertion=skip_insertion) if result: duplicate_list.extend(result) pbar.update() diff --git a/deduplication/workflows.py b/deduplication/workflows.py index f0e4be9..ca512cb 100644 --- a/deduplication/workflows.py +++ b/deduplication/workflows.py @@ -121,6 +121,7 @@ def dedup_single_bloom( save_dir: str = "./", compute_minhashes: bool = True, clear: bool = False, + skip_insertion: bool = False, ): if clear: clear_dir(save_dir) @@ -138,7 +139,7 @@ def dedup_single_bloom( m.process() index = LSHBloom(minhash_dir, lsh_params) - duplicates = index.deduplicate_corpus() + duplicates = index.deduplicate_corpus(skip_insertion=skip_insertion) write_duplicates_to_csv(duplicates, csvfile, corpus_name, header=["dup_key"]) @@ -155,6 +156,7 @@ def dedup_multi_bloom( save_dir: str = "./", compute_minhashes: bool = True, clear: bool = False, + skip_insertion: bool = False, ): assert len(input_dirs) == len(minhash_dirs) == len(corpus_names), \ f"Expected len(input_dirs) == len(minhash_dirs) == len(corpus_names), got {len(input_dirs)}, {len(minhash_dirs)}, {len(corpus_names)}" @@ -174,7 +176,8 @@ def dedup_multi_bloom( n_hash_funcs, save_dir, compute_minhashes, - clear=False + clear=False, + skip_insertion=skip_insertion ) def dedup_single_file_bloom( @@ -189,6 +192,7 @@ def dedup_single_file_bloom( save_dir: str = "./", compute_minhashes: bool = True, clear: bool = False, + skip_insertion: bool = False, ): if clear: clear_dir(save_dir) @@ -208,5 +212,5 @@ def dedup_single_file_bloom( fname = input_file.split("/")[-1] minhash_file = f"{minhash_dir}/{fname[:-6]}.pkl" index = LSHBloom(minhash_dir, lsh_params) - duplicates = index.deduplicate_minhash_file(minhash_file) + duplicates = index.deduplicate_minhash_file(minhash_file, skip_insertion=skip_insertion) write_duplicates_to_csv(duplicates, csvfile, corpus_name, header=["dup_key"])