Skip to content

Commit

Permalink
Merge pull request #142 from SwissBorg/unique-indices
Browse files Browse the repository at this point in the history
Partitioned schema - unique index on (persistence_id, sequence_number).
  • Loading branch information
fredfp authored Feb 25, 2021
2 parents 2928ec1 + c04fd4f commit 4a81fdf
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 11 deletions.
24 changes: 18 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,21 @@ Example partition names: `j_myActor_0`, `j_myActor_1`, `j_worker_0` etc.
Keep in mind that the default maximum length for a table name in Postgres is 63 bytes, so you should avoid any non-ascii characters in your `persistenceId`s and keep the `prefix` reasonably short.

> :warning: Once any of the partitioning setting under `postgres-journal.tables.journal.partitions` branch is settled, you should never change it. Otherwise you might end up with PostgresExceptions caused by table name or range conflicts.
## Migration

## Migration from akka-persistence-jdbc 4.0.0
### Migration from akka-persistence-jdbc 4.0.0
It is possible to migrate existing journals from Akka Persistence JDBC 4.0.0.
Since we decided to extract metadata from the serialized payload and store it in a separate column it is not possible to migrate exiting journal and snapshot store using plain SQL scripts.

### How migration works
#### How migration works
Each journal event and snapshot has to be read, deserialized, metadata and tags must be extracted and then everything stored in the new table.

We provide you with an optional artifact, `akka-persistence-postgres-migration` that brings to your project the necessary classes to automate the above process.

**Important**: Our util classes neither drop nor update any old data. Original tables will be still there but renamed with an `old_` prefix. It's up to you when to drop them.

### How to use plugin provided migrations
#### Add akka-persistence-migration to your project
#### How to use plugin provided migrations
##### Add akka-persistence-migration to your project
Add the following to your `build.sbt`
```
libraryDependencies += "com.swissborg" %% "akka-persistence-postgres-migration" % "0.4.1"
Expand All @@ -141,7 +142,7 @@ For a maven project add:
```
to your `pom.xml`.

