Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MySQL #170

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1a5e00a
Add mysql dll and docker compose files
ptrdom Oct 21, 2024
1164b4e
Setup core-mysql module, improve docker compose for mysql
ptrdom Oct 24, 2024
21f10f4
Make journal spec pass for mysql implementation
ptrdom Oct 26, 2024
a4d1edd
Small improvements
ptrdom Oct 27, 2024
58fc2e3
Implement mysql snapshot store
ptrdom Oct 27, 2024
c05ffe0
Implement persist timestamp spec for mysql
ptrdom Oct 27, 2024
00cb262
Implement many actors perf spec for mysql
ptrdom Oct 27, 2024
50c012f
Implement perf spec for mysql
ptrdom Oct 27, 2024
b9007e6
Disable tags support for mysql
ptrdom Oct 27, 2024
80685b8
Make query specs pass for mysql implementation
ptrdom Oct 28, 2024
3a1f5f4
Make durable state specs pass for mysql implementation
ptrdom Oct 28, 2024
84a1c7e
Implement projection support for mysql
ptrdom Oct 29, 2024
c9c2271
Simplify mysql journal dao
ptrdom Nov 2, 2024
202b55a
Simplify mysql query dao
ptrdom Nov 2, 2024
cf68c06
Simplify mysql snapshot dao
ptrdom Nov 2, 2024
122c957
Simplify mysql durable state dao
ptrdom Nov 2, 2024
8e0f100
Simplifiy mysql module reference configurations
ptrdom Nov 2, 2024
6d87171
Simplifiy mysql module database config
ptrdom Nov 2, 2024
3b87c73
Fix Scala 2.12 compilation issues
ptrdom Nov 2, 2024
f76f7ed
Setup github workflow job for mysql tests
ptrdom Nov 2, 2024
a023e31
Make migration part of postgres sbt aggregate
ptrdom Nov 2, 2024
961a167
Fix pekko imports
ptrdom Nov 2, 2024
d9a9643
Fix github workflow job for mysql tests
ptrdom Nov 3, 2024
7d6ffcc
Try make mysql auth work in github actions
ptrdom Nov 3, 2024
876b87d
Fix docker healthcheck for mysql
ptrdom Nov 3, 2024
32b0dc5
Bump timeouts for certain specs
ptrdom Nov 3, 2024
9f3b62d
Disable mysql tests for jdk 8
ptrdom Nov 3, 2024
6174dd9
Run mysql tests for all LTS JDKs, but only compile on JDK 8
ptrdom Nov 5, 2024
b460799
Fix mysql CI job
ptrdom Nov 5, 2024
c202d29
Fix NPE in long duration DB call logging
ptrdom Nov 5, 2024
0b3a72a
Remove additional options for connection factory and use ConnectionFa…
ptrdom Nov 7, 2024
b6d6ac7
Add scala 2.12 compatible null value handling for db call log
ptrdom Nov 7, 2024
ff28e43
Fix headers
ptrdom Nov 9, 2024
f19f9fa
Revert "Fix headers"
ptrdom Nov 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
docker exec -i docker-postgres-db-1 psql -U postgres -t < ddl-scripts/create_tables_postgres.sql

- name: test
run: sbt ++${{ matrix.SCALA_VERSION }} test
run: sbt ++${{ matrix.SCALA_VERSION }} postgres/test

test-yugabyte:
name: Run tests with Yugabyte
Expand Down Expand Up @@ -130,7 +130,62 @@ jobs:
docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h yb-tserver-n1 -t < ddl-scripts/create_tables_yugabyte.sql

- name: test
run: sbt -Dpekko.persistence.r2dbc.dialect=yugabyte -Dpekko.projection.r2dbc.dialect=yugabyte ++${{ matrix.SCALA_VERSION }} test
run: sbt -Dpekko.persistence.r2dbc.dialect=yugabyte -Dpekko.projection.r2dbc.dialect=yugabyte ++${{ matrix.SCALA_VERSION }} yugabyte/test

test-mysql:
name: Run test with MySQL
runs-on: ubuntu-latest
strategy:
matrix:
SCALA_VERSION: [ 2.12, 2.13, 3.3 ]
JAVA_VERSION: [ 11, 17, 21 ]
# only compiling on JDK 8 because of lower timestamp precision, which causes flakiness in certain tests
include:
- JAVA_VERSION: 8
SCALA_VERSION: 2.12
COMPILE_ONLY: true
- JAVA_VERSION: 8
SCALA_VERSION: 2.13
COMPILE_ONLY: true
- JAVA_VERSION: 8
SCALA_VERSION: 3.3
COMPILE_ONLY: true
if: github.repository == 'apache/pekko-persistence-r2dbc'
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
fetch-tags: true

