diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index fc91051bd..5198ef49a 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -249,6 +249,7 @@ private[projection] class DynamoDBOffsetStore( } private def readTimestampOffset(): Future[TimestampOffsetBySlice] = { + implicit val sys = system // for implicit stream materializer val oldState = state.get() // retrieve latest timestamp for each slice, and use the earliest val offsetBySliceFut = @@ -263,7 +264,7 @@ private[projection] class DynamoDBOffsetStore( .mapConcat(identity) .runWith(Sink.fold(Map.empty[Int, TimestampOffset]) { (offsetMap, sliceAndOffset: (Int, TimestampOffset)) => offsetMap + sliceAndOffset - })(matFromSystem(system)) + }) offsetBySliceFut.map { offsetBySlice => val newState = State(offsetBySlice)