Skip to content

Commit

Permalink
Merge pull request #2709 from kconvey/kconvey-copy-job
Browse files Browse the repository at this point in the history
Add a BigQuery adapter macro to enable usage of CopyJobs
  • Loading branch information
beckjake authored Aug 19, 2020
2 parents f043f94 + 91b0496 commit 4273cc9
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 5 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Add deprecation warnings to anonymous usage tracking ([#2688](https://github.com/fishtown-analytics/dbt/issues/2688), [#2710](https://github.com/fishtown-analytics/dbt/issues/2710))

### Features
- Add a BigQuery adapter macro to enable usage of CopyJobs ([#2709](https://github.com/fishtown-analytics/dbt/pull/2709))
- Support TTL for BigQuery tables([#2711](https://github.com/fishtown-analytics/dbt/pull/2711))
- Add better retry support when using the BigQuery adapter ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694), follow-up to [#1963](https://github.com/fishtown-analytics/dbt/pull/1963))
- Added a `dispatch` method to the context adapter and deprecated `adapter_macro`. ([#2302](https://github.com/fishtown-analytics/dbt/issues/2302), [#2679](https://github.com/fishtown-analytics/dbt/pull/2679))
Expand All @@ -30,7 +31,7 @@

Contributors:
- [@bbhoss](https://github.com/bbhoss) ([#2677](https://github.com/fishtown-analytics/dbt/pull/2677))
- [@kconvey](https://github.com/kconvey) ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694), [#2711], (https://github.com/fishtown-analytics/dbt/pull/2711))
- [@kconvey](https://github.com/kconvey) ([#2694](https://github.com/fishtown-analytics/dbt/pull/2694), [#2709](https://github.com/fishtown-analytics/dbt/pull/2709)), [#2711](https://github.com/fishtown-analytics/dbt/pull/2711))
- [@vogt4nick](https://github.com/vogt4nick) ([#2702](https://github.com/fishtown-analytics/dbt/issues/2702))
- [@stephen8chang](https://github.com/stephen8chang) ([docs#106](https://github.com/fishtown-analytics/dbt-docs/pull/106), [docs#108](https://github.com/fishtown-analytics/dbt-docs/pull/108), [docs#113](https://github.com/fishtown-analytics/dbt-docs/pull/113))
- [@rsenseman](https://github.com/rsenseman) ([#2708](https://github.com/fishtown-analytics/dbt/pull/2708))
Expand Down
30 changes: 29 additions & 1 deletion plugins/bigquery/dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

BQ_QUERY_JOB_SPLIT = '-----Query Job SQL Follows-----'

WRITE_TRUNCATE = google.cloud.bigquery.job.WriteDisposition.WRITE_TRUNCATE

REOPENABLE_ERRORS = (
ConnectionResetError,
ConnectionError,
Expand Down Expand Up @@ -336,7 +338,7 @@ def create_table(self, database, schema, table_name, sql):

table_ref = self.table_ref(database, schema, table_name, conn)
job_params = {'destination': table_ref,
'write_disposition': 'WRITE_TRUNCATE'}
'write_disposition': WRITE_TRUNCATE}

timeout = self.get_timeout(conn)

Expand All @@ -352,6 +354,32 @@ def callback(table):
self.create_bigquery_table(database, schema, table_name, callback,
'CREATE DAY PARTITIONED TABLE')

def copy_bq_table(self, source, destination, write_disposition):
conn = self.get_thread_connection()
client = conn.handle

source_ref = self.table_ref(
source.database, source.schema, source.table, conn)
destination_ref = self.table_ref(
destination.database, destination.schema, destination.table, conn)

logger.debug(
'Copying table "{}" to "{}" with disposition: "{}"',
source_ref.path, destination_ref.path, write_disposition)

def copy_and_results():
job_config = google.cloud.bigquery.CopyJobConfig(
write_disposition=write_disposition)
copy_job = client.copy_table(
source_ref, destination_ref, job_config=job_config)
iterator = copy_job.result(timeout=self.get_timeout(conn))
return copy_job, iterator

self._retry_and_handle(
msg='copy table "{}" to "{}"'.format(
source_ref.path, destination_ref.path),
conn=conn, fn=copy_and_results)

@staticmethod
def dataset(database, schema, conn):
dataset_ref = conn.handle.dataset(schema, database)
Expand Down
21 changes: 21 additions & 0 deletions plugins/bigquery/dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import agate
import json

# Write dispositions for bigquery.
WRITE_APPEND = google.cloud.bigquery.job.WriteDisposition.WRITE_APPEND
WRITE_TRUNCATE = google.cloud.bigquery.job.WriteDisposition.WRITE_TRUNCATE


def sql_escape(string):
if not isinstance(string, str):
Expand Down Expand Up @@ -417,6 +421,23 @@ def _materialize_as_table(

return "CREATE TABLE"

@available.parse(lambda *a, **k: '')
def copy_table(self, source, destination, materialization):
if materialization == 'incremental':
write_disposition = WRITE_APPEND
elif materialization == 'table':
write_disposition = WRITE_TRUNCATE
else:
dbt.exceptions.raise_compiler_error(
'Copy table materialization must be "copy" or "table", but '
f"config.get('copy_materialization', 'table') was "
f'{materialization}')

self.connections.copy_bq_table(
source, destination, write_disposition)

return "COPY TABLE with materialization: {}".format(materialization)

@classmethod
def poll_until_job_completes(cls, job, timeout):
retry_count = timeout
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{% materialization copy, adapter='bigquery' -%}

{# Setup #}
{{ run_hooks(pre_hooks) }}

{# there should be exactly one ref or exactly one source #}
{% set destination = this.incorporate(type='table') %}

{% set dependency_type = none %}
{% if (model.refs | length) == 1 and (model.sources | length) == 0 %}
{% set dependency_type = 'ref' %}
{% elif (model.refs | length) == 0 and (model.sources | length) == 1 %}
{% set dependency_type = 'source' %}
{% else %}
{% set msg %}
Expected exactly one ref or exactly one source, instead got {{ model.refs | length }} models and {{ model.sources | length }} sources.
{% endset %}
{% do exceptions.raise_compiler_error(msg) %}
{% endif %}

{% if dependency_type == 'ref' %}
{% set src = ref(*model.refs[0]) %}
{% else %}
{% set src = source(*model.sources[0]) %}
{% endif %}

{%- set result_str = adapter.copy_table(
src,
destination,
config.get('copy_materialization', 'table')) -%}

{{ store_result('main', status=result_str) }}

{# Clean up #}
{{ run_hooks(post_hooks) }}
{{ adapter.commit() }}

{{ return({'relations': [destination]}) }}
{%- endmaterialization %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{ config(copy_materialization='view') }}
{{ ref('original') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select 1 as id
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{ config(copy_materialization='incremental') }}
{{ ref('original') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{{ config(copy_materialization='table') }}
{{ ref('original') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select 1 as id
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from test.integration.base import DBTIntegrationTest, use_profile
import textwrap
import yaml


class TestBigqueryCopyTableFails(DBTIntegrationTest):

@property
def schema(self):
return "bigquery_test_022"

@property
def models(self):
return "copy-failing-models"

@property
def profile_config(self):
return self.bigquery_profile()

@property
def project_config(self):
return yaml.safe_load(textwrap.dedent('''\
config-version: 2
models:
test:
original:
materialized: table
copy_bad_materialization:
materialized: copy
'''))

@use_profile('bigquery')
def test__bigquery_copy_table_fails(self):
results = self.run_dbt(expect_pass=False)
self.assertEqual(len(results), 2)
self.assertTrue(results[1].error)
37 changes: 37 additions & 0 deletions test/integration/022_bigquery_test/test_bigquery_copy_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from test.integration.base import DBTIntegrationTest, use_profile
import textwrap
import yaml


class TestBigqueryCopyTable(DBTIntegrationTest):

@property
def schema(self):
return "bigquery_test_022"

@property
def models(self):
return "copy-models"

@property
def profile_config(self):
return self.bigquery_profile()

@property
def project_config(self):
return yaml.safe_load(textwrap.dedent('''\
config-version: 2
models:
test:
original:
materialized: table
copy_as_table:
materialized: copy
copy_as_incremental:
materialized: copy
'''))

@use_profile('bigquery')
def test__bigquery_copy_table(self):
results = self.run_dbt()
self.assertEqual(len(results), 3)
64 changes: 61 additions & 3 deletions test/unit/test_bigquery_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import unittest
from contextlib import contextmanager
from requests.exceptions import ConnectionError
from unittest.mock import patch, MagicMock, Mock, create_autospec
from unittest.mock import patch, MagicMock, Mock, create_autospec, ANY

import hologram

Expand All @@ -21,8 +21,9 @@
import dbt.exceptions
from dbt.logger import GLOBAL_LOGGER as logger # noqa

from .utils import config_from_parts_or_dicts, inject_adapter, TestAdapterConversions
import google.cloud.bigquery

from .utils import config_from_parts_or_dicts, inject_adapter, TestAdapterConversions

def _bq_conn():
conn = MagicMock()
Expand Down Expand Up @@ -516,8 +517,65 @@ def test_query_and_results(self, mock_bq):
self.mock_client.query.assert_called_once_with(
'sql', job_config=mock_bq.QueryJobConfig())


def test_copy_bq_table_appends(self):
self._copy_table(
write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND)
args, kwargs = self.mock_client.copy_table.call_args
self.mock_client.copy_table.assert_called_once_with(
self._table_ref('project', 'dataset', 'table1', None),
self._table_ref('project', 'dataset', 'table2', None),
job_config=ANY)
args, kwargs = self.mock_client.copy_table.call_args
self.assertEqual(
kwargs['job_config'].write_disposition,
dbt.adapters.bigquery.impl.WRITE_APPEND)

def test_copy_bq_table_truncates(self):
self._copy_table(
write_disposition=dbt.adapters.bigquery.impl.WRITE_TRUNCATE)
args, kwargs = self.mock_client.copy_table.call_args
self.mock_client.copy_table.assert_called_once_with(
self._table_ref('project', 'dataset', 'table1', None),
self._table_ref('project', 'dataset', 'table2', None),
job_config=ANY)
args, kwargs = self.mock_client.copy_table.call_args
self.assertEqual(
kwargs['job_config'].write_disposition,
dbt.adapters.bigquery.impl.WRITE_TRUNCATE)

def _table_ref(self, proj, ds, table, conn):
return google.cloud.bigquery.table.TableReference.from_string(
'{}.{}.{}'.format(proj, ds, table))

def _copy_table(self, write_disposition):

self.connections.table_ref = self._table_ref
source = BigQueryRelation.create(
database='project', schema='dataset', identifier='table1')
destination = BigQueryRelation.create(
database='project', schema='dataset', identifier='table2')
self.connections.copy_bq_table(source, destination, write_disposition)


class TestBigQueryAdapter(BaseTestBigQueryAdapter):

def test_copy_table_materialization_table(self):
adapter = self.get_adapter('oauth')
adapter.connections = MagicMock()
adapter.copy_table('source', 'destination', 'table')
adapter.connections.copy_bq_table.assert_called_once_with(
'source', 'destination',
dbt.adapters.bigquery.impl.WRITE_TRUNCATE)

def test_copy_table_materialization_incremental(self):
adapter = self.get_adapter('oauth')
adapter.connections = MagicMock()
adapter.copy_table('source', 'destination', 'incremental')
adapter.connections.copy_bq_table.assert_called_once_with(
'source', 'destination',
dbt.adapters.bigquery.impl.WRITE_APPEND)

class TestBigQueryTableOptions(BaseTestBigQueryAdapter):
def test_parse_partition_by(self):
adapter = self.get_adapter('oauth')

Expand Down

0 comments on commit 4273cc9

Please sign in to comment.