-
Notifications
You must be signed in to change notification settings - Fork 0
/
bq-controller.js
120 lines (107 loc) · 3.54 KB
/
bq-controller.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
module.exports = (projectName, dataSetName, filename, installPath)=>{
var bqClient = require("./bq-client.js")(projectName, dataSetName),
fs = require("fs"),
installPath = installPath || require("os").homedir(),
failedLogEntries = {},
FAILED_FILE_PATH = require("path").join(installPath, filename),
MAX_FAILED_LOG_QUEUE = 50,
FAILED_LOG_QUEUE_PURGE_COUNT = 10,
TEN_MINUTE_MS = 60 * 1000 * 10,
FIVE_HOURS_MS = TEN_MINUTE_MS * 6 * 5,
INITIAL_FAILED_LOG_RETRY_MS = 10000,
FAILED_ENTRY_RETRY_MS = TEN_MINUTE_MS,
PERSIST_FAILURE_DEBOUNCE = 5000,
persistFailuresTimeout,
insertPending;
function addFailedLogEntry(tableName, data, date, templateSuffix) {
if (Object.keys(failedLogEntries).length >= MAX_FAILED_LOG_QUEUE) { purgeOldEntries(); }
failedLogEntries[Number(date)] = [tableName, data, date, templateSuffix];
schedulePersist();
}
function purgeOldEntries() {
Object.keys(failedLogEntries)
.sort((a, b)=>a-b)
.slice(0, FAILED_LOG_QUEUE_PURGE_COUNT)
.forEach((key)=>{
delete failedLogEntries[key];
});
}
function insertFailedLogEntries() {
insertPending = null;
log.file("Inserting failed bq log entries");
Object.keys(failedLogEntries).reduce((promiseChain, key)=>{
return promiseChain.then(()=>insert(...failedLogEntries[key]))
.then(()=>{
log.file("inserted " + key);
delete failedLogEntries[key];
});
}, Promise.resolve())
.catch(()=>{
log.file("Could not log all previously failed bq logs entries.");
scheduleLogInsert();
})
.then(()=>{
schedulePersist();
});
}
function scheduleLogInsert() {
if (!insertPending) {
insertPending = setTimeout(insertFailedLogEntries, FAILED_ENTRY_RETRY_MS);
FAILED_ENTRY_RETRY_MS = Math.min(FAILED_ENTRY_RETRY_MS * 1.5, FIVE_HOURS_MS);
}
}
function schedulePersist() {
if (persistFailuresTimeout) {clearTimeout(persistFailuresTimeout);}
persistFailuresTimeout = setTimeout(persistFailures, PERSIST_FAILURE_DEBOUNCE);
}
function persistFailures() {
persistFailuresTimeout = null;
fs.writeFile(FAILED_FILE_PATH, JSON.stringify(failedLogEntries, null, 2), {
encoding: "utf8"
}, (err)=>{
if (err) {
log.file("Could not save failed log entries. " + err.message);
}
});
}
function insert(tableName, data, date, templateSuffix) {
return bqClient.insert(tableName, data, date, templateSuffix)
.catch(e=>{
addFailedLogEntry(tableName, data, date, templateSuffix);
scheduleLogInsert();
return Promise.reject(e);
});
}
var mod = {
getBQClient() { return bqClient; },
getDateForTableName(date) {
date = new Date(date);
var year = date.getUTCFullYear(),
month = date.getUTCMonth() + 1,
day = date.getUTCDate();
if (month < 10) {month = "0" + month;}
if (day < 10) {day = "0" + day;}
return "" + year + month + day;
},
init() {
try {
failedLogEntries = require(FAILED_FILE_PATH);
if (Object.keys(failedLogEntries).length) {
insertPending = insertPending || setTimeout(insertFailedLogEntries, INITIAL_FAILED_LOG_RETRY_MS);
}
} catch(e) {
failedLogEntries = {};
}
},
log(tableName, data, date, templateSuffix) {
return insert(tableName, data, date, templateSuffix);
},
pendingEntries() {
return failedLogEntries;
},
maxQueue() {
return MAX_FAILED_LOG_QUEUE;
}
};
return mod;
};