Deriving Spark DataFrame schemas from case classes.
struct-type-encoder is available on maven central with the following coordinates:
"com.github.benfradet" %% "struct-type-encoder" % "0.5.0"
When reading a DataFrame/Dataset from a data source the schema of the data has to be inferred. In practice, this translates into looking at every record of all the files and coming up with a schema that can satisfy every one of these records, as shown here for JSON.
As anyone can guess, this can be a very time-consuming task, especially if you know in advance the schema of your data. A common pattern is to do the following:
case class MyCaseClass(a: Int, b: String, c: Double)
val inferred = spark
.read
.json("/some/dir/*.json")
.as[MyCaseClass]
In this case, there is no need to spend time inferring the schema as the DataFrame is directly
converted to a Dataset of MyCaseClass
. However, it can be a lot of boilerplate to bypass the
inference by specifying your own schema.
import org.apache.spark.sql.types._
val schema = SructType(
StructField("a", IntegerType) ::
StructField("b", StringType) ::
StructField("c", DoubleType) :: Nil
)
val specified = spark
.read
.schema(schema)
.json("/some/dir/*.json")
.as[MyCaseClass]
struct-type-encoder derives instances of StructType
(how
Spark represents a schema) from your case class automatically:
import ste.StructTypeEncoder
import ste.StructTypeEncoder._
val derived = spark
.read
.schema(StructTypeEncoder[MyCaseClass].encode)
.json("/some/dir/*.json")
.as[MyCaseClass]
No inference, no boilerplate!
It is possible to add Metada
information to StructField
s with the Meta
annotation:
import org.apache.spark.sql.types._
import ste._
val metadata = new MetadataBuilder()
.putLong("foo", 10)
.putString("bar", "baz")
.build()
case class Foo(a: String, @Meta(metadata) b: Int)
Using the ste.Flatten
annotation we can eliminate repetitions from case class definitions.
Take the following example:
import ste._
case class Foo(a: String, b: Int)
case class Bar(@Flatten(2) a: Seq[Foo], @Flatten(1, Seq("x", "y")) b: Map[String, Foo], @Flatten c: Foo)
StructTypeEncoder[Bar].encode
The derived schema is the following:
StructType(
StructField("a.0.a", StringType, false) ::
StructField("a.0.b", IntegerType, false) ::
StructField("a.1.a", StringType, false) ::
StructField("a.1.b", IntegerType, false) ::
StructField("b.x.a", StringType, false) ::
StructField("b.x.b", IntegerType, false) ::
StructField("b.y.a", StringType, false) ::
StructField("b.y.b", IntegerType, false) ::
StructField("c.a", StringType, false) ::
StructField("c.b", IntegerType, false) :: Nil
)
Now we want to read our data source with a flat schema:
import ste.StructTypeEncoder
import ste.StructTypeEncoder._
val df = spark
.read
.schema(StructTypeEncoder[Bar].encode)
.csv("/some/dir/*.csv")
struct-type-encoder
can derive the nested projection of a Dataframe
and convert it to a Dataset
by providing the class:
import StructTypeSelector._
val ds: Dataset[Bar] = df.asNested[Bar]
This project includes JMH benchmarks to prove that inferring schemas and coming up with the schema satisfying all records is expensive. The benchmarks compare the average time spent parsing a thousand files each containing a hundred rows when the schema is inferred (by Spark, not user-specified) and derived (thanks to struct-type-encoder).
derived | inferred | |
---|---|---|
CSV | 5.936 ± 0.035 s | 6.494 ± 0.209 s |
JSON | 5.092 ± 0.048 s | 6.019 ± 0.049 s |
We see that when deriving the schemas we spend 16.7% less time reading JSON data and a 8.98% for CSV.