kaizen-solutions / fs2-kafka-jsonschema   0.0.1

MIT License GitHub

Provides JsonSchema-aware codecs for FS2 Kafka that integrate with Confluent's Schema Registry and implementation (Scala 3 only)

Scala versions: 3.x

FS2 Kafka JsonSchema

Continuous Integration Maven Central

Provides FS2 Kafka Serializers and Deserializers that provide integration with Confluent Schema Registry for JSON messages with JSON Schemas.

Note: This library only works with Scala 3.3.x and above. For Scala 2.x, see here.

This functionality is backed by the following libraries:

Usage

Add the following to your build.sbt

resolvers ++= Seq("confluent" at "https://packages.confluent.io/maven")
libraryDependencies += "io.kaizen-solutions" %% "fs2-kafka-jsonschema" % "<latest-version>"

Example

Define the datatype that you would like to send/receive over Kafka via the JSON + JSON Schema format. You do this by defining your datatype and providing a Pickler instance for it. The Pickler instance comes from the Tapir library.

import sttp.tapir.Schema.annotations.*
import sttp.tapir.json.pickler.*

final case class Book(
  @description("name of the book") name: String,
  @description("international standard book number") isbn: Int
)
object Book:
  given Pickler[Book] = Pickler.derived

Next, you can create a fs2 Kafka Serializer and Deserializer for this datatype and use it when building your FS2 Kafka producer/consumer.

import io.kaizensolutions.jsonschema.*
import cats.effect.*
import fs2.kafka.*

def bookSerializer[F[_]: Sync]: Resource[F, ValueSerializer[F, Book]] =
  JsonSchemaSerializerSettings.default
    .withSchemaRegistryUrl("http://localhost:8081")
    .forValue[F, Book]

def bookDeserializer[F[_]: Sync]: Resource[F, ValueDeserializer[F, Book]] =
  JsonSchemaDeserializerSettings.default
    .withSchemaRegistryUrl("http://localhost:8081")
    .forValue[F, Book]