FlinkRunner 4
is available on maven central
, built against Flink 1.19 with Scala 2.12 and JDK 11. You can add it to your sbt project:
libraryDependencies += "io.epiphanous" %% "flinkrunner" % <flinkrunner-version>
replacing <flinkrunner-version>
with the currently released version
of FlinkRunner
on maven
.
FlinkRunner is built to support many common data sources and sinks, including:
Connector | Dependency | Source | Sink |
---|---|---|---|
file system | flink-connector-files | yes | yes |
kafka | flink-connector-kafka | yes | yes |
kinesis streams source | flink-connector-kinesis | yes | no |
kinesis streams sink | flink-connector-aws-kinesis-streams | no | yes |
kinesis firehose | flink-connector-aws-kinesis-firehose | no | yes |
iceberg | iceberg-flink-runtime-<flink.minor.version> | yes | yes |
cassandra | flink-connector-cassandra | no | yes |
elasticsearch | flink-connector-elasticsearch7 | no | yes |
jdbc | flink-connector-jdbc | no | yes |
rabbit mq | flink-connector-rabbitmq | yes | yes |
You can add a dependency for a connector by dropping the library into flink's lib
directory during deployment of your jobs. You should make sure to match the library's
version with the compiled flink and scala versions of FlinkRunner
.
To run tests locally in your IDE, add a connector library to your dependencies in provided scope, like this:
"org.apache.flink" % "flink-connector-kafka" % <flink-version> % Provided
replacing <flink-version>
with the version of flink used in FlinkRunner
.
S3 configuration is important for most flink usage scenarios. Flink has two different
implementations to support S3: flink-s3-fs-presto
and flink-s3-fs-hadoop
.
flink-s3-fs-presto
is registered under the schemess3://
ands3p://
and is preferred for checkpointing to s3.flink-s3-fs-hadoop
is registered under the schemess3://
ands3a://
and is required for using the streaming file sink.
During deployment, you should copy both s3 dependencies from flink's opt
directory into
the plugins
directory:
cd $FLINK_DIR
mkdir -p ./plugins/s3-fs-presto
cp ./opt/flink-s3-fs-presto-<flink-version>.jar .plugins/s3-fs-presto
mkdir -p ./plugins/s3-fs-hadoop
cp ./opt/flink-s3-fs-hadoop-<flink-version>.jar .plugins/s3-fs-hadoop
replacing <flink-version>
with the version of flink used in FlinkRunner
.
NOTE: Do not copy them into flink's
lib
directory, as this will not work! They need to be in their own, individual subdirectories of flink's deployedplugins
directory.
NOTE: You will also need to configure access from your job to AWS S3. That is outside the scope of this readme.
FlinkRunner supports reading and writing avro messages with kafka and file systems. For kafka sources and sinks, FlinkRunner uses binary encoding with support for either Confluent or AWS Glue schema registry.
For file sources and sinks, you can select either standard or parquet avro encoding.
Add the following dependencies if you need Avro and schema registry support:
"org.apache.flink" % "flink-avro" % <flink-version>,
"org.apache.flink" % "flink-avro-confluent-registry" % <flink-version>,
"io.confluent" % "kafka-avro-serializer" % <confluent-version>
"software.amazon.glue" % "schema-registry-flink-serde" % <aws-glue-serde-version>,
Note, kafka-avro-serializer
requires you add an sbt resolver to your
project's build.sbt
file to point at Confluent's maven repository:
resolvers += "Confluent Repository" at "https://packages.confluent.io/maven/"
FlinkRunner provides automatic support for serializing and deserializing in and out of kafka using either Confluent Avro schema registry or AWS Glue Avro schema registry.
To make use of this support, you need to do three things:
- Include the appropriate serde library (
kafka-avro-serializer
for confluent orschema-registry-flink-serde
for glue) library in your project (as described above). - Configure your kafka sources and sinks with a
schema.registry
block like so:sinks { kafka-sink { bootstrap.servers = "kafka:9092" topic = my-topic schema-registry { // for confluent url = "http://schema-registry:8082" // required // other settings are possible //cache.capacity = 500 // (1000 by default) //use.logical.type.converters = false // (true by default) //specific.avro.reader = false // (true by default) // for glue (one or both of region or endpoint is required) aws.region = "us-east-1" //aws.endpoint = "localhost:4566" //other configs in AWSSchemaRegistryConstants } } }
- Mixin the
EmbeddedAvroRecord
andEmbeddedAvroRecordFactory
traits for your event types that need to decode/encode Confluent schema registry compatible avro records in Kafka. - If your output event type is an avro record, use the
AvroStreamJob
base class for your flink jobs instead ofStreamJob
. Note, that bothAvroStreamJob
andStreamJob
can read from avro source streams, butAvroStreamJob
uses an avro sink. This is described in more detail below.
To enable file sinks to write in parquet format, add the following dependency to your build:
"org.apache.parquet" % "parquet-avro" % 1.14.2
Add use the format = parquet
directive in your file sink configuration (more details
below).
The iceberg source and sink also support parquet output by default.
FlinkRunner
uses scala-logging for logging
on top of slf4j. In your implementation, you must provide a logging backend compatible
with slf4j, such as logback:
libraryDependencies += "ch.qos.logback" % "logback-classic" % <logback-version>
If you want to use the complex event processing library, add this dependency:
"org.apache.flink" % "flink-cep" % <flink-version>
FlinkRunner
helps you think about your datastream api flink jobs at a high level, so you
can focus on the event pipeline, not the plumbing. It is not as high-level as Flink SQL,
but when you need to write multiple related data stream API jobs, FlinkRunner helps you to
avoid repetitive boiler plate code, simplifies configuration and simplifies usage of many
common flink streaming patterns.
You have a set of related flink jobs that deal in a related set of data event
types. FlinkRunner
helps you build one application to run those related jobs and
coordinate the types. It also simplifies setting up common sources and sinks, so you can
control them purely with configuration, not code. FlinkRunner
supports a variety of
sources and sinks out of the box, including kafka
, kinesis
, jdbc
, elasticsearch 7+
(sink only), cassandra
(sink only),
filesystems
(including s3
using parquet
as well as delimited
and json
text
files) andsockets
. It also has many common operators to help you in writing your own
transformation logic. Finally, FlinkRunner
makes it easy to test your transformation logic with property-based testing.
First, you need to define an algebraic data
type (ADT) containing
the types that will be used in your flink jobs. Your top level type should inherit
the FlinkEvent
trait. This requires your types to implement the following members:
$id: String
- a unique id for the event$timestamp: Long
- the event time (millis since unix epoch)$key: String
- a key to group events
Additionally, a FlinkEvent
has three additional members that you can optionally override
to enable/configure various features.
$bucketId: String
- for bucketing files written to a file sink$dedupId: String
- for use with FlinkRunner's deduplication filter$active:Boolean
- for use with FlinkRunner's filterByControl source stream algorithm
sealed trait MyEventADT extends FlinkEvent
final case class MyEventA(a:Int, $id:String, $key:String, $timestamp:Long)
extends MyEventADT
final case class MyEventB(b:Int, $id:String, $key:String, $timestamp:Long)
extends MyEventADT
Next, you should create your own runner subclass of the abstract FlinkRunner
base class.
import io.epiphanous.flinkrunner.model.FlinkConfig
import io.epiphanous.flinkrunner.FlinkRunner
class MyRunner(config:FlinkConfig) extends FlinkRunner[MyEventADT](config) {
override def invoke(jobName:String):Unit = jobName matches {
case "MyJob1" => new MyJob1(this).run()
case "MyJob2" => new MyJob2(this).run()
case _ => throw new RuntimeException(s"Unknown job $jobName")
}
}
Next, write some jobs! This is the fun part.
class MyJob1(runner:FlinkRunner[MyEventADT]) extends StreamJob[MyEventA](runner) {
override def transform:DataStream[MyEventA] =
singleSource[MyEventA]()
}
This job takes MyEventA
type input events and passes them through unchanged to the sink.
While this sounds useless at first blush, and your transform
method will usually do much
more interesting things, identity transforms like this can be useful to copy data from one
storage system to another.
You could something a little more exciting, say transform a stream of MyEventA
events into a stream of MyEventB
events:
class MyJob2(runner:FlinkRunner[MyEventADT]) extends StreamJob[MyEventB](runner) {
override def transform:DataStream[MyEventB] =
singleSource[MyEventA]().map { a:MyEventA => MyEventB(b = a.a * 2) }
}
Next, you need to configure your job. The following configuration defines a file source that reads csv files from one s3 bucket and a file sink that writes json files to a different s3 bucket.
jobs {
MyJob1 {
sources {
csv-file-source {
path = "s3://my-events-csv-files"
format = csv
}
}
sinks {
json-file-sink {
path = "s3://my-events-json-files"
format = json
}
}
}
}
Note that this configuration would work for either MyJob1
or MyJob2
.
Next, wire up your runner to a main method.
object Main {
def main(args:Array[String]) =
new MyRunner(new FlinkConfig(args))
}
Finally, assemble an uber jar with all your dependencies, deploy it to your flink cluster, and run a job:
flink run myRunner.jar MyJob1
FlinkRunner
provides a StreamJob
base class from which you can build and run your
flink jobs. If your output event types require avro support, you should instead use the
AvroStreamJob
base class to build and run your flink job.
class StreamJob[
OUT <: ADT,
ADT <: FlinkEvent](runner:FlinkRunner[ADT])
A StreamJob
must specify the output stream event type in its definition. That output
stream event type must be a subclass of your FlinkRunner
algebraic data type (ADT). Your
job class will be passed an instance of your FlinkRunner
type, from which you can access
runner.config
:FlinkRunner
configuration (instance ofFlinkConfig
).runner.env
: Flink's stream execution environment (instance ofStreamExecutionEnvironment
), for interfacing with the DataStream API.runner.tableEnv
: Flink's stream table environment (instance ofStreamTableEnvironment
), for interfacing with the Table API.runner.mockEdges
: A boolean indicating if your runner instance will mock interaction with sources and sinks and redirect transformed output to your specifiedCheckResults.checkOutputEvents
method.
Your StreamJob
must provide a transform
method that defines and transforms your
sources into an output event stream. StreamJob
provides several factory methods to
create source streams from your configuration:
-
singleSource[IN <: ADT](name:String)
: produces a single input source stream, configured under the provided name, of typeDataStream[IN]
, whereIN
is an event type within your runner'sADT
. The configuredname
parameter defaults to the first configured source for convenience. -
connectedSource[IN1 <: ADT, IN2 <: ADT, KEY](source1:DataStream[IN1], source2:DataStream[IN2], key1:IN1=>KEY, key2:IN2=>KEY): ConnectedStreams[IN1,IN2]
: connects the two provided source streams producing a single stream of typeConnectedStreams[IN1,IN2]
. A connected stream combines the two streams. An event on the connected stream is either of typeIN1
orIN2
. The key functions are used for anykeyBy
operations done on the connected stream. It is important to realize a connected stream is NOT a join between the two streams, and the keys are not used to perform a join. The use case for a connected stream is where the data on one stream dynamically impacts how you want to process the other stream. -
filterByControlSource[CONTROL <: ADT, DATA <: ADT, KEY](control:DataStream[CONTROL], data:DataStream[DATA], key1:CONTROL=>KEY, key2:DATA=>KEY): DataStream[DATA]
: is a special instance of a connected source, where the first source is a control stream and the second source is a data stream. The control events indicate when the data event stream should be considered active, meaning any data events seen should be emitted. When the control type's$active
method returns true, the data stream will be considered active, and any data events seen on the connected stream will be emitted to the output. Conversely, when the control type's$active
method returnsfalse
, the data stream will be considered inactive, and any data events seen on the connected stream will not be emitted to the output. -
broadcastConnectedSource[ IN <: ADT: TypeInformation, BC <: ADT: TypeInformation, KEY: TypeInformation]( keyedSource: DataStream[IN], broadcastSource: DataStream[BC], keyedSourceGetKeyFunc: IN => KEY): BroadcastConnectedStream[IN, BC]
: another special connected source that implements Flink's [Broadcast State Pattern](https://nightlies.apache. org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/). ThisStreamJob
method keys and connects a regular data stream, with a so-called broadcast data stream. A broadcast stream sends copies of all of its elements to all downstream tasks. So in this case, we key the source data function and effectively send a connected stream of the data and broadcast elements to each keyed task. A common use case broadcasts a stream of rule changes that impact how the data stream should be processed. TheBroadcastConnectedStream[IN,BC]
should be processed with a special type ofCoProcessFunction
called aKeyedBroadcastProcessFunction[KEY, IN, BC, OUT]
, which produces your transformed output data stream of typeDataStream[OUT]
.
StreamJob
also provides avro versions of all these source factory methods. If your
sources are Confluent schema-registry aware avro encoded Kafka streams, you should use the
avro aware versions of these factory methods. For instance, the singleAvroSource()
method can be used to produce such an input datatream. The signatures of these methods are
more complicated and rely on you to use FlinkRunner's EmbeddedAvroRecord
and
EmbeddedAvroRecordFactory
traits when implementing your event types.
singleAvroSource[
IN <: ADT with EmbeddedAvroRecord[INA],
INA <: GenericRecord](
name: String)(implicit
fromKV: EmbeddedAvroRecordInfo[INA] => IN): DataStream[IN]
Besides source factory methods, StreamJob
also provides a method to easily perform
[windowed aggregations](https://nightlies.apache.
org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/).
def windowedAggregation[
E <: ADT: TypeInformation,
KEY: TypeInformation,
WINDOW <: Window: TypeInformation,
AGG <: Aggregate: TypeInformation,
QUANTITY <: Quantity[QUANTITY]: TypeInformation,
PWF_OUT <: ADT: TypeInformation](
source: KeyedStream[E, KEY],
initializer: WindowedAggregationInitializer[
E,
KEY,
WINDOW,
AGG,
QUANTITY,
PWF_OUT,
ADT
]): DataStream[PWF_OUT]
Finally, StreamJob
also provides a run()
method that builds and executes the flink job
graph defined by your transform
method.
class AvroStreamJob[
OUT <: ADT with EmbeddedAvroRecord[A],
A<:GenericRecord,
ADT <: FlinkEvent](runner:FlinkRunner[ADT])
An AvroStreamJob
is a specialized StreamJob
class to support outputting to an avro
encoded sink (kafka or parquet-avro files).
trait EmbeddedAvroRecord[A <: GenericRecord {
def $recordKey: Option[String] = None
def $record: A
def $recordHeaders: Map[String, String] = Map.empty
def toKV: EmbeddedAvroRecordInfo[A] =
EmbeddedAvroRecordInfo($record, $recordKey, $recordHeaders)
}
trait EmbeddedAvroRecordFactory[
E <: FlinkEvent with EmbeddedAvroRecord[A],
A <: GenericRecord] {
implicit def fromKV(recordInfo: EmbeddedAvroRecordInfo[A]): E
implicit def avroParquetRecordFormat: StreamFormat[A] = ???
}
case class EmbeddedAvroRecordInfo[A <: GenericRecord](
record: A,
keyOpt: Option[String] = None,
headers: Map[String, String] = Map.empty)
FlinkRunner
uses lightbend config for its
configuration, integrating environment variables and command line arguments to provide
easy, environment specific, 12-factor style configuration. FlinkRunner
makes this
configuration accessible through a FlinkConfig
object that you construct and pass to
your runner.
trait SourceConfig[ADT <: FlinkEvent]
trait SinkConfig[ADT <: FlinkEvent]
trait Aggregate {
def name:String
// the kind of measurement being aggregated
def dimension: String
// the preferred unit of measurements being aggregated
def unit: String
// the current aggregated value
def value: Double
// the current count of measurements included in this aggregate
def count: BigInt
// the timestamp of the most recent aggregated event
def aggregatedLastUpdated: Instant
// the timestamp when this aggregate was last updated
def lastUpdated: Instant
// other aggregations this aggregation depends on
def dependentAggregations: Map[String, Aggregate]
// configuration parameters
def params: Map[String, String]
// a method to update the aggregate with a new value
def update(value: Double, unit: String,
aggLU: Instant): Try[Aggregate]
}
Some useful utilities.