diff --git a/.jvmopts b/.jvmopts index 60c96d09..61edfa41 100644 --- a/.jvmopts +++ b/.jvmopts @@ -2,4 +2,4 @@ -Xms2G -Xmx2G --Xss2M +-Xss2M \ No newline at end of file diff --git a/core/src/main/scala/akka/persistence/r2dbc/ConnectionFactoryProvider.scala b/core/src/main/scala/akka/persistence/r2dbc/ConnectionFactoryProvider.scala index 2465a5ac..839b3d9d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/ConnectionFactoryProvider.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/ConnectionFactoryProvider.scala @@ -17,9 +17,9 @@ import io.r2dbc.spi.ConnectionFactory import java.time.{ Duration => JDuration } import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConverters._ import scala.concurrent.Future import scala.concurrent.duration.Duration +import scala.jdk.CollectionConverters._ import scala.util.Failure import scala.util.Success diff --git a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala index 05d7e746..c52e7f9d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala @@ -4,7 +4,10 @@ package akka.persistence.r2dbc +import scala.concurrent.duration._ import scala.collection.immutable +import scala.jdk.DurationConverters._ + import akka.annotation.InternalApi import akka.annotation.InternalStableApi import akka.persistence.r2dbc.internal.codec.IdentityAdapter @@ -14,12 +17,9 @@ import akka.persistence.r2dbc.internal.codec.QueryAdapter import akka.persistence.r2dbc.internal.codec.TagsCodec import akka.persistence.r2dbc.internal.codec.TimestampCodec import akka.persistence.r2dbc.internal.ConnectionFactorySettings -import akka.util.JavaDurationConverters._ import com.typesafe.config.Config import java.util.Locale -import scala.collection.immutable -import scala.concurrent.duration._ /** * INTERNAL API @@ -61,7 +61,7 @@ object R2dbcSettings { configToMap(config.getConfig("state.custom-table")) val durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]] = { - import akka.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ val cfg = config.getConfig("state.additional-columns") cfg.root.unwrapped.asScala.toMap.map { case (k, v: java.util.List[_]) => k -> v.iterator.asScala.map(_.toString).toVector @@ -126,7 +126,7 @@ object R2dbcSettings { val logDbCallsExceeding: FiniteDuration = config.getString("log-db-calls-exceeding").toLowerCase(Locale.ROOT) match { case "off" => -1.millis - case _ => config.getDuration("log-db-calls-exceeding").asScala + case _ => config.getDuration("log-db-calls-exceeding").toScala } val codecSettings = { @@ -190,7 +190,7 @@ object R2dbcSettings { } private def configToMap(cfg: Config): Map[String, String] = { - import akka.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ cfg.root.unwrapped.asScala.toMap.map { case (k, v) => k -> v.toString } } @@ -484,11 +484,11 @@ final class R2dbcSettings private ( */ @InternalStableApi final class QuerySettings(config: Config) { - val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").asScala - val behindCurrentTime: FiniteDuration = config.getDuration("behind-current-time").asScala + val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").toScala + val behindCurrentTime: FiniteDuration = config.getDuration("behind-current-time").toScala val backtrackingEnabled: Boolean = config.getBoolean("backtracking.enabled") - val backtrackingWindow: FiniteDuration = config.getDuration("backtracking.window").asScala - val backtrackingBehindCurrentTime: FiniteDuration = config.getDuration("backtracking.behind-current-time").asScala + val backtrackingWindow: FiniteDuration = config.getDuration("backtracking.window").toScala + val backtrackingBehindCurrentTime: FiniteDuration = config.getDuration("backtracking.behind-current-time").toScala val bufferSize: Int = config.getInt("buffer-size") val persistenceIdsBufferSize: Int = config.getInt("persistence-ids.buffer-size") val deduplicateCapacity: Int = config.getInt("deduplicate-capacity") @@ -502,10 +502,10 @@ final class QuerySettings(config: Config) { final class ConnectionPoolSettings(config: Config) { val initialSize: Int = config.getInt("initial-size") val maxSize: Int = config.getInt("max-size") - val maxIdleTime: FiniteDuration = config.getDuration("max-idle-time").asScala - val maxLifeTime: FiniteDuration = config.getDuration("max-life-time").asScala + val maxIdleTime: FiniteDuration = config.getDuration("max-idle-time").toScala + val maxLifeTime: FiniteDuration = config.getDuration("max-life-time").toScala - val acquireTimeout: FiniteDuration = config.getDuration("acquire-timeout").asScala + val acquireTimeout: FiniteDuration = config.getDuration("acquire-timeout").toScala val acquireRetry: Int = config.getInt("acquire-retry") val validationQuery: String = config.getString("validation-query") @@ -513,7 +513,7 @@ final class ConnectionPoolSettings(config: Config) { val closeCallsExceeding: Option[FiniteDuration] = config.getString("close-calls-exceeding").toLowerCase(Locale.ROOT) match { case "off" => None - case _ => Some(config.getDuration("close-calls-exceeding").asScala) + case _ => Some(config.getDuration("close-calls-exceeding").toScala) } } @@ -523,7 +523,7 @@ final class ConnectionPoolSettings(config: Config) { @InternalStableApi final class PublishEventsDynamicSettings(config: Config) { val throughputThreshold: Int = config.getInt("throughput-threshold") - val throughputCollectInterval: FiniteDuration = config.getDuration("throughput-collect-interval").asScala + val throughputCollectInterval: FiniteDuration = config.getDuration("throughput-collect-interval").toScala } /** diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala index 73357eaf..6357e93e 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala @@ -7,8 +7,8 @@ package akka.persistence.r2dbc.cleanup.javadsl import java.util.concurrent.CompletionStage import java.util.{ List => JList } -import scala.collection.JavaConverters._ -import scala.compat.java8.FutureConverters._ +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ import akka.Done import akka.actor.ClassicActorSystemProvider @@ -43,14 +43,14 @@ final class DurableStateCleanup private (delegate: scaladsl.DurableStateCleanup) * Delete the state related to one single `persistenceId`. */ def deleteState(persistenceId: String, resetRevisionNumber: Boolean): CompletionStage[Done] = { - delegate.deleteState(persistenceId, resetRevisionNumber).toJava + delegate.deleteState(persistenceId, resetRevisionNumber).asJava } /** * Delete all states related to the given list of `persistenceIds`. */ def deleteStates(persistenceIds: JList[String], resetRevisionNumber: Boolean): CompletionStage[Done] = { - delegate.deleteStates(persistenceIds.asScala.toVector, resetRevisionNumber).toJava + delegate.deleteStates(persistenceIds.asScala.toVector, resetRevisionNumber).asJava } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala index 3e94e8fd..811a96e5 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala @@ -8,8 +8,8 @@ import java.time.Instant import java.util.concurrent.CompletionStage import java.util.{ List => JList } -import scala.collection.JavaConverters._ -import scala.compat.java8.FutureConverters._ +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ import akka.Done import akka.actor.ClassicActorSystemProvider @@ -53,19 +53,19 @@ final class EventSourcedCleanup private (delegate: scaladsl.EventSourcedCleanup) * sequence nr (inclusive) to delete up to */ def deleteEventsTo(persistenceId: String, toSequenceNr: Long): CompletionStage[Done] = - delegate.deleteEventsTo(persistenceId, toSequenceNr).toJava + delegate.deleteEventsTo(persistenceId, toSequenceNr).asJava /** * Delete all events related to one single `persistenceId`. Snapshots are not deleted. */ def deleteAllEvents(persistenceId: String, resetSequenceNumber: Boolean): CompletionStage[Done] = - delegate.deleteAllEvents(persistenceId, resetSequenceNumber).toJava + delegate.deleteAllEvents(persistenceId, resetSequenceNumber).asJava /** * Delete all events related to the given list of `persistenceIds`. Snapshots are not deleted. */ def deleteAllEvents(persistenceIds: JList[String], resetSequenceNumber: Boolean): CompletionStage[Done] = - delegate.deleteAllEvents(persistenceIds.asScala.toVector, resetSequenceNumber).toJava + delegate.deleteAllEvents(persistenceIds.asScala.toVector, resetSequenceNumber).asJava /** * Delete events before a timestamp for the given persistence id. Snapshots are not deleted. @@ -83,7 +83,7 @@ final class EventSourcedCleanup private (delegate: scaladsl.EventSourcedCleanup) * timestamp (exclusive) to delete up to */ def deleteEventsBefore(persistenceId: String, timestamp: Instant): CompletionStage[Done] = - delegate.deleteEventsBefore(persistenceId, timestamp).toJava + delegate.deleteEventsBefore(persistenceId, timestamp).asJava /** * Delete events before a timestamp for the given entityType and slice. Snapshots are not deleted. @@ -103,43 +103,43 @@ final class EventSourcedCleanup private (delegate: scaladsl.EventSourcedCleanup) * timestamp (exclusive) to delete up to */ def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): CompletionStage[Done] = - delegate.deleteEventsBefore(entityType, slice, timestamp).toJava + delegate.deleteEventsBefore(entityType, slice, timestamp).asJava /** * Delete snapshots related to one single `persistenceId`. Events are not deleted. */ def deleteSnapshot(persistenceId: String): CompletionStage[Done] = - delegate.deleteSnapshot(persistenceId).toJava + delegate.deleteSnapshot(persistenceId).asJava /** * Delete all snapshots related to the given list of `persistenceIds`. Events are not deleted. */ def deleteSnapshots(persistenceIds: JList[String]): CompletionStage[Done] = - delegate.deleteSnapshots(persistenceIds.asScala.toVector).toJava + delegate.deleteSnapshots(persistenceIds.asScala.toVector).asJava /** * Deletes all events for the given persistence id from before the snapshot. The snapshot is not deleted. The event * with the same sequence number as the remaining snapshot is deleted. */ def cleanupBeforeSnapshot(persistenceId: String): CompletionStage[Done] = - delegate.cleanupBeforeSnapshot(persistenceId).toJava + delegate.cleanupBeforeSnapshot(persistenceId).asJava /** * See single persistenceId overload for what is done for each persistence id */ def cleanupBeforeSnapshot(persistenceIds: JList[String]): CompletionStage[Done] = - delegate.cleanupBeforeSnapshot(persistenceIds.asScala.toVector).toJava + delegate.cleanupBeforeSnapshot(persistenceIds.asScala.toVector).asJava /** * Delete everything related to one single `persistenceId`. All events and snapshots are deleted. */ def deleteAll(persistenceId: String, resetSequenceNumber: Boolean): CompletionStage[Done] = - delegate.deleteAll(persistenceId, resetSequenceNumber).toJava + delegate.deleteAll(persistenceId, resetSequenceNumber).asJava /** * Delete everything related to the given list of `persistenceIds`. All events and snapshots are deleted. */ def deleteAll(persistenceIds: JList[String], resetSequenceNumber: Boolean): CompletionStage[Done] = - delegate.deleteAll(persistenceIds.asScala.toVector, resetSequenceNumber).toJava + delegate.deleteAll(persistenceIds.asScala.toVector, resetSequenceNumber).asJava } diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala index 5d7b6cf8..cc02554c 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala @@ -4,20 +4,17 @@ package akka.persistence.r2dbc.cleanup.scaladsl -import scala.collection.immutable +import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.collection.immutable import scala.util.Failure import scala.util.Success - import org.slf4j.LoggerFactory - import akka.Done import akka.actor.ClassicActorSystemProvider import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.ApiMayChange import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.R2dbcExecutorProvider @@ -73,7 +70,7 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf if (resetRevisionNumber) stateDao .deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) else { stateDao.readState(persistenceId).flatMap { case None => @@ -81,7 +78,7 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf case Some(s) => stateDao .deleteState(persistenceId, s.revision + 1, changeEvent = None) - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } } } @@ -106,7 +103,7 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf case pid :: tail => pidOperation(pid).flatMap { _ => if (n % settings.cleanupSettings.logProgressEvery == 0) - log.infoN("Cleanup {} [{}] of [{}].", operationName, n, size) + log.info("Cleanup {} [{}] of [{}].", operationName, n, size) loop(tail, n + 1) } } @@ -116,7 +113,7 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf result.onComplete { case Success(_) => - log.info2("Cleanup completed {} of [{}] persistenceId.", operationName, size) + log.info("Cleanup completed {} of [{}] persistenceId.", operationName, size) case Failure(e) => log.error(s"Cleanup {$operationName} failed.", e) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala index 18660af5..02fb4be6 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala @@ -16,7 +16,6 @@ import org.slf4j.LoggerFactory import akka.Done import akka.actor.ClassicActorSystemProvider import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.ApiMayChange import akka.annotation.InternalApi import akka.persistence.SnapshotSelectionCriteria @@ -209,7 +208,7 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf case pid :: tail => pidOperation(pid).flatMap { _ => if (n % settings.cleanupSettings.logProgressEvery == 0) - log.infoN("Cleanup {} [{}] of [{}].", operationName, n, size) + log.info("Cleanup {} [{}] of [{}].", operationName, n, size) loop(tail, n + 1) } } @@ -219,7 +218,7 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf result.onComplete { case Success(_) => - log.info2("Cleanup completed {} of [{}] persistenceId.", operationName, size) + log.info("Cleanup completed {} of [{}] persistenceId.", operationName, size) case Failure(e) => log.error(s"Cleanup {$operationName} failed.", e) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index 6f9164d1..ff14e3bf 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -13,7 +13,6 @@ import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import akka.NotUsed -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi import akka.persistence.query.Offset import akka.persistence.query.TimestampOffset @@ -250,7 +249,7 @@ import org.slf4j.Logger } if (state.queryCount != 0 && log.isDebugEnabled()) - log.debugN( + log.debug( "{} next query [{}] from slices [{} - {}], between time [{} - {}]. Found [{}] rows in previous query.", logPrefix, state.queryCount, @@ -277,7 +276,7 @@ import org.slf4j.Logger .via(deserializeAndAddOffset(state.latest))) } else { if (log.isDebugEnabled) - log.debugN( + log.debug( "{} query [{}] from slices [{} - {}] completed. Found [{}] rows in previous query.", logPrefix, state.queryCount, @@ -297,7 +296,7 @@ import org.slf4j.Logger .futureSource[Envelope, NotUsed] { currentTimestamp.map { currentTime => if (log.isDebugEnabled()) - log.debugN( + log.debug( "{} query slices [{} - {}], from time [{}] until now [{}].", logPrefix, minSlice, @@ -326,7 +325,7 @@ import org.slf4j.Logger val initialOffset = toTimestampOffset(offset) if (log.isDebugEnabled()) - log.debugN( + log.debug( "Starting {} query from slices [{} - {}], from time [{}].", logPrefix, minSlice, @@ -359,7 +358,7 @@ import org.slf4j.Logger if (log.isDebugEnabled()) { if (state.latestBacktracking.seen.nonEmpty && offset.timestamp.isAfter(state.latestBacktracking.timestamp.plus(firstBacktrackingQueryWindow))) - log.debugN( + log.debug( "{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]", logPrefix, state.latestBacktracking, @@ -382,7 +381,7 @@ import org.slf4j.Logger if (log.isDebugEnabled) delay.foreach { d => - log.debugN( + log.debug( "{} query [{}] from slices [{} - {}] delay next [{}] ms.", logPrefix, state.queryCount, @@ -466,7 +465,7 @@ import org.slf4j.Logger " in backtracking mode," else "" - log.debugN( + log.debug( "{} next query [{}]{} from slices [{} - {}], between time [{} - {}]. {}", logPrefix, newState.queryCount, @@ -540,7 +539,7 @@ import org.slf4j.Logger val newState = state.copy(buckets = newBuckets) if (log.isDebugEnabled) { val sum = counts.iterator.map { case Bucket(_, count) => count }.sum - log.debugN( + log.debug( "{} retrieved [{}] event count buckets, with a total of [{}], from slices [{} - {}], from time [{}]", logPrefix, counts.size, @@ -571,7 +570,7 @@ import org.slf4j.Logger throw new IllegalStateException( s"Too many events stored with the same timestamp [$currentTimestamp], buffer size [${settings.querySettings.bufferSize}]") } - log.traceN( + log.trace( "filtering [{}] [{}] as db timestamp is the same as last offset and is in seen [{}]", row.persistenceId, row.seqNr, diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/ChangeHandlerFactory.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/ChangeHandlerFactory.scala index d192e52f..9c1467a9 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/ChangeHandlerFactory.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/ChangeHandlerFactory.scala @@ -4,8 +4,8 @@ package akka.persistence.r2dbc.internal -import scala.compat.java8.FutureConverters._ import scala.concurrent.Future +import scala.jdk.FutureConverters._ import scala.util.Try import akka.Done @@ -29,7 +29,7 @@ import akka.persistence.r2dbc.state.scaladsl.ChangeHandler override def process(session: R2dbcSession, change: DurableStateChange[Any]): Future[Done] = { val javadslSession = new akka.persistence.r2dbc.session.javadsl.R2dbcSession(session.connection)(session.ec, session.system) - delegate.process(javadslSession, change).toScala + delegate.process(javadslSession, change).asScala } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/ContinuousQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/ContinuousQuery.scala index 5eb297c3..5d93e034 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/ContinuousQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/ContinuousQuery.scala @@ -4,15 +4,14 @@ package akka.persistence.r2dbc.internal +import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try - import akka.NotUsed import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.stream.Attributes import akka.stream.Outlet import akka.stream.SourceShape @@ -117,7 +116,7 @@ final private[r2dbc] class ContinuousQuery[S, T]( beforeQuery(state) match { case None => runNextQuery() case Some(beforeQueryFuture) => - beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContexts.parasitic) + beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContext.parasitic) } } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala index cd4391b4..5bad2781 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala @@ -9,11 +9,11 @@ import java.util.function.BiConsumer import scala.collection.immutable import scala.collection.immutable.IntMap import scala.collection.mutable -import scala.compat.java8.FutureConverters._ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration._ +import scala.jdk.FutureConverters._ import scala.util.Failure import scala.util.Success import scala.util.control.Exception.Catcher @@ -21,9 +21,7 @@ import scala.util.control.NonFatal import akka.Done import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalStableApi -import akka.dispatch.ExecutionContexts import io.r2dbc.spi.Connection import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Result @@ -44,17 +42,17 @@ import akka.persistence.r2dbc.R2dbcSettings @InternalStableApi object R2dbcExecutor { final implicit class PublisherOps[T](val publisher: Publisher[T]) extends AnyVal { def asFuture(): Future[T] = - Mono.from(publisher).toFuture.toScala + Mono.from(publisher).toFuture.asScala def asFutureDone(): Future[Done] = { val mono: Mono[Done] = Mono.from(publisher).map(_ => Done) - mono.defaultIfEmpty(Done).toFuture.toScala + mono.defaultIfEmpty(Done).toFuture.asScala } } def updateOneInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Long] = stmt.execute().asFuture().flatMap { result => - result.getRowsUpdated.asFuture().map(_.longValue())(ExecutionContexts.parasitic) + result.getRowsUpdated.asFuture().map(_.longValue())(ExecutionContext.parasitic) } def updateOneReturningInTx[A](stmt: Statement, mapRow: Row => A)(implicit ec: ExecutionContext): Future[A] = @@ -79,7 +77,7 @@ import akka.persistence.r2dbc.R2dbcSettings statements.foldLeft(Future.successful(Vector.empty[Long])) { (acc, stmt) => acc.flatMap { seq => stmt.execute().asFuture().flatMap { res => - res.getRowsUpdated.asFuture().map(seq :+ _.longValue())(ExecutionContexts.parasitic) + res.getRowsUpdated.asFuture().map(seq :+ _.longValue())(ExecutionContext.parasitic) } } } @@ -142,7 +140,7 @@ class R2dbcExecutor( if (durationMicros >= logDbCallsExceedingMicros) log.info("{} - getConnection took [{}] µs", logPrefix, durationMicros) connection - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) } /** @@ -220,7 +218,7 @@ class R2dbcExecutor( def updateInBatchReturning[A](logPrefix: String)( statementFactory: Connection => Statement, mapRow: Row => A): Future[immutable.IndexedSeq[A]] = { - import scala.collection.JavaConverters._ + import scala.jdk.CollectionConverters._ withConnection(logPrefix) { connection => val stmt = statementFactory(connection) Flux @@ -251,7 +249,7 @@ class R2dbcExecutor( } mappedRows.failed.foreach { exc => - log.debug2("{} - Select failed: {}", logPrefix, exc) + log.debug("{} - Select failed: {}", logPrefix, exc) connection.close().asFutureDone() } @@ -259,7 +257,7 @@ class R2dbcExecutor( connection.close().asFutureDone().map { _ => val durationMicros = durationInMicros(startTime) if (durationMicros >= logDbCallsExceedingMicros) - log.infoN("{} - Selected [{}] rows in [{}] µs", logPrefix, r.size, durationMicros) + log.info("{} - Selected [{}] rows in [{}] µs", logPrefix, r.size, durationMicros) r } } @@ -290,7 +288,7 @@ class R2dbcExecutor( result.failed.foreach { exc => if (log.isDebugEnabled()) - log.debug2("{} - DB call failed: {}", logPrefix, exc.toString) + log.debug("{} - DB call failed: {}", logPrefix, exc.toString) // ok to rollback async like this, or should it be before completing the returned Future? val done = rollbackAndClose(connection) timeoutTask.foreach { task => done.onComplete(_ => task.cancel()) } @@ -332,7 +330,7 @@ class R2dbcExecutor( } result.failed.foreach { exc => - log.debug2("{} - DB call failed: {}", logPrefix, exc) + log.debug("{} - DB call failed: {}", logPrefix, exc) // auto-commit so nothing to rollback val done = connection.close().asFutureDone() timeoutTask.foreach { task => done.onComplete(_ => task.cancel()) } @@ -342,7 +340,7 @@ class R2dbcExecutor( val done = connection.close().asFutureDone().map { _ => val durationMicros = durationInMicros(startTime) if (durationMicros >= logDbCallsExceedingMicros) - log.infoN("{} - DB call completed [{}] in [{}] µs", logPrefix, r, durationMicros) + log.info("{} - DB call completed [{}] in [{}] µs", logPrefix, r, durationMicros) r } timeoutTask.foreach { task => done.onComplete(_ => task.cancel()) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala index c93dffa9..f89e5e05 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala @@ -14,7 +14,6 @@ import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.QueryDao import akka.persistence.r2dbc.internal.SnapshotDao import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.util.ccompat.JavaConverters._ import com.typesafe.config.Config import io.r2dbc.h2.H2ConnectionConfiguration import io.r2dbc.h2.H2ConnectionFactory @@ -23,6 +22,7 @@ import io.r2dbc.spi.ConnectionFactory import java.util.Locale import scala.concurrent.ExecutionContext +import scala.jdk.CollectionConverters._ import akka.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsProvider import akka.persistence.r2dbc.internal.R2dbcExecutorProvider diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala index 6a548399..db12b37b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala @@ -5,16 +5,13 @@ package akka.persistence.r2dbc.internal.h2 import java.time.Instant - +import scala.concurrent.ExecutionContext import scala.concurrent.Future - import io.r2dbc.spi.Connection import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory - import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.R2dbcExecutorProvider @@ -81,7 +78,7 @@ private[r2dbc] class H2JournalDao(executorProvider: R2dbcExecutorProvider) result.foreach { _ => log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId) } - result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic) + result.map(_ => events.head.dbTimestamp)(ExecutionContext.parasitic) } override def writeEventInTx(event: SerializedJournalRow, connection: Connection): Future[Instant] = { @@ -95,7 +92,7 @@ private[r2dbc] class H2JournalDao(executorProvider: R2dbcExecutorProvider) result.foreach { _ => log.debug("Wrote [{}] event for persistenceId [{}]", 1, persistenceId) } - result.map(_ => event.dbTimestamp)(ExecutionContexts.parasitic) + result.map(_ => event.dbTimestamp)(ExecutionContext.parasitic) } private def bindInsertStatement(stmt: Statement, write: SerializedJournalRow): Statement = { diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala index 7c1c7aab..16087f26 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala @@ -9,6 +9,7 @@ import java.util.Locale import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration +import scala.jdk.DurationConverters._ import akka.actor.typed.ActorSystem import akka.annotation.InternalApi @@ -18,7 +19,6 @@ import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.QueryDao import akka.persistence.r2dbc.internal.SnapshotDao -import akka.util.JavaDurationConverters.JavaDurationOps import com.typesafe.config.Config import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider import io.r2dbc.postgresql.client.SSLMode @@ -57,13 +57,13 @@ private[r2dbc] object PostgresDialect extends Dialect { val sslKey: String = config.getString("ssl.key") val sslPassword: String = config.getString("ssl.password") - val connectTimeout: FiniteDuration = config.getDuration("connect-timeout").asScala + val connectTimeout: FiniteDuration = config.getDuration("connect-timeout").toScala val statementCacheSize: Int = config.getInt("statement-cache-size") val statementTimeout: Option[FiniteDuration] = config.getString("statement-timeout").toLowerCase(Locale.ROOT) match { case "off" => None - case _ => Some(config.getDuration("statement-timeout").asScala) + case _ => Some(config.getDuration("statement-timeout").toScala) } } @@ -95,8 +95,8 @@ private[r2dbc] object PostgresDialect extends Dialect { Integer.valueOf(settings.statementCacheSize)) settings.statementTimeout.foreach { timeout => - import akka.util.JavaDurationConverters._ - builder.option(PostgresqlConnectionFactoryProvider.STATEMENT_TIMEOUT, timeout.asJava) + import scala.jdk.DurationConverters._ + builder.option(PostgresqlConnectionFactoryProvider.STATEMENT_TIMEOUT, timeout.toJava) } if (settings.sslEnabled) { diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 74faf857..1fcd12f5 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -25,9 +25,7 @@ import org.slf4j.LoggerFactory import akka.Done import akka.NotUsed import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.persistence.Persistence import akka.persistence.query.DeletedDurableState import akka.persistence.query.DurableStateChange @@ -475,7 +473,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = { if (revision == 0) { hardDeleteState(persistenceId) - .map(_ => None)(ExecutionContexts.parasitic) + .map(_ => None)(ExecutionContext.parasitic) } else { val slice = persistenceExt.sliceForPersistenceId(persistenceId) val executor = executorProvider.executorFor(slice) @@ -612,7 +610,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv if (log.isDebugEnabled()) result.foreach(_ => log.debug("Hard deleted durable state for persistenceId [{}]", persistenceId)) - result.map(_ => Done)(ExecutionContexts.parasitic) + result.map(_ => Done)(ExecutionContext.parasitic) } override def currentDbTimestamp(slice: Int): Future[Instant] = { @@ -708,8 +706,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv )) if (log.isDebugEnabled) - result.foreach(rows => - log.debugN("Read [{}] durable states from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => log.debug("Read [{}] durable states from slices [{} - {}]", rows.size, minSlice, maxSlice)) Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) } @@ -890,7 +887,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv }) if (log.isDebugEnabled) - result.foreach(rows => log.debugN("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) result diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index 56ab776b..7f6990a7 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -16,9 +16,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.persistence.Persistence import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.JournalDao @@ -202,7 +200,7 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) result.foreach { _ => log.debug("Wrote [{}] events for persistenceId [{}]", totalEvents, persistenceId) } - result.map(_.head)(ExecutionContexts.parasitic) + result.map(_.head)(ExecutionContext.parasitic) } } @@ -295,7 +293,7 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) val seqNr = row.get(0, classOf[java.lang.Long]) if (seqNr eq null) 0L else seqNr.longValue }) - .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContexts.parasitic) + .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContext.parasitic) if (log.isDebugEnabled) result.foreach(seqNr => log.debug("Highest sequence nr for persistenceId [{}]: [{}]", persistenceId, seqNr)) @@ -316,7 +314,7 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) val seqNr = row.get(0, classOf[java.lang.Long]) if (seqNr eq null) 0L else seqNr.longValue }) - .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContexts.parasitic) + .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContext.parasitic) if (log.isDebugEnabled) result.foreach(seqNr => log.debug("Lowest sequence nr for persistenceId [{}]: [{}]", persistenceId, seqNr)) @@ -378,13 +376,13 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) } }).map(deletedRows => if (log.isDebugEnabled) { - log.debugN( + log.debug( "Deleted [{}] events for persistenceId [{}], from seq num [{}] to [{}]", deletedRows, persistenceId, from, to) - })(ExecutionContexts.parasitic) + })(ExecutionContext.parasitic) } val batchSize = settings.cleanupSettings.eventsJournalDeleteBatchSize @@ -416,8 +414,8 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) .bindTimestamp(1, timestamp) } .map(deletedRows => - log.debugN("Deleted [{}] events for persistenceId [{}], before [{}]", deletedRows, persistenceId, timestamp))( - ExecutionContexts.parasitic) + log.debug("Deleted [{}] events for persistenceId [{}], before [{}]", deletedRows, persistenceId, timestamp))( + ExecutionContext.parasitic) } override def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): Future[Unit] = { @@ -431,12 +429,12 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) .bindTimestamp(2, timestamp) } .map(deletedRows => - log.debugN( + log.debug( "Deleted [{}] events for entityType [{}], slice [{}], before [{}]", deletedRows, entityType, slice, - timestamp))(ExecutionContexts.parasitic) + timestamp))(ExecutionContext.parasitic) } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index 84b97c03..c6f73e49 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -18,7 +18,6 @@ import org.slf4j.LoggerFactory import akka.NotUsed import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi import akka.persistence.Persistence import akka.persistence.r2dbc.R2dbcSettings @@ -299,7 +298,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e metadata = readMetadata(row))) if (log.isDebugEnabled) - result.foreach(rows => log.debugN("Read [{}] events from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => log.debug("Read [{}] events from slices [{} - {}]", rows.size, minSlice, maxSlice)) Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) } @@ -348,7 +347,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e }) if (log.isDebugEnabled) - result.foreach(rows => log.debugN("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) result } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index f43386d2..1c7703ef 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -17,9 +17,7 @@ import org.slf4j.LoggerFactory import akka.NotUsed import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.persistence.Persistence import akka.persistence.SnapshotSelectionCriteria import akka.persistence.r2dbc.R2dbcSettings @@ -273,7 +271,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider statement }, collectSerializedSnapshot(entityType, _)) - .map(_.headOption)(ExecutionContexts.parasitic) + .map(_.headOption)(ExecutionContext.parasitic) } protected def bindUpsertSql(statement: Statement, serializedRow: SerializedSnapshotRow): Statement = { @@ -322,7 +320,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider bindUpsertSql(statement, serializedRow) } - .map(_ => ())(ExecutionContexts.parasitic) + .map(_ => ())(ExecutionContext.parasitic) } def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = { @@ -352,7 +350,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider } statement } - }.map(_ => ())(ExecutionContexts.parasitic) + }.map(_ => ())(ExecutionContext.parasitic) /** * This is used from `BySliceQuery`, i.e. only if settings.querySettings.startFromSnapshotEnabled @@ -406,7 +404,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider collectSerializedSnapshot(entityType, _)) if (log.isDebugEnabled) - result.foreach(rows => log.debugN("Read [{}] snapshots from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => log.debug("Read [{}] snapshots from slices [{} - {}]", rows.size, minSlice, maxSlice)) Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) } @@ -465,7 +463,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider }) if (log.isDebugEnabled) - result.foreach(rows => log.debugN("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) result diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala index ee6d5125..17723302 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala @@ -8,6 +8,7 @@ import java.time.{ Duration => JDuration } import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration +import scala.jdk.DurationConverters._ import akka.actor.typed.ActorSystem import akka.annotation.InternalApi @@ -17,7 +18,6 @@ import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.QueryDao import akka.persistence.r2dbc.internal.SnapshotDao -import akka.util.JavaDurationConverters.JavaDurationOps import com.typesafe.config.Config import io.r2dbc.mssql.MssqlConnectionFactoryProvider import io.r2dbc.spi.ConnectionFactories @@ -44,7 +44,7 @@ private[r2dbc] object SqlServerDialect extends Dialect { val user: String = config.getString("user") val password: String = config.getString("password") val database: String = config.getString("database") - val connectTimeout: FiniteDuration = config.getDuration("connect-timeout").asScala + val connectTimeout: FiniteDuration = config.getDuration("connect-timeout").toScala } diff --git a/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala index 68ef5d79..dbd07a27 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala @@ -18,7 +18,6 @@ import akka.actor.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.adapter._ import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.event.Logging import akka.persistence.AtomicWrite import akka.persistence.Persistence @@ -196,7 +195,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends writeAndPublishResult.onComplete { _ => self ! WriteFinished(persistenceId, writeAndPublishResult) } - writeAndPublishResult.map(_ => Nil)(ExecutionContexts.parasitic) + writeAndPublishResult.map(_ => Nil)(ExecutionContext.parasitic) } private def publish(messages: immutable.Seq[AtomicWrite], dbTimestamp: Future[Instant]): Future[Done] = @@ -211,7 +210,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends } case None => - dbTimestamp.map(_ => Done)(ExecutionContexts.parasitic) + dbTimestamp.map(_ => Done)(ExecutionContext.parasitic) } override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = { @@ -226,7 +225,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends case Some(f) => log.debug("Write in progress for [{}], deferring replayMessages until write completed", persistenceId) // we only want to make write - replay sequential, not fail if previous write failed - f.recover { case _ => Done }(ExecutionContexts.parasitic) + f.recover { case _ => Done }(ExecutionContext.parasitic) case None => FutureDone } pendingWrite.flatMap { _ => diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala index e49c258b..6093ddda 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala @@ -9,11 +9,11 @@ import java.util import java.util.Optional import java.util.concurrent.CompletionStage -import scala.compat.java8.OptionConverters._ -import scala.compat.java8.FutureConverters._ +import scala.concurrent.ExecutionContext +import scala.jdk.FutureConverters._ +import scala.jdk.OptionConverters._ import akka.NotUsed -import akka.dispatch.ExecutionContexts import akka.japi.Pair import akka.persistence.query.{ EventEnvelope => ClassicEventEnvelope } import akka.persistence.query.Offset @@ -152,7 +152,7 @@ final class R2dbcReadJournal(delegate: scaladsl.R2dbcReadJournal) delegate.eventsBySlicesStartingFromSnapshots(entityType, minSlice, maxSlice, offset, transformSnapshot(_)).asJava override def sliceRanges(numberOfRanges: Int): util.List[Pair[Integer, Integer]] = { - import akka.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ delegate .sliceRanges(numberOfRanges) .map(range => Pair(Integer.valueOf(range.min), Integer.valueOf(range.max))) @@ -232,9 +232,9 @@ final class R2dbcReadJournal(delegate: scaladsl.R2dbcReadJournal) delegate.currentPersistenceIds(entityType, afterId, limit).asJava override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] = - delegate.timestampOf(persistenceId, sequenceNr).map(_.asJava)(ExecutionContexts.parasitic).toJava + delegate.timestampOf(persistenceId, sequenceNr).map(_.toJava)(ExecutionContext.parasitic).asJava override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] = - delegate.loadEnvelope[Event](persistenceId, sequenceNr).toJava + delegate.loadEnvelope[Event](persistenceId, sequenceNr).asJava } diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index 5dfc49b5..a6758ce1 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -15,7 +15,6 @@ import scala.concurrent.duration.FiniteDuration import akka.NotUsed import akka.actor.ExtendedActorSystem import akka.actor.typed.pubsub.Topic -import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.adapter._ import akka.annotation.ApiMayChange import akka.annotation.InternalApi @@ -525,7 +524,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat latestBacktracking = t.timestamp env :: Nil } else if (EnvelopeOrigin.fromPubSub(env) && latestBacktracking == Instant.EPOCH) { - log.trace2( + log.trace( "Dropping pubsub event for persistenceId [{}] seqNr [{}] because no event from backtracking yet.", env.persistenceId, env.sequenceNr) @@ -534,7 +533,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat .between(latestBacktracking, t.timestamp) .compareTo(maxAheadOfBacktracking) > 0) { // drop from pubsub when too far ahead from backtracking - log.debug2( + log.debug( "Dropping pubsub event for persistenceId [{}] seqNr [{}] because too far ahead of backtracking.", env.persistenceId, env.sequenceNr) @@ -584,7 +583,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1) if (state.queryCount != 0 && log.isDebugEnabled()) - log.debugN( + log.debug( "currentEventsByPersistenceId query [{}] for persistenceId [{}], from [{}] to [{}]. Found [{}] rows in previous query.", state.queryCount, persistenceId, @@ -596,7 +595,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat queryDao .eventsByPersistenceId(persistenceId, state.latestSeqNr + 1, highestSeqNr, includeDeleted)) } else { - log.debugN( + log.debug( "currentEventsByPersistenceId query [{}] for persistenceId [{}] completed. Found [{}] rows in previous query.", state.queryCount, persistenceId, @@ -607,7 +606,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat } if (log.isDebugEnabled()) - log.debugN( + log.debug( "currentEventsByPersistenceId query for persistenceId [{}], from [{}] to [{}].", persistenceId, fromSequenceNr, @@ -680,7 +679,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat settings.querySettings.refreshInterval) delay.foreach { d => - log.debugN( + log.debug( "eventsByPersistenceId query [{}] for persistenceId [{}] delay next [{}] ms.", state.queryCount, persistenceId, @@ -693,7 +692,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat def nextQuery( state: ByPersistenceIdState): (ByPersistenceIdState, Option[Source[SerializedJournalRow, NotUsed]]) = { if (state.latestSeqNr >= toSequenceNr) { - log.debugN( + log.debug( "eventsByPersistenceId query [{}] for persistenceId [{}] completed. Found [{}] rows in previous query.", state.queryCount, persistenceId, @@ -702,7 +701,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat } else { val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1) - log.debugN( + log.debug( "eventsByPersistenceId query [{}] for persistenceId [{}], from [{}]. Found [{}] rows in previous query.", newState.queryCount, persistenceId, @@ -809,7 +808,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1) if (state.queryCount != 0 && log.isDebugEnabled()) - log.debugN( + log.debug( "persistenceIds query [{}] after [{}]. Found [{}] rows in previous query.", state.queryCount, state.latestPid, diff --git a/core/src/main/scala/akka/persistence/r2dbc/session/javadsl/R2dbcSession.scala b/core/src/main/scala/akka/persistence/r2dbc/session/javadsl/R2dbcSession.scala index e86cddc9..f8ae2290 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/session/javadsl/R2dbcSession.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/session/javadsl/R2dbcSession.scala @@ -8,10 +8,10 @@ import java.util.Optional import java.util.concurrent.CompletionStage import java.util.function.{ Function => JFunction } -import scala.collection.JavaConverters._ -import scala.compat.java8.FutureConverters._ -import scala.compat.java8.OptionConverters._ import scala.concurrent.ExecutionContext +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ +import scala.jdk.OptionConverters._ import akka.actor.typed.ActorSystem import akka.annotation.ApiMayChange @@ -39,9 +39,9 @@ object R2dbcSession { fun: JFunction[R2dbcSession, CompletionStage[A]]): CompletionStage[A] = { scaladsl.R2dbcSession.withSession(system, connectionFactoryConfigPath) { scaladslSession => val javadslSession = new R2dbcSession(scaladslSession.connection)(system.executionContext, system) - fun(javadslSession).toScala + fun(javadslSession).asScala } - }.toJava + }.asJava } @@ -52,18 +52,18 @@ final class R2dbcSession(val connection: Connection)(implicit ec: ExecutionConte connection.createStatement(sql) def updateOne(statement: Statement): CompletionStage[java.lang.Long] = - R2dbcExecutor.updateOneInTx(statement).map(java.lang.Long.valueOf)(ExecutionContexts.parasitic).toJava + R2dbcExecutor.updateOneInTx(statement).map(java.lang.Long.valueOf)(ExecutionContext.parasitic).asJava def update(statements: java.util.List[Statement]): CompletionStage[java.util.List[java.lang.Long]] = R2dbcExecutor .updateInTx(statements.asScala.toVector) .map(results => results.map(java.lang.Long.valueOf).asJava) - .toJava + .asJava def selectOne[A](statement: Statement)(mapRow: JFunction[Row, A]): CompletionStage[Optional[A]] = - R2dbcExecutor.selectOneInTx(statement, mapRow(_)).map(_.asJava)(ExecutionContexts.parasitic).toJava + R2dbcExecutor.selectOneInTx(statement, mapRow(_)).map(_.toJava)(ExecutionContext.parasitic).asJava def select[A](statement: Statement)(mapRow: JFunction[Row, A]): CompletionStage[java.util.List[A]] = - R2dbcExecutor.selectInTx(statement, mapRow(_)).map(_.asJava).toJava + R2dbcExecutor.selectInTx(statement, mapRow(_)).map(_.asJava).asJava } diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala index 40b3f20d..29d938fe 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala @@ -9,6 +9,7 @@ import java.util.Optional import java.util.concurrent.CompletionStage import scala.concurrent.ExecutionContext +import scala.jdk.FutureConverters.FutureOps import akka.Done import akka.NotUsed @@ -20,7 +21,6 @@ import akka.persistence.query.typed.javadsl.DurableStateStoreBySliceQuery import akka.persistence.r2dbc.state.scaladsl.{ R2dbcDurableStateStore => ScalaR2dbcDurableStateStore } import akka.persistence.state.javadsl.GetObjectResult import akka.stream.javadsl.Source -import scala.compat.java8.FutureConverters.FutureOps import akka.persistence.state.javadsl.DurableStateUpdateWithChangeEventStore @@ -37,10 +37,10 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl scalaStore .getObject(persistenceId) .map(x => GetObjectResult(Optional.ofNullable(x.value.getOrElse(null.asInstanceOf[A])), x.revision)) - .toJava + .asJava override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] = - scalaStore.upsertObject(persistenceId, revision, value, tag).toJava + scalaStore.upsertObject(persistenceId, revision, value, tag).asJava override def upsertObject( persistenceId: String, @@ -48,17 +48,17 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl value: A, tag: String, changeEvent: Any): CompletionStage[Done] = - scalaStore.upsertObject(persistenceId, revision, value, tag, changeEvent).toJava + scalaStore.upsertObject(persistenceId, revision, value, tag, changeEvent).asJava @deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0") override def deleteObject(persistenceId: String): CompletionStage[Done] = deleteObject(persistenceId, revision = 0) override def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] = - scalaStore.deleteObject(persistenceId, revision).toJava + scalaStore.deleteObject(persistenceId, revision).asJava override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): CompletionStage[Done] = - scalaStore.deleteObject(persistenceId, revision, changeEvent).toJava + scalaStore.deleteObject(persistenceId, revision, changeEvent).asJava override def currentChangesBySlices( entityType: String, @@ -78,7 +78,7 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl scalaStore.sliceForPersistenceId(persistenceId) override def sliceRanges(numberOfRanges: Int): util.List[Pair[Integer, Integer]] = { - import akka.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ scalaStore .sliceRanges(numberOfRanges) .map(range => Pair(Integer.valueOf(range.min), Integer.valueOf(range.max))) @@ -103,12 +103,12 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl * A source containing all the persistence ids, limited as specified. */ def currentPersistenceIds(entityType: String, afterId: Optional[String], limit: Long): Source[String, NotUsed] = { - import scala.compat.java8.OptionConverters._ + import scala.jdk.OptionConverters._ scalaStore.currentPersistenceIds(entityType, afterId.asScala, limit).asJava } override def currentPersistenceIds(afterId: Optional[String], limit: Long): Source[String, NotUsed] = { - import scala.compat.java8.OptionConverters._ + import scala.jdk.OptionConverters._ scalaStore.currentPersistenceIds(afterId.asScala, limit).asJava } diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index 110599c8..995bfa61 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -14,7 +14,6 @@ import scala.concurrent.Future import akka.Done import akka.NotUsed import akka.actor.ExtendedActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.adapter._ import akka.persistence.Persistence import akka.persistence.SerializedEvent @@ -353,7 +352,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg val newState2 = newState.copy(rowCount = 0, queryCount = newState.queryCount + 1) if (newState.queryCount != 0 && log.isDebugEnabled()) - log.debugN( + log.debug( "persistenceIds query [{}] after [{}]. Found [{}] rows in previous query.", newState.queryCount, newState.latestPid, diff --git a/core/src/test/scala/akka/persistence/r2dbc/TestActors.scala b/core/src/test/scala/akka/persistence/r2dbc/TestActors.scala index 57afa503..65ea53d3 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/TestActors.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/TestActors.scala @@ -7,7 +7,6 @@ package akka.persistence.r2dbc import akka.Done import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.{ ActorRef, Behavior } import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.persistence.typed.PersistenceId @@ -92,14 +91,14 @@ object TestActors { { (state, command) => command match { case command: Persist => - context.log.debugN( + context.log.debug( "Persist [{}], pid [{}], seqNr [{}]", command.payload, pid.id, EventSourcedBehavior.lastSequenceNumber(context) + 1) Effect.persist(command.payload) case command: PersistWithAck => - context.log.debugN( + context.log.debug( "Persist [{}], pid [{}], seqNr [{}]", command.payload, pid.id, @@ -107,7 +106,7 @@ object TestActors { Effect.persist(command.payload).thenRun(_ => command.replyTo ! Done) case command: PersistAll => if (context.log.isDebugEnabled) - context.log.debugN( + context.log.debug( "PersistAll [{}], pid [{}], seqNr [{}]", command.payloads.mkString(","), pid.id, @@ -157,14 +156,14 @@ object TestActors { { (state, command) => command match { case command: Persist => - context.log.debugN( + context.log.debug( "Persist [{}], pid [{}], seqNr [{}]", command.payload, pid.id, DurableStateBehavior.lastSequenceNumber(context) + 1) Effect.persist(command.payload) case command: PersistWithAck => - context.log.debugN( + context.log.debug( "Persist [{}], pid [{}], seqNr [{}]", command.payload, pid.id, diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala index 6e8dd420..16f26b6d 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala @@ -16,7 +16,6 @@ import org.slf4j.LoggerFactory import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.persistence.query.NoOffset import akka.persistence.query.Offset import akka.persistence.query.PersistenceQuery @@ -69,7 +68,7 @@ class EventsBySliceBacktrackingSpec // to be able to store events with specific timestamps private def writeEvent(slice: Int, persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = { - log.debugN("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp) + log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp) val insertEventSql = sql""" INSERT INTO ${settings.journalTableWithSchema(slice)} (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload) diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala index dd75ada3..c87aebb6 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala @@ -12,7 +12,6 @@ import akka.Done import akka.NotUsed import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.{ ActorRef, ActorSystem } import akka.persistence.FilteredPayload import akka.persistence.query.NoOffset @@ -98,7 +97,7 @@ class EventsBySliceSpec // to be able to store events with specific timestamps private def writeEvent(slice: Int, persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = { - log.debugN("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp) + log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp) val insertEventSql = sql""" INSERT INTO ${settings.journalTableWithSchema(slice)} diff --git a/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java b/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java index 542f360f..8ee8e7cf 100644 --- a/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java +++ b/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java @@ -9,7 +9,8 @@ import akka.persistence.query.javadsl.CurrentPersistenceIdsQuery; import akka.persistence.r2dbc.cleanup.scaladsl.EventSourcedCleanup; import akka.persistence.r2dbc.query.javadsl.R2dbcReadJournal; -import scala.compat.java8.FutureConverters; + +import scala.jdk.javaapi.FutureConverters; public class CleanupDocExample { @@ -30,7 +31,7 @@ public static void example() { queries .currentPersistenceIds() .mapAsync(persistenceIdParallelism, pid -> - FutureConverters.toJava(cleanup.cleanupBeforeSnapshot(pid))) + FutureConverters.asJava(cleanup.cleanupBeforeSnapshot(pid))) .run(system); //#cleanup diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala index 69fcaad7..56028870 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala @@ -5,19 +5,16 @@ package akka.persistence.r2dbc.migration import java.time.Instant - +import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try - import akka.Done import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.LoggerOps -import akka.dispatch.ExecutionContexts import akka.pattern.ask import akka.persistence.Persistence import akka.persistence.SelectedSnapshot @@ -45,7 +42,6 @@ import akka.stream.scaladsl.Sink import akka.util.Timeout import io.r2dbc.spi.R2dbcDataIntegrityViolationException import org.slf4j.LoggerFactory - import akka.persistence.r2dbc.internal.DurableStateDao.SerializedStateRow import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.state.DurableStateStoreRegistry @@ -198,7 +194,7 @@ class MigrationTool(system: ActorSystem[_]) { } yield persistenceId -> Result(1, eventCount, snapshotCount) } .map { case (pid, result @ Result(_, events, snapshots)) => - log.debugN( + log.debug( "Migrated persistenceId [{}] with [{}] events{}.", pid, events, @@ -208,7 +204,7 @@ class MigrationTool(system: ActorSystem[_]) { .runWith(Sink.fold(Result.empty) { case (acc, Result(_, events, snapshots)) => val result = Result(acc.persistenceIds + 1, acc.events + events, acc.snapshots + snapshots) if (result.persistenceIds % 100 == 0) - log.infoN( + log.info( "Migrated [{}] persistenceIds with [{}] events and [{}] snapshots.", result.persistenceIds, result.events, @@ -218,7 +214,7 @@ class MigrationTool(system: ActorSystem[_]) { result.transform { case s @ Success(Result(persistenceIds, events, snapshots)) => - log.infoN( + log.info( "Migration successful. Migrated [{}] persistenceIds with [{}] events and [{}] snapshots.", persistenceIds, events, @@ -331,7 +327,7 @@ class MigrationTool(system: ActorSystem[_]) { val serializedRow = serializedSnapotRow(selectedSnapshot) targetSnapshotDao .store(serializedRow) - .map(_ => snapshotMetadata.sequenceNr)(ExecutionContexts.parasitic) + .map(_ => snapshotMetadata.sequenceNr)(ExecutionContext.parasitic) } _ <- migrationDao.updateSnapshotProgress(persistenceId, seqNr) } yield 1 @@ -399,22 +395,19 @@ class MigrationTool(system: ActorSystem[_]) { } yield persistenceId -> DurableStateResult(1, stateCount) } .map { case (pid, result @ DurableStateResult(_, states)) => - log.debugN("Migrated persistenceId [{}] with [{}] durable state.", pid, states) + log.debug("Migrated persistenceId [{}] with [{}] durable state.", pid, states) result } .runWith(Sink.fold(DurableStateResult.empty) { case (acc, DurableStateResult(_, states)) => val result = DurableStateResult(acc.persistenceIds + 1, acc.states + states) if (result.persistenceIds % 100 == 0) - log.infoN("Migrated [{}] persistenceIds with [{}] durable states.", result.persistenceIds, result.states) + log.info("Migrated [{}] persistenceIds with [{}] durable states.", result.persistenceIds, result.states) result }) result.transform { case s @ Success(DurableStateResult(persistenceIds, states)) => - log.infoN( - "Migration successful. Migrated [{}] persistenceIds with [{}] durable states.", - persistenceIds, - states) + log.info("Migration successful. Migrated [{}] persistenceIds with [{}] durable states.", persistenceIds, states) s case f @ Failure(exc) => log.error("Migration failed.", exc) @@ -443,7 +436,7 @@ class MigrationTool(system: ActorSystem[_]) { val serializedRow = serializedDurableStateRow(selectedDurableState) durableStateMigrationToolDao .upsertState(serializedRow, selectedDurableState.value, None) - .map(_ => selectedDurableState.revision)(ExecutionContexts.parasitic) + .map(_ => selectedDurableState.revision)(ExecutionContext.parasitic) } _ <- migrationDao.updateDurableStateProgress(persistenceId, revision) } yield 1 diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala index 87bfd2b9..ebe37008 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala @@ -95,7 +95,7 @@ import akka.persistence.typed.PersistenceId val stmt = connection.createStatement(baseUpsertMigrationProgressSql("event_seq_nr")) bindBaseUpsertSql(stmt, persistenceId, seqNr) } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } def updateSnapshotProgress(persistenceId: String, seqNr: Long): Future[Done] = { @@ -104,7 +104,7 @@ import akka.persistence.typed.PersistenceId val stmt = connection.createStatement(baseUpsertMigrationProgressSql("snapshot_seq_nr")) bindBaseUpsertSql(stmt, persistenceId, seqNr) } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } def updateDurableStateProgress(persistenceId: String, revision: Long): Future[Done] = { @@ -113,7 +113,7 @@ import akka.persistence.typed.PersistenceId val stmt = connection.createStatement(baseUpsertMigrationProgressSql("state_revision")) bindBaseUpsertSql(stmt, persistenceId, revision) } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } def currentProgress(persistenceId: String): Future[Option[CurrentProgress]] = { diff --git a/native-image-tests/build.sbt b/native-image-tests/build.sbt index 72a6f11d..acfe20c1 100644 --- a/native-image-tests/build.sbt +++ b/native-image-tests/build.sbt @@ -6,7 +6,7 @@ scalaVersion := "2.13.14" resolvers += "Akka library repository".at("https://repo.akka.io/maven") -lazy val akkaVersion = sys.props.getOrElse("akka.version", "2.9.4") +lazy val akkaVersion = sys.props.getOrElse("akka.version", "2.10.0-M1") lazy val akkaR2dbcVersion = sys.props.getOrElse("akka.r2dbc.version", "1.2.3") fork := true @@ -28,7 +28,7 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, "com.lightbend.akka" %% "akka-persistence-r2dbc" % akkaR2dbcVersion, - "ch.qos.logback" % "logback-classic" % "1.2.13", + "ch.qos.logback" % "logback-classic" % "1.5.7", // H2 "com.h2database" % "h2" % "2.2.224", "io.r2dbc" % "r2dbc-h2" % "1.0.0.RELEASE") diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7b3d44c4..6d04dce7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,9 +9,9 @@ object Dependencies { val Scala3 = "3.3.3" val Scala2Versions = Seq(Scala213) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 - val AkkaVersion = System.getProperty("override.akka.version", "2.9.4") + val AkkaVersion = System.getProperty("override.akka.version", "2.10.0-M1") val AkkaVersionInDocs = VersionNumber(AkkaVersion).numbers match { case Seq(major, minor, _*) => s"$major.$minor" } - val AkkaPersistenceJdbcVersion = "5.4.0" // only in migration tool tests + val AkkaPersistenceJdbcVersion = "5.5.0-M1" // only in migration tool tests val AkkaProjectionVersionInDocs = "current" val H2Version = "2.2.224" val R2dbcH2Version = "1.0.0.RELEASE" @@ -48,7 +48,7 @@ object Dependencies { val postgresql = "org.postgresql" % "postgresql" % "42.7.3" % Test // BSD-2-Clause - val logback = "ch.qos.logback" % "logback-classic" % "1.2.13" % Test // EPL 1.0 / LGPL 2.1 + val logback = "ch.qos.logback" % "logback-classic" % "1.5.7" % Test // EPL 1.0 / LGPL 2.1 val scalaTest = "org.scalatest" %% "scalatest" % "3.2.12" % Test // ApacheV2 val junit = "junit" % "junit" % "4.12" % Test // Eclipse Public License 1.0 val junitInterface = "com.novocode" % "junit-interface" % "0.11" % Test // "BSD 2-Clause" @@ -75,9 +75,7 @@ object Dependencies { val migrationTests = Seq( - ("com.lightbend.akka" %% "akka-persistence-jdbc" % AkkaPersistenceJdbcVersion % Test) - // Unsupported SLF4J 2 transitively pulled in by Slick 3.5.0 - .exclude("org.slf4j", "slf4j-api"), + "com.lightbend.akka" %% "akka-persistence-jdbc" % AkkaPersistenceJdbcVersion % Test, "com.microsoft.sqlserver" % "mssql-jdbc" % SqlServerJdbcVersion % Test, TestDeps.postgresql, TestDeps.logback,