Skip to content

Commit

Permalink
handle_list_relations_without_caching_failure (#190)
Browse files Browse the repository at this point in the history
Co-authored-by: Vamshi Kolanu <vkolanu@vamshikolanu.local>
  • Loading branch information
vamshikolanu and Vamshi Kolanu authored Feb 9, 2024
1 parent c429927 commit 55fbd86
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 16 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/impala/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

version = "1.4.2"
version = "1.4.3"
34 changes: 20 additions & 14 deletions dbt/adapters/impala/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

LIST_SCHEMAS_MACRO_NAME = "list_schemas"
LIST_RELATIONS_MACRO_NAME = "list_relations_without_caching"
LIST_TABLES_IN_RELATION_MACRO_NAME = "list_tables_in_relation"
GET_RELATIONSHIP_TYPE_MACRO_NAME = "get_relation_type"

KEY_TABLE_OWNER = "Owner"
KEY_TABLE_STATISTICS = "Statistics"
Expand Down Expand Up @@ -95,31 +97,35 @@ def list_schemas(self, database: str) -> List[str]:

return schemas

def fetch_relation_type(self, relation: Relation) -> str:
try:
kwargs = {"relation": relation}
relation_type = self.execute_macro(GET_RELATIONSHIP_TYPE_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.DbtRuntimeError as e:
logger.error(
f"Unable to fetch relation type {relation.schema}.{relation.identifier}: {e.msg}"
)
return None
return relation_type

def list_relations_without_caching(
self, schema_relation: ImpalaRelation
) -> List[ImpalaRelation]:
kwargs = {"schema_relation": schema_relation}

try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
kwargs = {"relation": schema_relation}
table_relations = self.execute_macro(LIST_TABLES_IN_RELATION_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.DbtRuntimeError as e:
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
if f"Database does not exist" in errmsg:
return []
else:
description = "Error while retrieving information about"
logger.error(f"{description} {schema_relation}: {e.msg}")
logger.error(f"Unable to extract tables in relation {schema_relation}: {errmsg}")
raise e

relations = []
for row in results:
if len(row) != 2:
raise dbt.exceptions.DbtRuntimeError(
f'Invalid value from "show table extended ...", '
f"got {len(row)} values, expected 4"
)
_identifier = row[0]
_rel_type = row[1]
for table_relation in table_relations:
_rel_type = self.fetch_relation_type(table_relation)
_identifier = table_relation.identifier

relation = self.Relation.create(
database=None,
Expand Down
15 changes: 15 additions & 0 deletions dbt/include/impala/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,21 @@
{{ return(load_result('list_schemas').table) }}
{% endmacro %}

{% macro list_tables_in_relation(relation) %}
{% set result_set = run_query('show tables in ' ~ relation) %}

{% set tables = [] %}

{% if execute %}
{%- for rs in result_set -%}
{% do tables.append(relation.new_copy(relation.schema, rs[0])) %}
{%- endfor -%}
{% endif %}

{{ return(tables) }}
{% endmacro %}


{# Note: This function currently needs to query each object to determine its type. Depending on the schema, this function could be expensive. #}
{% macro impala__list_relations_without_caching(relation) %}
{% set result_set = run_query('show tables in ' ~ relation) %}
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def _get_dbt_core_version():

package_name = "dbt-impala"
# make sure this always matches dbt/adapters/dbt_impala/__version__.py
package_version = "1.4.2"
package_version = "1.4.3"
description = """The Impala adapter plugin for dbt"""

dbt_core_version = _get_dbt_core_version()
Expand Down

0 comments on commit 55fbd86

Please sign in to comment.