Mocks for fs2-kafka consumers and producers wrapping the native mocks built into the apache kafka clients library. This allows for testing applications without the need for a real kafka implementation such as testcontainers or embedded kafka.
Add the following dependency to your build.sbt
:
libraryDependencies += "io.github.jchapuis" %% "fs2-kafka-mock" % "{latest version}"
The mock consumer allows covering code making use of fs2-kafka's KafkaConsumer
. Injection of the mock is done via the implicit MkConsumer
parameter (that fs2-kafka had the foresight to allow). Methods on the mock allow for publishing and redacting messages, as if these were being published on the real kafka.
Internally, the mock consumer tracks published records in an array. Note, however, that there are some limitations imposed by the native kafka mock:
- single partition support
- consumers must subscribe for publication to succeed
Here's a short example taken from the project's munit test suite: we mock publication of a message, consume it using a fs2 kafka consumer configured with the mock and verify that the consumer was able to read the record.
test("mock kafka consumer can read a published message") {
MockKafkaConsumer("topic").use { mockConsumer =>
for {
_ <- mockConsumer
.publish("topic", "key", "value")
.start // this call semantically blocks until we can publish to the consumer
// hence the need to run it in a separate fiber
record <- {
implicit val mkConsumer: MkConsumer[IO] = mockConsumer.mkConsumer
KafkaConsumer
.stream(
ConsumerSettings[IO, String, String]
.withGroupId("test")
)
.subscribeTo("topic")
.records
.map(_.record)
.map(record => (record.topic, record.key, record.value))
.take(1)
.compile
.toList
.map(_.head)
}
} yield assertEquals(record, ("topic", "key", "value"))
}
}
The mock producer allows covering code producing with fs2-kafka's KafkaProducer
. Injection of the mock is done via the implicit MkProducer
parameter, in a similar way as for the consumer. Various access methods on the mock allow for retrieving published records and iteratively checking for newer messages.
test("mock kafka producer returns next message and allows for checking full history") {
MockKafkaProducer()
.flatMap { mock =>
implicit val mkProducer: MkProducer[IO] = mock.mkProducer
val producer = KafkaProducer.resource(ProducerSettings[IO, String, String])
producer.map((mock, _))
}
.use { case (mock, producer) =>
for {
_ <- producer.produce(ProducerRecords.one(ProducerRecord[String, String]("topic", "key", "value"))).flatten
_ <- mock
.nextMessageFor[String, String]("topic")
.map(maybeKeyValue => assertEquals(maybeKeyValue, Some(("key", "value"))))
_ <- mock
.historyFor[String, String]("topic")
.map(history => assertEquals(history, List(("key", "value"))))
} yield ()
}
}