Skip to content

Commit

Permalink
Projection implementation for MySQL support
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrdom committed Nov 18, 2024
1 parent 719e9eb commit e65f0c7
Show file tree
Hide file tree
Showing 17 changed files with 228 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ jobs:
docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root --password=root --database=mysql < ddl-scripts/create_tables_mysql.sql
- name: test
run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ matrix.SCALA_VERSION }} core/${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}
run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ matrix.SCALA_VERSION }} ${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}

test-docs:
name: Docs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,7 @@ final class R2dbcSettings(config: Config) {

val durableStateAssertSingleWriter: Boolean = config.getBoolean("state.assert-single-writer")

val dialect: Dialect = toRootLowerCase(config.getString("dialect")) match {
case "yugabyte" => Dialect.Yugabyte
case "postgres" => Dialect.Postgres
case "mysql" => Dialect.MySQL
case other =>
throw new IllegalArgumentException(s"Unknown dialect [$other]. Supported dialects are [yugabyte, postgres].")
}
val dialect: Dialect = Dialect.fromString(config.getString("dialect"))

val querySettings = new QuerySettings(config.getConfig("query"))

Expand Down Expand Up @@ -96,6 +90,18 @@ object Dialect {

/** @since 1.1.0 */
case object MySQL extends Dialect

/** @since 1.1.0 */
def fromString(value: String): Dialect = {
toRootLowerCase(value) match {
case "yugabyte" => Dialect.Yugabyte
case "postgres" => Dialect.Postgres
case "mysql" => Dialect.MySQL
case other =>
throw new IllegalArgumentException(
s"Unknown dialect [$other]. Supported dialects are [yugabyte, postgres, mysql].")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object Sql {
* INTERNAL API
*/
@InternalApi
private[r2dbc] implicit class DialectInterpolation(val sc: StringContext) extends AnyVal {
private[pekko] implicit class DialectInterpolation(val sc: StringContext) extends AnyVal {
def sql(args: Any*)(implicit dialect: Dialect): String =
dialect.replaceParameters(trimLineBreaks(sc.s(args: _*)))
}
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ object Dependencies {
pekkoPersistenceQuery,
r2dbcSpi,
r2dbcPool,
r2dbcMysql % "provided,test",
pekkoProjectionCore,
TestDeps.pekkoProjectionEventSourced,
TestDeps.pekkoProjectionDurableState,
Expand Down
2 changes: 1 addition & 1 deletion projection/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

//#projection-config
pekko.projection.r2dbc {
# postgres or yugabyte
# postgres, yugabyte or mysql
dialect = ${pekko.persistence.r2dbc.dialect}

offset-store {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import java.time.{ Duration => JDuration }
import java.util.Locale

import scala.concurrent.duration._

import com.typesafe.config.Config
import org.apache.pekko
import pekko.util.JavaDurationConverters._
import pekko.actor.typed.ActorSystem
import com.typesafe.config.Config
import pekko.persistence.r2dbc.Dialect
import pekko.util.JavaDurationConverters._

object R2dbcProjectionSettings {

Expand All @@ -44,11 +44,37 @@ object R2dbcProjectionSettings {
keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"),
evictInterval = config.getDuration("offset-store.evict-interval"),
deleteInterval = config.getDuration("offset-store.delete-interval"),
logDbCallsExceeding)
logDbCallsExceeding,
dialect = Dialect.fromString(config.getString("dialect")))
}

def apply(system: ActorSystem[_]): R2dbcProjectionSettings =
apply(system.settings.config.getConfig(DefaultConfigPath))

def apply(
schema: Option[String],
offsetTable: String,
timestampOffsetTable: String,
managementTable: String,
useConnectionFactory: String,
timeWindow: JDuration,
keepNumberOfEntries: Int,
evictInterval: JDuration,
deleteInterval: JDuration,
logDbCallsExceeding: FiniteDuration
): R2dbcProjectionSettings = R2dbcProjectionSettings(
schema,
offsetTable,
timestampOffsetTable,
managementTable,
useConnectionFactory,
timeWindow,
keepNumberOfEntries,
evictInterval,
deleteInterval,
logDbCallsExceeding,
Dialect.Postgres
)
}

// FIXME remove case class, and add `with` methods
Expand All @@ -62,7 +88,8 @@ final case class R2dbcProjectionSettings(
keepNumberOfEntries: Int,
evictInterval: JDuration,
deleteInterval: JDuration,
logDbCallsExceeding: FiniteDuration) {
logDbCallsExceeding: FiniteDuration,
dialect: Dialect) {
val offsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + offsetTable
val timestampOffsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + timestampOffsetTable
val managementTableWithSchema: String = schema.map(_ + ".").getOrElse("") + managementTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorSystem
Expand All @@ -38,8 +37,9 @@ import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.UpdatedDurableState
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.Interpolation
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.typed.PersistenceId
import pekko.projection.BySlicesSourceProvider
import pekko.projection.MergeableOffset
Expand All @@ -49,6 +49,7 @@ import pekko.projection.internal.OffsetSerialization
import pekko.projection.internal.OffsetSerialization.MultipleOffsets
import pekko.projection.internal.OffsetSerialization.SingleOffset
import pekko.projection.r2dbc.R2dbcProjectionSettings
import pekko.projection.r2dbc.internal.mysql.MySQLR2dbcOffsetStore
import io.r2dbc.spi.Connection
import io.r2dbc.spi.Statement
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -154,6 +155,22 @@ object R2dbcOffsetStore {
val FutureDone: Future[Done] = Future.successful(Done)
val FutureTrue: Future[Boolean] = Future.successful(true)
val FutureFalse: Future[Boolean] = Future.successful(false)

def fromConfig(
projectionId: ProjectionId,
sourceProvider: Option[BySlicesSourceProvider],
system: ActorSystem[_],
settings: R2dbcProjectionSettings,
r2dbcExecutor: R2dbcExecutor,
clock: Clock = Clock.systemUTC()
): R2dbcOffsetStore = {
settings.dialect match {
case Dialect.Postgres | Dialect.Yugabyte =>
new R2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor, clock)
case Dialect.MySQL =>
new MySQLR2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor, clock)
}
}
}

/**
Expand All @@ -170,6 +187,9 @@ private[projection] class R2dbcOffsetStore(

import R2dbcOffsetStore._

implicit protected val dialect: Dialect = settings.dialect
protected lazy val timestampSql: String = "transaction_timestamp()"

// FIXME include projectionId in all log messages
private val logger = LoggerFactory.getLogger(this.getClass)

Expand All @@ -181,8 +201,8 @@ private[projection] class R2dbcOffsetStore(
import offsetSerialization.toStorageRepresentation

private val timestampOffsetTable = settings.timestampOffsetTableWithSchema
private val offsetTable = settings.offsetTableWithSchema
private val managementTable = settings.managementTableWithSchema
protected val offsetTable = settings.offsetTableWithSchema
protected val managementTable = settings.managementTableWithSchema

private[projection] implicit val executionContext: ExecutionContext = system.executionContext

Expand All @@ -195,7 +215,7 @@ private[projection] class R2dbcOffsetStore(
private val insertTimestampOffsetSql: String = sql"""
INSERT INTO $timestampOffsetTable
(projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed)
VALUES (?,?,?,?,?,?, transaction_timestamp())"""
VALUES (?,?,?,?,?,?, $timestampSql)"""

// delete less than a timestamp
private val deleteOldTimestampOffsetSql: String =
Expand All @@ -211,7 +231,7 @@ private[projection] class R2dbcOffsetStore(
private val selectOffsetSql: String =
sql"SELECT projection_key, current_offset, manifest, mergeable FROM $offsetTable WHERE projection_name = ?"

private val upsertOffsetSql: String = sql"""
protected val upsertOffsetSql: String = sql"""
INSERT INTO $offsetTable
(projection_name, projection_key, current_offset, manifest, mergeable, last_updated)
VALUES (?,?,?,?,?,?)
Expand Down Expand Up @@ -518,8 +538,10 @@ private[projection] class R2dbcOffsetStore(
} else {
// TODO Try Batch without bind parameters for better performance. Risk of sql injection for these parameters is low.
val boundStatement =
records.foldLeft(statement) { (stmt, rec) =>
stmt.add()
records.zipWithIndex.foldLeft(statement) { case (stmt, (rec, idx)) =>
if (idx != 0) {
stmt.add()
}
bindRecord(stmt, rec)
}
R2dbcExecutor.updateBatchInTx(boundStatement)
Expand Down Expand Up @@ -979,11 +1001,7 @@ private[projection] class R2dbcOffsetStore(
.bind(2, paused)
.bind(3, Instant.now(clock).toEpochMilli)
}
.flatMap {
case i if i == 1 => Future.successful(Done)
case _ =>
Future.failed(new RuntimeException(s"Failed to update management table for $projectionId"))
}
.map(_ => Done)(ExecutionContexts.parasitic)
}

private def createRecordWithOffset[Envelope](envelope: Envelope): Option[RecordWithOffset] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private[projection] object R2dbcProjectionImpl {
connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]) = {
val r2dbcExecutor =
new R2dbcExecutor(connectionFactory, log, settings.logDbCallsExceeding)(system.executionContext, system)
new R2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor)
R2dbcOffsetStore.fromConfig(projectionId, sourceProvider, system, settings, r2dbcExecutor)
}

private val loadEnvelopeCounter = new AtomicLong
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pekko.projection.r2dbc.internal.mysql

import java.time.Clock

import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.projection.BySlicesSourceProvider
import pekko.projection.ProjectionId
import pekko.projection.r2dbc.R2dbcProjectionSettings
import pekko.projection.r2dbc.internal.R2dbcOffsetStore

/**
* INTERNAL API
*/
@InternalApi
private[projection] class MySQLR2dbcOffsetStore(
projectionId: ProjectionId,
sourceProvider: Option[BySlicesSourceProvider],
system: ActorSystem[_],
settings: R2dbcProjectionSettings,
r2dbcExecutor: R2dbcExecutor,
clock: Clock = Clock.systemUTC())
extends R2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor, clock) {

override lazy val timestampSql: String = "NOW(6)"

override val upsertOffsetSql: String = sql"""
INSERT INTO $offsetTable
(projection_name, projection_key, current_offset, manifest, mergeable, last_updated)
VALUES (?,?,?,?,?,?) AS excluded
ON DUPLICATE KEY UPDATE
current_offset = excluded.current_offset,
manifest = excluded.manifest,
mergeable = excluded.mergeable,
last_updated = excluded.last_updated"""

override val updateManagementStateSql: String = sql"""
INSERT INTO $managementTable
(projection_name, projection_key, paused, last_updated)
VALUES (?,?,?,?) AS excluded
ON DUPLICATE KEY UPDATE
paused = excluded.paused,
last_updated = excluded.last_updated"""
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class EventSourcedChaosSpec
(1 to expectedEventCounts).foreach { _ =>
// not using receiveMessages(expectedEvents) for better logging in case of failure
try {
processed :+= processedProbe.receiveMessage(15.seconds)
processed :+= processedProbe.receiveMessage(30.seconds)
} catch {
case e: AssertionError =>
val missing = expectedEvents.diff(processed.map(_.envelope.event))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import java.util.UUID
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._

import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
Expand All @@ -29,8 +28,9 @@ import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.Behaviors
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.r2dbc.internal.Sql.Interpolation
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
Expand Down Expand Up @@ -152,6 +152,7 @@ class EventSourcedEndToEndSpec
// to be able to store events with specific timestamps
private def writeEvent(persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = {
log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr: java.lang.Long, event, timestamp)
implicit val dialect: Dialect = projectionSettings.dialect
val insertEventSql = sql"""
INSERT INTO ${journalSettings.journalTableWithSchema}
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload)
Expand Down Expand Up @@ -269,7 +270,7 @@ class EventSourcedEndToEndSpec
(1 to numberOfEvents).foreach { _ =>
// not using receiveMessages(expectedEvents) for better logging in case of failure
try {
processed :+= processedProbe.receiveMessage(15.seconds)
processed :+= processedProbe.receiveMessage(30.seconds)
} catch {
case e: AssertionError =>
val missing = expectedEvents.diff(processed.map(_.envelope.event))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class EventSourcedPubSubSpec
(1 to numberOfEvents).foreach { _ =>
// not using receiveMessages(expectedEvents) for better logging in case of failure
try {
processed :+= processedProbe.receiveMessage(25.seconds)
processed :+= processedProbe.receiveMessage(30.seconds)
} catch {
case e: AssertionError =>
val missing = expectedEvents.diff(processed.map(_.envelope.event))
Expand Down
Loading

0 comments on commit e65f0c7

Please sign in to comment.