Skip to content

Commit

Permalink
Merge pull request #105 from SwissBorg/eliminate-caffeine-warnings
Browse files Browse the repository at this point in the history
Eliminate nasty warnings being logged by CachedTagIdResolver
  • Loading branch information
mkubala authored Oct 15, 2020
2 parents 6e6905a + e0a788c commit 7525310
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package akka.persistence.postgres.tag
import akka.persistence.postgres.config.TagsConfig
import com.github.blemale.scaffeine.{ AsyncLoadingCache, Scaffeine }

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Success

trait TagIdResolver {
def getOrAssignIdsFor(tags: Set[String]): Future[Map[String, Int]]
Expand All @@ -31,7 +31,14 @@ class CachedTagIdResolver(dao: TagDao, config: TagsConfig)(implicit ctx: Executi
cache.getAll(tags)

override def lookupIdFor(tagName: String): Future[Option[Int]] =
cache.getFuture(tagName, dao.find(_).map(_.get)).map(Some(_)).recover {
case _: NoSuchElementException => None
Future.sequence(cache.getIfPresent(tagName).toList).map(_.headOption).flatMap {
case Some(tagId) => Future.successful(Some(tagId))
case _ =>
val findRes = dao.find(tagName)
findRes.onComplete {
case Success(Some(tagId)) => cache.put(tagName, Future.successful(tagId))
case _ => // do nothing
}
findRes
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package akka.persistence.postgres.tag

import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }

import akka.persistence.postgres.config.TagsConfig
import com.typesafe.config.ConfigFactory
Expand Down Expand Up @@ -62,6 +62,34 @@ class CachedTagIdResolverSpec
returnedTagIds should contain theSameElementsAs Map(fakeTagName -> fakeTagId)
}

"assign ids if they do not exist" in {
// given
val (existingTagName, existingTagId) = ("existing-tag", 1)
val (tagName, tagId) = ("tag", 2)
val (anotherTagName, anotherTagId) = ("another-tag", 3)

val dao = new FakeTagDao(findF = {
case n if n == existingTagName => Future.successful(Some(existingTagId))
case _ => Future.successful(None)
}, insertF = name => {
if (name == tagName) Future.successful(tagId)
else if (name == anotherTagName) Future.successful(anotherTagId)
else
fail(
s"Unwanted interaction with DAO (insert) for tagName = '$name' ($tagName, $anotherTagName, $existingTagName)")
})
val resolver = new CachedTagIdResolver(dao, config)

// when
val returnedTagIds = resolver.getOrAssignIdsFor(Set(tagName, anotherTagName, existingTagName)).futureValue

// then
returnedTagIds should contain theSameElementsAs Map(
tagName -> tagId,
anotherTagName -> anotherTagId,
existingTagName -> existingTagId)
}

"hit the DAO only once and then read from cache" in {
// given
val fakeTagName = generateTagName()
Expand Down Expand Up @@ -108,7 +136,7 @@ class CachedTagIdResolverSpec
findF = _ => fail("Unwanted interaction with DAO (find)"),
insertF = _ => fail("Unwanted interaction with DAO (insert)"))
val resolver = new CachedTagIdResolver(dao, config)
resolver.cache.put(fakeTagName, Future.successful(fakeTagId))
resolver.cache.synchronous().put(fakeTagName, fakeTagId)

// when
val returnedTagIds = resolver.getOrAssignIdsFor(Set(fakeTagName)).futureValue
Expand Down Expand Up @@ -163,7 +191,7 @@ class CachedTagIdResolverSpec
findF = _ => fail("Unwanted interaction with DAO (find)"),
insertF = _ => fail("Unwanted interaction with DAO (insert)"))
val resolver = new CachedTagIdResolver(dao, config)
resolver.cache.put(fakeTagName, Future.successful(fakeTagId))
resolver.cache.synchronous().put(fakeTagName, fakeTagId)

// when
val returnedTagId = resolver.lookupIdFor(fakeTagName).futureValue
Expand All @@ -187,6 +215,30 @@ class CachedTagIdResolverSpec
returnedTagId should not be defined
}

"eventually discover newly inserted tag id mapping and update cache" in {
// given
val fakeTagName = generateTagName()
val fakeTagId = Random.nextInt()

val lookupMissHappened = new AtomicBoolean(false)

val dao = new FakeTagDao(
findF = name =>
if (name == fakeTagName && lookupMissHappened.getAndSet(true))
Future.successful(Some(fakeTagId))
else Future.successful(None),
insertF = _ => fail("Unwanted interaction with DAO (insert)"))

val resolver = new CachedTagIdResolver(dao, config)

// when
resolver.cache.synchronous().getIfPresent(fakeTagName) should not be defined
resolver.lookupIdFor(fakeTagName).futureValue should not be defined
resolver.cache.synchronous().getIfPresent(fakeTagName) should not be defined
resolver.lookupIdFor(fakeTagName).futureValue.value should equal(fakeTagId)
resolver.cache.synchronous().getIfPresent(fakeTagName).value should equal(fakeTagId)
}

"update cache" in {
// given
val fakeTagName = generateTagName()
Expand All @@ -198,8 +250,9 @@ class CachedTagIdResolverSpec

// then
resolver.cache.synchronous().getIfPresent(fakeTagName) should not be defined
resolver.lookupIdFor(fakeTagName).futureValue
resolver.lookupIdFor(fakeTagName).futureValue.value should equal(fakeTagId)
resolver.cache.synchronous().getIfPresent(fakeTagName).value should equal(fakeTagId)
resolver.lookupIdFor(fakeTagName).futureValue.value should equal(fakeTagId)
}

"not update cache" in {
Expand All @@ -212,8 +265,9 @@ class CachedTagIdResolverSpec

// then
resolver.cache.synchronous().getIfPresent(fakeTagName) should not be defined
resolver.lookupIdFor(fakeTagName).futureValue
resolver.lookupIdFor(fakeTagName).futureValue should not be defined
resolver.cache.synchronous().getIfPresent(fakeTagName) should not be defined
resolver.lookupIdFor(fakeTagName).futureValue should not be defined
}
}
}
Expand Down

0 comments on commit 7525310

Please sign in to comment.