This client is Scala wrapper over the standard RabbitMQ Java client. Goal of this library is to simplify basic use cases - to provide FP-oriented API for programmers and to shadow the programmer from an underlying client.
The library is configurable both by case classes (core
module) and by HOCON/Lightbend Config
(pureconfig
module).
The library uses concept of connection and derived producers and consumers. Note that the connection shadows you from the underlying concept of AMQP connection and derived channels - it handles channels automatically according to best practises. Each producer and _ consumer_ can be closed separately while closing connection causes closing all derived channels and all producers and consumers.
SBT:
"com.avast.clients.rabbitmq" %% "rabbitmq-client-core" % "x.x.x"
Gradle:
compile 'com.avast.clients.rabbitmq:rabbitmq-client-core_$scalaVersion:x.x.x'
- api - Contains only basic traits for consumer etc.
- core - Main module. The client, configurable by case classes.
- pureconfig - Module for configuration from
Config
. - extras - Module with some extra features.
- extras-circe Allows to publish and consume JSON events, using the circe library.
- extras-protobuf Allows to publish and consume events defined as Google Protocol Buffers messages (as both JSON and Protobuf), represented as standard Java classes.
- extras-scalapb Allows to publish and consume events defined as Google Protocol Buffers messages (as both JSON and Protobuf), generated to Scala using ScalaPB.
There exists a migration guide between versions 6.1.x and 8.0.x.
There exists a migration guide between versions 8.x and 9.0.x.
Please note that configuration from Typesafe/Lightbend config has been moved to pureconfig module since 8.x.
The API is finally tagless (read more e.g. here) with
cats.effect.Resource
which is convenient way how to
manage resources in your app. In addition,
there is a support for streaming with fs2.Stream
.
The API uses conversions for both consumer and producer, that means you don't have to work directly with Bytes
(however you still can if
you want to) and you touch only your business model class which is then (de)serialized using provided converter.
Monitoring of the library is done via Avast Metrics library, its Scala Effect API in
particular. If you don't want the client to be monitored, feel free to pass Monitor.noOp[F]
instead.
The library uses two types of executors - one is for blocking (IO) operations and the second for callbacks. You have to provide both of them:
- Blocking executor as
ExecutorService
- Callback executor as
scala.concurrent.ExecutionContext
The default way is to configure the client with manually provided case classes; see pureconfig module for a configuration from HOCON (Lightbend Config).
This is somewhat minimal setup, using Monix Task
:
import java.util.concurrent.ExecutorService
import cats.effect.Resource
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq._
import com.avast.clients.rabbitmq.api._
import com.avast.metrics.scalaeffectapi.Monitor
import javax.net.ssl.SSLContext
import monix.eval._
import monix.execution.Scheduler
implicit val sch: Scheduler = ???
val monitor: Monitor = ???
val blockingExecutor: ExecutorService = ???
val sslContext = SSLContext.getDefault
val connectionConfig = RabbitMQConnectionConfig(
hosts = List("localhost:5432"),
name = "MyProductionConnection",
virtualHost = "/",
credentials = CredentialsConfig(username = "vogon", password = "jeltz")
)
val consumerConfig = ConsumerConfig(
name = "MyConsumer",
queueName = "QueueWithMyEvents",
bindings = List(
AutoBindQueueConfig(exchange = AutoBindExchangeConfig(name = "OtherAppExchange"), routingKeys = List("TheEvent"))
)
)
val producerConfig = ProducerConfig(
name = "MyProducer",
exchange = "MyGreatApp"
)
// see https://typelevel.org/cats-effect/tutorial/tutorial.html#acquiring-and-releasing-resources
val rabbitMQProducer: Resource[Task, RabbitMQProducer[Task, Bytes]] = {
for {
connection <- RabbitMQConnection.make[Task](connectionConfig, blockingExecutor, Some(sslContext))
/*
Here you have created the connection; it's shared for all producers/consumers amongst one RabbitMQ server - they will share a single
TCP connection but have separated channels.
If you expect very high load, you can use separate connections for each producer/consumer, however it's usually not needed.
*/
consumer <- connection.newConsumer[Bytes](consumerConfig, monitor) {
case delivery: Delivery.Ok[Bytes] =>
Task.now(DeliveryResult.Ack)
case _: Delivery.MalformedContent =>
Task.now(DeliveryResult.Reject)
}
producer <- connection.newProducer[Bytes](producerConfig, monitor)
} yield {
producer
}
}
Note: this has nothing to do with the RabbitMQ Streams. This client is about providing fs2.Stream
API instead of the callback-based one
but works on top of a normal queue.
It seems quite natural to process RabbitMQ queue with a streaming app.
StreamingRabbitMQConsumer
provides you an
fs2.Stream
through which you can easily process incoming messages in a streaming way.
Notice: Using this functionality requires you to know some basics of FS2 library. Please see it's official guide if you're not familiar with it first.
// skipping imports and common things, they are the same as in general example above
val consumerConfig = StreamingConsumerConfig( // notice: StreamingConsumerConfig vs. ConsumerConfig
name = "MyConsumer",
queueName = "QueueWithMyEvents",
bindings = List(
AutoBindQueueConfig(exchange = AutoBindExchangeConfig(name = "OtherAppExchange"), routingKeys = List("TheEvent"))
)
)
val processMyStream: fs2.Pipe[Task, StreamedDelivery[Task, Bytes], Unit] = { in =>
in.evalMap(_.handleWith(d => Task.now(DeliveryResult.Ack))) // TODO you probably want to do some real stuff here
}
val deliveryStream: Resource[Task, fs2.Stream[Task, Unit]] = {
for {
connection <- RabbitMQConnection.make[Task](connectionConfig, blockingExecutor, Some(sslContext))
streamingConsumer <- connection.newStreamingConsumer[Bytes](consumerConfig, monitor)
} yield {
val stream: fs2.Stream[Task, StreamedResult] = streamingConsumer.deliveryStream.through(processMyStream)
// create resilient (self-restarting) stream; see more information below
lazy val resilientStream: fs2.Stream[Task, StreamedResult] = stream.handleErrorWith { e =>
// TODO log the error - something is going wrong!
resilientStream
}
resilientStream
}
}
While you should never ever let the stream fail (handle all your possible errors; see Error handling section in official docs how the stream can be failed), it's important you're able to recover the stream when it accidentally happens. You can do that by simply requesting a new stream from the client:
val stream = streamingConsumer.deliveryStream // get stream from client
.through(processMyStream) // "run" the stream through your processing logic
val failureCounter: Ref[Task, Int] = ??? // TODO: initialize to max recover count!
lazy val resilientStream: fs2.Stream[Task, Unit] = stream.handleErrorWith { err =>
// handle the error in stream: recover by calling itself
// TODO don't forget to add some logging/metrics here!
fs2.Stream.eval(failureCounter.modify(a => (a - 1, a - 1))).flatMap { attemptsRest =>
if (attemptsRest < 0) fs2.Stream.raiseError[Task](err) else resilientStream
}
}
resilientStream
or use a prepared extension method:
import com.avast.clients.rabbitmq._
streamingConsumer.deliveryStream // get stream from client
.through(processMyStream) // "run" the stream through your processing logic
.makeResilient(maxErrors = 3) { err =>
Task.delay {
// TODO don't forget to add some logging/metrics here!
()
}
}
Please refer to the official guide for understanding more deeply how the recovery of fs2.Stream
works.
While everyone wants the RabbitMQ to "just work", in reality, it may not be that easy. Servers are restarted, deliveries are processed for
too long etc. For such occasions, there exist listeners in this client - connection, channel and consumer kinds.
The listeners are passed to the connection factory method. You are not required to implement/provide them, however, it's strongly
recommended doing so. The default implementations are only logging the events while you may want to react differently - increase some
counter, mark the app as unhealthy etc. (as some events are not easy to recover from).
Both the producer and consumer require type argument when creating from connection:
connection.newConsumer[MyClass]
which requires implicitDeliveryConverter[MyClass]
connection.newProducer[MyClass]
which requires implicitProductConverter[MyClass]
There are multiple options where to get the converter (it's the same case for DeliveryConverter
as for ProductConverter
):
- Implement your own implicit converter for the type
- Modules extras-circe and extras-scalapb provide support for JSON and GPB conversion.
- Use
identity
converter by specifyingBytes
type argument. No further action needed in that case.
It's quite often use-case we want to republish failed message but want to avoid the message to be republishing forever. You can use
the PoisonedMessageHandler
(PMH) to solve this issue. It will count no. of attempts and won't let the message be republished again and
again (above the limit you set).
Note: it works ONLY for Republish
and not for Retry
!
The PoisonedMessageHandler
is built into the both "normal" and streaming consumers. After the execution of the poisoned-message action,
the delivery is REJECTed (so it's not in the original queue anymore).
All types (except no-op) of the poisoned message handler has maxAttempts
configuration option which determines how many times the message
can be delivered to the consumer. What it means in practice is that if maxAttempts == 3
and you choose to republish it for the third time,
the PMH takes its action - as the next delivery of the message would be already fourth, which is over the configured limit.
Internally, the attempts counting is done via incrementing (or adding, the first time) the X-Republish-Count
header in the message. Feel
free to use its value for your own logging or whatever. You can even set it to your own value - just bear in mind that you might affect the
PMH's functionality (of course, that might be your intention).
While the republishing is meant to not let the other messages starve (it puts the message to the start of the queue, meaning it'll be the
last message to be processed at the moment), it may still happen that its retry is immediate - in other words, during the periods with a low
traffic, it behaves very similarly to Retry
. To prevent this, there is a support (since 9.2.0) for republish delaying - meaning the
republishing is in-memory delayed before applying. Since it happens in the client itself and not in your application, the consumer timeout
is not involved at that point.
However, it's important to note that if there are too many messages being republished and delayed, it might theoretically stop
the consuming absolutely! This is due to the prefetchCount
configuration value - the server just won't give the consumer any more
messages. While this behavior might be desired (it may be considered as a way of self-throttling for the consumer), it doesn't have to be,
and you should setup your consumer and republishing delay properly. There is a gauge in metrics showing how many messages is currently
being delayed.
It can happen that you know that you have PMH configured, and you need to republish the message and "not count the attempt" (the typical
scenario is that the message processing has failed and it's not fault of your app but of some 3rd party system which you count on to be
recovered later). There exists the new countAsPoisoned
parameter now (defaults to true
) determining whether the PMH (if configured)
should count the attempt or not. This is an easy and clean way how to influence the PMH behavior.
On the other hand, you may want to use DirectlyPoison
result (since 9.3.0) in cases you know there's no chance the retry will succeed (
e.g. message is not parseable) but still you want to keep the message (move it to the poisoned queue if configured; that is the difference
between DirectlyPoison
and Retry
where the message would be thrown away completely).
The most common and useful type, which will take all "poisoned" messages and publish them to a queue of your choice.
In its configuration, you basically configure a producer which is used to send the message into the dead-queue. While the producer will
create its exchange it publishes to, you are responsible for creating the queue and binding to the producer's exchange. You can use the
additional declaration/bindings functionality of the client. If you forget to do so, your messages
will be lost completely.
As the name suggests, this PMH only logs the poisoned message before it's thrown away (and lost forever).
This PMH does nothing.
Please mind that the PMH is only responsible for publishing the poisoned messages, not for declaring/binding the queue where they'll end!
myConsumer {
name = "MyVeryImportantConsumer"
// ...
// the usual stuff for consumer - timeout, bindings, ...
// ...
poisonedMessageHandling {
type = "deadQueue" // deadqueue, logging, noop (default noop)
maxAttempts = 2 // <-- required for deadqueue and logging types
// required only for deadQueue type:
deadQueueProducer {
routingKey = "dead"
name = "DeadQueueProducer"
exchange = "EXCHANGE3"
declare {
enabled = true
type = "direct"
}
}
}
}
By default, the client logs received delivery (on the TRACE level, unless timeout or sth happens - it's on some higher levels then) for
better debugging experience. However, if you transfer some sensitive data and you don't want the delivery to be logged, you can easily
turn it off by using redactPayload = true
parameter in consumer configs (note: producer doesn't log the delivery at all, just its
metadata like routing key and properties).
null
instead of converter instance
It may happen you run in this problem:Notice the results of last three calls differ even though they are supposed to be the same (non-null respectively)! A very similar issue is discussed on the StackOverflow and so is similar the solution:scala> import io.circe.generic.auto._ import io.circe.generic.auto._ scala> import com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter import com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter scala> import com.avast.clients.rabbitmq.DeliveryConverter import com.avast.clients.rabbitmq.DeliveryConverter scala> case class Event(name: String) defined class Event scala> implicit val deliveryConverter: JsonDeliveryConverter[Event] = JsonDeliveryConverter.derive[Event]() deliveryConverter: com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter[Event] = null scala> implicit val deliveryConverter: DeliveryConverter[Event] = JsonDeliveryConverter.derive[Event]() deliveryConverter: com.avast.clients.rabbitmq.DeliveryConverter[Event] = com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter$$anon$1@5b977aaa scala> implicit val deliveryConverter = JsonDeliveryConverter.derive[Event]() deliveryConverter: com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter[Event] = com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter$$anon$1@4b024fb2
- Remove explicit type completely (not recommended)
- Make the explicit type more general (
DeliveryConverter
instead ofJsonDeliveryConverter
in this case)
There is a module with some optional functionality called extras.
The library offers configurable network recovery, with the functionality itself backed by RabbitMQ client's one (ready in 5+).
You can either disable the recovery or select (and configure one of following types):
- Linear
The client will waitinitialDelay
for first recovery attempt and if it fails, will try it again eachperiod
until it succeeds. - Exponential
The client will waitinitialDelay
for first recovery attempt and if it fails, will try it again until it succeeds and prolong the delay between each two attempts exponentially (based onperiod
,factor
, attempt number), up tomaxLength
.
Example:
ForinitialDelay = 3s, period = 2s, factor = 2.0, maxLength = 1 minute
, produced delays will be 3, 2, 4, 8, 16, 32, 60 seconds (and it will never go higher).
Do not set too short custom recovery delay intervals (less than 2 seconds) as it is not recommended by the official RabbitMQ API Guide.
The consumers readAction
returns Future
of DeliveryResult
.
The DeliveryResult
has 4 possible values
(descriptions of usual use-cases):
- Ack - the message was processed; it will be removed from the queue
- Reject - the message is corrupted or for some other reason we don't want to see it again; it will be removed from the queue
- Retry - the message couldn't be processed at this moment (unreachable 3rd party services?); it will be requeued (inserted on the top of the queue)
- Republish - the message may be corrupted, but we're not sure; it will be re-published to the bottom of the queue (as a new message and the original one will be removed). It's usually wise to prevent an infinite republishing of the message - see Poisoned message handler.
- DirectlyPoison - the message should be thrown away, without any retry effort. The Poisoned message handler takes care of it - it either moves the message to the poisoned queue (if it's configured) like it has reached the configured limit of republishes; or it just throws the message away (if no-op PMH is configured).
When using Retry the message can effectively cause starvation of other messages in the queue until the message itself can be processed; on the other hand Republish inserts the message to the original queue as a new message and it lets the consumer handle other messages (if they can be processed).
Republishing is solved at application level with publishing a new message (with original content, headers, messageId, etc.) to the original queue and acknowledging the old one. This can be done via:
- Default exchange Every virtual host in RabbitMQ has default exchange which has implicit bindings to all queues and can be easily used for
publishing to basically any queue. This is very handy for functionality such as the republishing however it's also very dangerous and you
don't have permissions to use it. In case you do have them, use this option instead of the custom exchange.
This the default option (in other words, the client will use the default exchange in case you don't tell it not to do so). - Custom exchange In case you're unable to use the default exchange, you have to create your own exchange to replace the functionality. The
RabbitMQ client will create it for you together with all necessary bindings and all you have to do is to just configure a name of the
exchange, e.g.
The exchange is created as direct, durable and without auto-delete flag.
rabbitConnection { hosts = ["localhost:5672"] virtualHost = "/" ... republishStrategy { type = CustomExchange exchangeName = "ExchangeForRepublishing" exchangeDeclare = true // default exchangeAutoBind = true // default } ... }
There is an option to specify bind/declare arguments for queues/exchanges as you may read about
at RabbitMQ docs.
Example of configuration with HOCON:
producer {
name = "Testing" // this is used for logging etc.
exchange = "myclient"
// should the producer declare exchange he wants to send to?
declare {
enabled = true // disabled by default
type = "direct" // fanout, topic
arguments = {"x-max-length": 10000}
}
}
Sometimes it's necessary to declare an additional queue or exchange which is not directly related to the consumers or producers you have in
your application (e.g. dead-letter queue).
The library makes possible to do such thing. Here is example of such configuration with HOCON:
val rabbitConnection: ConfigRabbitMQConnection[F] = ???
rabbitConnection.bindExchange("backupExchangeBinding") // : F[Unit]
where the "backupExchangeBinding" is link to the configuration (use relative path to the declarations
block in configuration):
declarations {
backupExchangeBinding {
sourceExchangeName = "mainExchange"
destExchangeName = "backupExchange"
routingKeys = ["myMessage"]
arguments {}
}
}
Equivalent code with using case classes configuration:
val rabbitConnection: RabbitMQConnection[F] = ???
rabbitConnection.bindExchange(
BindExchangeConfig(
sourceExchangeName = "mainExchange",
destExchangeName = "backupExchange",
routingKeys = List("myMessage")
)
) // : F[Unit]
The library supports CorrelationId (sometimes also called TraceId or TracingId) handling out of the box.
Producer takes implicit cidStrategy: CorrelationIdStrategy
parameter which enables you to configure how the CorrelationId should be
derived/generated. You can implement your own strategy to suit your needs.That means that there'll always be "some" CorrelationId going in
the message (since v9).
If you don't specify the strategy by yourself, CorrelationIdStrategy.FromPropertiesOrRandomNew
is used - it will try to locate the CID in
properties (or headers) and generate a new one if it doesn't succeed. In any way, the CID will be part of both logs and resulting (outgoing)
RabbitMQ message.
By using following configuration
producer {
properties {
confirms {
enabled = true
sendAttempts = 2
}
}
}
clients can enable publisher confirms. Each send
call will wait for ack/nack from broker.
This wait is of course non-blocking. sendAttempts
is number of all attempts including initial one. If number of sendAttempts
is greater than 1 it will try to resend messages again
right after it obtains nack from broker.
From implementation point of view, it uses asynchronous acks/nacks combined with Deferred from cats library.
You can also get the CorrelationId from the message properties on the consumer side. The CID is taken from both AMQP properties
and X-Correlation-Id
header (where the property has precedence and the header is just a fallback).
Sometimes your use-case just doesn't fit the normal consumer scenario. Here you can use the pull consumer which gives you much more control over the received messages. You pull new message from the queue and acknowledge (reject, ...) it somewhere in the future.
The pull consumer uses PullResult
as return type:
- Ok - contains
DeliveryWithHandle
instance - EmptyQueue - there was no message in the queue available
Additionally you can call .toOption
method on the PullResult
.
A simplified example, using configuration from HOCON:
import cats.effect.Resource
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq._
import com.avast.clients.rabbitmq.pureconfig._
import com.avast.clients.rabbitmq.api._
import monix.eval.Task
import monix.execution.Scheduler
implicit val sch: Scheduler = ???
val consumer: Resource[Task, RabbitMQPullConsumer[Task, Bytes]] = {
for {
connection <- RabbitMQConnection.fromConfig[Task](???, ???)
consumer <- connection.newPullConsumer[Bytes](??? : String, ???)
} yield {
consumer
}
}
val program: Task[Unit] = consumer.use { consumer =>
Task
.sequence {
(1 to 100).map(_ => consumer.pull())
} // receive "up to" 100 deliveries
.flatMap { ds =>
// do your stuff!
Task.unit
}
}
Quite often you receive a single type of message but you want to support multiple formats of encoding (Protobuf, Json, ...). This is
where MultiFormatConsumer
could be used.
Modules extras-circe and extras-scalapb provide support for JSON and GPB conversion. They are both used in the example below.
The MultiFormatConsumer
is Scala only.
Usage example:
import com.avast.bytes.Bytes
import com.avast.cactus.bytes._ // Cactus support for Bytes, see https://github.com/avast/cactus#bytes
import com.avast.clients.rabbitmq.test.ExampleEvents.{NewFileSourceAdded => NewFileSourceAddedGpb}
import com.avast.clients.rabbitmq._
import com.avast.clients.rabbitmq.extras.format._
import io.circe.Decoder
import io.circe.generic.auto._ // to auto derive `io.circe.Decoder[A]` with https://circe.github.io/circe/codec.html#fully-automatic-derivation
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
private implicit val d: Decoder[Bytes] = Decoder.decodeString.map(???)
case class FileSource(fileId: Bytes, source: String)
case class NewFileSourceAdded(fileSources: Seq[FileSource])
val consumer = MultiFormatConsumer.forType[Future, NewFileSourceAdded](
JsonDeliveryConverter.derive(), // requires implicit `io.circe.Decoder[NewFileSourceAdded]`
GpbDeliveryConverter[NewFileSourceAddedGpb]
.derive() // requires implicit `com.avast.cactus.Converter[NewFileSourceAddedGpb, NewFileSourceAdded]`
)(_ => ???)
(see unit test for full example)
The CheckedDeliveryConverter is usually reacting to Content-Type (like in the example below) but it's not required - it could e.g. analyze the payload (or first bytes) too.
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.CheckedDeliveryConverter
import com.avast.clients.rabbitmq.api.{ConversionException, Delivery}
val StringDeliveryConverter: CheckedDeliveryConverter[String] = new CheckedDeliveryConverter[String] {
override def canConvert(d: Delivery[Bytes]): Boolean = d.properties.contentType.contains("text/plain")
override def convert(b: Bytes): Either[ConversionException, String] = Right(b.toStringUtf8)
}