Skip to content

Commit

Permalink
Merge pull request #93 from SwissBorg/improve-read-efficiency
Browse files Browse the repository at this point in the history
Improve events by tag query
  • Loading branch information
mkubala authored Sep 8, 2020
2 parents 350a548 + 18d72b3 commit 3d81a9e
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import slick.jdbc.JdbcBackend._

import scala.collection.immutable._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
import scala.util.Try

trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWithReadMessages {
def db: Database
Expand All @@ -40,8 +40,7 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
maxOffset: Long,
max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = {
val publisher: Int => DatabasePublisher[JournalRow] = tagId =>
db.stream(queries.eventsByTag(List(tagId), offset, maxOffset, max).result)
// applies workaround for https://github.com/akka/akka-persistence-jdbc/issues/168
db.stream(queries.eventsByTag(List(tagId), offset, maxOffset).result)
Source
.future(tagIdResolver.lookupIdFor(tag))
.flatMapConcat(_.fold(Source.empty[JournalRow])(tagId => Source.fromPublisher(publisher(tagId))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@ class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) {
protected def _eventsByTag(
tag: Rep[List[Int]],
offset: ConstColumn[Long],
maxOffset: ConstColumn[Long],
max: ConstColumn[Long]) = {
maxOffset: ConstColumn[Long]): Query[JournalTable, JournalRow, Seq] = {
baseTableQuery()
.filter(_.tags @> tag)
.sortBy(_.ordering.asc)
.filter(row => row.ordering > offset && row.ordering <= maxOffset)
.take(max)
}

val eventsByTag = Compiled(_eventsByTag _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.{ Materializer, SystemMaterializer }
import akka.util.Timeout
import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import slick.jdbc.JdbcBackend._

import scala.collection.immutable._
Expand All @@ -42,6 +43,8 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
with CurrentEventsByTagQuery
with EventsByTagQuery {

private val log = LoggerFactory.getLogger(this.getClass)

implicit val ec: ExecutionContext = system.dispatcher
implicit val mat: Materializer = SystemMaterializer(system).materializer
val readJournalConfig = new ReadJournalConfig(config)
Expand Down Expand Up @@ -203,11 +206,14 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
latestOrdering: MaxOrderingId): Source[EventEnvelope, NotUsed] = {
if (latestOrdering.maxOrdering < offset) Source.empty
else {
readJournalDao.eventsByTag(tag, offset, latestOrdering.maxOrdering, max).mapAsync(1)(Future.fromTry).mapConcat {
case (repr, ordering) =>
adaptEvents(repr).map(r =>
EventEnvelope(Sequence(ordering), r.persistenceId, r.sequenceNr, r.payload, r.timestamp))
}
readJournalDao
.eventsByTag(tag, offset, math.min(offset + max, latestOrdering.maxOrdering), max)
.mapAsync(1)(Future.fromTry)
.mapConcat {
case (repr, ordering) =>
adaptEvents(repr).map(r =>
EventEnvelope(Sequence(ordering), r.persistenceId, r.sequenceNr, r.payload, r.timestamp))
}
}
}

Expand All @@ -227,14 +233,21 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
val batchSize = readJournalConfig.maxBufferSize

Source
.unfoldAsync[(Long, FlowControl), Seq[EventEnvelope]]((offset, Continue)) {
.unfoldAsync[(Long, FlowControl), Seq[EventEnvelope]]((math.max(0L, offset), Continue)) {
case (from, control) =>
def retrieveNextBatch() = {
for {
queryUntil <- journalSequenceActor.ask(GetMaxOrderingId).mapTo[MaxOrderingId]
xs <- currentJournalEventsByTag(tag, from, batchSize, queryUntil).runWith(Sink.seq)
} yield {
val hasMoreEvents = xs.size == batchSize
val to = from + batchSize
val highestOffset = xs.map(_.offset.value) match {
case Nil => to
case offsets => offsets.max
}
val hasMoreEvents = {
highestOffset < queryUntil.maxOrdering
}
val nextControl: FlowControl =
terminateAfterOffset match {
// we may stop if target is behind queryUntil and we don't have more events to fetch
Expand All @@ -247,15 +260,25 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
if (hasMoreEvents) Continue else ContinueDelayed
}

val nextStartingOffset = if (xs.isEmpty) {
/* If no events matched the tag between `from` and `maxOrdering` then there is no need to execute the exact
* same query again. We can continue querying from `maxOrdering`, which will save some load on the db.
* (Note: we may never return a value smaller than `from`, otherwise we might return duplicate events) */
math.max(from, queryUntil.maxOrdering)
} else {
// Continue querying from the largest offset
xs.map(_.offset.value).max
val nextStartingOffset = (xs, queryUntil.maxOrdering) match {
case (Nil, 0L) =>
/* If `maxOrdering` is not known yet or journal is empty (min value for Postgres' bigserial is 1)
* then we should not move the query window forward. Otherwise we might miss (skip) some events.
* By setting nextStartingOffset to `from` we wait for either maxOrdering to be discovered or first
* event to be persisted in the journal. */
from
case (Nil, maxOrdering) =>
/* If no events matched the tag between `from` and `to` (`from + batchSize`) and `maxOrdering` then
* there is no need to execute the exact same query again. We can continue querying from `to`,
* which will save some load on the db. */
math.min(to, maxOrdering)
case _ =>
// Continue querying from the largest offset
highestOffset
}

log.trace(
s"tag = $tag => ($nextStartingOffset, $nextControl), [highestOffset = $highestOffset, maxOrdering = ${queryUntil.maxOrdering}, hasMoreEvents = $hasMoreEvents, results = ${xs.size}, from = $from]")
Some((nextStartingOffset, nextControl), xs)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ package akka.persistence.postgres

import akka.actor.ActorSystem
import akka.persistence.postgres.config.{ JournalConfig, ReadJournalConfig, SlickConfiguration }
import akka.persistence.postgres.db.SlickDatabase
import akka.persistence.postgres.query.javadsl.PostgresReadJournal
import akka.persistence.postgres.util.DropCreate
import akka.persistence.postgres.db.SlickDatabase
import akka.util.Timeout
import com.typesafe.config.{ Config, ConfigFactory, ConfigValue }
import org.scalatest.BeforeAndAfterEach
Expand All @@ -29,7 +29,7 @@ abstract class SingleActorSystemPerTestSpec(val config: Config)
implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute)
implicit val timeout: Timeout = Timeout(1.minute)

val cfg = config.getConfig("postgres-journal")
val cfg: Config = config.getConfig("postgres-journal")
val journalConfig = new JournalConfig(cfg)
val readJournalConfig = new ReadJournalConfig(config.getConfig(PostgresReadJournal.Identifier))

Expand Down Expand Up @@ -70,4 +70,10 @@ abstract class SingleActorSystemPerTestSpec(val config: Config)
f(system)
system.terminate().futureValue
}

def withActorSystem(config: Config = config)(f: ActorSystem => Unit): Unit = {
implicit val system: ActorSystem = ActorSystem("test", config)
f(system)
system.terminate().futureValue
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,26 @@ import akka.persistence.postgres.query.CurrentEventsByTagTest._
import akka.persistence.postgres.query.EventAdapterTest.{ Event, TaggedAsyncEvent }
import akka.persistence.postgres.util.Schema.{ NestedPartitions, Partitioned, Plain, SchemaType }
import akka.persistence.query.{ EventEnvelope, NoOffset, Sequence }
import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
import com.typesafe.config.{ Config, ConfigFactory, ConfigValue, ConfigValueFactory }

import scala.concurrent.Future
import scala.concurrent.duration._

object CurrentEventsByTagTest {
val maxBufferSize = 20
val refreshInterval = 500.milliseconds
val refreshInterval: FiniteDuration = 500.milliseconds

val configOverrides: Map[String, ConfigValue] = Map(
"postgres-read-journal.max-buffer-size" -> ConfigValueFactory.fromAnyRef(maxBufferSize.toString),
"postgres-read-journal.refresh-interval" -> ConfigValueFactory.fromAnyRef(refreshInterval.toString()))

case class TestEvent(greetings: String)

}

abstract class CurrentEventsByTagTest(val schemaType: SchemaType)
extends QueryTestSpec(s"${schemaType.resourceNamePrefix}-shared-db-application.conf", configOverrides) {

it should "not find an event by tag for unknown tag" in withActorSystem { implicit system =>
val journalOps = new ScalaPostgresReadJournalOperations(system)
withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
Expand Down Expand Up @@ -164,8 +168,15 @@ abstract class CurrentEventsByTagTest(val schemaType: SchemaType)
}
}

it should "complete without any gaps in case events are being persisted when the query is executed" in withActorSystem {
implicit system =>
{
val numOfActors = 3
val batch1Size = 200
val batch2Size = 10000

import scala.collection.JavaConverters._

it should "complete without any gaps in case events are being persisted when the query is executed" in withActorSystem(
withMaxBufferSize(1000)) { implicit system =>
val journalOps = new JavaDslPostgresReadJournalOperations(system)
import system.dispatcher
withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
Expand Down Expand Up @@ -201,7 +212,53 @@ abstract class CurrentEventsByTagTest(val schemaType: SchemaType)
}
batch2.futureValue
}
}

it should "complete without omitting any events in case events are being persisted when the query is executed" in withActorSystem(
withMaxBufferSize((batch1Size + batch2Size) * (numOfActors + 1))) { implicit system =>
val journalOps = new JavaDslPostgresReadJournalOperations(system)
import system.dispatcher
withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
def sendMessagesWithTag(tag: String, numberOfMessagesPerActor: Int): Future[Done] = {
val futures = for {
actor <- Seq(actor1, actor2, actor3)
i <- 1 to numberOfMessagesPerActor
} yield {
actor ? TaggedAsyncEvent(Event(i.toString), tag)
}
Future.sequence(futures).map(_ => Done)
}

val tag = "someTag"
// send a batch of 3 * 200
val batch1 = sendMessagesWithTag(tag, 200)
// Try to persist a large batch of events per actor. Some of these may be returned, but not all!
val batch2 = sendMessagesWithTag(tag, 10000)

// wait for acknowledgement of the first batch only
batch1.futureValue
// Sanity check, all events in the first batch must be in the journal
journalOps.countJournal.futureValue should be >= 600L

// start the query before the last batch completes
journalOps.withCurrentEventsByTag()(tag, NoOffset) { tp =>
// The stream must complete within the given amount of time
// This make take a while in case the journal sequence actor detects gaps
val allEvents = tp.toStrict(atMost = 30.seconds)
allEvents.size should be >= 600
}
batch2.futureValue
journalOps.withCurrentEventsByTag()(tag, Sequence(600)) { tp =>
val allEvents = tp.toStrict(atMost = 3.minutes)
allEvents.size should equal(3 * 10000)
}
}
}

def withMaxBufferSize(size: Long): Config =
ConfigFactory.parseMap(Map("postgres-read-journal.max-buffer-size" -> size.toString).asJava).withFallback(config)
}

}

// Note: these tests use the shared-db configs, the test for all (so not only current) events use the regular db config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,23 @@ import akka.persistence.postgres.util.Schema.SchemaType
import akka.persistence.query.NoOffset
import akka.serialization.SerializationExtension
import akka.stream.scaladsl.{ Sink, Source }
import com.typesafe.config.{ ConfigValue, ConfigValueFactory }

import scala.concurrent.duration._

object CurrentEventsByTagWithGapsTest {
private val maxBufferSize = 10000
private val refreshInterval = 500.milliseconds

val configOverrides: Map[String, ConfigValue] = Map(
"postgres-read-journal.max-buffer-size" -> ConfigValueFactory.fromAnyRef(maxBufferSize.toString),
"postgres-read-journal.refresh-interval" -> ConfigValueFactory.fromAnyRef(refreshInterval.toString()))
}

class CurrentEventsByTagWithGapsTest
extends QueryTestSpec(s"${Schema.Partitioned.resourceNamePrefix}-shared-db-application.conf") {
extends QueryTestSpec(
s"${Schema.Partitioned.resourceNamePrefix}-shared-db-application.conf",
CurrentEventsByTagWithGapsTest.configOverrides) {

// We are using Partitioned variant because it does not override values for an `ordering` field
override val schemaType: SchemaType = Schema.Partitioned
Expand Down Expand Up @@ -63,7 +75,7 @@ class CurrentEventsByTagWithGapsTest
.futureValue

journalOps.withCurrentEventsByTag(5.minutes)(tag, NoOffset) { tp =>
val allEvents = tp.toStrict(atMost = 3.minutes)
val allEvents = tp.toStrict(atMost = 5.minutes)
allEvents.size should equal(expectedTotalNumElements)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,35 @@ package akka.persistence.postgres.query

import java.util.concurrent.atomic.AtomicLong

import akka.actor.{ActorRef, ActorSystem}
import akka.actor.{ ActorRef, ActorSystem }
import akka.pattern.ask
import akka.persistence.postgres.config.JournalSequenceRetrievalConfig
import akka.persistence.postgres.db.ExtendedPostgresProfile
import akka.persistence.postgres.query.JournalSequenceActor.{GetMaxOrderingId, MaxOrderingId}
import akka.persistence.postgres.query.dao.{ByteArrayReadJournalDao, TestProbeReadJournalDao}
import akka.persistence.postgres.tag.{CachedTagIdResolver, SimpleTagDao}
import akka.persistence.postgres.util.Schema.{NestedPartitions, Partitioned, Plain, SchemaType}
import akka.persistence.postgres.{JournalRow, SharedActorSystemTestSpec}
import akka.persistence.postgres.query.JournalSequenceActor.{ GetMaxOrderingId, MaxOrderingId }
import akka.persistence.postgres.query.dao.{ ByteArrayReadJournalDao, TestProbeReadJournalDao }
import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao }
import akka.persistence.postgres.util.Schema.{ NestedPartitions, Partitioned, Plain, SchemaType }
import akka.persistence.postgres.{ JournalRow, SharedActorSystemTestSpec }
import akka.serialization.SerializationExtension
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{Materializer, SystemMaterializer}
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.{ Materializer, SystemMaterializer }
import akka.testkit.TestProbe
import org.scalatest.time.Span
import org.slf4j.LoggerFactory
import slick.jdbc.{JdbcBackend, JdbcCapabilities}
import slick.jdbc.{ JdbcBackend, JdbcCapabilities }

import scala.concurrent.Future
import scala.concurrent.duration._

abstract class JournalSequenceActorTest(val schemaType: SchemaType) extends QueryTestSpec(schemaType.configName) {
private val log = LoggerFactory.getLogger(classOf[JournalSequenceActorTest])

val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration
val journalTable = schemaType.table(journalConfig.journalTableConfiguration)
private val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration
private val journalTable = schemaType.table(journalConfig.journalTableConfiguration)

import akka.persistence.postgres.db.ExtendedPostgresProfile.api._

implicit val askTimeout = 50.millis
implicit val askTimeout: FiniteDuration = 50.millis

private val orderingSeq = new AtomicLong(0L)
def generateId: Long = orderingSeq.incrementAndGet()
Expand Down Expand Up @@ -192,7 +192,7 @@ class MockDaoJournalSequenceActorTest extends SharedActorSystemTestSpec {

val almostQueryDelay = queryDelay - 50.millis
val almostImmediately = 50.millis
withTestProbeJournalSequenceActor(batchSize, maxTries, queryDelay) { (daoProbe, actor) =>
withTestProbeJournalSequenceActor(batchSize, maxTries, queryDelay) { (daoProbe, _) =>
daoProbe.expectMsg(almostImmediately, TestProbeReadJournalDao.JournalSequence(0, batchSize))
val firstBatch = (1L to 40L) ++ (51L to 110L)
daoProbe.reply(firstBatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class ReadJournalQueriesTest extends BaseQueryTest {
}

it should "create SQL query for eventsByTag" in withReadJournalQueries { queries =>
queries.eventsByTag(List(11), 23L, 2L, 3L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags" from "journal" where ("tags" @> ?) and (("ordering" > ?) and ("ordering" <= ?)) order by "ordering" limit ?"""
queries.eventsByTag(List(11), 23L, 25L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags" from "journal" where ("tags" @> ?) and (("ordering" > ?) and ("ordering" <= ?)) order by "ordering""""
}

it should "create SQL query for journalSequenceQuery" in withReadJournalQueries { queries =>
Expand Down

0 comments on commit 3d81a9e

Please sign in to comment.