Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: first pass at d1 support #83

Merged
merged 5 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/brown-cobras-whisper.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@microlabs/otel-cf-workers': minor
---

Add initial support for D1 (experimental)
157 changes: 157 additions & 0 deletions src/instrumentation/d1.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import { Attributes, SpanKind, SpanOptions, SpanStatusCode, Exception, trace } from '@opentelemetry/api'
import { SemanticAttributes } from '@opentelemetry/semantic-conventions'
import { wrap } from '../wrap.js'

const dbSystem = 'Cloudflare D1'

// We need to peak into D1 "internals" to instrument batch queries
// See: https://github.com/cloudflare/workerd/blob/5d27f8f7f1f9b584f673d2f11c9032f5a776ec55/src/cloudflare/internal/d1-api.ts#L173
interface D1StatementInternals {
statement: string
params: unknown[]
}

function metaAttributes(meta: D1Meta): Attributes {
return {
'db.cf.d1.rows_read': meta.rows_read,
'db.cf.d1.rows_written': meta.rows_written,
'db.cf.d1.duration': meta.duration,
'db.cf.d1.size_after': meta.size_after,
'db.cf.d1.last_row_id': meta.last_row_id,
'db.cf.d1.changed_db': meta.changed_db,
'db.cf.d1.changes': meta.changes,
}
}
function spanOptions(dbName: string, operation: string, sql?: string): SpanOptions {
const attributes: Attributes = {
binding_type: 'D1',
[SemanticAttributes.DB_NAME]: dbName,
[SemanticAttributes.DB_SYSTEM]: dbSystem,
[SemanticAttributes.DB_OPERATION]: operation,
}
if (sql) {
attributes[SemanticAttributes.DB_STATEMENT] = sql
}
return {
kind: SpanKind.CLIENT,
attributes,
}
}

function instrumentD1StatementFn(fn: Function, dbName: string, operation: string, sql: string) {
const tracer = trace.getTracer('D1')
const fnHandler: ProxyHandler<any> = {
apply: (target, thisArg, argArray) => {
if (operation === 'bind') {
const newStmt = Reflect.apply(target, thisArg, argArray) as D1PreparedStatement
return instrumentD1PreparedStatement(newStmt, dbName, sql)
}

const options = spanOptions(dbName, operation, sql)
return tracer.startActiveSpan(`${dbName} ${operation}`, options, async (span) => {
try {
const result = await Reflect.apply(target, thisArg, argArray)
if (operation === 'all' || operation === 'run') {
span.setAttributes(metaAttributes((result as D1Result).meta))
}
span.setStatus({ code: SpanStatusCode.OK })
return result
} catch (error) {
span.recordException(error as Exception)
span.setStatus({ code: SpanStatusCode.ERROR })
throw error
} finally {
span.end()
}
})
},
}
return wrap(fn, fnHandler)
}

function instrumentD1PreparedStatement(
stmt: D1PreparedStatement,
dbName: string,
statement: string,
): D1PreparedStatement {
const statementHandler: ProxyHandler<D1PreparedStatement> = {
get: (target, prop, receiver) => {
const operation = String(prop)
const fn = Reflect.get(target, prop, receiver)
if (typeof fn === 'function') {
return instrumentD1StatementFn(fn, dbName, operation, statement)
}
return fn
},
}
return wrap(stmt, statementHandler)
}

export function instrumentD1Fn(fn: Function, dbName: string, operation: string) {
const tracer = trace.getTracer('D1')
const fnHandler: ProxyHandler<any> = {
apply: (target, thisArg, argArray) => {
if (operation === 'prepare') {
const sql = argArray[0] as string
const stmt = Reflect.apply(target, thisArg, argArray) as D1PreparedStatement
return instrumentD1PreparedStatement(stmt, dbName, sql)
} else if (operation === 'exec') {
const sql = argArray[0] as string
const options = spanOptions(dbName, operation, sql)
return tracer.startActiveSpan(`${dbName} ${operation}`, options, async (span) => {
try {
const result = await Reflect.apply(target, thisArg, argArray)
span.setStatus({ code: SpanStatusCode.OK })
return result
} catch (error) {
span.recordException(error as Exception)
span.setStatus({ code: SpanStatusCode.ERROR })
throw error
} finally {
span.end()
}
})
} else if (operation === 'batch') {
// Create span for each statement, requires peeaking into D1 internals ...
const statements = argArray[0] as D1StatementInternals[]
return tracer.startActiveSpan(`${dbName} ${operation}`, async (span) => {
// Create a span per query in the batch
const subSpans = statements.map((s) =>
tracer.startSpan(`${dbName} ${operation} > query`, spanOptions(dbName, operation, s.statement)),
)

try {
const result = (await Reflect.apply(target, thisArg, argArray)) as D1Result[]
result.forEach((r, i) => subSpans[i]?.setAttributes(metaAttributes(r.meta)))
span.setStatus({ code: SpanStatusCode.OK })
return result
} catch (error) {
span.recordException(error as Exception)
span.setStatus({ code: SpanStatusCode.ERROR })
throw error
} finally {
subSpans.forEach((s) => s.end())
span.end()
}
})
} else {
return Reflect.apply(target, thisArg, argArray)
}
},
}
return wrap(fn, fnHandler)
}

export function instrumentD1(database: D1Database, dbName: string): D1Database {
const dbHandler: ProxyHandler<D1Database> = {
get: (target, prop, receiver) => {
const operation = String(prop)
const fn = Reflect.get(target, prop, receiver)
if (typeof fn === 'function') {
return instrumentD1Fn(fn, dbName, operation)
}
return fn
},
}
return wrap(database, dbHandler)
}
7 changes: 7 additions & 0 deletions src/instrumentation/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { instrumentDOBinding } from './do.js'
import { instrumentKV } from './kv.js'
import { instrumentQueueSender } from './queue.js'
import { instrumentServiceBinding } from './service.js'
import { instrumentD1 } from './d1'
import { instrumentAnalyticsEngineDataset } from './analytics-engine.js'

const isJSRPC = (item?: unknown): item is Service => {
Expand Down Expand Up @@ -34,6 +35,10 @@ const isAnalyticsEngineDataset = (item?: unknown): item is AnalyticsEngineDatase
return !isJSRPC(item) && !!(item as AnalyticsEngineDataset)?.writeDataPoint
}

const isD1Database = (item?: unknown): item is D1Database => {
return !!(item as D1Database)?.exec && !!(item as D1Database)?.prepare
}

const instrumentEnv = (env: Record<string, unknown>): Record<string, unknown> => {
const envHandler: ProxyHandler<Record<string, unknown>> = {
get: (target, prop, receiver) => {
Expand All @@ -54,6 +59,8 @@ const instrumentEnv = (env: Record<string, unknown>): Record<string, unknown> =>
return item
} else if (isAnalyticsEngineDataset(item)) {
return instrumentAnalyticsEngineDataset(item, String(prop))
} else if (isD1Database(item)) {
return instrumentD1(item, String(prop))
} else {
return item
}
Expand Down