Skip to content

Commit

Permalink
Add before/after columns, use GIN index, document Destination DB
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Feb 9, 2024
1 parent 8eca5ef commit 61d9a7b
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 34 deletions.
4 changes: 2 additions & 2 deletions core/src/change-message.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { RequiredEntityData } from '@mikro-orm/core';

import { Change, Operation } from "./entities/Change"
import { logger } from './logger'
import { Message, decodeData } from './nats'

export const MESSAGE_PREFIX_CONTEXT = '_bemi'
Expand Down Expand Up @@ -31,7 +30,8 @@ const parseDebeziumData = (debeziumChange: any, now: Date) => {

return {
primaryKey: (operation === Operation.DELETE ? before : after)?.id?.toString(),
values: after || {},
before: before || {},
after: after || {},
context,
database,
schema,
Expand Down
19 changes: 12 additions & 7 deletions core/src/entities/Change.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@ export enum Operation {
@Entity({ tableName: 'changes' })

@Index({ properties: ['primaryKey'] })
@Index({ properties: ['values'] })
@Index({ properties: ['context'] })
@Index({ properties: ['committedAt'] })
@Unique({ properties: ['position', 'database', 'schema', 'table', 'values'] })
@Index({ properties: ['context'], type: 'GIN' })
@Index({ properties: ['before'], type: 'GIN' })
@Index({ properties: ['after'], type: 'GIN' })
@Unique({ properties: ['position', 'operation', 'table', 'schema', 'database'] })

export class Change extends BaseEntity {
@Property({ nullable: true })
primaryKey: string | undefined;

@Property({ type: JsonType, default: '{}' })
values: object;
before: object;

@Property({ type: JsonType, default: '{}' })
after: object;

@Property({ type: JsonType, default: '{}' })
context: object;
Expand Down Expand Up @@ -53,12 +57,13 @@ export class Change extends BaseEntity {
position: number;

constructor(
{ primaryKey, values, context, database, schema, table, operation, committedAt, queuedAt, transactionId, position }:
{ primaryKey?: string, values: object, context: object, database: string, schema: string, table: string, operation: Operation, committedAt: Date, queuedAt: Date, transactionId: number, position: number }
{ primaryKey, before, after, context, database, schema, table, operation, committedAt, queuedAt, transactionId, position }:
{ primaryKey?: string, before: object, after: object, context: object, database: string, schema: string, table: string, operation: Operation, committedAt: Date, queuedAt: Date, transactionId: number, position: number }
) {
super();
this.primaryKey = primaryKey;
this.values = values;
this.before = before;
this.after = after;
this.context = context;
this.database = database;
this.schema = schema;
Expand Down
46 changes: 34 additions & 12 deletions core/src/migrations/.snapshot-db_799e557d9277.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
"nullable": true,
"mappedType": "string"
},
"values": {
"name": "values",
"after": {
"name": "after",
"type": "jsonb",
"unsigned": false,
"autoincrement": false,
Expand Down Expand Up @@ -135,19 +135,40 @@
"primary": false,
"nullable": false,
"mappedType": "bigint"
},
"before": {
"name": "before",
"type": "jsonb",
"unsigned": false,
"autoincrement": false,
"primary": false,
"nullable": false,
"default": "'{}'",
"mappedType": "json"
}
},
"name": "changes",
"schema": "public",
"indexes": [
{
"keyName": "changes_committed_at_index",
"keyName": "changes_after_index",
"columnNames": [
"committed_at"
"after"
],
"composite": false,
"primary": false,
"unique": false
"unique": false,
"type": "GIN"
},
{
"keyName": "changes_before_index",
"columnNames": [
"before"
],
"composite": false,
"primary": false,
"unique": false,
"type": "GIN"
},
{
"keyName": "changes_context_index",
Expand All @@ -156,12 +177,13 @@
],
"composite": false,
"primary": false,
"unique": false
"unique": false,
"type": "GIN"
},
{
"keyName": "changes_values_index",
"keyName": "changes_committed_at_index",
"columnNames": [
"values"
"committed_at"
],
"composite": false,
"primary": false,
Expand All @@ -177,13 +199,13 @@
"unique": false
},
{
"keyName": "changes_position_database_schema_table_values_unique",
"keyName": "changes_position_operation_table_schema_database_unique",
"columnNames": [
"position",
"database",
"schema",
"operation",
"table",
"values"
"schema",
"database"
],
"composite": true,
"primary": false,
Expand Down
41 changes: 41 additions & 0 deletions core/src/migrations/Migration20240208205640_before_and_after.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { Migration } from '@mikro-orm/migrations';

export class Migration20240208205640_before_and_after extends Migration {

async up(): Promise<void> {
// before - new column
this.addSql('alter table "changes" add column "before" jsonb not null default \'{}\';');
this.addSql('create index "changes_before_index" on "changes" using GIN ("before" jsonb_path_ops);');

// after - rename
this.addSql('drop index "changes_values_index";');
this.addSql('alter table "changes" rename column "values" to "after";');
this.addSql('create index "changes_after_index" on "changes" using GIN ("after" jsonb_path_ops);');

// context - new index
this.addSql('drop index "changes_context_index";');
this.addSql('create index "changes_context_index" on "changes" using GIN ("context" jsonb_path_ops);');

// unique constraint
this.addSql('alter table "changes" drop constraint "changes_position_database_schema_table_values_unique";');
this.addSql('alter table "changes" add constraint "changes_position_operation_table_schema_database_unique" unique ("position", "operation", "table", "schema", "database");');
}

async down(): Promise<void> {
// before - drop column
this.addSql('alter table "changes" drop column "before";');

// after - rename
this.addSql('drop index "changes_after_index";');
this.addSql('alter table "changes" rename column "after" to "values";');
this.addSql('create index "changes_values_index" on "changes" ("values");');

// context - new index
this.addSql('drop index "changes_context_index";');
this.addSql('create index "changes_context_index" on "changes" ("context");');

// unique constraint
this.addSql('alter table "changes" drop constraint "changes_position_operation_table_schema_database_unique";');
this.addSql('alter table "changes" add constraint "changes_position_database_schema_table_values_unique" unique ("position", "database", "schema", "table", "values");');
}
}
24 changes: 16 additions & 8 deletions core/src/specs/fixtures/change-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ export const CHANGE_ATTRIBUTES = {
"schema": "public",
"table": "todo",
"transactionId": 768,
"values": { "id": 2, "isCompleted": false, "task": "Test" }
"before": {},
"after": { "id": 2, "isCompleted": false, "task": "Test" },
},
CREATE_MESSAGE: {
"committedAt": MOCKED_DATE,
Expand All @@ -32,7 +33,8 @@ export const CHANGE_ATTRIBUTES = {
"schema": "",
"table": "",
"transactionId": 768,
"values": {},
"before": {},
"after": {},
},
UPDATE: {
"committedAt": MOCKED_DATE,
Expand All @@ -46,7 +48,8 @@ export const CHANGE_ATTRIBUTES = {
"schema": "public",
"table": "todo",
"transactionId": 769,
"values": { "id": 2, "isCompleted": false, "task": "2023-11-28THH:06:22:437" },
"before": { "id": 2, "isCompleted": false, "task": "Test" },
"after": { "id": 2, "isCompleted": true, "task": "Test" },
},
UPDATE_MESSAGE: {
"committedAt": MOCKED_DATE,
Expand All @@ -60,7 +63,8 @@ export const CHANGE_ATTRIBUTES = {
"schema": "",
"table": "",
"transactionId": 769,
"values": {},
"before": {},
"after": {},
},
DELETE: {
"committedAt": MOCKED_DATE,
Expand All @@ -74,7 +78,8 @@ export const CHANGE_ATTRIBUTES = {
"schema": "public",
"table": "todo",
"transactionId": 767,
"values": {},
"before": { "id": 2, "isCompleted": true, "task": "Test" },
"after": {},
},
DELETE_MESSAGE: {
"committedAt": MOCKED_DATE,
Expand All @@ -88,7 +93,8 @@ export const CHANGE_ATTRIBUTES = {
"schema": "",
"table": "",
"transactionId": 767,
"values": {},
"before": {},
"after": {},
},
HEARTBEAT_MESSAGE: {
"committedAt": MOCKED_DATE,
Expand All @@ -102,7 +108,8 @@ export const CHANGE_ATTRIBUTES = {
"schema": "",
"table": "",
"transactionId": 769,
"values": {},
"before": {},
"after": {},
},
TRUNCATE: {
"committedAt": MOCKED_DATE,
Expand All @@ -116,6 +123,7 @@ export const CHANGE_ATTRIBUTES = {
"schema": "public",
"table": "todo",
"transactionId": 770,
"values": {},
"before": {},
"after": {},
},
}
6 changes: 3 additions & 3 deletions core/src/specs/fixtures/nats-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ export const MESSAGE_DATA = {
message: { prefix: '_bemi', content: 'eyJvcCI6ICJjIn0=' },
},
UPDATE: {
before: null,
after: { id: 2, task: '2023-11-28THH:06:22:437', isCompleted: false },
before: { id: 2, task: 'Test', isCompleted: false },
after: { id: 2, task: 'Test', isCompleted: true },
source: {
version: '2.4.1.Final',
connector: 'postgresql',
Expand Down Expand Up @@ -90,7 +90,7 @@ export const MESSAGE_DATA = {
message: { prefix: '_bemi', content: 'eyJvcCI6ICJ1In0=' },
},
DELETE: {
before: { id: 2, task: '', isCompleted: false },
before: { id: 2, task: 'Test', isCompleted: true },
after: null,
source: {
version: '2.4.1.Final',
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/orms/prisma.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

[Bemi](https://bemi.io) plugs into [Prisma](https://github.com/prisma/prisma) and PostgreSQL to track database changes automatically. It unlocks robust context-aware audit trails and time travel querying inside your application.

This package is an optional Prisma integration, enabling you to pass application-specific context when performing database changes. This can include context such as the 'where' (API endpoint, worker, etc.), 'who' (user, cron job, etc.), and 'how' behind a change, thereby enriching the information captured by Bemi.
This package is an recommended Prisma integration, enabling you to pass application-specific context when performing database changes. This can include context such as the 'where' (API endpoint, worker, etc.), 'who' (user, cron job, etc.), and 'how' behind a change, thereby enriching the information captured by Bemi.

See [this repo](https://github.com/BemiHQ/bemi-prisma-example) as an Todo app example with Prisma that automatically tracks all changes.

Expand Down
2 changes: 1 addition & 1 deletion docs/docs/orms/typeorm.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

[Bemi](https://bemi.io/) plugs into [TypeORM](https://github.com/typeorm/typeorm) and PostgreSQL to track database changes automatically. It unlocks robust context-aware audit trails and time travel querying inside your application.

This package is an optional TypeORM integration, enabling you to pass application-specific context when performing database changes. This can include context such as the 'where' (API endpoint, worker, etc.), 'who' (user, cron job, etc.), and 'how' behind a change, thereby enriching the information captured by Bemi.
This package is an recommended TypeORM integration, enabling you to pass application-specific context when performing database changes. This can include context such as the 'where' (API endpoint, worker, etc.), 'who' (user, cron job, etc.), and 'how' behind a change, thereby enriching the information captured by Bemi.

See [this repo](https://github.com/BemiHQ/bemi-typeorm-example) as an Todo app example with TypeORM that automatically tracks all changes.

Expand Down
53 changes: 53 additions & 0 deletions docs/docs/postgresql/destination-database.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Destination Database

Bemi automatically provisions a cloud-hosted PostgreSQL destination database to store all changes made in a source database.
You have full control over this database which comes with additional features:

* Autoscaling, managed table partitioning and index optimization to improve performance
* Automatic failover, read-replica, and backups for high availability and fault tolerance
* Automatic data retention: 15 days, 30 days, or unlimited
* Automatic PostgreSQL version upgrades
* Standard cloud support
* Control plane and monitoring through Bemi Dashboard (coming soon)

## Data Structure

Changes performed by creating, updating, or deleting each row are stored in a table called `changes` and have the following structure:

| Column | Type | Description |
| ---------------- | ---------------- | ------------------------------------------------------------ |
| `id` | `uuid` | A unique identifier of the change record |
| `database` | `varchar(255)` | Database name where the changed record was stored |
| `schema` | `varchar(255)` | Schema name where the changed record was stored |
| `table` | `varchar(255)` | Table name where the changed record was stored |
| `primary_key` | `varchar(255)` | A unique identifier of the changed record (optional) |
| `operation` | `text` | Enum that can be either `CREATE`, `UPDATE`, or `DELETE` |
| `before` | `jsonb` | Record's values before the change |
| `after` | `jsonb` | Record's values after the change |
| `context` | `jsonb` | App context passed by using our recommended [ORM packages](/#supported-nodejs-orms) |
| `committed_at` | `timestamptz(0)` | When the record was changed |
| `queued_at` | `timestamptz(0)` | When the changed record was ingested from WAL |
| `created_at` | `timestamptz(0)` | When the change record was stored in the database |
| `transaction_id` | `integer` | PostgreSQL transaction ID |
| `position` | `bigint` | PostgreSQL WAL position |

## Querying Changes

You can query changes by using our [ORM packages](/#supported-nodejs-orms) or by directly connecting and executing SQL queries.
For example, if you need to find when and how a user record with ID `b7267340-5011-40f4-ab9a-902b68fc5b25` had its email updated to `new@example.com` in the last 3 months:

```sql
SELECT *
FROM "changes"
WHERE
"database" = 'postgres' AND "schema" = 'public' AND "table" = 'users' AND
"primary_key" = 'b7267340-5011-40f4-ab9a-902b68fc5b25' AND "operation" = 'UPDATE' AND
"after" @> '{"email": "new@example.com"}' AND NOT ("before" @> '{"email": "new@example.com"}') AND
"committed_at" BETWEEN (NOW() - INTERVAL '3 months') AND NOW()
LIMIT 1;`
```

The JSONB columns are indexed with [GIN Index](https://www.postgresql.org/docs/current/indexes-types.html#INDEXES-TYPES-GIN) with `jsonb_path_ops` operator class that is perfomance-optimized for operators like:

* `jsonb @> '{"key": value}'` to check if a key/value pair matches JSONB
* `jsonb @? '$.key'` to check if a key exists in JSONB
1 change: 1 addition & 0 deletions docs/sidebars.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const sidebars: SidebarsConfig = {
collapsed: false,
items: [
'postgresql/source-database',
'postgresql/destination-database',
],
},
{
Expand Down

0 comments on commit 61d9a7b

Please sign in to comment.