Reservoir sampling implementation with Akka Streams support
Scala 2.13
libraryDependencies += "lgbt.princess" %% "reservoir-core" % "0.4.0" // the core library supporting synchronous reservoir sampling
libraryDependencies += "lgbt.princess" %% "reservoir-akka-stream" % "0.4.0" // the library for akka-stream operators
libraryDependencies += "lgbt.princess" %% "reservoir" % "0.4.0" // all parts of the library
import lgbt.princess.reservoir.Sampler
final case class User(id: String, displayName: String)
val sampler = Sampler[User, String](maxSampleSize = 100)(_.id)
sampler.sampleAll(onlineUsers())
val sampleIds = sampler.result()
val distinctSampler = Sampler.distinct[User, String](maxSampleSize = 100)(_.id)
distinctSampler.sampleAll(onlineUsers())
val distinctSampleIds = distinctSampler.result()
import akka.stream.scaladsl.{Keep, Sink}
import lgbt.princess.reservoir.akkasupport.Sample
final case class User(id: String, displayName: String)
val (users1, sampleIds) = onlineUsers()
.viaMat(Sample[User, String](maxSampleSize = 100)(_.id))(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run()
val (users2, distinctSampleIds) = onlineUsers()
.viaMat(Sample.distinct[User, String](maxSampleSize = 100)(_.id))(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run()