diff --git a/cdxj_indexer/bufferiter.py b/cdxj_indexer/bufferiter.py index ee11029..52bff53 100644 --- a/cdxj_indexer/bufferiter.py +++ b/cdxj_indexer/bufferiter.py @@ -91,7 +91,7 @@ def concur_req_resp(rec_1, rec_2): # ============================================================================ def buffer_record_content(record): - spool = tempfile.SpooledTemporaryFile() + spool = tempfile.SpooledTemporaryFile(BUFF_SIZE) shutil.copyfileobj(record.content_stream(), spool) spool.seek(0) record.buffered_stream = spool diff --git a/cdxj_indexer/main.py b/cdxj_indexer/main.py index fcfe196..208a7cb 100644 --- a/cdxj_indexer/main.py +++ b/cdxj_indexer/main.py @@ -7,10 +7,12 @@ import sys import zlib import hashlib +import heapq from argparse import ArgumentParser, RawTextHelpFormatter from io import BytesIO from copy import copy +from tempfile import NamedTemporaryFile from warcio.indexer import Indexer @@ -19,7 +21,7 @@ from warcio.archiveiterator import ArchiveIterator from warcio.utils import open_or_default -from cdxj_indexer.bufferiter import buffering_record_iter +from cdxj_indexer.bufferiter import buffering_record_iter, BUFF_SIZE # ============================================================================ @@ -61,6 +63,7 @@ def __init__( sort=False, compress=None, lines=DEFAULT_NUM_LINES, + max_sort_buff_size=None, data_out_name=None, filename=None, fields=None, @@ -91,6 +94,7 @@ def __init__( self.dir_root = dir_root self.num_lines = lines + self.max_sort_buff_size = max_sort_buff_size self.sort = sort self.compress = compress self.data_out_name = data_out_name @@ -203,7 +207,7 @@ def process_all(self): ) if self.sort: - fh = SortingWriter(fh) + fh = SortingWriter(fh, self.max_sort_buff_size) self.output = fh @@ -360,22 +364,59 @@ class CDX09Indexer(CDXLegacyIndexer): # ============================================================================ class SortingWriter: - def __init__(self, out): + MAX_SORT_BUFF_SIZE = 1024 * 1024 * 32 + + def __init__(self, out, max_sort_buff_size=None): self.out = out self.sortedlist = [] + self.count = 0 + self.max_sort_buff_size = max_sort_buff_size or self.MAX_SORT_BUFF_SIZE + + self.tmp_files = [] def write(self, line): self.sortedlist.append(line) + self.count += len(line) + + if self.count > self.max_sort_buff_size: + self.tmp_files.append(self.write_to_temp()) + self.sortedlist = [] + self.count = 0 def flush(self): + if not len(self.tmp_files): + self.sortedlist.sort() + self.write_to_file(self.sortedlist, self.out) + return + + if len(self.sortedlist) > 0: + self.tmp_files.append(self.write_to_temp()) + self.sortedlist = [] + self.count = 0 + + open_files = [open(name, "rt", encoding="utf-8") for name in self.tmp_files] + + self.write_to_file(heapq.merge(*open_files), self.out) + + for out, name in zip(open_files, self.tmp_files): + out.close() + os.remove(name) + + def write_to_temp(self): self.sortedlist.sort() + with NamedTemporaryFile(mode="wt", delete=False) as out: + self.write_to_file(self.sortedlist, out) + + return out.name + + def write_to_file(self, iter_, out): lastline = None - for line in self.sortedlist: + for line in iter_: if lastline != line: - self.out.write(line) + out.write(line) lastline = line - self.out.flush() + out.flush() # ============================================================================ diff --git a/test/test_indexer.py b/test/test_indexer.py index 0763a61..dee7ea0 100644 --- a/test/test_indexer.py +++ b/test/test_indexer.py @@ -226,6 +226,7 @@ def test_warc_cdxj_compressed_2_with_digest(self): compress=temp_fh, data_out_name="comp_2.cdxj.gz", lines=11, + max_sort_buff_size=1000, digest_records=True, )