That library provides a Scala based Apache Spark (a fast and general engine for large-scale data processing) integration with the NATS messaging system (a highly performant cloud native messaging system) as well as NATS Streaming (a data streaming system powered by NATS).
That library provided a wrapper over the (Java based) NATS / Spark Connectors to facilitate its usage on Scala (which is the de facto language of Spark).
Please refer to that page for additional information.
- Based on nats-connector-spark version
1.0.0
- Based on Java Nats Streaming
2.1.2
, which includes NATS - Java Client version2.3.0
- Based on Spark version
2.3.1
- Published as a Spark Package
- Based on nats-connector-spark version
0.4.0
- Spark version
2.2.1
- Spark version
2.0.1
+ Scala version2.11.8
.asStreamOf(ssc)
introducedstoredAsKeyValue()
introduced- Message Data can be any Java
Object
(not limited toString
), serialized asbyte[]
(the native NATS payload format)
Include this package in your Spark Applications (spark-shell
, pyspark
, or spark-submit
) using:
> $SPARK_HOME/bin/spark-shell --packages com.logimethods:nats-connector-spark-scala_2.11:1.0.0
resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
resolvers += "Sonatype OSS Release" at "https://oss.sonatype.org/content/groups/public/"
libraryDependencies += "com.logimethods" % "nats-connector-spark-scala_2.11" % "1.0.0"
In your pom.xml
, add:
<dependencies>
<!-- list of dependencies -->
<dependency>
<groupId>com.logimethods</groupId>
<artifactId>nats-connector-spark-scala_2.11</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
See the Java code Documentation to get the list of the available options (properties, subjects, encoder/decoder, etc.).
import com.logimethods.connector.nats.to_spark._
import com.logimethods.scala.connector.spark.to_nats._
val ssc = new StreamingContext(sc, new Duration(2000));
The reception of NATS Messages as Spark Steam is done through the NatsToSparkConnector.receiveFromNats(classOf[Class], ...)
method, where [Class]
is the Java Class of the objects to deserialize.
Those objects need first to be serialized as byte[]
using the right protocol before being stored into the NATS messages payload.
By default, the (Java) number types are then automatically decoded by the connector.
For more complex types, you should provide your own decoder through the withDataDecoder(scala.Function1<byte[], V> dataDecoder)
method.
Let's say that the payload have been encoded that way:
def encodePayload(date: LocalDateTime, value: Float): Array[Byte] = {
// https://docs.oracle.com/javase/8/docs/api/java/nio/ByteBuffer.html
val buffer = ByteBuffer.allocate(8+4);
buffer.putLong(date.atZone(zoneId).toEpochSecond())
buffer.putFloat(value)
return buffer.array()
}
You have to provide your own decoder:
def dataDecoder: Array[Byte] => Tuple2[Long,Float] = bytes => {
import java.nio.ByteBuffer
val buffer = ByteBuffer.wrap(bytes);
val epoch = buffer.getLong()
val voltage = buffer.getFloat()
(epoch, voltage)
}
import org.apache.spark.streaming.dstream._
val messages: ReceiverInputDStream[(String, (Long, Float))] =
NatsToSparkConnector
.receiveFromNatsStreaming(classOf[Tuple2[Long,Float]], StorageLevel.MEMORY_ONLY, clusterId)
.withNatsURL(natsUrl)
.withSubjects(inputSubject)
.withDataDecoder(dataDecoder)
.asStreamOfKeyValue(ssc)
val stream = NatsToSparkConnector.receiveFromNatsStreaming(classOf[String], StorageLevel.MEMORY_ONLY, clusterId)
.withNatsURL(natsUrl)
.withSubjects(inputSubject)
.asStreamOf(ssc)
val properties = new Properties()
val natsUrl = System.getenv("NATS_URI")
val stream = NatsToSparkConnector.receiveFromNats(classOf[Integer], StorageLevel.MEMORY_ONLY)
.withProperties(properties)
.withSubjects(inputSubject)
.asStreamOf(ssc)
The Spark Stream is there made of Key/Value Pairs, where the Key is the Subject and the Value is the Payload of the NATS Messages.
val stream = NatsToSparkConnector.receiveFromNats[Streaming](...)
...
.withSubjects("main-subject.>")
.asStreamOfKeyValue(ssc)
stream.groupByKey().print()
The Spark elements are first serialized as byte[]
before being sent to NATS. By default, the (Java) number types are encoded through the com.logimethods.connector.nats_spark.NatsSparkUtilities.encodeData(Object obj)
method.
Custom serialization can be performed by a java.util.function.Function<[Class], byte[]> & Serializable)
function provided through the .publishToNats(...)
method, like:
val stream: DStream[(String, (Long, Float))] = .../...
def longFloatTupleEncoder: Tuple2[Long,Float] => Array[Byte] = tuple => {
val buffer = ByteBuffer.allocate(8+4);
buffer.putLong(tuple._1)
buffer.putFloat(tuple._2)
buffer.array()
}
SparkToNatsConnectorPool.newStreamingPool(clusterId)
.withNatsURL(natsUrl)
.withSubjects(outputSubject)
.publishToNatsAsKeyValue(stream, longFloatTupleEncoder)
SparkToNatsConnectorPool.newStreamingPool(clusterId)
.withNatsURL(natsUrl)
.withSubjects(outputSubject)
.publishToNats(stream)
SparkToNatsConnectorPool.newPool()
.withProperties(properties)
.withSubjects(outputSubject)
.publishToNats(stream)
The Spark Stream should there be made of Key/Value Pairs. .storedAsKeyValue()
will publish NATS Messages where the Subject is a composition of the (optional) Global Subject(s) and the Key of the Pairs ; while the NATS Payload will be the Pair's Value.
stream.groupByKey().print()
SparkToNatsConnectorPool.new[Streaming]Pool(...)
...
.withSubjects("A1.", "A2.")
.publishToNatsAsKeyValue(stream)
will send to NATS such [subject:payload] messages:
[A1.key1:string1]
[A2.key1:string1]
[A1.key2:string2]
[A2.key2:string2]
...
- The 'docker-nats-connector-spark' Docker Based Project that makes use of Gatling, Spark & NATS.
- The 'smart-meter' Docker Swarm Based Project that makes use of Gatling, Cassandra, Golang, Spark & NATS.
(The MIT License)
Copyright (c) 2016-2019 Logimethods.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.