From 2f736c228751f58d826e8af22ba32a2d71f8557e Mon Sep 17 00:00:00 2001 From: Levi Ramsey Date: Wed, 30 Oct 2024 11:10:05 -0400 Subject: [PATCH] More idiomatic materializer --- .../projection/dynamodb/internal/DynamoDBOffsetStore.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index fc91051bd..5198ef49a 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -249,6 +249,7 @@ private[projection] class DynamoDBOffsetStore( } private def readTimestampOffset(): Future[TimestampOffsetBySlice] = { + implicit val sys = system // for implicit stream materializer val oldState = state.get() // retrieve latest timestamp for each slice, and use the earliest val offsetBySliceFut = @@ -263,7 +264,7 @@ private[projection] class DynamoDBOffsetStore( .mapConcat(identity) .runWith(Sink.fold(Map.empty[Int, TimestampOffset]) { (offsetMap, sliceAndOffset: (Int, TimestampOffset)) => offsetMap + sliceAndOffset - })(matFromSystem(system)) + }) offsetBySliceFut.map { offsetBySlice => val newState = State(offsetBySlice)