hjfruit / zio-pulsar   0.4.3

Scala versions: 3.x

ZIO Pulsar

Scala 3

libraryDependencies += "io.github.jxnu-liguobin" %% "zio-pulsar" % "latest version"

Scala 2.13.6+ (sbt 1.5.x)

libraryDependencies += 
  ("io.github.jxnu-liguobin" %% "zio-pulsar" % "latest version").cross(CrossVersion.for2_13Use3)

These dependencies are required in the project classpath (ZIO projects only need to pay attention to whether they have imported zio-streams):

libraryDependencies ++= Seq(
  "dev.zio" %% "zio"         % zioVersion,
  "dev.zio" %% "zio-streams" % zioVersion,
  "dev.zio" %% "zio-json"    % zioJsonVersion

Example 1

object SingleMessageExample extends ZIOAppDefault:

  // Note: Do not duplicate the construction of this object, just keep one instance!
  lazy val pulsarClient = PulsarClient.live("localhost", 6650)
  // val pulsarClient = PulsarClient.live(""pulsar://localhost:6650,localhost:6651,localhost:6652"")

  val topic = "single-topic"

  // The client should be a long-term/on-demand object, while the consumer/producer is perishable after use
  val app: ZIO[PulsarClient & Scope, PulsarClientException, Unit] =
      builder  <- ConsumerBuilder.make(JSchema.STRING)
      consumer <- builder
                    .subscription(Subscription("my-subscription", SubscriptionType.Shared))
      producer <- Producer.make(topic, JSchema.STRING)
      _        <- producer.send("Hello!")
      m        <- consumer.receive
      _ = println(m.getValue)
    yield ()

  override def run = app.provideLayer(pulsarClient ++ Scope.default).exitCode

Example 2

If you need producer and consumer, or it is not convenient to propagate R, then we can use constructor injection:

object UserService {
  val live = ZLayer.fromFunction(UserService.apply)

final case class UserService(zioPulsar: ZioPulsar) {
  def sendPulsar(): ZIO[Scope, PulsarClientException, Message[String]] =
    for {
      consumerBuilder <- zioPulsar
      consumer        <- consumerBuilder
            name = "zio-subscription",
            `type` = SubscriptionType.Shared
        .build // or use `unsafeBuild` to not close the consumer
      msg             <- consumer.receive(10, TimeUnit.SECONDS)
    } yield msg

object ZioPulsarExample extends ZIOAppDefault:

  val app: ZIO[UserService with Scope, IOException, Unit] = 

  override def run = app