A MongoDB plugin for akka-persistence
- Three projects, a core and two driver implementations. To use you must pull jars for only one of the drivers. Common will be pulled in as a transitive dependency:
- common provides integration with Akka persistence, implementing the plugin API
- casbah provides an implementation against the casbah driver
- rxmongo provides an implementation against the ReactiveMongo driver
- The tests will automatically download mongodb via flapdoodle's embedded mongo utility, do not be alarmed :)
- Supports MongoDB major versions 2.6 and 3.0
- Cross compiled to
2.11
and2.10
series. - Be aware that there is a maximum
16MB
size limit for snapshots and journal entries
- Tracked in issue log
- Fixes issue #34 with sporadic failure to authenticate w/ ReactiveMongo driver. Thanks @krchniam for the PR!
- Re-enables authentication test with ReactiveMongo driver
- Fixes issue #32 with handling of legacy configuration format
- Please note, 0.4.x requires a minimum version of 0.11 if using the reactive mongo driver
- Support for
ErrorsIgnored
write concern has been dropped - Now supporting mongodb version 3.0, dropping support for version 2.4
- The Casbah driver fully supports 3.0, including
SCRAM-SHA1
authentication - The RxMongo driver (0.11) does not fully support 3.0:
SCRAM-SHA1
is not supported yet, please refer to this Mongo ticket for details on a workaround- Use without authentication seems to be solid
- The Casbah driver fully supports 3.0, including
- Now supporting URI-based connection, instead of
urls
+db
+ (optional)user
andpassword
. Details below- The legacy configuration is still supported for now
- Built against akka
2.3.11
, cross-compiled to scala2.11.7
and2.10.5
- This will likely be one of the last releases supporting akka
2.3.x
- Make sure deletes block until complete to be compatible with akka snapshot api, addresses occasional test failure
- Remove noisy print statement
- Migrated to build.sbt
- Direct passing of bson documents into journal/snapshotter (#16), much thanks to @alari for the PR.
- Closes issue #18, thanks for the report and fixing pull request @marcuslinke!
- If replay size exceeded
Int.MaxValue
, conversion was causing negative limits passed to rxmongo which are transformed to 101, much smaller than 4 billion.
- If replay size exceeded
- D'oh. Skipped this one.
- Fixes #17, configuration no longer eagerly loads (this was providing no additional value)
- Snapshots no longer are forced to use Java serialization
SelectedSnapshot
is not serialized directly- Payload is now wrapped in a
Snapshot
object - Fixes #15
- Fix for auth test that fails irregularly
- Reactive Mongo driver now supported as an alternative to Casbah
- Requires minimum of
0.10.5.0.akka23
of driver - Measurably (factor of 2ish) less performant than Casbah in limited load testing - not sure what to make of this yet, possibly different dispatcher tuning is necessary
- Closes issue #7
- Requires minimum of
- It is important that you use only one driver sub-project at a time. Using both in the same project is unsupported.
- Compiled against Akka 2.3.8 and updated to latest of TCK
- Now supports configuration of
wtimeout
andfsync
for Journal and Snapshots write concerns
- Ignore sequence# hint during journaling max sequence query - in some cases it's quite wrong; Closes issue #14
- Verify against Mongo 2.6 - add support for both 2.6 and 2.4; Closes issue #13
- DRY out usage of circuit breakers in preparation for RxMongo driver; Closes issue #6
- Close out connection pool with actor system shutdown; should fix leaking connections for use case of reusing single JVM with multiple
ActorSystem
s; Fixes issue #12
- Add support for authentication against Mongo (currently MONGO CR supported) - Issue #10
- Compile against Akka 2.3.4, which is binary incompatible to 2.3.3 for akka-persistence module
- Update to support 0.3.4 of TCK - fixing issue #8 indirectly
- Cross compile to 2.11 (Issue #9)
- Update to 2.11 compatible versions of libraries (scalatest, casbah); Mark akka dependencies
provided
- Update to support 0.3.1 of TCK, which supports Scala 2.11
- Eliminate publish message from root project
- Remove build and publish of rxmongo project for now
- Use batch mode for inserts to journal for very significant performance gain
- Currently batch-mode inserts are not atomic, and so there is an opportunity for a failure to leave the journal in a partially completed state; Looking to address this with Issue #5
- Update to support 0.3 of TCK, which added snapshot coverage
- Fix metrics bug by which all timers were shunted to the JE timer
- Fixed goofy name for extension - now is
MongoPersistenceExtension
, matching the metrics docs below
- A
what's new
section in the README :) - Metrics Scala introduced on casbah journal ... details
Version 0.4.2
is tracking Akka 2.3.12
as a provided
dependency and passing the TCK now delivered with Akka
Driver dependencies are also provided
, meaning they must be included in the application project's dependencies.
libraryDependencies +="com.github.scullxbones" %% "akka-persistence-mongo-casbah" % "0.4.2"
libraryDependencies +="com.github.scullxbones" %% "akka-persistence-mongo-rxmongo" % "0.4.2"
<dependency>
<groupId>com.github.scullxbones</groupId>
<artifactId>akka-persistence-mongo-casbah_2.10</artifactId>
<version>0.4.2</version>
</dependency>
<dependency>
<groupId>com.github.scullxbones</groupId>
<artifactId>akka-persistence-mongo-rxmongo_2.10</artifactId>
<version>0.4.2</version>
</dependency>
compile 'com.github.scullxbones:akka-persistence-mongo-casbah_2.10:0.4.2'
compile 'com.github.scullxbones:akka-persistence-mongo-rxmongo_2.10:0.4.2'
Inside of your application.conf
file, add the following line if you want to use the journal:
akka.persistence.journal.plugin = "akka-contrib-mongodb-persistence-journal"
Add the following line if you want to use the snapshotting functionality:
akka.persistence.snapshot-store.plugin = "akka-contrib-mongodb-persistence-snapshot"
The defaults are fairly sane, but some common ones to change may be the location of the mongodb server. These changes can be made in the application.conf
file.
The only thing you have to provide is the driver implementation:
akka.contrib.persistence.mongodb.mongo.driver = "akka.contrib.persistence.mongodb.RxMongoPersistenceExtension" // or CasbahPersistenceExtension
A mongo uri can be specified. This must meet the requirements of Mongo's uri format.
For example mongodb://user:secret@localhost:27017/my-akka-persistence
. If the database name
is unspecified, it will be defaulted to akka-persistence
.
akka.contrib.persistence.mongodb.mongo.mongouri = "mongodb://user:password@192.168.0.1:27017,192.168.0.2:27017/replicated-database"
If the mongouri
is used, the legacy fields (urls
,db
,username
,password
) will be ignored.
A list of urls can be supplied. These are specified as host:port. For example localhost:27017
.
If these are used, the db should also be specified.
akka.contrib.persistence.mongodb.mongo.urls = ["192.168.0.1:27017","192.168.0.2:27017"]
akka.contrib.persistence.mongodb.mongo.db = "my_akka_persistence"
If authentication credentials are required, both username and password can be specified.
akka.contrib.persistence.mongodb.mongo.username = "my.mongo.user"
akka.contrib.persistence.mongodb.mongo.password = "secret"
A DB name can be specified, as can the names of the collections and indices used (one for journal, one for snapshots).
akka.contrib.persistence.mongodb.mongo.journal-collection = "my_persistent_journal"
akka.contrib.persistence.mongodb.mongo.journal-index = "my_journal_index"
akka.contrib.persistence.mongodb.mongo.snaps-collection = "my_persistent_snapshots"
akka.contrib.persistence.mongodb.mongo.snaps-index = "my_snaps_index"
akka.contrib.persistence.mongodb.mongo.journal-write-concern = "Acknowledged"
This is well described in the MongoDB Write Concern Documentation
The write concern can be set both for the journal plugin as well as the snapshot plugin. Every level of write concern is supported. The possible concerns are listed below in decreasing safety:
ReplicaAcknowledged
- requires a replica to acknowledge the write, this confirms that at least two servers have seen the writeJournaled
- requires that the change be journaled on the server that was written to. Other replicas may not see this write on a network partitionAcknowledged
- also known as "Safe", requires that the MongoDB server acknowledges the write. This does not require that the change be persistent anywhere but memory.Unacknowledged
- does not require the MongoDB server to acknowledge the write. It may raise an error in cases of network issues. This was the default setting that MongoDB caught a lot of flak for, as it masked errors in exchange for straight-line speed. This is no longer a default in the driver.ErrorsIgnored
- !WARNING! Extremely unsafe. This level may not be able to detect if the MongoDB server is even running. It will also not detect errors such as key collisions. This makes data loss likely. In general, don't use this.- Errors ignored is no longer supported as a write concern.
It is a bad idea™ to use anything less safe than Acknowledged
on the journal. The snapshots can be played a bit more fast and loose, but the recommendation is not to go below Acknowledged
for any serious work.
As a single data point (complete with grain of salt!) on a MBP i5 w/ SSD with Kodemaniak's testbed and mongodb running on the same physical machine, the performance difference between:
Journaled
andAcknowledged
write concerns is two orders of magnitudeAcknowledged
and bothUnacknowledged
andErrorsIgnored
write concerns is one order of magnitude
Journaled
is a significant trade off of straight line performance for safety, and should be benchmarked vs. Acknowledged
for your specific use case and treated as an engineering cost-benefit decision. The argument for the unsafe pair of Unacknowledged
and ErrorsIgnored
vs. Acknowledged
seems to be much weaker, although the functionality is left for the user to apply supersonic lead projectiles to their phalanges as necessary :).
In addition to the mode of write concern, the wtimeout
and fsync
parameters may be configured seperately for the journal and snapshot. FSync cannot be used with Journaling, so it will be disabled in that case.
Default values below:
akka.contrib.persistence.mongodb.mongo.journal-wtimeout = 3s
akka.contrib.persistence.mongodb.mongo.journal-fsync = false
akka.contrib.persistence.mongodb.mongo.snaps-wtimeout = 3s
akka.contrib.persistence.mongodb.mongo.snaps-fsync = false
By default the circuit breaker is set up with a maxTries
of 5, and callTimeout
and resetTimeout
of 5s. Akka's circuit breaker documentation covers in detail what these settings are used for. In the context of this plugin, you can set these in the following way:
akka.contrib.persistence.mongodb.mongo.breaker.maxTries = 3
akka.contrib.persistence.mongodb.mongo.breaker.timeout.call = 3s
akka.contrib.persistence.mongodb.mongo.breaker.timeout.reset = 10s
The name akka-contrib-persistence-dispatcher
is mapped to a typically configured ThreadPoolExecutor
based dispatcher. This is needed to support the Future
s used to interact with MongoDB via Casbah. More details on these settings can be found in the Akka Dispatcher documentation. For example the (by core-scaled) pool sizes can be set:
akka-contrib-persistence-dispatcher.thread-pool-executor.core-pool-size-min = 10
akka-contrib-persistence-dispatcher.thread-pool-executor.core-pool-size-factor = 10
akka-contrib-persistence-dispatcher.thread-pool-executor.core-pool-size-max = 20
akka {
contrib {
persistence {
mongodb {
mongo {
urls = [ "localhost:27017" ]
db = "akka-persistence"
journal-collection = "akka_persistence_journal"
journal-index = "akka_persistence_journal_index"
# Write concerns are one of: Unacknowledged, Acknowledged, Journaled, ReplicaAcknowledged
journal-write-concern = "Journaled"
journal-wtimeout = 3s
journal-fsync = false
snaps-collection = "akka_persistsence_snaps"
snaps-index = "akka_persistence_snaps_index"
snaps-write-concern = "Journaled"
snaps-wtimeout = 3s
snaps-fsync = false
breaker {
maxTries = 5
timeout {
call = 5s
reset = 5s
}
}
}
}
}
}
}
akka-contrib-persistence-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "thread-pool-executor"
# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads to cap factor-based core number to
core-pool-size-min = 2
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 2.0
# maximum number of threads to cap factor-based number to
core-pool-size-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
If you need to see contents of your events directly in database in non-binary form, you can call persist()
with DBObject
(using casbah driver) or BSONDocument
(using reactivemongo).
case class Command(value: String)
case class SampleState(counter: Int, lastValue: Option[String])
class SampleActor extends PersistentActor {
var state = SampleState(0,None)
def updateState(event: DBObject): Unit = {
state = state.copy(counter = state.counter + 1, lastValue = event.getAs[String]("value"))
}
val receiveCommand: Receive = {
case Command(value) =>
persist(DBObject("value" -> value))(updateState)
}
// receiveRecover implementation, etc rest of class
}
During replay, events will be sent to your actor as-is. It is the application's duty to handle BSON (de)serialization in this case.
This functionality is also exposed for snapshots.
Depends on the excellent Metrics-Scala library which in turn stands on the shoulders of codahale's excellent Metrics library.
For this implementation, no assumptions are made about how the results are reported. Unfortunately this means you need to inject your own reporters. This will require you to refer to the extension in your own code, e.g.:
object MyApplication {
val actorSystem = ActorSystem("my-application")
val registry = MongoPersistenceExtension(actorSystem).registry
val jmxReporter = JmxReporter.forRegistry(registry).build()
jmxReporter.start()
}
Timers:
- Single journal entry
- Journal entry range
- Journal append
- Journal range delete
- Journal multi delete
- Journal confirms
- Journal replays
- Journal max sequence number for processor query
Meters:
- Rate of usage of
permanent
flag in deletes
Histograms:
- Batch sizes used for: range read, appends, deletes, confirms
- Adding metrics to snapshotter
- Adding health checks to both