-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdedicated-sql-utilities.sql
74 lines (60 loc) · 2.64 KB
/
dedicated-sql-utilities.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
BEGIN TRY
EXEC('CREATE SCHEMA delta');
END TRY
BEGIN CATCH END CATCH
GO
-- Instructions:
-- 1. Execute the script below to create the delta.load procedure in your dedicated SQL pool
-- 2. Execute the stored procedure by providing delta folder path and the target table name:
-- EXEC delta.load 'https://mystorage.dfs.core.windows.net/delta-log/green/', 'GreenTable', null
-- 3. If you are not using AAD passthrough, provide credential info in the @credential parameter, for example:
-- EXEC delta.load 'https://mystorage.dfs.core.windows.net/delta-log/green/', 'GreenTable', 'IDENTITY= ''Shared Access Signature'', SECRET=''<Your_SAS_Token>'''
-- EXEC delta.load 'https://mystorage.dfs.core.windows.net/delta-log/green/', 'GreenTable', 'IDENTITY= 'IDENTITY = ''Managed Identity'''
CREATE PROCEDURE delta.load @folder varchar(8000), @table_name varchar(128), @credential varchar(8000)
AS BEGIN
--declare @folder varchar(8000)
--declare @table_name varchar(128)
--declare @credential varchar(8000)
-- test parameters
--set @folder = 'https://jovanpoptest.dfs.core.windows.net/delta-log/green/';
--set @table_name = 'hrkljush'
--set @credential = ''
IF (@credential IS NULL)
SET @credential = ''
IF (@credential <> '')
SET @credential = ' CREDENTIAL(' + @credential + '),'
declare @tsql varchar(8000);
IF OBJECT_ID(@table_name) IS NOT NULL
BEGIN
SET @tsql = 'DROP TABLE ' + @table_name
EXEC(@tsql)
END
IF OBJECT_ID('tempdb..#delta_json') IS NOT NULL
DROP TABLE #delta_json
create table #delta_json
(
jsoninput varchar(max)
) with (distribution=round_robin, heap)
-- Read all the delta transaction logs
set @tsql = '
COPY INTO #delta_json (jsoninput)
FROM ''' + @folder + '_delta_log/*.json''
WITH ( ' + @credential + '
FILE_TYPE = ''CSV''
,fieldterminator =''0x0b''
,fieldquote = ''0x0b''
,rowterminator = ''0x0a'' ) '
print(@tsql);
exec(@tsql);
with files(location) as (
select location = JSON_VALUE(jsoninput, '$.add.path') from #delta_json a
where JSON_VALUE(jsoninput, '$.add.path') is not null
and JSON_VALUE(jsoninput, '$.add.path')
not in (select JSON_VALUE(jsoninput, '$.remove.path') from #delta_json where JSON_VALUE(jsoninput, '$.remove.path') is not null)
)
select @tsql = 'COPY INTO ' + @table_name + ' FROM ' + STRING_AGG(CAST(''''+@folder+location+'''' AS VARCHAR(MAX)), ',')
+ ' WITH ( ' + @credential + ' FILE_TYPE = ''PARQUET'', AUTO_CREATE_TABLE = ''ON'' )'
from files;
print(@tsql);
EXEC(@tsql)
END