forked from transferwise/pipelinewise-tap-oracle
-
Notifications
You must be signed in to change notification settings - Fork 13
/
log_miner.py
165 lines (135 loc) · 5.68 KB
/
log_miner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#!/home/oracle/local/bin/python
"""
------------------------------------------------------------------------
Author: Steve Howard
Date: December 10, 2010
Purpose: Simple program to process redo log changes
------------------------------------------------------------------------
"""
import _thread
import datetime
import string
import sys
from threading import Lock, Thread
from singer import utils
from tap_oracle.connection_helper import SQLNET_ORA_CONFIG, oracledb
plock = _thread.allocate_lock()
startYear = 2018
startMonth = 1
startDay = 23
startHour = 10
startMinute = 0
startTime=datetime.datetime(startYear, startMonth, startDay, startHour,startMinute, 0)
endYear = 2018
endMonth = 2
endDay = 1
endHour = 23
endMinute = 0
endTime=datetime.datetime(endYear, endMonth, endDay, endHour, endMinute,0)
#-----------------------------------------------------------------------
class readRedoThread(Thread):
def __init__ (self,threadNum):
Thread.__init__(self)
self.t = threadNum
def run(self):
#dsn = oracledb.makedsn('127.0.0.1', 1521, 'ORCL')
conn_config = {
'user': 'root',
'password': 'BiouTaSeCtOmPavA',
'dsn': '(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=127.0.0.1)(PORT=1521))(CONNECT_DATA=(SID=ORCL)))'
}
if SQLNET_ORA_CONFIG is not None:
conn_config.update(SQLNET_ORA_CONFIG)
conn = oracledb.connect(**conn_config)
cursor = conn.cursor()
contents = conn.cursor()
cursor.prepare("select name \
from v$archived_log \
where first_time between :1 and :2 + 60/1440 \
and thread# = :3 \
and deleted = 'NO' \
and name is not null \
and dest_id = 1")
#...and loop until we are past the ending time in which we are interested...
global startTime
global endTime
s = startTime
e = endTime
while s < e:
cursor.execute("select name \
from v$archived_log \
where first_time between :1 and :2 + 60/1440 \
and thread# = :3 \
and deleted = 'NO' \
and name is not null \
and dest_id = 1",[s, s, self.t])
for row in cursor:
print("Row:", row)
logAdd = conn.cursor()
try:
logAdd.execute("begin sys.dbms_logmnr.add_logfile(:1); end;",[row[0]])
logStart = conn.cursor()
#you may have to use an "offline" catalog if this is being run
# against a standby database, or against a read-only database.
#logStart.execute("begin sys.dbms_logmnr.start_logmnr(dictfilename => :1); end;",["/tmp/dictionary.ora"])
#logStart.execute("begin sys.dbms_logmnr.start_logmnr(options => dbms_logmnr.dict_from_online_catalog \
# dbms_logmnr.print_pretty_sql \
# dbms_logmnr.no_rowid_in_stmt); end;")
try:
logStart.execute("begin sys.dbms_logmnr.start_logmnr(options => dbms_logmnr.dict_from_online_catalog + \
dbms_logmnr.no_rowid_in_stmt); end;")
#contents.execute("select scn, sql_redo, table_name, operation, seg_type_name from v$logmnr_contents where thread# = :1", [self.t])
contents.execute("select sql_redo, table_name from v$logmnr_contents where thread# = :1", [self.t])
for change in contents:
plock.acquire()
print("SQL redo:", change[0])
#contents.execute("begin sys.dbms_logmnr.mine_value(:1, :2); end;", change[0], change[1])
# for result in contents:
# print("results:", result)
print("SCN:", change[0])
print("sql redo:", change[1])
# print("table name", change[2])
# print("operation", change[3])
# print("seg_type_name", change[4])
plock.release()
except:
#print("Something bad happened:")
pass
except oracledb.DatabaseError as ex:
pass
#print("Exception at row:", row, ex)
minutes = datetime.timedelta(minutes=60)
s = s + minutes
#-----------------------------------------------------------------------
# def restoreLogs():
# #placeholder for future procedure to get any necessary archived redo
# logs from RMAN
# pass
#-----------------------------------------------------------------------
def get_logs(config):
threadList = []
threadNums = []
global startTime
global endTime
#startTime = utils.strptime(config["start_date"])
#endTime = datetime.datetime.now()
#print(startTime)
conn_config = {
'user': config["user"],
'password': config["password"],
'dsn': oracledb.makedsn(config["host"], config["port"], 'ORCL')
}
if SQLNET_ORA_CONFIG is not None:
conn_config.update(SQLNET_ORA_CONFIG)
conn = oracledb.connect(**conn_config)
#conn = oracledb.connect(mode = oracledb.SYSDBA)
cursor = conn.cursor()
cursor.execute("select distinct thread# from v$archived_log where first_time >= :1 and next_time <= :2",[startTime,endTime])
for row in cursor:
threadNums.append(row[0])
for i in threadNums:
thisOne = readRedoThread(i)
threadList.append(thisOne)
thisOne.start()
for j in threadList:
j.join()