consul-akka-stream
is a small utility that can listen to a Consul key in its key/value store and provide an Akka stream that emits a value every time the key is updated.
Add these lines you to your SBT project:
libraryDependencies += "com.supersonic" %% "consul-akka-stream" % "1.1.2"
To create a stream we first need a Consul client, we use the Consul Client for Java library:
val consul =
Consul.builder()
.withReadTimeoutMillis(0L)
.withHostAndPort(HostAndPort.fromParts("localhost", 8500))
.build()
With this in hand, we can initialize an Akka-streams source based on the client using the ConsulStream.consulKeySource
function:
val source: Source[Map[String, Option[String]], CancellationToken] =
ConsulStream.consulKeySource(key = "foo/baz/bar", consul, blockingTime = 1.seconds)
This creates an Akka source that listens to the foo/baz/bar
key in Consul's key/value store. On every change to the key (including recursive changes) a new map is produced. The keys in the map are the Consul keys and the values are the (optional) strings under them.
An example output can be something like:
Map(
"foo/baz/bar/qux" -> Some("a"),
"foo/baz/bar/goo" -> Some("b"),
"foo/baz/bar/bla" -> None)
The Source materializes into a CancellationToken
token that can be triggered when the user wants to stop polling Consul.
See the tests for further examples.
The library provides a trait that facilitates testing the Consul stream (with ScalaTest) using the Embedded Consul library.
To obtain the trait, add the following to your SBT project:
libraryDependencies += "com.supersonic" %% "consul-akka-stream-integration-tests" % "1.1.2" % "it"
And mixin the ConsulIntegrationSpec
trait into your test.