-
Notifications
You must be signed in to change notification settings - Fork 1
/
analyze_impact.py
executable file
·174 lines (148 loc) · 5.12 KB
/
analyze_impact.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/env python3
import argparse
from collections import defaultdict
import datetime
from rich.console import Console
import sys
from typing import Tuple, List, Annotated
import networkx
import rich
from typer import Argument, Option
from common import (
client, load_query, sql_quote_path, NotFoundError,
execute_stored_query, QUERIES,
)
console = Console()
def get_linked_columns(
data_source_id: int,
table_path: Tuple[str, ...],
columns: List[str],
columns_by_tag: str,
):
# First we need to get uids of the table and its columns
table_data = execute_stored_query(
QUERIES / 'table_by_path.gql',
path=sql_quote_path((str(data_source_id),) + table_path),
)
if table_data['table'] is None:
raise NotFoundError('Table "%s" not found in data source #%d' % (
'.'.join(table_path),
data_source_id,
))
table_uid = table_data['table']['uid']
column_name_to_uid = {
c['prop']['name']: c['uid'] for c in table_data['table']['columns']
}
column_uids = []
for c in columns:
uid = column_name_to_uid.get(c)
if uid is None:
raise NotFoundError('Column %s is not found' % c)
column_uids.append(uid)
if columns_by_tag:
column_uids = [
column['uid']
for column in table_data['table']['columns']
if columns_by_tag in {
tag['name'] for tag in column['tags']
}
]
if not column_uids:
raise ValueError(
f'No columns found for tag `{columns_by_tag}`.',
)
# Now let's get all downstreams
lineage_data = execute_stored_query(
QUERIES / 'lineage_analyze_impact.gql',
primaryUid=table_uid,
depthDownstream=1000,
depthUpstream=0,
popularity=[0, 4],
biLastUsedDays=90,
allowedList=column_uids,
)
edges = [
(source, destination)
for edge in lineage_data['lineage']['edges']
if (
# Filter out "off-chart" edges
(source := edge['sourceUid'])
and (destination := edge['destinationUid'])
)
]
if column_uids:
graph = networkx.DiGraph()
graph.add_edges_from(edges)
connected_columns = {
connected_uid
for allowed_column in column_uids
for edge in networkx.dfs_edges(graph, source=allowed_column)
for connected_uid in edge
}
else:
connected_columns = {
connected_uid
for edge in edges
for connected_uid in edge
}
for tabular_entity in lineage_data['lineage']['entities']:
name = tabular_entity.get(
'prop', {},
).get('path') or tabular_entity.get('name') or '???'
entity_type = tabular_entity['__typename']
if tableau_project_name := tabular_entity.get('projectName'):
name = f'[i]{tableau_project_name}[/i]/{name}'
rich.print(f'[purple]{entity_type}[/purple] [green]{name}[/green]')
for col in tabular_entity.get('columns', []):
uid = col.get('uid')
if uid is None:
continue # that's not a column
if uid not in connected_columns:
continue
column_name = col.get('prop', {}).get('name') or col.get('name') or uid
rich.print(f' • {column_name}')
if tags := col.get('tags', []):
console.print(f' └ [i]🏷️ Tags:[/i] ', end='')
for tag in tags:
color = tag['color']
console.print(
tag['name'],
style=color, end='',
)
console.print()
def analyze_impact(
data_source_id: Annotated[int, Argument(help='Data source ID.')],
table_path: Annotated[str, Argument(
help='Full table path, case sensitive. Format: DB.SCHEMA.TABLE',
)],
columns: Annotated[str, Option(help=(
'Print downstreams only for these columns of the table in question. '
'Format: `ID,COL1,ANOTHER_COL`. Case sensitive.'
))] = '',
columns_by_tag: Annotated[str, Option(help=(
'Print downstreams only for the columns of the primary table which are '
'tagged with the specified tag.'
))] = '',
):
"""
Print downstream dependencies of a given table.
How to obtain Data Source ID:
* Open https://app.datafold.com,
* Go to **Settings** → Data Sources,
* Find your Data Source; the leftmost column in the Data Sources list is ID.
"""
if columns_by_tag and columns:
raise ValueError(
'Please specify only one of `--columns-by-tag` & `--columns`.',
)
try:
get_linked_columns(
data_source_id=data_source_id,
table_path=tuple(table_path.split('.')),
columns=[column.strip() for column in columns.split(',') if column],
columns_by_tag=columns_by_tag,
)
except NotFoundError as e:
print('ERROR:', e)
print('Names are case sensitive')
sys.exit(-1)