We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Thanks for reviewing my issue:)
I am trying to use the test_client arg while declare an WriteToBigQuery instance and hopefully I can make the bq client work with big query emulator.
test_client
WriteToBigQuery
I did notice this one:
beam/sdks/python/apache_beam/io/gcp/bigquery.py
Line 1443 in 786ba8b
which lead to an assumption: test_client is an instance of BigqueryV2
BigqueryV2
Here is the pipeline code:
def test_write_big_query(): project_id = "test-project" dataset_id = 'poc-test-dataset' table_1_id = 'poc-test-table-1' pipeline_options = PipelineOptions(save_main_session=True) tp = TestPipeline(options=pipeline_options) x = tp | Create([ {'type2': 'a', "full_name": 'firstname lastname', 'id2': str(uuid.uuid4())}, {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': str(uuid.uuid4())} ]) from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2 from apache_beam.internal.http_client import get_new_http from apache_beam.internal.gcp import auth test_client_bq = BigqueryV2(... # I tried different versions her and trying to make this work x = x | WriteToBigQuery(table=table_resolve, dataset=dataset_id, project=project_id, test_client=test_client_bq, method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False) tp.run()
I keeps getting trouble while trying different ways to construct a working BigqueryV2 instance but it keeps failing...
test_client_bq is defined as :
test_client_bq
test_client_bq = BigqueryV2(url='http://localhost:9050/bigquery/v2/')
unit/test_io.py:301 (test_write_big_query) encoded = b'QlpoOTFBWSZTWURKFI8ABwF/6v////ve///1v///ov////pz5K4AIABIAEU1YAgfelh9D32+fON6ALc55NTzVnlLoaSGphoU2p6ZRP0DU0ynqPE0ymEa...T9hSQuqjjMVZ+10rH+O4z7uw6tc2/eQw6eVNTFVY9LZqrermFypf6cBJfsdjNmqjKJjfaP/RsCy366TEl+MGJiyaqHmRJ+5hGaEiD/4u5IpwoSCIlCkeA=' enable_trace = True, use_zlib = False def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" c = base64.b64decode(encoded) if use_zlib: s = zlib.decompress(c) else: s = bz2.decompress(c) del c # Free up some possibly large and no-longer-needed memory. with _pickle_lock: try: > return dill.loads(s) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:285: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ str = b'\x80\x04\x956\x10\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.' ignore = None, kwds = {}, file = <_io.BytesIO object at 0x13392df90> def loads(str, ignore=None, **kwds): """unpickle an object from a string""" file = StringIO(str) > return load(file, ignore, **kwds) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ file = <_io.BytesIO object at 0x13392df90>, ignore = None, kwds = {} def load(file, ignore=None, **kwds): """unpickle an object from a file""" > return Unpickler(file, ignore=ignore, **kwds).load() /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <dill._dill.Unpickler object at 0x133a08630> def load(self): #NOTE: if settings change, need to update attributes > obj = StockUnpickler.load(self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = [], sequence = ['*/*'] def extend(self, sequence): """Validate extension of list.""" > self.__field.validate(sequence) E AttributeError: 'FieldList' object has no attribute '_FieldList__field' /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apitools/base/protorpclite/messages.py:1147: AttributeError During handling of the above exception, another exception occurred: def test_write_big_query(): project_id = "test-project" dataset_id = 'poc-test-dataset' table_1_id = 'poc-test-table-1' pipeline_options = PipelineOptions(save_main_session=True) tp = TestPipeline(options=pipeline_options) x = tp | Create([ {'type2': 'a', "full_name": 'firstname lastname', 'id2': str(uuid.uuid4())}, {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': str(uuid.uuid4())} ]) from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2 from apache_beam.internal.http_client import get_new_http from apache_beam.internal.gcp import auth # from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper # bqc_ = BigqueryV2(url='http://localhost:9050/bigquery/v2/') # bq_test_client = BigQueryWrapper(client=bqc_) test_client_bq = BigqueryV2( url='http://localhost:9050/bigquery/v2/' # http=get_new_http(), # credentials=auth.get_service_credentials(None), # response_encoding='utf8', # additional_http_headers={ # "user-agent": "apache-beam-x" # } ) > x = x | WriteToBigQuery(table=table_resolve, dataset=dataset_id, project=project_id, test_client=test_client_bq, method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False) test_io.py:332: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__ return self.pipeline.apply(ptransform, self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply return m(transform, input, options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform return transform.expand(input) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2308: in expand outputs = pcoll | _StreamToBigQuery( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__ return self.pipeline.apply(ptransform, self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply return m(transform, input, options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform return transform.expand(input) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2031: in expand | 'StreamInsertRows' >> ParDo( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/core.py:1416: in __init__ super().__init__(fn, *args, **kwargs) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:864: in __init__ self.fn = pickler.loads(pickler.dumps(self.fn)) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/pickler.py:51: in loads return desired_pickle_lib.loads( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:289: in loads return dill.loads(s) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: in loads return load(file, ignore, **kwds) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: in load return Unpickler(file, ignore=ignore, **kwds).load() /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: in load obj = StockUnpickler.load(self) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = [], sequence = ['*/*'] def extend(self, sequence): """Validate extension of list.""" > self.__field.validate(sequence) E AttributeError: 'FieldList' object has no attribute '_FieldList__field' /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apitools/base/protorpclite/messages.py:1147: AttributeError
since I see this line:
beam/sdks/python/apache_beam/io/gcp/bigquery_tools.py
Line 332 in 786ba8b
I put test_client_bq as :
bigquery.BigqueryV2( http=get_new_http(), credentials=auth.get_service_credentials(None), response_encoding='utf8', additional_http_headers={ "user-agent": "apache-beam-%s" % apache_beam.__version__ })
unit/test_io.py:301 (test_write_big_query) encoded = b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A=' enable_trace = True, use_zlib = False def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" c = base64.b64decode(encoded) if use_zlib: s = zlib.decompress(c) else: s = bz2.decompress(c) del c # Free up some possibly large and no-longer-needed memory. with _pickle_lock: try: > return dill.loads(s) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:285: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ str = b'\x80\x04\x95a\x11\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.' ignore = None, kwds = {}, file = <_io.BytesIO object at 0x133cd2a40> def loads(str, ignore=None, **kwds): """unpickle an object from a string""" file = StringIO(str) > return load(file, ignore, **kwds) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ file = <_io.BytesIO object at 0x133cd2a40>, ignore = None, kwds = {} def load(file, ignore=None, **kwds): """unpickle an object from a file""" > return Unpickler(file, ignore=ignore, **kwds).load() /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <dill._dill.Unpickler object at 0x133c90500> def load(self): #NOTE: if settings change, need to update attributes > obj = StockUnpickler.load(self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca9880> attr = '__setstate__' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca9880> attr = '_google_auth_credentials' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca9880> attr = '_google_auth_credentials' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) E RecursionError: maximum recursion depth exceeded /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: RecursionError !!! Recursion detected (same locals & position) During handling of the above exception, another exception occurred: self = <ParDo(PTransform) label=[ParDo(BigQueryWriteFn)] at 0x133ca9190> fn = <apache_beam.io.gcp.bigquery.BigQueryWriteFn object at 0x133ca36d0> args = (), kwargs = {} def __init__(self, fn, *args, **kwargs): # type: (WithTypeHints, *Any, **Any) -> None if isinstance(fn, type) and issubclass(fn, WithTypeHints): # Don't treat Fn class objects as callables. raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__)) self.fn = self.make_fn(fn, bool(args or kwargs)) # Now that we figure out the label, initialize the super-class. super().__init__() if (any(isinstance(v, pvalue.PCollection) for v in args) or any(isinstance(v, pvalue.PCollection) for v in kwargs.values())): raise error.SideInputError( 'PCollection used directly as side input argument. Specify ' 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' 'PCollection is to be used.') self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args( args, kwargs, pvalue.AsSideInput) self.raw_side_inputs = args, kwargs # Prevent name collisions with fns of the form '<function <lambda> at ...>' self._cached_fn = self.fn # Ensure fn and side inputs are picklable for remote execution. try: > self.fn = pickler.loads(pickler.dumps(self.fn)) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:864: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ encoded = b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A=' enable_trace = True, use_zlib = False def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" > return desired_pickle_lib.loads( encoded, enable_trace=enable_trace, use_zlib=use_zlib) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/pickler.py:51: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ encoded = b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A=' enable_trace = True, use_zlib = False def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" c = base64.b64decode(encoded) if use_zlib: s = zlib.decompress(c) else: s = bz2.decompress(c) del c # Free up some possibly large and no-longer-needed memory. with _pickle_lock: try: return dill.loads(s) except Exception: # pylint: disable=broad-except if enable_trace: dill.dill._trace(True) # pylint: disable=protected-access > return dill.loads(s) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:289: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ str = b'\x80\x04\x95a\x11\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.' ignore = None, kwds = {}, file = <_io.BytesIO object at 0x133d2d4f0> def loads(str, ignore=None, **kwds): """unpickle an object from a string""" file = StringIO(str) > return load(file, ignore, **kwds) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ file = <_io.BytesIO object at 0x133d2d4f0>, ignore = None, kwds = {} def load(file, ignore=None, **kwds): """unpickle an object from a file""" > return Unpickler(file, ignore=ignore, **kwds).load() /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <dill._dill.Unpickler object at 0x133c90630> def load(self): #NOTE: if settings change, need to update attributes > obj = StockUnpickler.load(self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca33d0> attr = '__setstate__' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca33d0> attr = '_google_auth_credentials' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca33d0> attr = '_google_auth_credentials' def __getattr__(self, attr): """Delegate attribute access to underlying google-auth credentials.""" > return getattr(self._google_auth_credentials, attr) E RecursionError: maximum recursion depth exceeded while calling a Python object /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: RecursionError !!! Recursion detected (same locals & position) During handling of the above exception, another exception occurred: def test_write_big_query(): project_id = "test-project" dataset_id = 'poc-test-dataset' table_1_id = 'poc-test-table-1' pipeline_options = PipelineOptions(save_main_session=True) tp = TestPipeline(options=pipeline_options) x = tp | Create([ {'type2': 'a', "full_name": 'firstname lastname', 'id2': str(uuid.uuid4())}, {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': str(uuid.uuid4())} ]) from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2 from apache_beam.internal.http_client import get_new_http from apache_beam.internal.gcp import auth # from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper # bqc_ = BigqueryV2(url='http://localhost:9050/bigquery/v2/') # bq_test_client = BigQueryWrapper(client=bqc_) # test_client_bq = BigqueryV2( # url='http://localhost:9050/bigquery/v2/' # # http=get_new_http(), # # credentials=auth.get_service_credentials(None), # # response_encoding='utf8', # # additional_http_headers={ # # "user-agent": "apache-beam-x" # # } # ) test_client_bq = BigqueryV2( http=get_new_http(), credentials=auth.get_service_credentials(None), response_encoding='utf8', additional_http_headers={ "user-agent": "apache-beam-2.41.0" }) > x = x | WriteToBigQuery(table=table_resolve, dataset=dataset_id, project=project_id, test_client=test_client_bq, method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False) test_io.py:340: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__ return self.pipeline.apply(ptransform, self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply return m(transform, input, options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform return transform.expand(input) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2308: in expand outputs = pcoll | _StreamToBigQuery( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__ return self.pipeline.apply(ptransform, self) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply return m(transform, input, options) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform return transform.expand(input) /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2031: in expand | 'StreamInsertRows' >> ParDo( /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/core.py:1416: in __init__ super().__init__(fn, *args, **kwargs) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <ParDo(PTransform) label=[ParDo(BigQueryWriteFn)] at 0x133ca9190> fn = <apache_beam.io.gcp.bigquery.BigQueryWriteFn object at 0x133ca36d0> args = (), kwargs = {} def __init__(self, fn, *args, **kwargs): # type: (WithTypeHints, *Any, **Any) -> None if isinstance(fn, type) and issubclass(fn, WithTypeHints): # Don't treat Fn class objects as callables. raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__)) self.fn = self.make_fn(fn, bool(args or kwargs)) # Now that we figure out the label, initialize the super-class. super().__init__() if (any(isinstance(v, pvalue.PCollection) for v in args) or any(isinstance(v, pvalue.PCollection) for v in kwargs.values())): raise error.SideInputError( 'PCollection used directly as side input argument. Specify ' 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the ' 'PCollection is to be used.') self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args( args, kwargs, pvalue.AsSideInput) self.raw_side_inputs = args, kwargs # Prevent name collisions with fns of the form '<function <lambda> at ...>' self._cached_fn = self.fn # Ensure fn and side inputs are picklable for remote execution. try: self.fn = pickler.loads(pickler.dumps(self.fn)) except RuntimeError as e: > raise RuntimeError('Unable to pickle fn %s: %s' % (self.fn, e)) E RuntimeError: Unable to pickle fn <apache_beam.io.gcp.bigquery.BigQueryWriteFn object at 0x133ca36d0>: maximum recursion depth exceeded while calling a Python object /Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:866: RuntimeError
allure-pytest==2.9.45 allure-python-commons==2.9.45 anyio==3.6.1 apache-beam==2.41.0 appdirs==1.4.4 attrs==22.1.0 avro==1.11.0 bigquery-schema-generator==1.5 cachetools==4.2.4 certifi==2022.6.15 charset-normalizer==2.1.1 click==8.1.3 cloudevents==1.2.0 cloudpickle==2.1.0 confluent-kafka==1.8.2 coverage==6.4.4 crcmod==1.7 deprecation==2.1.0 dill==0.3.1.1 docopt==0.6.2 fastapi==0.79.0 fastavro==1.6.0 fasteners==0.18 google-api-core==2.10.0 google-apitools==0.5.32 google-auth==2.11.0 google-auth-httplib2==0.1.0 google-cloud-bigquery==2.34.4 google-cloud-bigquery-storage==2.13.2 google-cloud-bigtable==1.7.2 google-cloud-core==2.3.2 google-cloud-datastore==1.15.5 google-cloud-dlp==3.9.2 google-cloud-language==1.3.2 google-cloud-pubsub==2.13.9 google-cloud-pubsublite==1.5.0 google-cloud-recommendations-ai==0.7.1 google-cloud-spanner==1.19.3 google-cloud-videointelligence==1.16.3 google-cloud-vision==1.0.2 google-crc32c==1.5.0 google-resumable-media==2.4.0 googleapis-common-protos==1.56.4 greenlet==1.1.3 grpc-google-iam-v1==0.12.4 grpcio==1.48.1 grpcio-gcp==0.2.2 grpcio-status==1.48.1 h11==0.13.0 hdfs==2.7.0 httplib2==0.20.4 idna==3.3 importlib-metadata==4.12.0 libcst==0.4.7 more-itertools==8.14.0 MouseInfo==0.1.3 mypy-extensions==0.4.3 numpy==1.22.4 oauth2client==4.1.3 orjson==3.7.12 overrides==6.5.0 packaging==21.3 pandas==1.4.4 Pillow==9.2.0 pluggy==0.13.1 proto-plus==1.22.0 protobuf==3.20.3 py==1.11.0 pyarrow==7.0.0 pyasn1==0.4.8 pyasn1-modules==0.2.8 PyAutoGUI==0.9.53 pydantic==1.10.1 pydot==1.4.2 pyee==8.2.2 PyGetWindow==0.0.9 PyJWT==2.4.0 pymongo==3.12.3 PyMsgBox==1.0.9 pyobjc==8.5.1 pyobjc-core==8.5.1 pyobjc-framework-Accessibility==8.5.1 pyobjc-framework-Accounts==8.5.1 pyobjc-framework-AddressBook==8.5.1 pyobjc-framework-AdServices==8.5.1 pyobjc-framework-AdSupport==8.5.1 pyobjc-framework-AppleScriptKit==8.5.1 pyobjc-framework-AppleScriptObjC==8.5.1 pyobjc-framework-ApplicationServices==8.5.1 pyobjc-framework-AppTrackingTransparency==8.5.1 pyobjc-framework-AudioVideoBridging==8.5.1 pyobjc-framework-AuthenticationServices==8.5.1 pyobjc-framework-AutomaticAssessmentConfiguration==8.5.1 pyobjc-framework-Automator==8.5.1 pyobjc-framework-AVFoundation==8.5.1 pyobjc-framework-AVKit==8.5.1 pyobjc-framework-BusinessChat==8.5.1 pyobjc-framework-CalendarStore==8.5.1 pyobjc-framework-CallKit==8.5.1 pyobjc-framework-CFNetwork==8.5.1 pyobjc-framework-ClassKit==8.5.1 pyobjc-framework-CloudKit==8.5.1 pyobjc-framework-Cocoa==8.5.1 pyobjc-framework-Collaboration==8.5.1 pyobjc-framework-ColorSync==8.5.1 pyobjc-framework-Contacts==8.5.1 pyobjc-framework-ContactsUI==8.5.1 pyobjc-framework-CoreAudio==8.5.1 pyobjc-framework-CoreAudioKit==8.5.1 pyobjc-framework-CoreBluetooth==8.5.1 pyobjc-framework-CoreData==8.5.1 pyobjc-framework-CoreHaptics==8.5.1 pyobjc-framework-CoreLocation==8.5.1 pyobjc-framework-CoreMedia==8.5.1 pyobjc-framework-CoreMediaIO==8.5.1 pyobjc-framework-CoreMIDI==8.5.1 pyobjc-framework-CoreML==8.5.1 pyobjc-framework-CoreMotion==8.5.1 pyobjc-framework-CoreServices==8.5.1 pyobjc-framework-CoreSpotlight==8.5.1 pyobjc-framework-CoreText==8.5.1 pyobjc-framework-CoreWLAN==8.5.1 pyobjc-framework-CryptoTokenKit==8.5.1 pyobjc-framework-DataDetection==8.5.1 pyobjc-framework-DeviceCheck==8.5.1 pyobjc-framework-DictionaryServices==8.5.1 pyobjc-framework-DiscRecording==8.5.1 pyobjc-framework-DiscRecordingUI==8.5.1 pyobjc-framework-DiskArbitration==8.5.1 pyobjc-framework-DVDPlayback==8.5.1 pyobjc-framework-EventKit==8.5.1 pyobjc-framework-ExceptionHandling==8.5.1 pyobjc-framework-ExecutionPolicy==8.5.1 pyobjc-framework-ExternalAccessory==8.5.1 pyobjc-framework-FileProvider==8.5.1 pyobjc-framework-FileProviderUI==8.5.1 pyobjc-framework-FinderSync==8.5.1 pyobjc-framework-FSEvents==8.5.1 pyobjc-framework-GameCenter==8.5.1 pyobjc-framework-GameController==8.5.1 pyobjc-framework-GameKit==8.5.1 pyobjc-framework-GameplayKit==8.5.1 pyobjc-framework-ImageCaptureCore==8.5.1 pyobjc-framework-IMServicePlugIn==8.5.1 pyobjc-framework-InputMethodKit==8.5.1 pyobjc-framework-InstallerPlugins==8.5.1 pyobjc-framework-InstantMessage==8.5.1 pyobjc-framework-Intents==8.5.1 pyobjc-framework-IntentsUI==8.5.1 pyobjc-framework-IOSurface==8.5.1 pyobjc-framework-iTunesLibrary==8.5.1 pyobjc-framework-KernelManagement==8.5.1 pyobjc-framework-LatentSemanticMapping==8.5.1 pyobjc-framework-LaunchServices==8.5.1 pyobjc-framework-libdispatch==8.5.1 pyobjc-framework-LinkPresentation==8.5.1 pyobjc-framework-LocalAuthentication==8.5.1 pyobjc-framework-LocalAuthenticationEmbeddedUI==8.5.1 pyobjc-framework-MailKit==8.5.1 pyobjc-framework-MapKit==8.5.1 pyobjc-framework-MediaAccessibility==8.5.1 pyobjc-framework-MediaLibrary==8.5.1 pyobjc-framework-MediaPlayer==8.5.1 pyobjc-framework-MediaToolbox==8.5.1 pyobjc-framework-Metal==8.5.1 pyobjc-framework-MetalKit==8.5.1 pyobjc-framework-MetalPerformanceShaders==8.5.1 pyobjc-framework-MetalPerformanceShadersGraph==8.5.1 pyobjc-framework-MetricKit==8.5.1 pyobjc-framework-MLCompute==8.5.1 pyobjc-framework-ModelIO==8.5.1 pyobjc-framework-MultipeerConnectivity==8.5.1 pyobjc-framework-NaturalLanguage==8.5.1 pyobjc-framework-NetFS==8.5.1 pyobjc-framework-Network==8.5.1 pyobjc-framework-NetworkExtension==8.5.1 pyobjc-framework-NotificationCenter==8.5.1 pyobjc-framework-OpenDirectory==8.5.1 pyobjc-framework-OSAKit==8.5.1 pyobjc-framework-OSLog==8.5.1 pyobjc-framework-PassKit==8.5.1 pyobjc-framework-PencilKit==8.5.1 pyobjc-framework-Photos==8.5.1 pyobjc-framework-PhotosUI==8.5.1 pyobjc-framework-PreferencePanes==8.5.1 pyobjc-framework-PushKit==8.5.1 pyobjc-framework-Quartz==8.5.1 pyobjc-framework-QuickLookThumbnailing==8.5.1 pyobjc-framework-ReplayKit==8.5.1 pyobjc-framework-SafariServices==8.5.1 pyobjc-framework-SceneKit==8.5.1 pyobjc-framework-ScreenCaptureKit==8.5.1 pyobjc-framework-ScreenSaver==8.5.1 pyobjc-framework-ScreenTime==8.5.1 pyobjc-framework-ScriptingBridge==8.5.1 pyobjc-framework-SearchKit==8.5.1 pyobjc-framework-Security==8.5.1 pyobjc-framework-SecurityFoundation==8.5.1 pyobjc-framework-SecurityInterface==8.5.1 pyobjc-framework-ServiceManagement==8.5.1 pyobjc-framework-ShazamKit==8.5.1 pyobjc-framework-Social==8.5.1 pyobjc-framework-SoundAnalysis==8.5.1 pyobjc-framework-Speech==8.5.1 pyobjc-framework-SpriteKit==8.5.1 pyobjc-framework-StoreKit==8.5.1 pyobjc-framework-SyncServices==8.5.1 pyobjc-framework-SystemConfiguration==8.5.1 pyobjc-framework-SystemExtensions==8.5.1 pyobjc-framework-UniformTypeIdentifiers==8.5.1 pyobjc-framework-UserNotifications==8.5.1 pyobjc-framework-UserNotificationsUI==8.5.1 pyobjc-framework-VideoSubscriberAccount==8.5.1 pyobjc-framework-VideoToolbox==8.5.1 pyobjc-framework-Virtualization==8.5.1 pyobjc-framework-Vision==8.5.1 pyobjc-framework-WebKit==8.5.1 pyparsing==3.0.9 pyperclip==1.8.2 pyppeteer==1.0.2 PyRect==0.2.0 PyScreeze==0.1.28 pytest==5.4.3 pytest-order==1.0.1 python-dateutil==2.8.2 pytweening==1.0.4 pytz==2022.2.1 PyYAML==6.0 regex==2022.9.13 requests==2.28.1 rsa==4.9 rubicon-objc==0.4.2 six==1.16.0 sniffio==1.3.0 SQLAlchemy==1.4.39 sqlparse==0.4.2 starlette==0.19.1 tqdm==4.64.1 typing-inspect==0.8.0 typing_extensions==4.3.0 urllib3==1.26.11 uvicorn==0.18.2 wcwidth==0.2.5 websockets==10.3 zipp==3.8.1 zstandard==0.18.0
docker-compose to set up the emulator
docker-compose
version: '3.6' services: bigquery-emulator: hostname: bigquery container_name: bigquery image: ghcr.io/goccy/bigquery-emulator:latest ports: - 9050:9050 command: --project=${PROJECT_ID} --port=${BIGQUERY_PORT}
The python code to setup dataset/table
def set_up_bq_instance(): from google.cloud.bigquery import TableReference, DatasetReference from google.api_core.client_options import ClientOptions from google.auth.credentials import AnonymousCredentials from google.cloud import bigquery project_id = "test-project" client_options = ClientOptions(api_endpoint="http://localhost:9050") bq_test_client = bq_test_client = bigquery.Client( project=project_id, client_options=client_options, credentials=AnonymousCredentials(), ) dataset_ref = DatasetReference(project=project_id, dataset_id='poc-test-dataset') try: bq_test_client.create_dataset(dataset=dataset_ref, exists_ok=True, retry=None) except InternalServerError: pass table_ref_1 = TableReference(dataset_ref=dataset_ref, table_id='poc-test-table-1') table_ref_2 = TableReference(dataset_ref=dataset_ref, table_id='poc-test-table-2') schema_1 = [ bigquery.SchemaField("type2", "STRING", mode="REQUIRED"), bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("id2", "STRING", mode="REQUIRED"), ] schema_2 = [ bigquery.SchemaField("type2", "STRING", mode="REQUIRED"), bigquery.SchemaField("full_addr", "STRING", mode="REQUIRED"), bigquery.SchemaField("id2", "STRING", mode="REQUIRED"), ] table_1 = bigquery.Table(table_ref=table_ref_1, schema=schema_1) table_2 = bigquery.Table(table_ref=table_ref_2, schema=schema_2) try: _ = bq_test_client.create_table(table_1, exists_ok=True, retry=None) except InternalServerError: pass try: _ = bq_test_client.create_table(table_2, exists_ok=True, retry=None) except InternalServerError: pass list_dataset = list(bq_test_client.list_datasets()) list_table = list(bq_test_client.list_tables(dataset_ref)) return bq_test_client
Priority: 2
Component: io-py-gcp
The text was updated successfully, but these errors were encountered:
@johnjcasey @Abacn do you understand the question here?
Sorry, something went wrong.
Relates to #28149 (comment)
No branches or pull requests
What happened?
Thanks for reviewing my issue:)
I am trying to use the
test_client
arg while declare anWriteToBigQuery
instance and hopefully I can make the bq client work with big query emulator.I did notice this one:
beam/sdks/python/apache_beam/io/gcp/bigquery.py
Line 1443 in 786ba8b
which lead to an assumption:
test_client
is an instance ofBigqueryV2
Here is the pipeline code:
I keeps getting trouble while trying different ways to construct a working
BigqueryV2
instance but it keeps failing...Experiment 1
test_client_bq
is defined as :error msg
Experiment 2
since I see this line:
beam/sdks/python/apache_beam/io/gcp/bigquery_tools.py
Line 332 in 786ba8b
I put
test_client_bq
as :error
ENV
pip3 freeze
Other context
code to set up bigquery emulator
docker-compose
to set up the emulatorThe python code to setup dataset/table
Issue Priority
Priority: 2
Issue Component
Component: io-py-gcp
The text was updated successfully, but these errors were encountered: