A tiny Scala library to send events and entities to Apache Flume.
This library differentiates between events and entities:
- Events are information about something that happened at a certain time with attached metadata. (most common use case)
- At EyeEm we send events to Flume e.g. when a user uploads a photo.
- Entities are considered static, like the deterministic output of a function with version X on some data Y. (less common use case)
- At EyeEm we use this to store the result of expensive deep learning model computations on photos. E.g. photoId=123 contains ["tree", "mountain", "lake"] by keyword function version 3.0. This allows us to do further analysis on the keywords stored in Hadoop later.
Add following lines to your build.sbt
:
resolvers += Resolver.bintrayRepo("eyeem", "maven")
libraryDependencies += "com.eyeem" %% "flume-client" % "0.2.0"
Override these defaults in your application.conf if needed:
flume {
applicationName = "override_your_application_name"
host = "override_your_hostname"
portEvent = 9091 # events are sent to this port
portEntity = 9092 # entities are sent to this port
enabled = true
threadPoolSize = 10
}
- This project only supports Scala 2.11.x. If you need Scala 2.10 support please open an issue.
- This project was built with Java 8. If you wish to use this library in a Java 7 project let us know by opening an issue.
module | dependsOn | version |
---|---|---|
flume-client | play-json | 2.5.x |
pureconfig | 0.7.2 | |
libthrift | 0.9.3 |
There is a hard dependency on play-json. If you would like to have support for other json libraries, please open an issue or pull request.
You may initialize a FlumeReporter in two ways:
import com.eyeem.flume.client.FlumeReporter
val flumeReporter = new FlumeReporter() // make sure to override at a minimum flume.applicationName and flume.host in your configuration file
or
import com.eyeem.flume.client.FlumeReporter
val flumeConfig = FlumeConfig(applicationName = "myAppName", host = "myserver.mydomain.overrideMe")
val flumeReporter = new FlumeReporter(flumeConfig)
import com.eyeem.flume.client.FlumeReporter
import play.api.libs.json.Json
import scala.concurrent.Future
case class MyEvent(someAttribute: String, otherAttribute: Boolean)
object MyEvent {
implicit val myEventFormat = Json.format[MyEvent]
}
class SomeClass {
// initialize your flumeReporter
def sendMyEvent(): Future[Unit] = {
// create an event
val myEvent = MyEvent(someAttribute = "something happened", otherAttribute = true)
val jsonData = Json.toJson(myEvent)
// send the event to Flume
flumeReporter.postEvent("eventName", jsonData)
}
}
The above will send the following json payload to localhost:9091 with headers application = "myAppName"
:
{
"event_name": "eventName",
"timestamp": "2016-07-25T18:18:08.180+02:00",
"user_id": "myAppName",
"salt": 873375028,
"data": {
"someAttribute": "something happened",
"otherAttribute": true
}
}
import com.eyeem.flume.client.models.FlumeEntity
import com.eyeem.flume.client.FlumeReporter
import play.api.libs.json.Json
import scala.concurrent.Future
case class MyEntity(someAttribute: String, otherAttribute: Boolean)
object MyEntity {
implicit val myEventFormat = Json.format[MyEntity]
}
class SomeClass {
// initialize your flumeReporter
def sendMyEntity(): Future[Unit] = {
// create a static entity
val entity = MyEntity(someAttribute = "something happened", otherAttribute = true)
val entityJson = Json.toJson(entity)
// send an entity
flumeReporter.postEntity(FlumeEntity("entityName", "v2", entityJson))
}
}
The above will send the following to localhost:9092:
Headers:
application = "myAppName"
entity = "entityName"
version = "v2"
Body:
{
"someAttribute": "something happened",
"otherAttribute": true
}
Both postEvent
and postEntity
functions execute asynchronously and return a Scala Future[Unit]
. This Future will fail if an exception occurs (e.g. incorrect hostname set) - keep this in mind when composing futures.
Since the underlying calls to Thrift are blocking, it uses a separate execution context (fixed thread pool) configurable through your application.conf
to avoid introducing blocking code on your default execution context.
This code is open source software licensed under the Apache 2.0 License.
Pull requests and issues welcome.
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request
sbt test
for regular testssbt it:test
runs integration tests (They require a running Flume ingestionAgent. Configuration for that is not part of this repo). Tested withapache-flume-1.6.0
.
Java code under src/main/java
was generated using the Thrift compiler from the Thrift file with
thrift --gen java --out src/main/java src/main/resources/flume.thrift
To publish a new version see Maintainers