- name: Checkout GitHub merge
if: github.event.pull_request
run: |-
git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch
git checkout scratch

- name: Setup Java ${{ matrix.JAVA_VERSION }}
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: ${{ matrix.JAVA_VERSION }}

- name: Install sbt
uses: sbt/setup-sbt@v1

- name: Cache Coursier cache
uses: coursier/cache-action@v6

- name: Enable jvm-opts
run: cp .jvmopts-ci .jvmopts

- name: Start DB
run: |-
docker compose -f docker/docker-compose-mysql.yml up -d --wait
docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root --password=root --database=mysql < ddl-scripts/create_tables_mysql.sql
ptrdom marked this conversation as resolved.
Show resolved Hide resolved

- name: test
run: sbt ++${{ matrix.SCALA_VERSION }} mysql/${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}

test-docs:
name: Docs
Expand Down
28 changes: 27 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,33 @@ lazy val root = (project in file("."))
.settings(dontPublish)
.settings(
name := "pekko-persistence-r2dbc-root")
.aggregate(core, projection, migration, docs)
.aggregate(core, `core-mysql`, projection, migration, docs)

def suffixFileFilter(suffix: String): FileFilter = new SimpleFileFilter(f => f.getAbsolutePath.endsWith(suffix))

lazy val postgres = (project in file("postgres"))
.aggregate(core, projection, migration)

lazy val yugabyte = (project in file("yugabyte"))
.aggregate(core, projection)

lazy val mysql = (project in file("mysql"))
.aggregate(`core-mysql`, `projection-mysql`)

lazy val core = (project in file("core"))
.enablePlugins(ReproducibleBuildsPlugin)
.settings(
name := "pekko-persistence-r2dbc",
libraryDependencies ++= Dependencies.core)

lazy val `core-mysql` = (project in file("core-mysql"))
.dependsOn(core % "compile->compile;test->test") // TODO core-testkit would be ideal, though some classes are problematic to move
.enablePlugins(ReproducibleBuildsPlugin)
.disablePlugins(MimaPlugin) // TODO enable after future release
.settings(
name := "pekko-persistence-r2dbc-mysql",
libraryDependencies ++= Dependencies.`core-mysql`)

lazy val projection = (project in file("projection"))
.dependsOn(core)
.enablePlugins(ReproducibleBuildsPlugin)
Expand All @@ -63,6 +80,15 @@ lazy val projection = (project in file("projection"))
libraryDependencies ++= Dependencies.projection,
dependencyOverrides ++= Dependencies.pekkoTestDependencyOverrides)

lazy val `projection-mysql` = (project in file("projection-mysql"))
.dependsOn(`core-mysql` % "compile->compile;test->test", projection % "compile->compile;test->test") // TODO projection-testkit would be ideal, though some classes are problematic to move
.enablePlugins(ReproducibleBuildsPlugin)
.disablePlugins(MimaPlugin) // TODO enable after future release
.settings(
name := "pekko-projection-r2dbc-mysql",
libraryDependencies ++= Dependencies.projection,
dependencyOverrides ++= Dependencies.pekkoTestDependencyOverrides)

