A wrapper around Akka's reactive kafka providing resilience and re-use of Akka defined serialization for Kafka messages.
Add the dependency in the build.sbt, like:
libraryDependencies ++= Seq(
"nl.tradecloud" %% "akka-kafka" % "0.65"
)
Configure in the application.conf file, like:
tradecloud.kafka {
serviceName = "test"
brokers = "localhost:9092"
topicPrefix = ""
groupPrefix = ""
}
As this library is a wrapper around Akka's reactive kafka, you can also use the configuration options of Reactive Kafka.
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
new KafkaSubscriber(
group = "some_group_name",
topics = Set("some_topic")
).atLeastOnce(
Flow[String]
.map { wrapper: KafkaMessage[String] =>
// do something
println(wrapper.msg + "-world")
// return the offset
msg.offset
}
)
// promise is completed when publish is added to Kafka
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
val publisher = new KafkaPublisher()
publisher.publish("topic", msg)
Serialization is handled using Akka Serialization, see: Akka Serialization