Skip to content

Commit

Permalink
collection records + sort optimizations (#19)
Browse files Browse the repository at this point in the history
- Fix a bug where SpooledTemporaryFile() did not have a max buffer size, resulting in reading everything into memory!

- Optimize sort: also add max size (currently 32MB) for amount of CDXJ lines to hold in memory. If size is exceeded, write to temp files and use heapq.merge to merge temp files into final output.

- Allow setting max sort buffer size with 'max_sort_buff_size' param, set to lower number for testing

- Add encoding=utf-8 to cdxj output
  • Loading branch information
ikreymer authored Jun 25, 2022
1 parent 97d1743 commit 3df3bbc
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cdxj_indexer/bufferiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def concur_req_resp(rec_1, rec_2):

# ============================================================================
def buffer_record_content(record):
spool = tempfile.SpooledTemporaryFile()
spool = tempfile.SpooledTemporaryFile(BUFF_SIZE)
shutil.copyfileobj(record.content_stream(), spool)
spool.seek(0)
record.buffered_stream = spool
Expand Down
53 changes: 47 additions & 6 deletions cdxj_indexer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import sys
import zlib
import hashlib
import heapq

from argparse import ArgumentParser, RawTextHelpFormatter
from io import BytesIO
from copy import copy
from tempfile import NamedTemporaryFile


from warcio.indexer import Indexer
Expand All @@ -19,7 +21,7 @@
from warcio.archiveiterator import ArchiveIterator
from warcio.utils import open_or_default

from cdxj_indexer.bufferiter import buffering_record_iter
from cdxj_indexer.bufferiter import buffering_record_iter, BUFF_SIZE


# ============================================================================
Expand Down Expand Up @@ -61,6 +63,7 @@ def __init__(
sort=False,
compress=None,
lines=DEFAULT_NUM_LINES,
max_sort_buff_size=None,
data_out_name=None,
filename=None,
fields=None,
Expand Down Expand Up @@ -91,6 +94,7 @@ def __init__(
self.dir_root = dir_root

self.num_lines = lines
self.max_sort_buff_size = max_sort_buff_size
self.sort = sort
self.compress = compress
self.data_out_name = data_out_name
Expand Down Expand Up @@ -203,7 +207,7 @@ def process_all(self):
)

if self.sort:
fh = SortingWriter(fh)
fh = SortingWriter(fh, self.max_sort_buff_size)

self.output = fh

Expand Down Expand Up @@ -360,22 +364,59 @@ class CDX09Indexer(CDXLegacyIndexer):

# ============================================================================
class SortingWriter:
def __init__(self, out):
MAX_SORT_BUFF_SIZE = 1024 * 1024 * 32

def __init__(self, out, max_sort_buff_size=None):
self.out = out
self.sortedlist = []
self.count = 0
self.max_sort_buff_size = max_sort_buff_size or self.MAX_SORT_BUFF_SIZE

self.tmp_files = []

def write(self, line):
self.sortedlist.append(line)
self.count += len(line)

if self.count > self.max_sort_buff_size:
self.tmp_files.append(self.write_to_temp())
self.sortedlist = []
self.count = 0

def flush(self):
if not len(self.tmp_files):
self.sortedlist.sort()
self.write_to_file(self.sortedlist, self.out)
return

if len(self.sortedlist) > 0:
self.tmp_files.append(self.write_to_temp())
self.sortedlist = []
self.count = 0

open_files = [open(name, "rt", encoding="utf-8") for name in self.tmp_files]

self.write_to_file(heapq.merge(*open_files), self.out)

for out, name in zip(open_files, self.tmp_files):
out.close()
os.remove(name)

def write_to_temp(self):
self.sortedlist.sort()
with NamedTemporaryFile(mode="wt", delete=False) as out:
self.write_to_file(self.sortedlist, out)

return out.name

def write_to_file(self, iter_, out):
lastline = None
for line in self.sortedlist:
for line in iter_:
if lastline != line:
self.out.write(line)
out.write(line)
lastline = line

self.out.flush()
out.flush()


# ============================================================================
Expand Down
1 change: 1 addition & 0 deletions test/test_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def test_warc_cdxj_compressed_2_with_digest(self):
compress=temp_fh,
data_out_name="comp_2.cdxj.gz",
lines=11,
max_sort_buff_size=1000,
digest_records=True,
)

Expand Down

0 comments on commit 3df3bbc

Please sign in to comment.