To use monix-circe in an existing SBT project with Scala 2.12 or a later version, add the following dependencies to your
build.sbt
depending on your needs:
libraryDependencies ++= Seq(
"io.monix" %% "monix-circe" % "0.0.3"
)
Monix-Circe provides different pipes to parse your streams of JSONs depending on whether your Observable
is:
- a \n-separated stream of JSON values or value stream:
{ "repo": "monix-circe", "stars": 14 }
{ "repo": "monix-config", "stars": 5 }
- or a JSON array:
[
{ "repo": "monix-circe", "stars": 14 },
{ "repo": "monix-config", "stars": 5 }
]
The appropriate Operator
for the job also depends on your input stream value type (i.e. String
or Byte
).
The following table sums up every Operator
available as a function of the input stream value type as
well as the JSON structure:
String | Byte | |
---|---|---|
Value stream | stringStreamParser |
byteStreamParser |
Array | stringArrayParser |
byteArrayParser |
As an example, let's say we have a stream of strings representing a JSON array, we'll
pick the stringArrayParser
pipe which converts a stream of String
to a stream of Json
, Circe's
representation of JSONs:
import io.circe.Json
import monix.circe._
import monix.reactive.Observable
val stringStream: Observable[String] = ???
val parsedStream: Observable[Json] = stringStream.liftByOperator(stringArrayParser)
Monix-Circe also comes with a decoder
function which, given a Decoder[A]
, produces a
Observable[Json] => Observable[A]
.
For example, using Circe's fully automatic derivation:
import io.circe.generic.auto._
case class Foo(a: Int, b: String)
val parsedStream: Observable[Json] = ???
val decodedStream: Observable[Foo] = parsedStream.liftByOperator(decoder[Foo])
Heavily inspired/based on circe-fs2 and circe-iteratee.
All code in this repository is licensed under the Apache License, Version 2.0. See LICENSE.txt.