Skip to content

Commit

Permalink
batch size updates
Browse files Browse the repository at this point in the history
  • Loading branch information
xando committed Jul 6, 2024
1 parent a1c41ed commit 05247b1
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 16 deletions.
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ Writes a PyArrow Table to a BigQuery Table. No return value.
- `worker_count`: `int`, *default* `os.cpu_count()`
The number of threads or processes to use for fetching data from BigQuery.

- `batch_size`: `int`, *default* `100`
The batch size for fetched rows.
- `batch_size`: `int`, *default* `10`
The batch size used to upload.

```python
bq.write_table(table, 'gcp_project.dataset.table')
Expand Down Expand Up @@ -185,9 +185,6 @@ Context manager version of the write method. Useful when the PyArrow table is la
- `worker_count`: `int`, *default* `os.cpu_count()`
The number of threads or processes to use for writing data to BigQuery.

- `batch_size`: `int`, *default* `100`
The batch size used for writes. The table will be automatically split to this value.

Depending on your use case, you might want to use one of the methods below to write your data to a BigQuery table, using either `pa.Table` or `pa.RecordBatch`.

#### `pyarrow.bigquery.writer.write_table` (Context Manager Method)
Expand Down
15 changes: 5 additions & 10 deletions src/pyarrow/bigquery/write/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ def __init__(
table_overwrite: bool = False,
worker_count: int = multiprocessing.cpu_count(),
worker_type: type[threading.Thread] | type[multiprocessing.Process] = threading.Thread,
batch_size: int = 100,
):
self.project = project
self.where = where
Expand All @@ -144,8 +143,6 @@ def __init__(
self.worker_count = worker_count
self.worker_type = worker_type

self.batch_size = batch_size

project_id, dataset_id, table_id = where.split(".")

self.parent = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
Expand Down Expand Up @@ -187,10 +184,9 @@ def __enter__(self):
return self

def write_table(self, table):
for table_chunk in some_itertools.to_chunks(table, self.batch_size):
element = tempfile.mktemp(dir=self.temp_dir)
fa.write_feather(table_chunk, element)
self.queue_results.put(element)
element = tempfile.mktemp(dir=self.temp_dir)
fa.write_feather(table, element)
self.queue_results.put(element)

def write_batch(self, batch):
element = tempfile.mktemp(dir=self.temp_dir)
Expand Down Expand Up @@ -219,7 +215,7 @@ def write_table(
table_overwrite: bool = False,
worker_count: int = multiprocessing.cpu_count(),
worker_type: type[threading.Thread] | type[multiprocessing.Process] = threading.Thread,
batch_size: int = 100,
batch_size: int = 10,
):
assert table.num_rows > 0, "Table is empty"

Expand All @@ -232,7 +228,6 @@ def write_table(
table_overwrite=table_overwrite,
worker_count=worker_count,
worker_type=worker_type,
batch_size=batch_size,
) as w:
for table_chunk in some_itertools.to_split(table, w.worker_count):
for table_chunk in some_itertools.to_chunks(table, batch_size):
w.write_table(table_chunk)
2 changes: 1 addition & 1 deletion src/pyarrow/bigquery/write/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def _send(stream, serialized_rows, offset):
request.offset = offset
request.proto_rows = proto_data

stream.append_rows_stream.send(request).result()
stream.append_rows_stream.send(request)


def upload_data(stream, pa_table, protobuf_definition, offset):
Expand Down

0 comments on commit 05247b1

Please sign in to comment.