A Spark Datasource implementation for MessagePack.
We publish all our releases to maven central repository: spark-msgpack-datasource-3.5_2.12
<dependency>
<groupId>io.github.cybercentrecanada</groupId>
<artifactId>spark-msgpack-datasource-3.5_2.12</artifactId>
</dependency>
// read
val df = spark.read.format("messagepack").load("path/to/messagepack")
// write
df.write.format("messagepack").save("path/to/output/raw/messagepack")
Options | Type | Default | Description |
---|---|---|---|
deserializer.lenient | Boolean | false | When enabled, deserialization errors resulting from incompatibilities between the raw data and the schema will return null. |
deserializer.trace_path | Boolean | false | If an exception occurs while deserializing the values against the specified schema, the message will indicate the exact location in the data where the error occured. |
schema.max_sample_files | Number | 10 | The maximum number of files read during schema inference. Set to 0 for no limit. |
schema.max_sample_rows | Number | 10000 | The maximum number of rows to sample in each file during schema inference. Set to 0 for no limit. |
// If a deserialization error occurs, simply return null instead of throwing an error.
val df = spark.read.format("messagepack").option("deserializer.lenient", true).load("path/to/messagepack")
// If a deserialization error occurs, the error message will include the xpath within the raw data where the problem occured.
val df = spark.read.format("messagepack").option("deserializer.trace_path", true).load("path/to/messagepack")
// Process all files and all rows during schema inference.
val df = spark.read.format("messagepack").option("schema.max_sample_files", 0).option("schema.max_sample_rows", 0).load("path/to/messagepack")
We expose our spark sql expressions through spark's native extensions API.
Spark extension property | MessagePack extension implementation |
---|---|
spark.sql.extensions | org.apache.spark.sql.msgpack.MessagePackExtensions |
Simillarly to spark's native from_json
expression, you can convert one msgpack map raw data into a spark row.
As an example, assuming that you have a dataframe or a table named 'my_table' with the following structure:
+--------------------+
| msgpack_raw |
+--------------------+
|[87 A2 6B 31 A2 7...|
+--------------------+
And given that the schema of the raw data is:
root
|-- f1: string (nullable = true)
You can convert the raw data like this
val spark = SparkSession.builder.getOrCreate()
val schemaStr = "{"type":"struct","fields":[{"name":"f1","type":"string","nullable":true,"metadata":{}}]}"
val df = spark.sql(s"select from_msgpack(msgpack_raw, '${schemaStr}') as decoded from my_table")
df.select("decoded.*").show()
This will yield a decoded output:
+---+
| f1|
+---+
| v1|
+---+
You can provide custom deserializers for each of your msgpack extension type
- For each extension type you require a custom deserializer, simply implement the ExtensionDeserializer trait:
trait ExtensionDeserializer extends Serializable {
def extensionType(): Int
def sqlType(): DataType
def deserialize(data: Array[Byte]): Any
}
- Then implement a deserializer provider:
trait MessagePackExtensionDeserializerProvider {
def get(): ExtensionDeserializers
}
- Finally, register your application's deserializer provider by specifying your provider implementation into:
src/main/resources/META-INF/services/org.apache.spark.sql.sources.MessagePackExtensionDeserializerProvider
Below is shown the default deserializer implementation:
class ExtensionDeserializerProvider extends MessagePackExtensionDeserializerProvider {
override def get(): ExtensionDeserializers = {
val deserializers = new ExtensionDeserializers();
for(i <- 0 to 127) {
deserializers.set(new DefaultDeserializer(i))
}
deserializers
}
}
ExtensionDeserializerProvider.scala
We accept, encourage, and appreciate contributions to this project. Please send us a pull-request and we will review and get in touch with you.