lazy val migration = (project in file("migration"))
.enablePlugins(ReproducibleBuildsPlugin)
.settings(
Expand Down
31 changes: 31 additions & 0 deletions core-mysql/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# SPDX-License-Identifier: Apache-2.0

// #connection-settings
pekko.persistence.r2dbc {

# setting dialect to empty toggles use of the following class config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The values allowed are empty and "mysql" ? Could we list the allowed values in the comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, valid dialect values are declared here - https://github.com/apache/pekko-persistence-r2dbc/blob/main/core/src/main/resources/reference.conf#L105. Setting dialect to empty enables loading of DAO classes specified by journalDaoClass, queryDaoClass, snapshotDaoClass, durableStateDaoClass configs instead of using default Postgres/Yugabyte implementation. This is the only way I figured MySQL implementation could be plugged in as a separate sbt module. This also provides flexibility in a way that library's users could plug their own DAO implementations and provide support for new databases without making changes to the library.

If you have any suggestions for improvements to implementation or docs, please let me know,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

your link says postgre and yugabyte and the only valid values

I am lost. I do not understand what you are trying to do here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how else to explain it without you having to familiarize with the code more to either understand my solution or propose an alternative that is better.

dialect = ""
journalDaoClass = "org.apache.pekko.persistence.r2dbc.mysql.journal.MySQLJournalDao"
queryDaoClass = "org.apache.pekko.persistence.r2dbc.mysql.query.MySQLQueryDao"
snapshotDaoClass = "org.apache.pekko.persistence.r2dbc.mysql.snapshot.MySQLSnapshotDao"
durableStateDaoClass = "org.apache.pekko.persistence.r2dbc.mysql.state.MySQLDurableStateDao"

schema = ""

connection-factory {
driver = "mysql"

host = "localhost"
port = 3306
database = "mysql"
user = "root"
password = "root"

connection-factory-options-customizer = "org.apache.pekko.persistence.r2dbc.mysql.MySQLConnectionFactoryOptionsCustomizer"
}

db-timestamp-monotonic-increasing = on

use-app-timestamp = on
}
// #connection-settings
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.persistence.r2dbc.mysql

import com.typesafe.config.Config
import io.r2dbc.spi.ConnectionFactoryOptions
import io.r2dbc.spi.Option
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsCustomizer

class MySQLConnectionFactoryOptionsCustomizer(system: ActorSystem[_]) extends ConnectionFactoryOptionsCustomizer {
override def apply(builder: ConnectionFactoryOptions.Builder, config: Config): ConnectionFactoryOptions.Builder = {
// Either `connectionTimeZone = SERVER` or `forceConnectionTimeZoneToSession = true` need to be set for timezones to work correctly,
// likely caused by bug in https://github.com/asyncer-io/r2dbc-mysql/pull/240.
builder.option(Option.valueOf("connectionTimeZone"), "SERVER")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to change the source header on all the new files - see point 5 in https://github.com/apache/pekko/blob/main/CONTRIBUTING.md#pull-request-requirements

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apologies @ptrdom - can you remove your last header change commit?

These new classes are copies of old classes, so they must preserve the header of the old class. It is only absolutely new classes that need the standard ASF header.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I can revert. I do not understand the definition of "copy" and "absolutely new" in this context, so I will need some further explanation to avoid header issues in the future or just continued handholding is honestly fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • if you copy/paste code from another source file (whole class or just some lines), you need to also copy the source header from the file you copied from (even if you modify that copied code)
  • if you add new code to a source file and that source file does not have a source header that matches the license under which your new code is licensed, then you should add the right source header (but don't remove any existing source headers)
  • you can avoid duplication
    • you might have 2 Apache source headers that might have different copyright lines - you can merge those headers into one header but with 2 copyright lines
    • in Pekko, we use 2 forms of the Apache header - one that mentions Akka but we also use the standard Apache header if none of the code in the file is derived from Akka - we don't need both Apache headers in the same file - we just use the Pekko specific one (mentioning Akka)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewing the new files on a case by case basis, it looks like some files should have the standard Apache header and some should have the special one that mentions Akka.
Maybe, we can get back to this just before we merge this. I can make the changes if the PR allows Pekko members to edit them (the default setting).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was about to comment on that, because most of changes are subclass implementations without any changes that would without a doubt could be called a copy/paste.

*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.persistence.r2dbc.mysql.journal

import scala.concurrent.ExecutionContext
import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.r2dbc.internal.Sql
import pekko.persistence.r2dbc.internal.Sql.ConfigurableInterpolation
import pekko.persistence.r2dbc.journal.JournalDao
import org.slf4j.Logger
import org.slf4j.LoggerFactory

object MySQLJournalDao {
val log: Logger = LoggerFactory.getLogger(classOf[MySQLJournalDao])

def settingRequirements(journalSettings: R2dbcSettings): Unit = {
// Application timestamps are used because MySQL does not have transaction_timestamp like Postgres. In future releases
// they could be tried to be emulated, but the benefits are questionable - no matter where the timestamps are generated,
// risk of clock skews remains.
require(journalSettings.useAppTimestamp,
"use-app-timestamp config must be on for MySQL support")
// Supporting the non-monotonic increasing timestamps by incrementing the timestamp within the insert queries based on
// latest row in the database seems to cause deadlocks when running tests like PersistTimestampSpec. Possibly this could
// be fixed.
require(journalSettings.dbTimestampMonotonicIncreasing,
"db-timestamp-monotonic-increasing config must be on for MySQL support")
// Also, missing RETURNING implementation makes grabbing the timestamp generated by the database less efficient - this
// applies for both of the requirements above.
}
}

class MySQLJournalDao(
journalSettings: R2dbcSettings,
connectionFactory: ConnectionFactory
)(implicit ec: ExecutionContext, system: ActorSystem[_])
extends JournalDao(journalSettings, connectionFactory) {
MySQLJournalDao.settingRequirements(journalSettings)

override implicit lazy val sqlReplacements: Sql.Replacements = Sql.Replacements.None
override lazy val timestampSql: String = "NOW(6)"

override val insertEventWithParameterTimestampSql: String =
sql"INSERT INTO $journalTable " +
"(slice, entity_type, persistence_id, seq_nr, writer, adapter_manifest, event_ser_id, event_ser_manifest, " +
"event_payload, tags, meta_ser_id, meta_ser_manifest, meta_payload, db_timestamp) " +
s"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.persistence.r2dbc.mysql.query

import java.time.Instant

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.r2dbc.internal.Sql
import pekko.persistence.r2dbc.internal.Sql.ConfigurableInterpolation
import pekko.persistence.r2dbc.query.scaladsl.QueryDao

class MySQLQueryDao(
journalSettings: R2dbcSettings,
connectionFactory: ConnectionFactory
)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends QueryDao(journalSettings, connectionFactory) {

override implicit lazy val sqlReplacements: Sql.Replacements = Sql.Replacements.None
override lazy val statementTimestampSql: String = "NOW(6)"

override def eventsBySlicesRangeSql(
toDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
maxSlice: Int): String = {

def toDbTimestampParamCondition =
if (toDbTimestampParam) "AND db_timestamp <= ?" else ""

def behindCurrentTimeIntervalCondition =
if (behindCurrentTime > Duration.Zero)
s"AND db_timestamp < DATE_SUB($statementTimestampSql, INTERVAL '${behindCurrentTime.toMicros}' MICROSECOND)"
else ""

val selectColumns = {
if (backtracking)
s"SELECT slice, persistence_id, seq_nr, db_timestamp, $statementTimestampSql AS read_db_timestamp "
else
s"SELECT slice, persistence_id, seq_nr, db_timestamp, $statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, event_payload, meta_ser_id, meta_ser_manifest, meta_payload "
}

sql"""
$selectColumns
FROM $journalTable
WHERE entity_type = ?
AND slice BETWEEN $minSlice AND $maxSlice
AND db_timestamp >= ? $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND deleted = false
ORDER BY db_timestamp, seq_nr
LIMIT ?"""
}

override def selectBucketsSql(minSlice: Int, maxSlice: Int): String = {
sql"""
SELECT CAST(UNIX_TIMESTAMP(db_timestamp) AS SIGNED) / 10 AS bucket, count(*) AS count
FROM $journalTable
WHERE entity_type = ?
AND slice BETWEEN $minSlice AND $maxSlice
AND db_timestamp >= ? AND db_timestamp <= ?
AND deleted = false
GROUP BY bucket ORDER BY bucket LIMIT ?
"""
}

override def currentDbTimestamp(): Future[Instant] = Future.successful(Instant.now())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

package org.apache.pekko.persistence.r2dbc.mysql.snapshot

import scala.concurrent.ExecutionContext
import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.r2dbc.internal.Sql
import pekko.persistence.r2dbc.snapshot.SnapshotDao

class MySQLSnapshotDao(
settings: R2dbcSettings, connectionFactory: ConnectionFactory
)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends SnapshotDao(settings, connectionFactory) {

override implicit lazy val sqlReplacements: Sql.Replacements = Sql.Replacements.None

override val upsertSql = s"""
INSERT INTO $snapshotTable
(slice, entity_type, persistence_id, seq_nr, write_timestamp, snapshot, ser_id, ser_manifest, meta_payload, meta_ser_id, meta_ser_manifest)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) AS excluded
ON DUPLICATE KEY UPDATE
seq_nr = excluded.seq_nr,
write_timestamp = excluded.write_timestamp,
snapshot = excluded.snapshot,
ser_id = excluded.ser_id,
ser_manifest = excluded.ser_manifest,
meta_payload = excluded.meta_payload,
meta_ser_id = excluded.meta_ser_id,
meta_ser_manifest = excluded.meta_ser_manifest"""
}
Loading