Functional Scala Caching
Do one thing and do it well micro birds library series
libraryDependencies += "us.oyanglul" %% "jujiu" % version
There are only two simple steps to use cache:
- Initiate a protocol-agnostic Cache DSL, which means the DSL only aware of what to operate, not how to
- using the DSL syntax
fetchF
,parFetchAllF
etc to describe how to use the Cache in the program given
a instance ofCache[Key, Val]
, in the example we created a Caffeine instance, which tells how exactly how to actually do the cache.class JujiuScala3Spec extends Specification: given ContextShift[IO] = IO.contextShift(ExecutionContext.global) "works with IO" >> { "normal cache" >> { val dsl: Cache[IO, cache.Cache, String, String] = new CaffeineCache[IO, String, String]{} def program(using cache.Cache[String, String]) = for _ <- IO(println("something")) _ <- dsl.putF("key1", "value1") r1 <- dsl.fetchF("key1") r2 <- dsl.fetchF("key2", _ => IO("value2")) r3 <- dsl.fetchAllF(List("key1", "key2")) r4 <- dsl.parFetchAllF[List, IO.Par](List("key1", "key2")) _ <- dsl.clearF("key1") yield (r1, r2, r3, r4) given cache.Cache[String, String] = Caffeine().sync[String, String] program.unsafeRunSync() must_== ( ( Some("value1"), "value2", List(Some("value1"), Some("value2")), List(Some("value1"), Some("value2")) ) ) } } end JujiuScala3Spec
"it should able to get and set cache" >> {
object cacheDsl extends CaffeineCache[IO, String, String] // <- (ref:dsl)
val program = for {
r1 <- cacheDsl.fetch("not exist yet") // <- (ref:fetch)
r2 <- cacheDsl.fetch("not exist yet", _ => IO("default")) // <- (ref:fetchOr)
_ <- cacheDsl.put("not exist yet", "now exist") // <- (ref:put)
r3 <- cacheDsl.fetch("not exist yet")
_ <- cacheDsl.clear("not exist yet")
r4 <- cacheDsl.fetch("not exist yet")
} yield (r1, r2, r3, r4)
program(Caffeine().sync) // <- (ref:run)
.unsafeRunSync() must_== ((None, "default", Some("now exist"), None))
}
This README is a literal programming file, all code here will generate the test file
I can walk you through line by line though
- line-(dsl) creates an instance of
CaffeineCache
which has side effectIO
, key isString
and value isString
as well - line-(fetch) won’t acutally trigger any effect, it just returns a
DSL, represent as type
Klesili[IO, Cache, String]
which in English, “give me aCache
and I can provide you anIO[String]
” - line-(fetchOr) is new
fetch
DSL, the second parameter is a functionK => IO[V]
, if cache not exist, it will run the function can put the result into the cache, and return the value - line-(put) will update the value of key “not exist yet” to “overrided”
- line-(run) is the Scala idiomatic syntax to build synchronize
Caffeine Cache
if you still recall that theprogram
is actuallyKlesili[IO, Cache, String]
so now\ I provide it aCache
byprogram(Caffeine().sync)
\ it shall return me aIO[String]
.unsafeRunSync()
the IO and all effects you described before inprogram
will be triggered\ and you will get the actual result
Async
) as well
what you’ll need to import some syntax
and provide cacheProvider
implicitly, since you are not using Kleisli, you need to tell what cache
these DSLs will run on
"loading cache" >> {
val c: LoadingCache[IO, cache.LoadingCache, String, String] = new CaffeineLoadingCache[IO, String, String] {}
implicit val cacheProvider: cache.LoadingCache[String, String] = Caffeine().sync(identity)
def program =
for {
_ <- IO(println("something"))
r1 <- c.fetchF("1")
r2 <- c.fetchAllF(List("2", "3"))
r3 <- c.parFetchAllF[List, IO.Par](List("4", "5"))
} yield (r1, r2, r3)
program.unsafeRunSync() must_== (("1", List("2", "3"), List("4", "5")))
}
}
similar to
ExecutionContext
, you need to provide context the thread can run on
and all dsl suffix with F
Dealing with Java DSL and Java Future is too verbose and painful in Scala project
Let’s see how Jiujiu makes Caffeine friendly to Cats IO as well
A good example is the Async Loading Cache
First you will need caffeine builder syntax
import us.oyanglul.jujiu.syntax.caffeine._
"it should able to get and set async loading cache" >> {
object cache extends CaffeineAsyncLoadingCache[IO, Integer, String] {
implicit val executionContext = global // <-- (ref:executionContext)
}
val program = for {
r1 <- cache.fetch(1)
r2 <- cache.fetch(2)
r3 <- cache.fetchAll(List[Integer](1, 2, 3))
} yield (r1, r2, r3)
val caffeineA: com.github.benmanes.caffeine.cache.AsyncLoadingCache[Integer, String] = Caffeine()
.executionContext(global) // <-- (ref:global)
.withExpire( // <-- (ref:expire)
(_: Integer, _: String) => 1.second,
(_: Integer, _: String, currentDuration: FiniteDuration) => currentDuration,
(_: Integer, _: String, currentDuration: FiniteDuration) => currentDuration
)
.async((key: Integer) => IO("async string" + key)) // <-- (ref:async)
val caffeineB = Caffeine()
.withExpireAfterAccess(1.second)
.withExpireAfterWrite(2.seconds)
.withRefreshAfterWrite(3.seconds)
.async((key: Integer) => IO("async string" + key))
val expected = (
"async string1",
"async string2",
List("async string1", "async string2", "async string3")
)
program(caffeineA).unsafeRunSync() must_== expected
program(caffeineB).unsafeRunSync() must_== expected
program(Caffeine().async(_ => IO.raiseError(new Exception("something wrong"))))
.unsafeRunSync() must throwA[Exception]
}
- line-(executionContext) Async Loading Cache need an Execution Context to execute the Java Future things
- line-(global)
.executionContext(global)
will make sure the cache using Scala execution context as default to execute java future, otherwise its default java folk join pool. alternatively you can also use Akka’s execution context. - line-(expire) default the expiring policy, here it’s more Scala idiomatic
lambda and
Duration
- line-(async) will create an
async loading cache.
the async loading function that it will use is
K => IO[V]
so you don’t need to deal with awful Java Future.
No matter what style of effect abstraction you project is using, Jujiu can easily fit in
i.e. Tagless Final
"works with tagless final" >> {
trait LogDsl[F[_]] {
def log(msg: String): F[Unit]
}
type ProgramDsl[F[_]] = CaffeineCache[F, String, String] with LogDsl[F]
def program[F[_]: Async](dsl: ProgramDsl[F])
(implicit ev: cache.Cache[String, String]): F[Option[String]] =
for {
value <- dsl.fetchF("key")
_ <- dsl.log("something")
} yield value
{
object dsl extends CaffeineCache[IO, String, String] with LogDsl[IO] {
def log(msg: String) = IO(println(msg))
}
implicit val cacheProvider: cache.Cache[String, String] = Caffeine().sync[String, String]
program[IO](dsl).unsafeRunSync() must_== None
}
}
just extends CaffeineCache[F, K, V]
and provide cacheProvider
if your code is in ReaderT pattern, good, it will fit in more naturally
"works with tagless final style readerT" >> {
// Layer 1: Environment
trait HasLogger {
def logger: String => Unit
}
trait HasCacheProvider {
def cacheProvider: cache.Cache[String, String]
}
type Env = HasLogger with HasCacheProvider
// Layer 2: DSL
trait LogDsl[F[_]] {
def log(msg: String)(implicit M: Applicative[F]): Kleisli[F, Env, Unit] = Kleisli(a => M.pure(a.logger(msg)))
}
type Dsl[F[_]] = CaffeineCache[F, String, String] with LogDsl[F]
// Layer 3: Business
def program[F[_]](dsl: Dsl[F])(
implicit ev: Async[F]
) =
for {
_ <- dsl.log("something")
value <- dsl.fetch("key").local[Env](_.cacheProvider)
} yield value
object dsl extends CaffeineCache[IO, String, String] with LogDsl[IO]
program[IO](dsl)
.run(new HasLogger with HasCacheProvider {
def logger = println
def cacheProvider = Caffeine().sync
})
.unsafeRunSync() must_== None
}
notice that proper contravariant adapt need .local[Env](_.cacheProvider)
it’s extensible by design as Kleisli, if you provider another cache provider, the same dsl will work.
"run on redis" >> {
import redis.clients.jedis._
def program[F[_]: Async, S[_, _]](dsl: Cache[F, S, String, String]) = for {
r1 <- dsl.fetch("not exist yet")
r2 <- dsl.fetch("not exist yet", _ => Async[F].delay("default"))
_ <- dsl.put("not exist yet", "now exist")
r3 <- dsl.fetch("not exist yet")
_ <- dsl.clear("not exist yet")
r4 <- dsl.fetch("not exist yet")
} yield (r1, r2, r3, r4)
type J[A, B] = Jedis
object dsl extends Cache[IO, J, String, String] {
def put(k: String, v: String)(implicit M: Async[IO]): Kleisli[IO, Jedis, Unit] =
Kleisli { redis =>
M.delay{
redis.set(k, v)
()
}
}
def fetch(k: String)(implicit M: Async[IO]): Kleisli[IO, Jedis, Option[String]] =
Kleisli(redis => M.delay(Option(redis.get(k))))
def clear(k: String)(implicit M: Async[IO]): Kleisli[IO, Jedis, Unit] =
Kleisli(redis => M.delay{
redis.del(k)
()
})
}
program(dsl).run(
new Jedis("localhost")
).unsafeRunSync() must_== ((None, "default", Some("now exist"), None))
}.pendingUntilFixed("Redis")