Skip to content

Commit

Permalink
Merge pull request #147 from SwissBorg/import-upstream-changes
Browse files Browse the repository at this point in the history
Import upstream changes
  • Loading branch information
fredfp authored Feb 24, 2021
2 parents a4565cc + cd48ce7 commit 621d3c9
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
val readJournalConfig = new ReadJournalConfig(config)

private val writePluginId = config.getString("write-plugin")
private val eventAdapters = Persistence(system).adaptersFor(writePluginId)
// If 'config' is empty, or if the plugin reference is not found, then the write plugin will be resolved from the
// ActorSystem configuration. Otherwise, it will be resolved from the provided 'config'.
private val eventAdapters = Persistence(system).adaptersFor(writePluginId, config)

val readJournalDao: ReadJournalDao = {
val slickDb = SlickExtension(system).database(config)
Expand Down Expand Up @@ -294,10 +296,13 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
.mapConcat(identity)
}

def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] =
Source.future(readJournalDao.maxJournalSequence()).flatMapConcat { maxOrderingInDb =>
eventsByTag(tag, offset, terminateAfterOffset = Some(maxOrderingInDb))
}
def currentEventsByTag(tag: String, offset: Long): Source[EventEnvelope, NotUsed] = {
Source
.futureSource(readJournalDao.maxJournalSequence().map { maxOrderingInDb =>
eventsByTag(tag, offset, terminateAfterOffset = Some(maxOrderingInDb))
})
.mapMaterializedValue(_ => NotUsed)
}

/**
* Query events that have a specific tag.
Expand Down
2 changes: 1 addition & 1 deletion project/ProjectAutoPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object ProjectAutoPlugin extends AutoPlugin {
override val projectSettings: Seq[Setting[_]] = Seq(
crossVersion := CrossVersion.binary,
crossScalaVersions := Dependencies.ScalaVersions,
scalaVersion := Dependencies.Scala212,
scalaVersion := Dependencies.Scala213,
Test / fork := true,
Test / parallelExecution := false,
Test / logBuffered := true,
Expand Down

0 comments on commit 621d3c9

Please sign in to comment.