From 7d2016fce0b75799238b455f3ac4be3aab4d918d Mon Sep 17 00:00:00 2001 From: Marcin Kubala Date: Tue, 20 Oct 2020 16:45:18 +0200 Subject: [PATCH 1/2] Cover the eventsByTag offset bugfix with a test case --- .../query/scaladsl/PostgresReadJournal.scala | 5 +- .../postgres/query/EventsByTagTest.scala | 78 ++++++++++++------- 2 files changed, 53 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/akka/persistence/postgres/query/scaladsl/PostgresReadJournal.scala b/core/src/main/scala/akka/persistence/postgres/query/scaladsl/PostgresReadJournal.scala index fb8afab3..56bbd7f9 100644 --- a/core/src/main/scala/akka/persistence/postgres/query/scaladsl/PostgresReadJournal.scala +++ b/core/src/main/scala/akka/persistence/postgres/query/scaladsl/PostgresReadJournal.scala @@ -269,8 +269,9 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste * event to be persisted in the journal. */ from case (Nil, maxOrdering) if maxOrdering < from => - /* In case of either `maxOrdering` is staled or journal didn't reach the offset - we should wait - * for either maxOrdering to be discovered or journal to reach the requested offset */ + /* In case of either `maxOrdering` is not yet discovered or journal didn't reach the offset - we should + * wait for either maximum ordering value to be eventually discovered or journal to reach + * the requested offset */ from case (Nil, maxOrdering) => /* If no events matched the tag between `from` and `to` (`from + batchSize`) and `maxOrdering` then diff --git a/core/src/test/scala/akka/persistence/postgres/query/EventsByTagTest.scala b/core/src/test/scala/akka/persistence/postgres/query/EventsByTagTest.scala index faefdcfb..9a22cee1 100644 --- a/core/src/test/scala/akka/persistence/postgres/query/EventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/postgres/query/EventsByTagTest.scala @@ -120,31 +120,6 @@ abstract class EventsByTagTest(val schemaType: SchemaType) } } - it should "find all events by tag starting from an offset" in withActorSystem { implicit system => - val testJournalSize = 200 - val journalOps = new ScalaPostgresReadJournalOperations(system) - withTestActors(replyToMessages = true) { (actor1, _, _) => - for { - n <- (1 to testJournalSize).inclusive - } (actor1 ? withTags(n, "number")).futureValue - - journalOps.withEventsByTag()("number", Sequence(testJournalSize / 2)) { tp => - tp.request(Int.MaxValue) - for { - n <- ((testJournalSize / 2) + 1 to testJournalSize).inclusive - } tp.expectNext(EventEnvelope(Sequence(n), "my-1", n, n)) - - tp.cancel() - } - - journalOps.withEventsByTag()("number", Sequence(testJournalSize)) { tp => - tp.request(Int.MaxValue) - tp.expectNoMessage(NoMsgTime) - tp.cancel() - } - } - } - it should "deliver EventEnvelopes non-zero timestamps" in withActorSystem { implicit system => val journalOps = new ScalaPostgresReadJournalOperations(system) @@ -276,6 +251,53 @@ abstract class EventsByTagTest(val schemaType: SchemaType) } } + it should "not find any events by tag from an offset == maximum ordering" in withActorSystem { implicit system => + val journalOps = new ScalaPostgresReadJournalOperations(system) + withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => + (actor1 ? withTags(1, "number")).futureValue + (actor2 ? withTags(2, "number")).futureValue + (actor3 ? withTags(3, "number")).futureValue + + eventually { + journalOps.countJournal.futureValue shouldBe 3 + } + + journalOps.withEventsByTag()("number", Sequence(3)) { tp => + tp.request(Int.MaxValue) + tp.expectNoMessage(1.second) + + actor1 ? withTags(1, "number") + tp.expectNext(EventEnvelope(Sequence(4), "my-1", 2, 1)) + tp.expectNoMessage(1.second) + + tp.cancel() + } + } + } + + it should "not find any events by tag from an offset > maximum ordering" in withActorSystem { implicit system => + val journalOps = new ScalaPostgresReadJournalOperations(system) + withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => + (actor1 ? withTags(1, "number")).futureValue + (actor2 ? withTags(2, "number")).futureValue + (actor3 ? withTags(3, "number")).futureValue + + eventually { + journalOps.countJournal.futureValue shouldBe 3 + } + + journalOps.withEventsByTag()("number", Sequence(500)) { tp => + tp.request(Int.MaxValue) + tp.expectNoMessage(1.second) + + actor1 ? withTags(1, "number") + tp.expectNoMessage(1.second) + + tp.cancel() + } + } + } + it should "persist and find tagged event for one tag" in withActorSystem { implicit system => val journalOps = new JavaDslPostgresReadJournalOperations(system) withTestActors() { (actor1, actor2, actor3) => @@ -442,8 +464,8 @@ abstract class EventsByTagTest(val schemaType: SchemaType) } } -class NestedPartitionsScalaEventsByTagTest extends EventsByTagTest(NestedPartitions) //with BaseDbCleaner +class NestedPartitionsScalaEventsByTagTest extends EventsByTagTest(NestedPartitions) -class PartitionedScalaEventsByTagTest extends EventsByTagTest(Partitioned) //with BaseDbCleaner +class PartitionedScalaEventsByTagTest extends EventsByTagTest(Partitioned) -class PlainScalaEventsByTagTest extends EventsByTagTest(Plain) //with BaseDbCleaner +class PlainScalaEventsByTagTest extends EventsByTagTest(Plain) From b4691ecd030356c05646791384583f8c8e6a072f Mon Sep 17 00:00:00 2001 From: Marcin Kubala Date: Tue, 20 Oct 2020 16:51:38 +0200 Subject: [PATCH 2/2] Better names for the test cases --- .../akka/persistence/postgres/query/EventsByTagTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/akka/persistence/postgres/query/EventsByTagTest.scala b/core/src/test/scala/akka/persistence/postgres/query/EventsByTagTest.scala index 9a22cee1..fa9034e5 100644 --- a/core/src/test/scala/akka/persistence/postgres/query/EventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/postgres/query/EventsByTagTest.scala @@ -251,7 +251,7 @@ abstract class EventsByTagTest(val schemaType: SchemaType) } } - it should "not find any events by tag from an offset == maximum ordering" in withActorSystem { implicit system => + it should "find newly stored event only when offset == maximum ordering" in withActorSystem { implicit system => val journalOps = new ScalaPostgresReadJournalOperations(system) withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => (actor1 ? withTags(1, "number")).futureValue @@ -275,7 +275,7 @@ abstract class EventsByTagTest(val schemaType: SchemaType) } } - it should "not find any events by tag from an offset > maximum ordering" in withActorSystem { implicit system => + it should "not find any events when offset >> maximum ordering" in withActorSystem { implicit system => val journalOps = new ScalaPostgresReadJournalOperations(system) withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => (actor1 ? withTags(1, "number")).futureValue