From 783b8153c4a2ba4d2401c4d1d8479ef65d3343b5 Mon Sep 17 00:00:00 2001 From: Aaron O'Mullan Date: Mon, 29 Jan 2024 05:03:03 +0900 Subject: [PATCH 1/4] feat: first pass at d1 support --- src/instrumentation/d1.ts | 152 +++++++++++++++++++++++++++++++++++++ src/instrumentation/env.ts | 7 ++ 2 files changed, 159 insertions(+) create mode 100644 src/instrumentation/d1.ts diff --git a/src/instrumentation/d1.ts b/src/instrumentation/d1.ts new file mode 100644 index 0000000..d6a3611 --- /dev/null +++ b/src/instrumentation/d1.ts @@ -0,0 +1,152 @@ +import { Attributes, SpanKind, SpanOptions, SpanStatusCode, Exception, trace } from '@opentelemetry/api' +import { SemanticAttributes } from '@opentelemetry/semantic-conventions' +import { wrap } from '../wrap.js' + +const dbSystem = 'Cloudflare D1' + +// We need to peak into D1 "internals" to instrument batch queries +// See: https://github.com/cloudflare/workerd/blob/5d27f8f7f1f9b584f673d2f11c9032f5a776ec55/src/cloudflare/internal/d1-api.ts#L173 +interface D1StatementInternals { + statement: string + params: unknown[] +} + +function metaAttributes(meta: D1Meta): Attributes { + return { + 'db.cf.d1.rows_read': meta.rows_read, + 'db.cf.d1.rows_written': meta.rows_written, + 'db.cf.d1.duration': meta.duration, + 'db.cf.d1.size_after': meta.size_after, + 'db.cf.d1.last_row_id': meta.last_row_id, + 'db.cf.d1.changed_db': meta.changed_db, + 'db.cf.d1.changes': meta.changes, + } +} +function spanOptions(dbName: string, operation: string, sql?: string): SpanOptions { + const attributes: Attributes = { + binding_type: 'D1', + [SemanticAttributes.DB_NAME]: dbName, + [SemanticAttributes.DB_SYSTEM]: dbSystem, + [SemanticAttributes.DB_OPERATION]: operation, + } + if (sql) { + attributes[SemanticAttributes.DB_STATEMENT] = sql + } + return { + kind: SpanKind.CLIENT, + attributes, + } +} + +function instrumentD1StatementFn(fn: Function, dbName: string, operation: string, sql: string) { + const tracer = trace.getTracer('D1') + const fnHandler: ProxyHandler = { + apply: (target, thisArg, argArray) => { + const options = spanOptions(dbName, operation, sql) + return tracer.startActiveSpan(`${dbName} ${operation}`, options, async (span) => { + try { + const result = await Reflect.apply(target, thisArg, argArray) + if (operation === 'all' || operation === 'run') { + span.setAttributes(metaAttributes((result as D1Result).meta)) + } + span.setStatus({ code: SpanStatusCode.OK }) + return result + } catch (error) { + span.recordException(error as Exception) + span.setStatus({ code: SpanStatusCode.ERROR }) + throw error + } finally { + span.end() + } + }) + }, + } + return wrap(fn, fnHandler) +} + +function instrumentD1PreparedStatement( + stmt: D1PreparedStatement, + dbName: string, + statement: string, +): D1PreparedStatement { + const statementHandler: ProxyHandler = { + get: (target, prop, receiver) => { + const operation = String(prop) + const fn = Reflect.get(target, prop, receiver) + if (typeof fn === 'function') { + return instrumentD1StatementFn(fn, dbName, operation, statement as string) + } + return fn + }, + } + return wrap(stmt, statementHandler) +} + +export function instrumentD1Fn(fn: Function, dbName: string, operation: string) { + const tracer = trace.getTracer('D1') + const fnHandler: ProxyHandler = { + apply: (target, thisArg, argArray) => { + if (operation === 'prepare') { + const sql = argArray[0] as string + const stmt = Reflect.apply(target, thisArg, argArray) as D1PreparedStatement + return instrumentD1PreparedStatement(stmt, dbName, sql) + } else if (operation === 'exec') { + const sql = argArray[0] as string + const options = spanOptions(dbName, operation, sql) + return tracer.startActiveSpan(`${dbName} ${operation}`, options, async (span) => { + try { + const result = await Reflect.apply(target, thisArg, argArray) + span.setStatus({ code: SpanStatusCode.OK }) + return result + } catch (error) { + span.recordException(error as Exception) + span.setStatus({ code: SpanStatusCode.ERROR }) + throw error + } finally { + span.end() + } + }) + } else if (operation === 'batch') { + // Create span for each statement, requires peeaking into D1 internals ... + const statements = argArray[0] as D1StatementInternals[] + return tracer.startActiveSpan(`${dbName} ${operation}`, async (span) => { + // Create a span per query in the batch + const subSpans = statements.map((_s) => + tracer.startSpan(`${dbName} ${operation} > query`, spanOptions(dbName, operation)), + ) + + try { + const result = (await Reflect.apply(target, thisArg, argArray)) as D1Result[] + result.forEach((r, i) => subSpans[i]?.setAttributes(metaAttributes(r.meta))) + span.setStatus({ code: SpanStatusCode.OK }) + return result + } catch (error) { + span.recordException(error as Exception) + span.setStatus({ code: SpanStatusCode.ERROR }) + throw error + } finally { + subSpans.forEach((s) => s.end()) + span.end() + } + }) + } else { + return Reflect.apply(target, thisArg, argArray) + } + }, + } + return wrap(fn, fnHandler) +} + +export function instrumentD1(database: D1Database, dbName: string): D1Database { + const dbHandler: ProxyHandler = { + get: (target, prop, receiver) => { + const operation = String(prop) + const fn = Reflect.get(target, prop, receiver) + if (typeof fn === 'function') { + return instrumentD1Fn(fn, dbName, operation) + } + return fn + }, + } + return wrap(database, dbHandler) +} diff --git a/src/instrumentation/env.ts b/src/instrumentation/env.ts index e361843..db24e96 100644 --- a/src/instrumentation/env.ts +++ b/src/instrumentation/env.ts @@ -4,6 +4,7 @@ import { instrumentKV } from './kv.js' import { instrumentQueueSender } from './queue.js' import { instrumentServiceBinding } from './service.js' import { instrumentAnalyticsEngineDataset } from './analytics-engine' +import { instrumentD1 } from './d1' const isKVNamespace = (item?: unknown): item is KVNamespace => { return !!(item as KVNamespace)?.getWithMetadata @@ -26,6 +27,10 @@ const isAnalyticsEngineDataset = (item?: unknown): item is AnalyticsEngineDatase return !!(item as AnalyticsEngineDataset)?.writeDataPoint } +const isD1Database = (item?: unknown): item is D1Database => { + return !!(item as D1Database)?.exec && !!(item as D1Database)?.prepare +} + const instrumentEnv = (env: Record): Record => { const envHandler: ProxyHandler> = { get: (target, prop, receiver) => { @@ -43,6 +48,8 @@ const instrumentEnv = (env: Record): Record => return instrumentServiceBinding(item, String(prop)) } else if (isAnalyticsEngineDataset(item)) { return instrumentAnalyticsEngineDataset(item, String(prop)) + } else if (isD1Database(item)) { + return instrumentD1(item, String(prop)) } else { return item } From 6ba9cc06ee90216d4987c28432c5f5b887b9417b Mon Sep 17 00:00:00 2001 From: Aaron O'Mullan Date: Mon, 29 Jan 2024 05:38:20 +0900 Subject: [PATCH 2/4] fix: track sql for batch sub-statements --- src/instrumentation/d1.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/instrumentation/d1.ts b/src/instrumentation/d1.ts index d6a3611..70e4c90 100644 --- a/src/instrumentation/d1.ts +++ b/src/instrumentation/d1.ts @@ -111,8 +111,8 @@ export function instrumentD1Fn(fn: Function, dbName: string, operation: string) const statements = argArray[0] as D1StatementInternals[] return tracer.startActiveSpan(`${dbName} ${operation}`, async (span) => { // Create a span per query in the batch - const subSpans = statements.map((_s) => - tracer.startSpan(`${dbName} ${operation} > query`, spanOptions(dbName, operation)), + const subSpans = statements.map((s) => + tracer.startSpan(`${dbName} ${operation} > query`, spanOptions(dbName, operation, s.statement)), ) try { From f66a38a6acf80aea47eeab0eef91b434cbb2b31b Mon Sep 17 00:00:00 2001 From: Aaron O'Mullan Date: Tue, 30 Jan 2024 03:18:25 +0900 Subject: [PATCH 3/4] fix(d1): instrument bind callls --- src/instrumentation/d1.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/instrumentation/d1.ts b/src/instrumentation/d1.ts index 70e4c90..578a378 100644 --- a/src/instrumentation/d1.ts +++ b/src/instrumentation/d1.ts @@ -42,6 +42,11 @@ function instrumentD1StatementFn(fn: Function, dbName: string, operation: string const tracer = trace.getTracer('D1') const fnHandler: ProxyHandler = { apply: (target, thisArg, argArray) => { + if (operation === 'bind') { + const newStmt = Reflect.apply(target, thisArg, argArray) as D1PreparedStatement + return instrumentD1PreparedStatement(newStmt, dbName, sql) + } + const options = spanOptions(dbName, operation, sql) return tracer.startActiveSpan(`${dbName} ${operation}`, options, async (span) => { try { @@ -74,7 +79,7 @@ function instrumentD1PreparedStatement( const operation = String(prop) const fn = Reflect.get(target, prop, receiver) if (typeof fn === 'function') { - return instrumentD1StatementFn(fn, dbName, operation, statement as string) + return instrumentD1StatementFn(fn, dbName, operation, statement) } return fn }, From 150efef8e06b332de3f7e68853c14112169668e7 Mon Sep 17 00:00:00 2001 From: Erwin van der Koogh <890386+evanderkoogh@users.noreply.github.com> Date: Mon, 16 Sep 2024 12:33:09 +1000 Subject: [PATCH 4/4] Create brown-cobras-whisper.md --- .changeset/brown-cobras-whisper.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/brown-cobras-whisper.md diff --git a/.changeset/brown-cobras-whisper.md b/.changeset/brown-cobras-whisper.md new file mode 100644 index 0000000..6369456 --- /dev/null +++ b/.changeset/brown-cobras-whisper.md @@ -0,0 +1,5 @@ +--- +'@microlabs/otel-cf-workers': minor +--- + +Add initial support for D1 (experimental)