Skip to content

Commit

Permalink
AWS V2 init
Browse files Browse the repository at this point in the history
  • Loading branch information
teroxik committed Feb 19, 2020
1 parent e6d8d72 commit 110ddf1
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 29 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ crossVersion := CrossVersion.binary
val akkaVersion = "2.5.25"
val amzVersion = "2.10.65"

scalacOptions += "-target:8"
scalacOptions += "-target:jvm-1.8"

libraryDependencies ++= Seq(
"software.amazon.awssdk" % "dynamodb" % amzVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ object AsyncDynamoDBJournalSpec {
| "java.io.Serializable" = test
| }
|}
""".stripMargin
).withFallback(ConfigFactory.load())
""".stripMargin).withFallback(ConfigFactory.load())

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,15 @@ class BackwardsCompatibilityV1Spec extends TestKit(ActorSystem("PartialAsyncSeri
"ChEIARINrO0ABXQABmEtMDAxNxARGg9PbGRGb3JtYXRFdmVudHNqJDI5NWZhMTE2LWZhOTUtNGY1Yi1hZjk2LTgwZDk4NjFhODk4ZA==",
"ChEIARINrO0ABXQABmEtMDAxOBASGg9PbGRGb3JtYXRFdmVudHNqJDI5NWZhMTE2LWZhOTUtNGY1Yi1hZjk2LTgwZDk4NjFhODk4ZA==",
"ChEIARINrO0ABXQABmEtMDAxORATGg9PbGRGb3JtYXRFdmVudHNqJDI5NWZhMTE2LWZhOTUtNGY1Yi1hZjk2LTgwZDk4NjFhODk4ZA==",
"ChEIARINrO0ABXQABmEtMDAyMBAUGg9PbGRGb3JtYXRFdmVudHNqJDI5NWZhMTE2LWZhOTUtNGY1Yi1hZjk2LTgwZDk4NjFhODk4ZA=="
)
"ChEIARINrO0ABXQABmEtMDAyMBAUGg9PbGRGb3JtYXRFdmVudHNqJDI5NWZhMTE2LWZhOTUtNGY1Yi1hZjk2LTgwZDk4NjFhODk4ZA==")

val table = dynamoDB.getTable(tableName)

