Akka Streams connector using Firebase as a message queue.
There is API for Scala and Java provided in packages com.elkozmon.akka.firebase.scaladsl
and com.elkozmon.akka.firebase.javadsl
. Factory methods reside in Consumer
and Producer
objects in those packages.
See ScalaDoc for more information on Consumers and Producers.
import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink, Keep}
import com.elkozmon.akka.firebase.Document
import com.elkozmon.akka.firebase.scaladsl._
import com.google.firebase.database.FirebaseDatabase
object Test {
implicit val mat: Materializer = ???
val transform: Flow[Document, Document, NotUsed] = ???
val parallelism: Int = 512
val consumer = Consumer.asyncSource(
sourceNode = FirebaseDatabase.getInstance().getReference("my-source"),
bufferSize = 256
)
val producer = Producer.asyncFlow(
targetNode = FirebaseDatabase.getInstance().getReference("my-sink")
)
val (consumerControl, futureDone) = consumer
.mapAsync(parallelism)(identity)
.via(transform)
.via(producer)
.mapAsync(parallelism)(identity)
.toMat(Sink.ignore)(Keep.both)
.run()
}