Skip to content

Commit

Permalink
chore: bump to akka 2.10.0-M1 (#603)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-alfers authored Sep 30, 2024
1 parent 591ccff commit e1163f5
Show file tree
Hide file tree
Showing 33 changed files with 163 additions and 196 deletions.
2 changes: 1 addition & 1 deletion .jvmopts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

-Xms2G
-Xmx2G
-Xss2M
-Xss2M
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import io.r2dbc.spi.ConnectionFactory
import java.time.{ Duration => JDuration }
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters._
import scala.util.Failure
import scala.util.Success

Expand Down
30 changes: 15 additions & 15 deletions core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package akka.persistence.r2dbc

import scala.concurrent.duration._
import scala.collection.immutable
import scala.jdk.DurationConverters._

import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
import akka.persistence.r2dbc.internal.codec.IdentityAdapter
Expand All @@ -14,12 +17,9 @@ import akka.persistence.r2dbc.internal.codec.QueryAdapter
import akka.persistence.r2dbc.internal.codec.TagsCodec
import akka.persistence.r2dbc.internal.codec.TimestampCodec
import akka.persistence.r2dbc.internal.ConnectionFactorySettings
import akka.util.JavaDurationConverters._
import com.typesafe.config.Config

import java.util.Locale
import scala.collection.immutable
import scala.concurrent.duration._

/**
* INTERNAL API
Expand Down Expand Up @@ -61,7 +61,7 @@ object R2dbcSettings {
configToMap(config.getConfig("state.custom-table"))

val durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]] = {
import akka.util.ccompat.JavaConverters._
import scala.jdk.CollectionConverters._
val cfg = config.getConfig("state.additional-columns")
cfg.root.unwrapped.asScala.toMap.map {
case (k, v: java.util.List[_]) => k -> v.iterator.asScala.map(_.toString).toVector
Expand Down Expand Up @@ -126,7 +126,7 @@ object R2dbcSettings {
val logDbCallsExceeding: FiniteDuration =
config.getString("log-db-calls-exceeding").toLowerCase(Locale.ROOT) match {
case "off" => -1.millis
case _ => config.getDuration("log-db-calls-exceeding").asScala
case _ => config.getDuration("log-db-calls-exceeding").toScala
}

val codecSettings = {
Expand Down Expand Up @@ -190,7 +190,7 @@ object R2dbcSettings {
}

private def configToMap(cfg: Config): Map[String, String] = {
import akka.util.ccompat.JavaConverters._
import scala.jdk.CollectionConverters._
cfg.root.unwrapped.asScala.toMap.map { case (k, v) => k -> v.toString }
}

Expand Down Expand Up @@ -484,11 +484,11 @@ final class R2dbcSettings private (
*/
@InternalStableApi
final class QuerySettings(config: Config) {
val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").asScala
val behindCurrentTime: FiniteDuration = config.getDuration("behind-current-time").asScala
val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").toScala
val behindCurrentTime: FiniteDuration = config.getDuration("behind-current-time").toScala
val backtrackingEnabled: Boolean = config.getBoolean("backtracking.enabled")
val backtrackingWindow: FiniteDuration = config.getDuration("backtracking.window").asScala
val backtrackingBehindCurrentTime: FiniteDuration = config.getDuration("backtracking.behind-current-time").asScala
val backtrackingWindow: FiniteDuration = config.getDuration("backtracking.window").toScala
val backtrackingBehindCurrentTime: FiniteDuration = config.getDuration("backtracking.behind-current-time").toScala
val bufferSize: Int = config.getInt("buffer-size")
val persistenceIdsBufferSize: Int = config.getInt("persistence-ids.buffer-size")
val deduplicateCapacity: Int = config.getInt("deduplicate-capacity")
Expand All @@ -502,18 +502,18 @@ final class QuerySettings(config: Config) {
final class ConnectionPoolSettings(config: Config) {
val initialSize: Int = config.getInt("initial-size")
val maxSize: Int = config.getInt("max-size")
val maxIdleTime: FiniteDuration = config.getDuration("max-idle-time").asScala
val maxLifeTime: FiniteDuration = config.getDuration("max-life-time").asScala
val maxIdleTime: FiniteDuration = config.getDuration("max-idle-time").toScala
val maxLifeTime: FiniteDuration = config.getDuration("max-life-time").toScala

val acquireTimeout: FiniteDuration = config.getDuration("acquire-timeout").asScala
val acquireTimeout: FiniteDuration = config.getDuration("acquire-timeout").toScala
val acquireRetry: Int = config.getInt("acquire-retry")

val validationQuery: String = config.getString("validation-query")

val closeCallsExceeding: Option[FiniteDuration] =
config.getString("close-calls-exceeding").toLowerCase(Locale.ROOT) match {
case "off" => None
case _ => Some(config.getDuration("close-calls-exceeding").asScala)
case _ => Some(config.getDuration("close-calls-exceeding").toScala)
}
}

Expand All @@ -523,7 +523,7 @@ final class ConnectionPoolSettings(config: Config) {
@InternalStableApi
final class PublishEventsDynamicSettings(config: Config) {
val throughputThreshold: Int = config.getInt("throughput-threshold")
val throughputCollectInterval: FiniteDuration = config.getDuration("throughput-collect-interval").asScala
val throughputCollectInterval: FiniteDuration = config.getDuration("throughput-collect-interval").toScala
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ package akka.persistence.r2dbc.cleanup.javadsl
import java.util.concurrent.CompletionStage
import java.util.{ List => JList }

import scala.collection.JavaConverters._
import scala.compat.java8.FutureConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._

import akka.Done
import akka.actor.ClassicActorSystemProvider
Expand Down Expand Up @@ -43,14 +43,14 @@ final class DurableStateCleanup private (delegate: scaladsl.DurableStateCleanup)
* Delete the state related to one single `persistenceId`.
*/
def deleteState(persistenceId: String, resetRevisionNumber: Boolean): CompletionStage[Done] = {
delegate.deleteState(persistenceId, resetRevisionNumber).toJava
delegate.deleteState(persistenceId, resetRevisionNumber).asJava
}

/**
* Delete all states related to the given list of `persistenceIds`.
*/
def deleteStates(persistenceIds: JList[String], resetRevisionNumber: Boolean): CompletionStage[Done] = {
delegate.deleteStates(persistenceIds.asScala.toVector, resetRevisionNumber).toJava
delegate.deleteStates(persistenceIds.asScala.toVector, resetRevisionNumber).asJava
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import java.time.Instant
import java.util.concurrent.CompletionStage
import java.util.{ List => JList }

import scala.collection.JavaConverters._
import scala.compat.java8.FutureConverters._
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._

import akka.Done
import akka.actor.ClassicActorSystemProvider
Expand Down Expand Up @@ -53,19 +53,19 @@ final class EventSourcedCleanup private (delegate: scaladsl.EventSourcedCleanup)
* sequence nr (inclusive) to delete up to
*/
def deleteEventsTo(persistenceId: String, toSequenceNr: Long): CompletionStage[Done] =
delegate.deleteEventsTo(persistenceId, toSequenceNr).toJava
delegate.deleteEventsTo(persistenceId, toSequenceNr).asJava

/**
* Delete all events related to one single `persistenceId`. Snapshots are not deleted.
*/
def deleteAllEvents(persistenceId: String, resetSequenceNumber: Boolean): CompletionStage[Done] =
delegate.deleteAllEvents(persistenceId, resetSequenceNumber).toJava
delegate.deleteAllEvents(persistenceId, resetSequenceNumber).asJava

/**
* Delete all events related to the given list of `persistenceIds`. Snapshots are not deleted.
*/
def deleteAllEvents(persistenceIds: JList[String], resetSequenceNumber: Boolean): CompletionStage[Done] =
delegate.deleteAllEvents(persistenceIds.asScala.toVector, resetSequenceNumber).toJava
delegate.deleteAllEvents(persistenceIds.asScala.toVector, resetSequenceNumber).asJava

/**
* Delete events before a timestamp for the given persistence id. Snapshots are not deleted.
Expand All @@ -83,7 +83,7 @@ final class EventSourcedCleanup private (delegate: scaladsl.EventSourcedCleanup)
* timestamp (exclusive) to delete up to
*/
def deleteEventsBefore(persistenceId: String, timestamp: Instant): CompletionStage[Done] =
delegate.deleteEventsBefore(persistenceId, timestamp).toJava
delegate.deleteEventsBefore(persistenceId, timestamp).asJava

/**
* Delete events before a timestamp for the given entityType and slice. Snapshots are not deleted.
Expand All @@ -103,43 +103,43 @@ final class EventSourcedCleanup private (delegate: scaladsl.EventSourcedCleanup)
* timestamp (exclusive) to delete up to
*/
def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): CompletionStage[Done] =
delegate.deleteEventsBefore(entityType, slice, timestamp).toJava
delegate.deleteEventsBefore(entityType, slice, timestamp).asJava

/**
* Delete snapshots related to one single `persistenceId`. Events are not deleted.
*/
def deleteSnapshot(persistenceId: String): CompletionStage[Done] =
delegate.deleteSnapshot(persistenceId).toJava
delegate.deleteSnapshot(persistenceId).asJava

/**
* Delete all snapshots related to the given list of `persistenceIds`. Events are not deleted.
*/
def deleteSnapshots(persistenceIds: JList[String]): CompletionStage[Done] =
delegate.deleteSnapshots(persistenceIds.asScala.toVector).toJava
delegate.deleteSnapshots(persistenceIds.asScala.toVector).asJava

/**
* Deletes all events for the given persistence id from before the snapshot. The snapshot is not deleted. The event
* with the same sequence number as the remaining snapshot is deleted.
*/
def cleanupBeforeSnapshot(persistenceId: String): CompletionStage[Done] =
delegate.cleanupBeforeSnapshot(persistenceId).toJava
delegate.cleanupBeforeSnapshot(persistenceId).asJava

/**
* See single persistenceId overload for what is done for each persistence id
*/
def cleanupBeforeSnapshot(persistenceIds: JList[String]): CompletionStage[Done] =
delegate.cleanupBeforeSnapshot(persistenceIds.asScala.toVector).toJava
delegate.cleanupBeforeSnapshot(persistenceIds.asScala.toVector).asJava

/**
* Delete everything related to one single `persistenceId`. All events and snapshots are deleted.
*/
def deleteAll(persistenceId: String, resetSequenceNumber: Boolean): CompletionStage[Done] =
delegate.deleteAll(persistenceId, resetSequenceNumber).toJava
delegate.deleteAll(persistenceId, resetSequenceNumber).asJava

/**
* Delete everything related to the given list of `persistenceIds`. All events and snapshots are deleted.
*/
def deleteAll(persistenceIds: JList[String], resetSequenceNumber: Boolean): CompletionStage[Done] =
delegate.deleteAll(persistenceIds.asScala.toVector, resetSequenceNumber).toJava
delegate.deleteAll(persistenceIds.asScala.toVector, resetSequenceNumber).asJava

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,17 @@

package akka.persistence.r2dbc.cleanup.scaladsl

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.collection.immutable
import scala.util.Failure
import scala.util.Success

import org.slf4j.LoggerFactory

import akka.Done
import akka.actor.ClassicActorSystemProvider
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider

Expand Down Expand Up @@ -73,15 +70,15 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf
if (resetRevisionNumber)
stateDao
.deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check
.map(_ => Done)(ExecutionContexts.parasitic)
.map(_ => Done)(ExecutionContext.parasitic)
else {
stateDao.readState(persistenceId).flatMap {
case None =>
Future.successful(Done) // already deleted
case Some(s) =>
stateDao
.deleteState(persistenceId, s.revision + 1, changeEvent = None)
.map(_ => Done)(ExecutionContexts.parasitic)
.map(_ => Done)(ExecutionContext.parasitic)
}
}
}
Expand All @@ -106,7 +103,7 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf
case pid :: tail =>
pidOperation(pid).flatMap { _ =>
if (n % settings.cleanupSettings.logProgressEvery == 0)
log.infoN("Cleanup {} [{}] of [{}].", operationName, n, size)
log.info("Cleanup {} [{}] of [{}].", operationName, n, size)
loop(tail, n + 1)
}
}
Expand All @@ -116,7 +113,7 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf

result.onComplete {
case Success(_) =>
log.info2("Cleanup completed {} of [{}] persistenceId.", operationName, size)
log.info("Cleanup completed {} of [{}] persistenceId.", operationName, size)
case Failure(e) =>
log.error(s"Cleanup {$operationName} failed.", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import org.slf4j.LoggerFactory
import akka.Done
import akka.actor.ClassicActorSystemProvider
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.persistence.SnapshotSelectionCriteria
Expand Down Expand Up @@ -209,7 +208,7 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf
case pid :: tail =>
pidOperation(pid).flatMap { _ =>
if (n % settings.cleanupSettings.logProgressEvery == 0)
log.infoN("Cleanup {} [{}] of [{}].", operationName, n, size)
log.info("Cleanup {} [{}] of [{}].", operationName, n, size)
loop(tail, n + 1)
}
}
Expand All @@ -219,7 +218,7 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf

result.onComplete {
case Success(_) =>
log.info2("Cleanup completed {} of [{}] persistenceId.", operationName, size)
log.info("Cleanup completed {} of [{}] persistenceId.", operationName, size)
case Failure(e) =>
log.error(s"Cleanup {$operationName} failed.", e)
}
Expand Down
Loading

0 comments on commit e1163f5

Please sign in to comment.