def createItem(number: Int, data: String): Unit = {
table.putItem(
new Item()
.withPrimaryKey("par", persistenceId, "num", number)
.withBinary("pay", Base64.getDecoder.decode(data))
)
.withBinary("pay", Base64.getDecoder.decode(data)))
}

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class FailureReportingSpec extends TestKit(ActorSystem("FailureReportingSpec"))
.withFallback(ConfigFactory.load())
implicit val system = ActorSystem("FailureReportingSpec-test1", config)
try
EventFilter[ResourceNotFoundException](pattern = ".*ThisTableDoesNotExist.*", occurrences = 1).intercept {
EventFilter[ResourceNotFoundException](pattern = ".*ThisTableDoesNotExist.*", occurrences = 1).intercept {
Persistence(system).journalFor("")
}
finally system.terminate()
Expand All @@ -75,13 +75,13 @@ class FailureReportingSpec extends TestKit(ActorSystem("FailureReportingSpec"))
implicit val system = ActorSystem("FailureReportingSpec-test2", config)
try {
val journal =
EventFilter[ResourceNotFoundException](pattern = ".*ThisTableDoesNotExist.*", occurrences = 1).intercept {
EventFilter.error(pattern = ".*requests will fail.*ThisTableDoesNotExist.*", occurrences = 1).intercept {
EventFilter[ResourceNotFoundException](pattern = ".*ThisTableDoesNotExist.*", occurrences = 1).intercept {
EventFilter.error(pattern = ".*requests will fail.*ThisTableDoesNotExist.*", occurrences = 1).intercept {
Persistence(system).journalFor("")
}
}
EventFilter[ResourceNotFoundException](pattern = ".*BatchGetItemRequest.*", occurrences = 1).intercept {
EventFilter[DynamoDBJournalFailure](pattern = s".*failed.*read-highest-sequence-number.*$persistenceId.*", occurrences = 1).intercept {
EventFilter[ResourceNotFoundException](pattern = ".*BatchGetItemRequest.*", occurrences = 1).intercept {
EventFilter[DynamoDBJournalFailure](pattern = s".*failed.*read-highest-sequence-number.*$persistenceId.*", occurrences = 1).intercept {
journal ! ReplayMessages(0, Long.MaxValue, 0, persistenceId, testActor)
expectMsgType[ReplayMessagesFailure]
}
Expand All @@ -95,7 +95,7 @@ class FailureReportingSpec extends TestKit(ActorSystem("FailureReportingSpec"))
.withFallback(ConfigFactory.load())
implicit val system = ActorSystem("FailureReportingSpec-test3", config)
try
EventFilter.info(pattern = ".*protocol:https.*", occurrences = 1).intercept {
EventFilter.info(pattern = ".*protocol:https.*", occurrences = 1).intercept {
Persistence(system).journalFor("")
}
finally system.terminate()
Expand All @@ -116,7 +116,7 @@ akka.loggers = ["akka.testkit.TestEventListener"]
try {
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[Logging.LogEvent])
EventFilter[ResourceNotFoundException](pattern = ".*akka-persistence.*", occurrences = 1).intercept {
EventFilter[ResourceNotFoundException](pattern = ".*akka-persistence.*", occurrences = 1).intercept {
Persistence(system).journalFor("")
}
probe.expectMsgType[Logging.Error].message.toString should include("DescribeTableRequest(akka-persistence)")
Expand All @@ -132,21 +132,21 @@ akka.loggers = ["akka.testkit.TestEventListener"]
implicit val system = ActorSystem("FailureReportingSpec-test5", config)
try {
val journal =
EventFilter[ResourceNotFoundException](pattern = ".*ThisTableDoesNotExist.*", occurrences = 1).intercept {
EventFilter.error(pattern = ".*requests will fail.*ThisTableDoesNotExist.*", occurrences = 1).intercept {
EventFilter[ResourceNotFoundException](pattern = ".*ThisTableDoesNotExist.*", occurrences = 1).intercept {
EventFilter.error(pattern = ".*requests will fail.*ThisTableDoesNotExist.*", occurrences = 1).intercept {
Persistence(system).journalFor("")
}
}

val msgs = (1 to 3).map(i => persistentRepr(f"w-$i"))

EventFilter[ResourceNotFoundException](pattern = ".*ThisTableDoesNotExist.*", occurrences = 1).intercept {
EventFilter[ResourceNotFoundException](pattern = ".*ThisTableDoesNotExist.*", occurrences = 1).intercept {
journal ! WriteMessages(AtomicWrite(msgs(0)) :: Nil, testActor, 42)
expectMsgType[WriteMessagesFailed].cause shouldBe a[DynamoDBJournalFailure]
expectFailure[DynamoDBJournalFailure]("ThisTableDoesNotExist", msgs(0))
}

EventFilter[ResourceNotFoundException](pattern = ".*ThisTableDoesNotExist.*", occurrences = 1).intercept {
EventFilter[ResourceNotFoundException](pattern = ".*ThisTableDoesNotExist.*", occurrences = 1).intercept {
journal ! WriteMessages(AtomicWrite(msgs(1)) :: AtomicWrite(msgs(2)) :: Nil, testActor, 42)
expectMsgType[WriteMessagesFailed].cause shouldBe a[DynamoDBJournalFailure]
expectFailure[DynamoDBJournalFailure]("ThisTableDoesNotExist", msgs(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ object PartialAsyncSerializationSpec {
| "akka.persistence.dynamodb.journal.SerializeAsync" = test
| }
|}
""".stripMargin
).withFallback(ConfigFactory.load())
""".stripMargin).withFallback(ConfigFactory.load())
}

class PartialAsyncSerializationSpec extends TestKit(ActorSystem("PartialAsyncSerializationSpec", PartialAsyncSerializationSpec.config))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,15 @@ trait DynamoDBUtils {
.withAttributeDefinitions(
new AttributeDefinition().withAttributeName(Key).withAttributeType("S"),
new AttributeDefinition().withAttributeName(SequenceNr).withAttributeType("N"),
new AttributeDefinition().withAttributeName(Timestamp).withAttributeType("N")
)
new AttributeDefinition().withAttributeName(Timestamp).withAttributeType("N"))
.withKeySchema(
new KeySchemaElement().withAttributeName(Key).withKeyType(KeyType.HASH),
new KeySchemaElement().withAttributeName(SequenceNr).withKeyType(KeyType.RANGE)
)
new KeySchemaElement().withAttributeName(SequenceNr).withKeyType(KeyType.RANGE))
.withLocalSecondaryIndexes(
new LocalSecondaryIndex()
.withIndexName(TimestampIndex).withKeySchema(
new KeySchemaElement().withAttributeName(Key).withKeyType(KeyType.HASH),
new KeySchemaElement().withAttributeName(Timestamp).withKeyType(KeyType.RANGE)
).withProjection(new Projection().withProjectionType(ProjectionType.ALL))
)
new KeySchemaElement().withAttributeName(Timestamp).withKeyType(KeyType.RANGE)).withProjection(new Projection().withProjectionType(ProjectionType.ALL)))

def ensureSnapshotTableExists(read: Long = 10L, write: Long = 10L): Unit = {
val create = schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
import scala.collection.immutable.Seq

class SnapshotStoreTckSpec extends SnapshotStoreSpec(
ConfigFactory.load()
) with DynamoDBUtils {
ConfigFactory.load()) with DynamoDBUtils {
override def beforeAll(): Unit = {
super.beforeAll()
ensureSnapshotTableExists()
Expand Down

0 comments on commit 110ddf1

Please sign in to comment.