Skip to content

Commit

Permalink
Merge pull request #350 from metrico/349_kubernetes_support
Browse files Browse the repository at this point in the history
349 kubernetes support
  • Loading branch information
akvlad authored Oct 2, 2023
2 parents 1f156d6 + d689a12 commit a23c5a0
Show file tree
Hide file tree
Showing 17 changed files with 304 additions and 137 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/node-clickhouse.js.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ jobs:

strategy:
matrix:
node-version: [14.x, 16.x]
node-version: [18, 16.x]
# See supported Node.js release schedule at https://nodejs.org/en/about/releases/

steps:
- uses: EpicStep/clickhouse-github-action@v1.0.0
- run: docker run --name clickhouse -p 9000:9000 -p 8123:8123 -d clickhouse/clickhouse-server:23.8-alpine
- uses: actions/checkout@v2
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v2
Expand All @@ -42,4 +42,5 @@ jobs:
CLICKHOUSE_DB: loki
CLICKHOUSE_TSDB: loki
INTEGRATION_E2E: 1
run: node qryn.js & npm run test --forceExit
CLOKI_EXT_URL: 127.0.0.1:3100
run: node qryn.js >/dev/stdout & npm run test --forceExit
2 changes: 2 additions & 0 deletions common.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,5 @@ module.exports.isCustomSamplesOrderingRule = () => {
}

module.exports.CORS = process.env.CORS_ALLOW_ORIGIN || '*'

