scala-flow is a lightweight library intended to make developing Google DataFlow jobs in Scala easier. The core dataflow classes are enriched to allow more idiomatic and concise Scala usage while preserving full access to the underlying Java SDK.
Coders for Scala primitives and collection classes have been implemented so that you can conveniently return these types from your PTransforms. In addition you can easily create coders for your own case classes.
Caveat: This library is still evolving rapidly as we improve our knowledge and understanding of Dataflow, so there will be a some flux in the API as we discover and refine what works well and what doesn't.
As a preview of what's possible here's the eponymous MinimalWordCount example:
Pipeline.create(...)
.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
.flatMap(_.split("\\W+").filter(_.nonEmpty).toIterable)
.apply(Count.perElement[String])
.map(kv => kv.getKey + ": " + kv.getValue)
.apply(TextIO.Write.to("results.text"))
.run()
Pipeline
has been enriched with a handful of methods.
To create a PCollection
from in-memory data use the transform
method instead of apply
. This method ensures that the coder is set correctly on the input data.
In addition a run
method has been added to the POutput
type, so that you can fluently chain transforms then run your pipeline. For example:
val result = Pipeline.create(...)
.transform(Create.of("foo", "bar"))
.apply(...transforms...)
.run()
PCollection
now has map
, flatMap
, filter
and collect
methods that each behave as you would expect.
Simple example:
val result = Pipeline.create(...)
.transform(Create.of("123", "456", "789"))
.flatMap(_.split(""))
.map(_.toInt)
.filter(_ < 5)
.collect {
case x if x % 3 == 0 => if (x % 5 == 0) "FizzBuzz" else "Fizz"
case x if x % 5 == 0 => "Buzz"
}
.run()
A side-effecting method foreach
has been added in order to allow handy debug logging. This method supplies each element of the PCollection
to it's argument then passes on the element unchanged.
For example:
val result = Pipeline(...)
.transform(Create.of("123", "456", "789"))
.foreach(println)
.apply(...continue as normal...)
extractTimestamp
converts each element in the PCollection to a tuple with its corresponding timestamp. For example:
val collection: PCollection[(String, Instant)] = Pipeline.create(...)
.transform(Create.of("foo", "bar"))
.withTimestamps
The withKey
method provides a drop in replacement for the WithKeys
transform.
The flattenWith
method is the equivalent to the Flatten
transform, allowing collections of the same type to be merged together. For example:
val first: PCollection[String] = ...
val second: PCollection[String] = ...
val third: PCollection[String] = ...
val combined: PCollection[String] = first.flattenWith(second, third)
To provide better visualization of the Pipeline graph and to allow updating of running jobs, you can name blocks of transforms using the transformWith
method. For example:
val result = Pipeline.create(...)
.transformWith("Load Resources") { _
.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
}
.transformWith("Split and count Words") { _
.flatMap(_.split("\\W+").filter(_.nonEmpty).toIterable)
.apply(Count.perElement[String])
}
.transformWith("Output Results") { _
.map(kv => kv.getKey + ": " + kv.getValue)
.apply(TextIO.Write.to("results.text"))
}
.run()
Under the hood this method simply converts each nested block of methods into a PTransform
class.
The parDo
method provides an escape hatch in case none of the existing methods do what you want. Pass any arbitrary function wth a DoFn
signature to this method and it will be converted to a ParDo
transform. For example:
Pipeline.create(...)
.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/kinglear.txt"))
.parDo { (c: DoFn[String, String]#ProcessContext) =>
/* Do anything whatsoever here */
c.output(...)
}
Several methods have been added specifically for KV collections:
The mapValue
and flatMapValue
methods allow you to change the value of a KV
pair without affecting they key. For example:
val result = Pipeline.create(...)
.transform(Create.of("123", "456", "789")
.withKey(_.toInt)
.mapValue(_.split(""))
.flatMapValue(_.mkString(".")
/* Result contains KV(123, "1.2.3"), KV(456, "4.5.6."), KV(789, "7.8.9") */
In addition there are combinePerKey
, topPerKey
and groupPerKey
methods that work exactly the same as the Dataflow transform equivalents.
In order to join two or more collections of KV
values by key you can use coGroupByKey
, a type-safe wrapper around Dataflow's CoGroupByKey
transform.
val buyOrders: PCollection[KV[CustomerId, BuyOrder]] = ...
val sellOrders: PCollection[KV[CustomerId, SellOrder]] = ...
val allOrders: PCollection[KV[CustomerId, (Iterable[BuyOrder], Iterable[SellOrder])]] = buyOrders.coGroupByKey(sellOrders)
Implicit coders for the following types have been added:
Int
,Long
,Double
Option
,Try
,Either
Tuple2
toTuple22
Iterable
,List
,Set
,Map
,Array
Every method mentioned above required a coder for its output type to be implicitly available. This happens by default for any of the types listed above (and also any arbitrary combination e.g. List[Option[(Either[String, Int], Array[Double])]]
)
If you create coders for any other types then you'll need to ensure that they are available in the implicit scope somewhere.
You can create a custom coder for any case class containing up to 22 members using the caseClassCoder
method. For example:
case class Foo(name: String)
case class Bar(name: String, age: Int)
case class Qux[T](value : T)
implicit val fooCoder = caseClassCoder(Foo)
implicit val barCoder = caseClassCoder(Bar)
implicit def quxCoder = caseClassCoder(Qux.apply[T] _)
The last line shows demonstrates how to create a coder for a generic types, this is essentially a much simpler replacement for a CoderFactory
.
By default Dataflow will always try to create a SerializableCoder
if no other suitable coder can be found. scala-flow
provides an equivalent with the serializableCoder
method. For example:
class Foo(val name: String) extends Serializable
implicit val fooCoder = serializableCoder(Foo)
There are already some existing libraries for working with Dataflow:
- Apache Beam: Supports not only Dataflow, but also Spark, Apex and FLink.
- Scio: Spotify have developed this excellent and extensive Scala library
We initially used Beam directly but quickly found that the complex nature of the Java API (particulary around type erasure), made Scala interop tricky. We then evaluated Scio, but while we were learning the complex Dataflow concepts we wanted something that was very lightweight, and that kept us very close to the API. Hence this library that we feel fits in a niche between the two libraries above.
- Create version of each method that accepts a name to better support updating pipelines
- Switch underlying support to Apache Beam
The case class coder approach was heavily inspired by Spray Json, a really nice, light weight JSON parser.
Bug reports and pull requests are welcome on GitHub at https://github.com/zendesk/scala-flow/
Copyright 2017 Zendesk, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.