#### Create and run migrations:
##### Create and run migrations:
```scala
import akka.persistence.postgres.migration.journal.Jdbc4JournalMigration
import akka.persistence.postgres.migration.snapshot.Jdbc4SnapshotStoreMigration
Expand All @@ -155,9 +156,20 @@ _ <- new Jdbc4SnapshotStoreMigration(config).run()

It's your choice whether you want to trigger migration manually or (recommended) leverage a database version control system of your choice (e.g. Flyway).

### Examples
#### Examples
An example Flyway-based migration can be found in the demo app: https://github.com/mkubala/demo-akka-persistence-postgres/blob/master/src/main/scala/com/github/mkubala/FlywayMigrationExample.scala

### Migration from akka-persistence-postgres 0.4.0 to 0.5.0
New indices need to be created on each partition, to avoid locking production databases for too long, it should be done in 2 steps:
1. manually create indices CONCURRENTLY,
2. deploy new release with migration scripts.

#### Manually create indices CONCURRENTLY
Execute DDL statements produced by the [sample migration script](scripts/migratrion-0.5.0/partitioned/1-add-indices-manually.sql), adapt top level variables to match your journal configuration before executing.

#### Deploy new release with migration scripts
See [sample flyway migration script](scripts/migratrion-0.5.0/partitioned/2-add-indices-flyway.sql) and adapt top level variables to match your journal configuration.

## Contributing
We are also always looking for contributions and new ideas, so if you’d like to join the project, check out the [open issues](https://github.com/SwissBorg/akka-persistence-postgres/issues), or post your own suggestions!

Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/akka/persistence/postgres/db/DbErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ object DbErrors {
import ExtendedPostgresProfile.api._

val PgDuplicateTable: String = "42P07"
val PgUniqueViolation: String = "23505"

def withHandledPartitionErrors(logger: Logger, partitionDetails: String)(dbio: DBIOAction[_, NoStream, Effect])(
implicit ec: ExecutionContext): DBIOAction[Unit, NoStream, Effect] =
Expand All @@ -26,4 +27,18 @@ object DbErrors {
logger.debug(s"Created missing journal partition for $partitionDetails")
DBIO.successful(())
}

def withHandledIndexErrors(logger: Logger, indexDetails: String)(dbio: DBIOAction[_, NoStream, Effect])(implicit
ec: ExecutionContext): DBIOAction[Unit, NoStream, Effect] =
dbio.asTry.flatMap {
case Failure(ex: SQLException) if ex.getSQLState == PgUniqueViolation =>
logger.debug(s"Index $indexDetails already exists")
DBIO.successful(())
case Failure(ex) =>
logger.error(s"Cannot create index $indexDetails", ex)
DBIO.failed(ex)
case Success(_) =>
logger.debug(s"Created missing index $indexDetails")
DBIO.successful(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicReference

import akka.persistence.postgres.JournalRow
import akka.persistence.postgres.config.JournalConfig
import akka.persistence.postgres.db.DbErrors.withHandledPartitionErrors
import akka.persistence.postgres.db.DbErrors.{ withHandledIndexErrors, withHandledPartitionErrors }
import akka.serialization.Serialization
import akka.stream.Materializer
import slick.jdbc.JdbcBackend.Database
Expand Down Expand Up @@ -69,9 +69,13 @@ class PartitionedJournalDao(db: Database, journalConfig: JournalConfig, serializ
val name = s"${partitionPrefix}_$partitionNumber"
val minRange = partitionNumber * partitionSize
val maxRange = minRange + partitionSize
withHandledPartitionErrors(logger, s"ordering between $minRange and $maxRange") {
sqlu"""CREATE TABLE IF NOT EXISTS #${schema + name} PARTITION OF #${schema + journalTableCfg.tableName} FOR VALUES FROM (#$minRange) TO (#$maxRange)"""
}
val partitionName = s"${schema + name}"
val indexName = s"${name}_persistence_sequence_idx"
withHandledPartitionErrors(logger, s"$partitionName (ordering between $minRange and $maxRange)") {
sqlu"""CREATE TABLE IF NOT EXISTS #$partitionName PARTITION OF #${schema + journalTableCfg.tableName} FOR VALUES FROM (#$minRange) TO (#$maxRange)"""
}.andThen(withHandledIndexErrors(logger, s"$indexName for partition $partitionName") {
sqlu"""CREATE UNIQUE INDEX IF NOT EXISTS #$indexName ON #$partitionName USING BTREE (#${journalTableCfg.columnNames.persistenceId}, #${journalTableCfg.columnNames.sequenceNumber});"""
})
}
}
DBIO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ CREATE SEQUENCE journal_ordering_seq OWNED BY public.journal.ordering;

CREATE EXTENSION IF NOT EXISTS intarray WITH SCHEMA public;
CREATE INDEX journal_tags_idx ON public.journal USING GIN (tags public.gin__int_ops);
CREATE INDEX journal_persistence_sequence_idx ON public.journal USING BTREE (persistence_id, sequence_number);

DROP TABLE IF EXISTS public.tags;

Expand Down
41 changes: 41 additions & 0 deletions scripts/migratrion-0.5.0/partitioned/1-add-indices-manually.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
DO $$
DECLARE
-- adapt those to match journal configuration
v_journal_table_name constant text := 'journal';
v_schema constant text := 'public';
v_column_persistence_id text = 'persistence_id';
v_column_sequence_number text = 'sequence_number';
-- do not change values below
v_persistence_seq_idx constant text := '_persistence_sequence_idx';
v_rec record;
v_sql text;
BEGIN
FOR v_rec IN
-- get list of partitions
SELECT
child.relname AS child
FROM
pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
WHERE
parent.relname = v_journal_table_name AND
nmsp_parent.nspname = v_schema
LOOP
PERFORM
FROM
pg_indexes
WHERE
schemaname = v_schema
AND tablename = v_rec.child
AND indexname = v_rec.child || v_persistence_seq_idx;
IF NOT FOUND THEN
-- unique btree on (persistence_id, sequence_number)
v_sql := 'CREATE UNIQUE INDEX CONCURRENTLY ' || quote_ident(v_rec.child || v_persistence_seq_idx) || ' ON ' || quote_ident(v_schema) || '.' || quote_ident(v_rec.child) || ' USING BTREE (' || quote_ident(v_column_persistence_id) || ',' || quote_ident(v_column_sequence_number) || ');';
RAISE notice 'Run DDL: %', v_sql;
END IF;

END LOOP;
END;
$$;
41 changes: 41 additions & 0 deletions scripts/migratrion-0.5.0/partitioned/2-add-indices-flyway.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- Ensure indexes exist on partitions, actual indexes are created manually before migration using CONCURRENTLY option.
-- This block is needed to avoid missing indexes if there was a new partition created between manual index creation and
-- actual migration. We cannot create indexes CONCURRENTLY here, as is not possible to create indexes CONCURRENTLY
-- inside transaction and functions are executed inside transaction.
DO $$
DECLARE
-- adapt those to match journal configuration
v_journal_table_name constant text := 'journal';
v_column_persistence_id text = 'persistence_id';
v_column_sequence_number text = 'sequence_number';
-- do not change values below
v_persistence_seq_idx constant text := '_persistence_sequence_idx';
-- detect why schema flyway uses
v_schema constant text := (select trim(both '"' from split_part(setting,',',1)) FROM pg_settings WHERE name = 'search_path');
v_rec record;
v_sql text;
BEGIN
FOR v_rec IN
-- get list of partitions
SELECT
child.relname AS child
FROM
pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
WHERE
parent.relname = v_journal_table_name AND
nmsp_parent.nspname = v_schema
LOOP
-- unique btree on (persistence_id, sequence_number)
v_sql := 'CREATE UNIQUE INDEX IF NOT EXISTS ' || quote_ident(v_rec.child || v_persistence_seq_idx) || ' ON ' || quote_ident(v_schema) || '.' || quote_ident(v_rec.child) || ' USING BTREE (' || quote_ident(v_column_persistence_id) || ',' || quote_ident(v_column_sequence_number) || ');';
RAISE notice 'Running DDL: %', v_sql;
EXECUTE v_sql;

END LOOP;
END;
$$;

-- drop global, non-unique index
DROP INDEX IF EXISTS journal_persistence_id_sequence_number_idx;

0 comments on commit 4a81fdf

Please sign in to comment.