module.exports.clusterName = process.env.CLUSTER_NAME
3 changes: 2 additions & 1 deletion lib/db/alerting/alertWatcher/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const CallbackTimeSeriesAlertWatcher = require('./callbackTimeSeriesAlertWatcher
const CallbackCliqlAlertWatcher = require('./callbackCliqlAlertWatcher')
const MVAlertWatcher = require('./MVAlertWatcher')
const { parseCliQL } = require('../../../cliql')
const {clusterName} = require('../../../../common')
/**
* @param nsName {string}
* @param group {alerting.group | alerting.objGroup}
Expand All @@ -24,7 +25,7 @@ module.exports = (nsName, group, rule) => {
if (q.matrix) {
return new CallbackTimeSeriesAlertWatcher(nsName, group, rule)
}
if (q.stream && q.stream.length) {
if ((q.stream && q.stream.length) || clusterName) {
return new CallbackLogAlertWatcher(nsName, group, rule)
}
return new MVAlertWatcher(nsName, group, rule)
Expand Down
14 changes: 8 additions & 6 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const Zipkin = require('./zipkin')
const Otlp = require('./otlp')
const logfmt = require('logfmt')
const csql = require('@cloki/clickhouse-sql')
const clusterName = require('../../common').clusterName
const dist = clusterName ? '_dist' : ''

/* DB Helper */
const ClickHouse = require('@apla/clickhouse')
Expand Down Expand Up @@ -274,7 +276,7 @@ const tempoQueryScan = async function (query, res, traceId) {
const tempoQueryScanV2 = async function (query, res, traceId) {
logger.debug(`Scanning Tempo Fingerprints... ${traceId}`)
const _stream = await axios.post(getClickhouseUrl() + '/',
`SELECT payload_type, payload FROM ${DATABASE_NAME()}.tempo_traces WHERE oid='0' AND trace_id=unhex('${traceId}') ORDER BY timestamp_ns ASC LIMIT 2000 FORMAT JSONEachRow`,
`SELECT payload_type, payload FROM ${DATABASE_NAME()}.tempo_traces${dist} WHERE oid='0' AND trace_id=unhex('${traceId}') ORDER BY timestamp_ns ASC LIMIT 2000 FORMAT JSONEachRow`,
{
responseType: 'stream'
}
Expand Down Expand Up @@ -420,7 +422,7 @@ const queryTempoScanV2 = async function (query) {
const select = `SELECT hex(trace_id) as traceID, service_name as rootServiceName,
name as rootTraceName, timestamp_ns as startTimeUnixNano,
intDiv(duration_ns, 1000000) as durationMs`
const from = `FROM ${DATABASE_NAME()}.tempo_traces`
const from = `FROM ${DATABASE_NAME()}.tempo_traces${dist}`
const where = [
'oid = \'0\'',
`timestamp_ns >= ${parseInt(query.start)} AND timestamp_ns <= ${parseInt(query.end)}`,
Expand Down Expand Up @@ -459,7 +461,7 @@ const queryTempoScanV2 = async function (query) {

async function queryTempoTags () {
const q = `SELECT distinct key
FROM ${DATABASE_NAME()}.tempo_traces_kv
FROM ${DATABASE_NAME()}.tempo_traces_kv${dist}
WHERE oid='0' AND date >= toDate(NOW()) - interval '1 day'
FORMAT JSON`
const resp = await axios.post(getClickhouseUrl() + '/',q)
Expand All @@ -473,7 +475,7 @@ async function queryTempoTags () {
*/
async function queryTempoValues (tag) {
const q = `SELECT distinct val
FROM ${DATABASE_NAME()}.tempo_traces_kv
FROM ${DATABASE_NAME()}.tempo_traces_kv${dist}
WHERE oid='0' AND date >= toDate(NOW()) - interval '1 day' AND key = ${csql.quoteVal(tag)}
FORMAT JSON`
const resp = await axios.post(getClickhouseUrl() + '/', q)
Expand Down Expand Up @@ -1317,7 +1319,7 @@ const samplesReadTable = {
settingsVersions: async function () {
const versions = await axios.post(`${getClickhouseUrl()}/?database=${UTILS.DATABASE_NAME()}`,
`SELECT argMax(name, inserted_at) as _name, argMax(value, inserted_at) as _value
FROM settings WHERE type == 'update' GROUP BY fingerprint HAVING _name != '' FORMAT JSON`)
FROM settings${dist} WHERE type == 'update' GROUP BY fingerprint HAVING _name != '' FORMAT JSON`)
for (const version of versions.data.data) {
this.versions[version._name] = parseInt(version._value) * 1000
}
Expand Down Expand Up @@ -1353,7 +1355,7 @@ const getSettings = async (names, database) => {
'short-hash'))
const settings = await rawRequest(`SELECT argMax(name, inserted_at) as _name,
argMax(value, inserted_at) as _value
FROM settings WHERE fingerprint IN (${fps.join(',')}) GROUP BY fingerprint HAVING _name != '' FORMAT JSON`,
FROM settings${dist} WHERE fingerprint IN (${fps.join(',')}) GROUP BY fingerprint HAVING _name != '' FORMAT JSON`,
null, database)
return settings.data.data.reduce((sum, cur) => {
sum[cur._name] = cur._value
Expand Down
47 changes: 30 additions & 17 deletions lib/db/clickhouse_alerting.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ const { DATABASE_NAME } = require('../utils')
const UTILS = require('../utils')
const { getClickhouseUrl } = require('./clickhouse')
const Sql = require('@cloki/clickhouse-sql')
const { clusterName } = require('../../common')
const onCluster = clusterName ? `ON CLUSTER ${clusterName}` : ''
const dist = clusterName ? '_dist' : ''
/**
* @param ns {string}
* @param group {string}
Expand All @@ -18,7 +21,7 @@ module.exports.getAlertRule = async (ns, group, name) => {
const mark = Math.random()
const res = await axios.post(getClickhouseUrl(),
'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' +
`FROM ${DATABASE_NAME()}.settings ` +
`FROM ${DATABASE_NAME()}.settings${dist} ` +
`WHERE fingerprint = ${fp} AND ${mark} == ${mark} ` +
'GROUP BY fingerprint ' +
'HAVING name != \'\' ' +
Expand Down Expand Up @@ -47,7 +50,7 @@ module.exports.putAlertRule = async (namespace, group, rule) => {
const groupFp = getGroupFp(namespace, group.name)
const groupVal = JSON.stringify({ name: group.name, interval: group.interval })
await axios.post(getClickhouseUrl(),
`INSERT INTO ${DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow \n` +
`INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow \n` +
JSON.stringify({ fingerprint: ruleFp, type: 'alert_rule', name: ruleName, value: JSON.stringify(ruleVal), inserted_at: Date.now() * 1000000 }) + '\n' +
JSON.stringify({ fingerprint: groupFp, type: 'alert_group', name: groupName, value: groupVal, inserted_at: Date.now() * 1000000 })
)
Expand Down Expand Up @@ -99,7 +102,7 @@ module.exports.getAlertRules = async (limit, offset) => {
const mark = Math.random()
const res = await axios.post(getClickhouseUrl(),
'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' +
`FROM ${DATABASE_NAME()}.settings ` +
`FROM ${DATABASE_NAME()}.settings${dist} ` +
`WHERE type == 'alert_rule' AND ${mark} == ${mark} ` +
`GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`)
return res.data.data.map(e => {
Expand All @@ -119,7 +122,7 @@ module.exports.getAlertGroups = async (limit, offset) => {
const mark = Math.random()
const res = await axios.post(getClickhouseUrl(),
'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' +
`FROM ${DATABASE_NAME()}.settings ` +
`FROM ${DATABASE_NAME()}.settings${dist} ` +
`WHERE type == 'alert_group' AND ${mark} == ${mark} ` +
`GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`)
return res.data.data.map(e => {
Expand All @@ -134,7 +137,7 @@ module.exports.getAlertRulesCount = async () => {
const mark = Math.random()
const res = await axios.post(getClickhouseUrl(),
'SELECT COUNT(1) as count FROM (SELECT fingerprint ' +
`FROM ${DATABASE_NAME()}.settings ` +
`FROM ${DATABASE_NAME()}.settings${dist} ` +
`WHERE type=\'alert_rule\' AND ${mark} == ${mark} ` +
'GROUP BY fingerprint ' +
'HAVING argMax(name, inserted_at) != \'\') FORMAT JSON')
Expand All @@ -153,8 +156,9 @@ module.exports.deleteAlertRule = async (ns, group, rule) => {
`INSERT INTO ${DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow\n` +
JSON.stringify({ fingerprint: fp, type: 'alert_rule', name: '', value: '', inserted_at: Date.now() })
)
await axios.post(getClickhouseUrl(),
`ALTER TABLE ${DATABASE_NAME()}.settings DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')`
const settings = clusterName ? '/?allow_nondeterministic_mutations=1&mutations_execute_nondeterministic_on_initiator=1' : ''
await axios.post(getClickhouseUrl() + settings,
`ALTER TABLE ${DATABASE_NAME()}.settings ${onCluster} DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')`
)
}

Expand All @@ -166,11 +170,12 @@ module.exports.deleteAlertRule = async (ns, group, rule) => {
module.exports.deleteGroup = async (ns, group) => {
const fp = getGroupFp(ns, group)
await axios.post(getClickhouseUrl(),
`INSERT INTO ${DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow\n` +
`INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow\n` +
JSON.stringify({ fingerprint: fp, type: 'alert_group', name: '', value: '', inserted_at: Date.now() })
)
await axios.post(getClickhouseUrl(),
`ALTER TABLE ${DATABASE_NAME()}.settings DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')`
const settings = clusterName ? '/?allow_nondeterministic_mutations=1&mutations_execute_nondeterministic_on_initiator=1' : ''
await axios.post(getClickhouseUrl() + settings,
`ALTER TABLE ${DATABASE_NAME()}.settings ${onCluster} DELETE WHERE fingerprint=${fp} AND inserted_at <= now64(9, 'UTC')`
)
}

Expand All @@ -183,9 +188,11 @@ module.exports.deleteGroup = async (ns, group) => {
module.exports.dropAlertViews = async (ns, group, rule) => {
const fp = getRuleFP(ns, group, rule)
await axios.post(getClickhouseUrl(),
`DROP VIEW IF EXISTS ${DATABASE_NAME()}._alert_view_${fp}`)
`DROP VIEW IF EXISTS ${DATABASE_NAME()}._alert_view_${fp} ${onCluster}`)
await axios.post(getClickhouseUrl(),
`DROP TABLE IF EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark`)
`DROP TABLE IF EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark ${onCluster}`)
await axios.post(getClickhouseUrl(),
`DROP TABLE IF EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark_dist ${onCluster}`)
}

/**
Expand All @@ -197,9 +204,15 @@ module.exports.dropAlertViews = async (ns, group, rule) => {
module.exports.createMarksTable = async (ns, group, rule) => {
const fp = getRuleFP(ns, group, rule)
await axios.post(getClickhouseUrl(),
`CREATE TABLE IF NOT EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark ` +
`CREATE TABLE IF NOT EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark ${onCluster}` +
'(id UInt8 default 0,mark UInt64, inserted_at DateTime default now()) ' +
'ENGINE ReplacingMergeTree(mark) ORDER BY id')
`ENGINE ${clusterName ? 'Replicated' : ''}ReplacingMergeTree(mark) ORDER BY id`)
if (clusterName) {
await axios.post(getClickhouseUrl(),
`CREATE TABLE IF NOT EXISTS ${DATABASE_NAME()}._alert_view_${fp}_mark_dist ${onCluster}` +
'(id UInt8 default 0,mark UInt64, inserted_at DateTime default now()) ' +
`ENGINE=Distributed('${clusterName}', '${DATABASE_NAME()}', '_alert_view_${fp}_mark', id)`)
}
}

/**
Expand Down Expand Up @@ -235,7 +248,7 @@ module.exports.createAlertViews = async (ns, group, rule, request) => {
module.exports.getLastMark = async (ns, group, rule) => {
const fp = getRuleFP(ns, group, rule)
const mark = await axios.post(getClickhouseUrl(),
`SELECT max(mark) as mark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark WHERE id = 0 FORMAT JSON`)
`SELECT max(mark) as mark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark${dist} WHERE id = 0 FORMAT JSON`)
return parseInt(mark.data.data[0].mark)
}

Expand All @@ -253,7 +266,7 @@ module.exports.incAlertMark = async (ns, group, rule, newMark, id) => {
newMark = newMark || Date.now()
id = id || 0
await axios.post(getClickhouseUrl(),
`INSERT INTO ${DATABASE_NAME()}._alert_view_${fp}_mark (mark, id) VALUES (${newMark}, ${id})`)
`INSERT INTO ${DATABASE_NAME()}._alert_view_${fp}_mark${dist} (mark, id) VALUES (${newMark}, ${id})`)
return [mark, newMark]
}

Expand Down Expand Up @@ -285,7 +298,7 @@ module.exports.getAlerts = async (ns, group, rule, mark) => {
module.exports.dropOutdatedParts = async (ns, group, rule, mark) => {
const fp = getRuleFP(ns, group, rule)
const partitions = await axios.post(getClickhouseUrl(),
`SELECT DISTINCT mark FROM ${DATABASE_NAME()}._alert_view_${fp} WHERE mark <= ${mark} FORMAT JSON`)
`SELECT DISTINCT mark FROM ${DATABASE_NAME()}._alert_view_${fp}${dist} WHERE mark <= ${mark} FORMAT JSON`)
if (!partitions.data || !partitions.data.data || !partitions.data.data.length) {
return
}
Expand Down
Loading

0 comments on commit a23c5a0

Please sign in to comment.