Throttling for FS2 based on the Token bucket algorithm.
This implementation supports:
- burst in the processing of elements
- calculates a cost for every element of the stream
- two throttle modes (Shaping / Enforcing)
Add the following to your build.sbt
file:
libraryDependencies += "dev.kovstas" %% "fs2-throttler" % Version
To use the throttler, import the throttle function and apply it to your stream:
import cats.effect.IO
import fs2.Stream
import scala.concurrent.duration._
import dev.kovstas.fs2throttler.Throttler._
val stream = Stream(1, 2, 3, 4, 5)
val shapedStream = stream.through(throttle(2, 1.second, Shaping))
val enforcedStream = stream.through(throttle(2, 1.second, Enforcing))
val costFunction: Int => Long = i => i.toLong
val throttledCostStream = stream.through(throttle(2, 1.second, Shaping, costFunction))
For more examples, please refer to the tests.