Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Or a Feature Request]: Overwrite BigQuery TestClient to work with another API endpoint #23598

Open
ruodingt opened this issue Oct 12, 2022 · 2 comments

Comments

@ruodingt
Copy link

What happened?

Thanks for reviewing my issue:)

I am trying to use the test_client arg while declare an WriteToBigQuery instance and hopefully I can make the bq client work with big query emulator.

I did notice this one:

self.bigquery_wrapper = bigquery_tools.BigQueryWrapper(

which lead to an assumption: test_client is an instance of BigqueryV2

Here is the pipeline code:

def test_write_big_query():
    project_id = "test-project"
    dataset_id = 'poc-test-dataset'
    table_1_id = 'poc-test-table-1'

    pipeline_options = PipelineOptions(save_main_session=True)
    tp = TestPipeline(options=pipeline_options)

    x = tp | Create([
        {'type2': 'a', "full_name": 'firstname lastname', 'id2': str(uuid.uuid4())},
        {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': str(uuid.uuid4())}
    ])

    from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2
    from apache_beam.internal.http_client import get_new_http
    from apache_beam.internal.gcp import auth

    test_client_bq = BigqueryV2(... # I tried different versions her and trying to make this work

    x = x | WriteToBigQuery(table=table_resolve,
                            dataset=dataset_id,
                            project=project_id,
                            test_client=test_client_bq,
                            method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False)

    tp.run()

I keeps getting trouble while trying different ways to construct a working BigqueryV2 instance but it keeps failing...

Experiment 1

test_client_bq is defined as :

test_client_bq = BigqueryV2(url='http://localhost:9050/bigquery/v2/')
error msg
unit/test_io.py:301 (test_write_big_query)
encoded = b'QlpoOTFBWSZTWURKFI8ABwF/6v////ve///1v///ov////pz5K4AIABIAEU1YAgfelh9D32+fON6ALc55NTzVnlLoaSGphoU2p6ZRP0DU0ynqPE0ymEa...T9hSQuqjjMVZ+10rH+O4z7uw6tc2/eQw6eVNTFVY9LZqrermFypf6cBJfsdjNmqjKJjfaP/RsCy366TEl+MGJiyaqHmRJ+5hGaEiD/4u5IpwoSCIlCkeA='
enable_trace = True, use_zlib = False

    def loads(encoded, enable_trace=True, use_zlib=False):
      """For internal use only; no backwards-compatibility guarantees."""
    
      c = base64.b64decode(encoded)
    
      if use_zlib:
        s = zlib.decompress(c)
      else:
        s = bz2.decompress(c)
    
      del c  # Free up some possibly large and no-longer-needed memory.
    
      with _pickle_lock:
        try:
>         return dill.loads(s)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:285: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

str = b'\x80\x04\x956\x10\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.'
ignore = None, kwds = {}, file = <_io.BytesIO object at 0x13392df90>

    def loads(str, ignore=None, **kwds):
        """unpickle an object from a string"""
        file = StringIO(str)
>       return load(file, ignore, **kwds)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

file = <_io.BytesIO object at 0x13392df90>, ignore = None, kwds = {}

    def load(file, ignore=None, **kwds):
        """unpickle an object from a file"""
>       return Unpickler(file, ignore=ignore, **kwds).load()

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dill._dill.Unpickler object at 0x133a08630>

    def load(self): #NOTE: if settings change, need to update attributes
>       obj = StockUnpickler.load(self)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = [], sequence = ['*/*']

    def extend(self, sequence):
        """Validate extension of list."""
>       self.__field.validate(sequence)
E       AttributeError: 'FieldList' object has no attribute '_FieldList__field'

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apitools/base/protorpclite/messages.py:1147: AttributeError

During handling of the above exception, another exception occurred:

    def test_write_big_query():
        project_id = "test-project"
        dataset_id = 'poc-test-dataset'
        table_1_id = 'poc-test-table-1'
    
        pipeline_options = PipelineOptions(save_main_session=True)
        tp = TestPipeline(options=pipeline_options)
    
        x = tp | Create([
            {'type2': 'a', "full_name": 'firstname lastname', 'id2': str(uuid.uuid4())},
            {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': str(uuid.uuid4())}
        ])
    
        from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2
        from apache_beam.internal.http_client import get_new_http
        from apache_beam.internal.gcp import auth
        # from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
        # bqc_ = BigqueryV2(url='http://localhost:9050/bigquery/v2/')
        # bq_test_client = BigQueryWrapper(client=bqc_)
    
        test_client_bq = BigqueryV2(
            url='http://localhost:9050/bigquery/v2/'
            # http=get_new_http(),
            # credentials=auth.get_service_credentials(None),
            # response_encoding='utf8',
            # additional_http_headers={
            #     "user-agent": "apache-beam-x"
            # }
        )
    
>       x = x | WriteToBigQuery(table=table_resolve,
                                dataset=dataset_id,
                                project=project_id,
                                test_client=test_client_bq,
                                method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False)

test_io.py:332: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__
    return self.pipeline.apply(ptransform, self)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply
    return m(transform, input, options)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform
    return transform.expand(input)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2308: in expand
    outputs = pcoll | _StreamToBigQuery(
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__
    return self.pipeline.apply(ptransform, self)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply
    return m(transform, input, options)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform
    return transform.expand(input)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2031: in expand
    | 'StreamInsertRows' >> ParDo(
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/core.py:1416: in __init__
    super().__init__(fn, *args, **kwargs)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:864: in __init__
    self.fn = pickler.loads(pickler.dumps(self.fn))
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/pickler.py:51: in loads
    return desired_pickle_lib.loads(
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:289: in loads
    return dill.loads(s)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: in loads
    return load(file, ignore, **kwds)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: in load
    return Unpickler(file, ignore=ignore, **kwds).load()
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: in load
    obj = StockUnpickler.load(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = [], sequence = ['*/*']

    def extend(self, sequence):
        """Validate extension of list."""
>       self.__field.validate(sequence)
E       AttributeError: 'FieldList' object has no attribute '_FieldList__field'

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apitools/base/protorpclite/messages.py:1147: AttributeError



Experiment 2

since I see this line:

self.client = client or bigquery.BigqueryV2(

I put test_client_bq as :

bigquery.BigqueryV2(
        http=get_new_http(),
        credentials=auth.get_service_credentials(None),
        response_encoding='utf8',
        additional_http_headers={
            "user-agent": "apache-beam-%s" % apache_beam.__version__
        })
error
unit/test_io.py:301 (test_write_big_query)
encoded = b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A='
enable_trace = True, use_zlib = False

    def loads(encoded, enable_trace=True, use_zlib=False):
      """For internal use only; no backwards-compatibility guarantees."""
    
      c = base64.b64decode(encoded)
    
      if use_zlib:
        s = zlib.decompress(c)
      else:
        s = bz2.decompress(c)
    
      del c  # Free up some possibly large and no-longer-needed memory.
    
      with _pickle_lock:
        try:
>         return dill.loads(s)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:285: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

str = b'\x80\x04\x95a\x11\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.'
ignore = None, kwds = {}, file = <_io.BytesIO object at 0x133cd2a40>

    def loads(str, ignore=None, **kwds):
        """unpickle an object from a string"""
        file = StringIO(str)
>       return load(file, ignore, **kwds)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

file = <_io.BytesIO object at 0x133cd2a40>, ignore = None, kwds = {}

    def load(file, ignore=None, **kwds):
        """unpickle an object from a file"""
>       return Unpickler(file, ignore=ignore, **kwds).load()

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dill._dill.Unpickler object at 0x133c90500>

    def load(self): #NOTE: if settings change, need to update attributes
>       obj = StockUnpickler.load(self)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca9880>
attr = '__setstate__'

    def __getattr__(self, attr):
      """Delegate attribute access to underlying google-auth credentials."""
>     return getattr(self._google_auth_credentials, attr)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca9880>
attr = '_google_auth_credentials'

    def __getattr__(self, attr):
      """Delegate attribute access to underlying google-auth credentials."""
>     return getattr(self._google_auth_credentials, attr)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca9880>
attr = '_google_auth_credentials'

    def __getattr__(self, attr):
      """Delegate attribute access to underlying google-auth credentials."""
>     return getattr(self._google_auth_credentials, attr)
E     RecursionError: maximum recursion depth exceeded

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: RecursionError
!!! Recursion detected (same locals & position)

During handling of the above exception, another exception occurred:

self = <ParDo(PTransform) label=[ParDo(BigQueryWriteFn)] at 0x133ca9190>
fn = <apache_beam.io.gcp.bigquery.BigQueryWriteFn object at 0x133ca36d0>
args = (), kwargs = {}

    def __init__(self, fn, *args, **kwargs):
      # type: (WithTypeHints, *Any, **Any) -> None
      if isinstance(fn, type) and issubclass(fn, WithTypeHints):
        # Don't treat Fn class objects as callables.
        raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__))
      self.fn = self.make_fn(fn, bool(args or kwargs))
      # Now that we figure out the label, initialize the super-class.
      super().__init__()
    
      if (any(isinstance(v, pvalue.PCollection) for v in args) or
          any(isinstance(v, pvalue.PCollection) for v in kwargs.values())):
        raise error.SideInputError(
            'PCollection used directly as side input argument. Specify '
            'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the '
            'PCollection is to be used.')
      self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args(
          args, kwargs, pvalue.AsSideInput)
      self.raw_side_inputs = args, kwargs
    
      # Prevent name collisions with fns of the form '<function <lambda> at ...>'
      self._cached_fn = self.fn
    
      # Ensure fn and side inputs are picklable for remote execution.
      try:
>       self.fn = pickler.loads(pickler.dumps(self.fn))

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:864: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

encoded = b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A='
enable_trace = True, use_zlib = False

    def loads(encoded, enable_trace=True, use_zlib=False):
      """For internal use only; no backwards-compatibility guarantees."""
    
>     return desired_pickle_lib.loads(
          encoded, enable_trace=enable_trace, use_zlib=use_zlib)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/pickler.py:51: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

encoded = b'QlpoOTFBWSZTWWgkjdgAB3X/+/////v////1v///ov////rj7S4IAEAgAEAAAQQMAGAIn30+56Cp9vV573du719OefHOtoNs9HffPvDRCExA1MnqmaMk...dj9dkHpxsfg1V9mODbtHdYiyVO4UjKY1QG3OhDMGUiXjpbDNEUcrtRipaYUdeOKZH9XoMTt8LOY7Dtna2a4isad47qLu83S3xyJ/6f/F3JFOFCQaCSN2A='
enable_trace = True, use_zlib = False

    def loads(encoded, enable_trace=True, use_zlib=False):
      """For internal use only; no backwards-compatibility guarantees."""
    
      c = base64.b64decode(encoded)
    
      if use_zlib:
        s = zlib.decompress(c)
      else:
        s = bz2.decompress(c)
    
      del c  # Free up some possibly large and no-longer-needed memory.
    
      with _pickle_lock:
        try:
          return dill.loads(s)
        except Exception:  # pylint: disable=broad-except
          if enable_trace:
            dill.dill._trace(True)  # pylint: disable=protected-access
>           return dill.loads(s)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py:289: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

str = b'\x80\x04\x95a\x11\x00\x00\x00\x00\x00\x00\x8c\x1bapache_beam.io.gcp.bigquery\x94\x8c\x0fBigQueryWriteFn\x94\x93\x94)...#streaming_api_logging_frequency_sec\x94M,\x01\x8c\x16ignore_unknown_columns\x94\x89\x8c\x0c_max_retries\x94M\x10\'ub.'
ignore = None, kwds = {}, file = <_io.BytesIO object at 0x133d2d4f0>

    def loads(str, ignore=None, **kwds):
        """unpickle an object from a string"""
        file = StringIO(str)
>       return load(file, ignore, **kwds)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

file = <_io.BytesIO object at 0x133d2d4f0>, ignore = None, kwds = {}

    def load(file, ignore=None, **kwds):
        """unpickle an object from a file"""
>       return Unpickler(file, ignore=ignore, **kwds).load()

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:270: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <dill._dill.Unpickler object at 0x133c90630>

    def load(self): #NOTE: if settings change, need to update attributes
>       obj = StockUnpickler.load(self)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/dill/_dill.py:472: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca33d0>
attr = '__setstate__'

    def __getattr__(self, attr):
      """Delegate attribute access to underlying google-auth credentials."""
>     return getattr(self._google_auth_credentials, attr)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca33d0>
attr = '_google_auth_credentials'

    def __getattr__(self, attr):
      """Delegate attribute access to underlying google-auth credentials."""
>     return getattr(self._google_auth_credentials, attr)

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter object at 0x133ca33d0>
attr = '_google_auth_credentials'

    def __getattr__(self, attr):
      """Delegate attribute access to underlying google-auth credentials."""
>     return getattr(self._google_auth_credentials, attr)
E     RecursionError: maximum recursion depth exceeded while calling a Python object

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/internal/gcp/auth.py:119: RecursionError
!!! Recursion detected (same locals & position)

During handling of the above exception, another exception occurred:

    def test_write_big_query():
        project_id = "test-project"
        dataset_id = 'poc-test-dataset'
        table_1_id = 'poc-test-table-1'
    
        pipeline_options = PipelineOptions(save_main_session=True)
        tp = TestPipeline(options=pipeline_options)
    
        x = tp | Create([
            {'type2': 'a', "full_name": 'firstname lastname', 'id2': str(uuid.uuid4())},
            {'type2': 'b', "full_addr": 'addr st, vic 3000', 'id2': str(uuid.uuid4())}
        ])
    
        from apache_beam.io.gcp.internal.clients.bigquery import BigqueryV2
        from apache_beam.internal.http_client import get_new_http
        from apache_beam.internal.gcp import auth
        # from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
        # bqc_ = BigqueryV2(url='http://localhost:9050/bigquery/v2/')
        # bq_test_client = BigQueryWrapper(client=bqc_)
    
        # test_client_bq = BigqueryV2(
        #     url='http://localhost:9050/bigquery/v2/'
        #     # http=get_new_http(),
        #     # credentials=auth.get_service_credentials(None),
        #     # response_encoding='utf8',
        #     # additional_http_headers={
        #     #     "user-agent": "apache-beam-x"
        #     # }
        # )
    
        test_client_bq = BigqueryV2(
            http=get_new_http(),
            credentials=auth.get_service_credentials(None),
            response_encoding='utf8',
            additional_http_headers={
                "user-agent": "apache-beam-2.41.0"
            })
    
>       x = x | WriteToBigQuery(table=table_resolve,
                                dataset=dataset_id,
                                project=project_id,
                                test_client=test_client_bq,
                                method=WriteToBigQuery.Method.STREAMING_INSERTS, validate=False)

test_io.py:340: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__
    return self.pipeline.apply(ptransform, self)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply
    return m(transform, input, options)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform
    return transform.expand(input)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2308: in expand
    outputs = pcoll | _StreamToBigQuery(
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pvalue.py:137: in __or__
    return self.pipeline.apply(ptransform, self)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/pipeline.py:709: in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:185: in apply
    return m(transform, input, options)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/runners/runner.py:215: in apply_PTransform
    return transform.expand(input)
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery.py:2031: in expand
    | 'StreamInsertRows' >> ParDo(
/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/core.py:1416: in __init__
    super().__init__(fn, *args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <ParDo(PTransform) label=[ParDo(BigQueryWriteFn)] at 0x133ca9190>
fn = <apache_beam.io.gcp.bigquery.BigQueryWriteFn object at 0x133ca36d0>
args = (), kwargs = {}

    def __init__(self, fn, *args, **kwargs):
      # type: (WithTypeHints, *Any, **Any) -> None
      if isinstance(fn, type) and issubclass(fn, WithTypeHints):
        # Don't treat Fn class objects as callables.
        raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__))
      self.fn = self.make_fn(fn, bool(args or kwargs))
      # Now that we figure out the label, initialize the super-class.
      super().__init__()
    
      if (any(isinstance(v, pvalue.PCollection) for v in args) or
          any(isinstance(v, pvalue.PCollection) for v in kwargs.values())):
        raise error.SideInputError(
            'PCollection used directly as side input argument. Specify '
            'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the '
            'PCollection is to be used.')
      self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args(
          args, kwargs, pvalue.AsSideInput)
      self.raw_side_inputs = args, kwargs
    
      # Prevent name collisions with fns of the form '<function <lambda> at ...>'
      self._cached_fn = self.fn
    
      # Ensure fn and side inputs are picklable for remote execution.
      try:
        self.fn = pickler.loads(pickler.dumps(self.fn))
      except RuntimeError as e:
>       raise RuntimeError('Unable to pickle fn %s: %s' % (self.fn, e))
E       RuntimeError: Unable to pickle fn <apache_beam.io.gcp.bigquery.BigQueryWriteFn object at 0x133ca36d0>: maximum recursion depth exceeded while calling a Python object

/Users/tianr1/.pyenv/versions/x86-397/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py:866: RuntimeError



ENV

pip3 freeze
allure-pytest==2.9.45
allure-python-commons==2.9.45
anyio==3.6.1
apache-beam==2.41.0
appdirs==1.4.4
attrs==22.1.0
avro==1.11.0
bigquery-schema-generator==1.5
cachetools==4.2.4
certifi==2022.6.15
charset-normalizer==2.1.1
click==8.1.3
cloudevents==1.2.0
cloudpickle==2.1.0
confluent-kafka==1.8.2
coverage==6.4.4
crcmod==1.7
deprecation==2.1.0
dill==0.3.1.1
docopt==0.6.2
fastapi==0.79.0
fastavro==1.6.0
fasteners==0.18
google-api-core==2.10.0
google-apitools==0.5.32
google-auth==2.11.0
google-auth-httplib2==0.1.0
google-cloud-bigquery==2.34.4
google-cloud-bigquery-storage==2.13.2
google-cloud-bigtable==1.7.2
google-cloud-core==2.3.2
google-cloud-datastore==1.15.5
google-cloud-dlp==3.9.2
google-cloud-language==1.3.2
google-cloud-pubsub==2.13.9
google-cloud-pubsublite==1.5.0
google-cloud-recommendations-ai==0.7.1
google-cloud-spanner==1.19.3
google-cloud-videointelligence==1.16.3
google-cloud-vision==1.0.2
google-crc32c==1.5.0
google-resumable-media==2.4.0
googleapis-common-protos==1.56.4
greenlet==1.1.3
grpc-google-iam-v1==0.12.4
grpcio==1.48.1
grpcio-gcp==0.2.2
grpcio-status==1.48.1
h11==0.13.0
hdfs==2.7.0
httplib2==0.20.4
idna==3.3
importlib-metadata==4.12.0
libcst==0.4.7
more-itertools==8.14.0
MouseInfo==0.1.3
mypy-extensions==0.4.3
numpy==1.22.4
oauth2client==4.1.3
orjson==3.7.12
overrides==6.5.0
packaging==21.3
pandas==1.4.4
Pillow==9.2.0
pluggy==0.13.1
proto-plus==1.22.0
protobuf==3.20.3
py==1.11.0
pyarrow==7.0.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
PyAutoGUI==0.9.53
pydantic==1.10.1
pydot==1.4.2
pyee==8.2.2
PyGetWindow==0.0.9
PyJWT==2.4.0
pymongo==3.12.3
PyMsgBox==1.0.9
pyobjc==8.5.1
pyobjc-core==8.5.1
pyobjc-framework-Accessibility==8.5.1
pyobjc-framework-Accounts==8.5.1
pyobjc-framework-AddressBook==8.5.1
pyobjc-framework-AdServices==8.5.1
pyobjc-framework-AdSupport==8.5.1
pyobjc-framework-AppleScriptKit==8.5.1
pyobjc-framework-AppleScriptObjC==8.5.1
pyobjc-framework-ApplicationServices==8.5.1
pyobjc-framework-AppTrackingTransparency==8.5.1
pyobjc-framework-AudioVideoBridging==8.5.1
pyobjc-framework-AuthenticationServices==8.5.1
pyobjc-framework-AutomaticAssessmentConfiguration==8.5.1
pyobjc-framework-Automator==8.5.1
pyobjc-framework-AVFoundation==8.5.1
pyobjc-framework-AVKit==8.5.1
pyobjc-framework-BusinessChat==8.5.1
pyobjc-framework-CalendarStore==8.5.1
pyobjc-framework-CallKit==8.5.1
pyobjc-framework-CFNetwork==8.5.1
pyobjc-framework-ClassKit==8.5.1
pyobjc-framework-CloudKit==8.5.1
pyobjc-framework-Cocoa==8.5.1
pyobjc-framework-Collaboration==8.5.1
pyobjc-framework-ColorSync==8.5.1
pyobjc-framework-Contacts==8.5.1
pyobjc-framework-ContactsUI==8.5.1
pyobjc-framework-CoreAudio==8.5.1
pyobjc-framework-CoreAudioKit==8.5.1
pyobjc-framework-CoreBluetooth==8.5.1
pyobjc-framework-CoreData==8.5.1
pyobjc-framework-CoreHaptics==8.5.1
pyobjc-framework-CoreLocation==8.5.1
pyobjc-framework-CoreMedia==8.5.1
pyobjc-framework-CoreMediaIO==8.5.1
pyobjc-framework-CoreMIDI==8.5.1
pyobjc-framework-CoreML==8.5.1
pyobjc-framework-CoreMotion==8.5.1
pyobjc-framework-CoreServices==8.5.1
pyobjc-framework-CoreSpotlight==8.5.1
pyobjc-framework-CoreText==8.5.1
pyobjc-framework-CoreWLAN==8.5.1
pyobjc-framework-CryptoTokenKit==8.5.1
pyobjc-framework-DataDetection==8.5.1
pyobjc-framework-DeviceCheck==8.5.1
pyobjc-framework-DictionaryServices==8.5.1
pyobjc-framework-DiscRecording==8.5.1
pyobjc-framework-DiscRecordingUI==8.5.1
pyobjc-framework-DiskArbitration==8.5.1
pyobjc-framework-DVDPlayback==8.5.1
pyobjc-framework-EventKit==8.5.1
pyobjc-framework-ExceptionHandling==8.5.1
pyobjc-framework-ExecutionPolicy==8.5.1
pyobjc-framework-ExternalAccessory==8.5.1
pyobjc-framework-FileProvider==8.5.1
pyobjc-framework-FileProviderUI==8.5.1
pyobjc-framework-FinderSync==8.5.1
pyobjc-framework-FSEvents==8.5.1
pyobjc-framework-GameCenter==8.5.1
pyobjc-framework-GameController==8.5.1
pyobjc-framework-GameKit==8.5.1
pyobjc-framework-GameplayKit==8.5.1
pyobjc-framework-ImageCaptureCore==8.5.1
pyobjc-framework-IMServicePlugIn==8.5.1
pyobjc-framework-InputMethodKit==8.5.1
pyobjc-framework-InstallerPlugins==8.5.1
pyobjc-framework-InstantMessage==8.5.1
pyobjc-framework-Intents==8.5.1
pyobjc-framework-IntentsUI==8.5.1
pyobjc-framework-IOSurface==8.5.1
pyobjc-framework-iTunesLibrary==8.5.1
pyobjc-framework-KernelManagement==8.5.1
pyobjc-framework-LatentSemanticMapping==8.5.1
pyobjc-framework-LaunchServices==8.5.1
pyobjc-framework-libdispatch==8.5.1
pyobjc-framework-LinkPresentation==8.5.1
pyobjc-framework-LocalAuthentication==8.5.1
pyobjc-framework-LocalAuthenticationEmbeddedUI==8.5.1
pyobjc-framework-MailKit==8.5.1
pyobjc-framework-MapKit==8.5.1
pyobjc-framework-MediaAccessibility==8.5.1
pyobjc-framework-MediaLibrary==8.5.1
pyobjc-framework-MediaPlayer==8.5.1
pyobjc-framework-MediaToolbox==8.5.1
pyobjc-framework-Metal==8.5.1
pyobjc-framework-MetalKit==8.5.1
pyobjc-framework-MetalPerformanceShaders==8.5.1
pyobjc-framework-MetalPerformanceShadersGraph==8.5.1
pyobjc-framework-MetricKit==8.5.1
pyobjc-framework-MLCompute==8.5.1
pyobjc-framework-ModelIO==8.5.1
pyobjc-framework-MultipeerConnectivity==8.5.1
pyobjc-framework-NaturalLanguage==8.5.1
pyobjc-framework-NetFS==8.5.1
pyobjc-framework-Network==8.5.1
pyobjc-framework-NetworkExtension==8.5.1
pyobjc-framework-NotificationCenter==8.5.1
pyobjc-framework-OpenDirectory==8.5.1
pyobjc-framework-OSAKit==8.5.1
pyobjc-framework-OSLog==8.5.1
pyobjc-framework-PassKit==8.5.1
pyobjc-framework-PencilKit==8.5.1
pyobjc-framework-Photos==8.5.1
pyobjc-framework-PhotosUI==8.5.1
pyobjc-framework-PreferencePanes==8.5.1
pyobjc-framework-PushKit==8.5.1
pyobjc-framework-Quartz==8.5.1
pyobjc-framework-QuickLookThumbnailing==8.5.1
pyobjc-framework-ReplayKit==8.5.1
pyobjc-framework-SafariServices==8.5.1
pyobjc-framework-SceneKit==8.5.1
pyobjc-framework-ScreenCaptureKit==8.5.1
pyobjc-framework-ScreenSaver==8.5.1
pyobjc-framework-ScreenTime==8.5.1
pyobjc-framework-ScriptingBridge==8.5.1
pyobjc-framework-SearchKit==8.5.1
pyobjc-framework-Security==8.5.1
pyobjc-framework-SecurityFoundation==8.5.1
pyobjc-framework-SecurityInterface==8.5.1
pyobjc-framework-ServiceManagement==8.5.1
pyobjc-framework-ShazamKit==8.5.1
pyobjc-framework-Social==8.5.1
pyobjc-framework-SoundAnalysis==8.5.1
pyobjc-framework-Speech==8.5.1
pyobjc-framework-SpriteKit==8.5.1
pyobjc-framework-StoreKit==8.5.1
pyobjc-framework-SyncServices==8.5.1
pyobjc-framework-SystemConfiguration==8.5.1
pyobjc-framework-SystemExtensions==8.5.1
pyobjc-framework-UniformTypeIdentifiers==8.5.1
pyobjc-framework-UserNotifications==8.5.1
pyobjc-framework-UserNotificationsUI==8.5.1
pyobjc-framework-VideoSubscriberAccount==8.5.1
pyobjc-framework-VideoToolbox==8.5.1
pyobjc-framework-Virtualization==8.5.1
pyobjc-framework-Vision==8.5.1
pyobjc-framework-WebKit==8.5.1
pyparsing==3.0.9
pyperclip==1.8.2
pyppeteer==1.0.2
PyRect==0.2.0
PyScreeze==0.1.28
pytest==5.4.3
pytest-order==1.0.1
python-dateutil==2.8.2
pytweening==1.0.4
pytz==2022.2.1
PyYAML==6.0
regex==2022.9.13
requests==2.28.1
rsa==4.9
rubicon-objc==0.4.2
six==1.16.0
sniffio==1.3.0
SQLAlchemy==1.4.39
sqlparse==0.4.2
starlette==0.19.1
tqdm==4.64.1
typing-inspect==0.8.0
typing_extensions==4.3.0
urllib3==1.26.11
uvicorn==0.18.2
wcwidth==0.2.5
websockets==10.3
zipp==3.8.1
zstandard==0.18.0

Other context

code to set up bigquery emulator

docker-compose to set up the emulator

version: '3.6'
services:
  bigquery-emulator:
    hostname: bigquery
    container_name: bigquery
    image: ghcr.io/goccy/bigquery-emulator:latest
    ports:
      - 9050:9050
    command: --project=${PROJECT_ID} --port=${BIGQUERY_PORT}

The python code to setup dataset/table

def set_up_bq_instance():
    from google.cloud.bigquery import TableReference, DatasetReference

    from google.api_core.client_options import ClientOptions
    from google.auth.credentials import AnonymousCredentials
    from google.cloud import bigquery

    project_id = "test-project"

    client_options = ClientOptions(api_endpoint="http://localhost:9050")
    bq_test_client = bq_test_client = bigquery.Client(
        project=project_id,
        client_options=client_options,
        credentials=AnonymousCredentials(),
    )


    dataset_ref = DatasetReference(project=project_id, dataset_id='poc-test-dataset')

    try:
        bq_test_client.create_dataset(dataset=dataset_ref,
                                      exists_ok=True,
                                      retry=None)
    except InternalServerError:
        pass

    table_ref_1 = TableReference(dataset_ref=dataset_ref, table_id='poc-test-table-1')
    table_ref_2 = TableReference(dataset_ref=dataset_ref, table_id='poc-test-table-2')

    schema_1 = [
        bigquery.SchemaField("type2", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("id2", "STRING", mode="REQUIRED"),
    ]

    schema_2 = [
        bigquery.SchemaField("type2", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("full_addr", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("id2", "STRING", mode="REQUIRED"),
    ]

    table_1 = bigquery.Table(table_ref=table_ref_1, schema=schema_1)
    table_2 = bigquery.Table(table_ref=table_ref_2, schema=schema_2)

    try:
        _ = bq_test_client.create_table(table_1, exists_ok=True, retry=None)
    except InternalServerError:
        pass

    try:
        _ = bq_test_client.create_table(table_2, exists_ok=True, retry=None)
    except InternalServerError:
        pass

    list_dataset = list(bq_test_client.list_datasets())
    list_table = list(bq_test_client.list_tables(dataset_ref))
    return bq_test_client

Issue Priority

Priority: 2

Issue Component

Component: io-py-gcp

@kennknowles
Copy link
Member

@johnjcasey @Abacn do you understand the question here?

@martinbjeldbak
Copy link

Relates to #28149 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants