Spark "Helpers for Array Transformations"
This library extends Spark DataFrame API with helpers for transforming fields inside nested structures and arrays of arbitrary levels of nesting.
Reference the library
Please, use the table below to determine what version of spark-hats to use for Spark compatibility.
spark-hats version | Scala version | Spark version |
---|---|---|
0.1.x | 2.11, 2.12 | 2.4.3+ |
0.2.x | 2.11, 2.12 | 2.4.3+ |
0.2.x | 2.12 | 3.0.0+ |
0.3.x | 2.11 | 2.4.3+ |
0.3.x | 2.12, 2.13 | 3.2.1+ |
To use the extensions you need to add this import to your Spark application or shell:
import za.co.absa.spark.hats.Extensions._
sbt ++{matrix.scala} jacoco -DSPARK_VERSION={matrix.spark}
Code coverage will be generated on path:
{project-root}/spark-hats/target/scala-{scala_version}/jacoco/report/html
Here is a small example we will use to show you how spark-hats
work. The important thing is that the dataframe
contains an array of struct fields.
scala> df.printSchema()
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: long (nullable = true)
| | |-- b: string (nullable = true)
scala> df.show(false)
+---+------------------------------+
|id |my_array |
+---+------------------------------+
|1 |[[1, foo]] |
|2 |[[1, bar], [2, baz], [3, foz]]|
+---+------------------------------+
Now, say, we want to add a field c
as part of the struct alongside a
and b
from the example above. The
expression for c
is c = a + 1
.
Here is the code you can use in Spark:
val dfOut = df.select(col("id"), transform(col("my_array"), c => {
struct(c.getField("a").as("a"),
c.getField("b").as("b"),
(c.getField("a") + 1).as("c"))
}).as("my_array"))
(to use transform()
in Scala API you need to add spark-hofs as a dependency).
Here is how it looks when using spark-hats
library.
val dfOut = df.nestedMapColumn("my_array.a","c", a => a + 1)
Both produce the following results:
scala> dfOut.printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- a: long (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: long (nullable = true)
scala> dfOut.show(false)
+---+---------------------------------------+
|id |my_array |
+---+---------------------------------------+
|1 |[[1, foo, 2]] |
|2 |[[1, bar, 2], [2, baz, 3], [3, foz, 4]]|
+---+---------------------------------------+
Imagine how the code will look like for more levels of array nesting.
The nestedWithColumn
method allows adding new fields inside nested structures and arrays.
The addition of a column API is provided in two flavors: the basic and the extended API. The basic API is simpler to use, but the expressions it expects can only reference columns at the root of the schema. Here is an example of the basic add column API:
scala> df.nestedWithColumn("my_array.c", lit("hello")).printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- a: long (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: string (nullable = false)
scala> df.nestedWithColumn("my_array.c", lit("hello")).show(false)
+---+---------------------------------------------------+
|id |my_array |
+---+---------------------------------------------------+
|1 |[[1, foo, hello]] |
|2 |[[1, bar, hello], [2, baz, hello], [3, foz, hello]]|
+---+---------------------------------------------------+
The extended API method nestedWithColumnExtended
works similarly to the basic one but allows the caller to reference
other array elements, possibly on different levels of nesting. The way it allows this is a little tricky.
The second parameter is changed from being a column to a function that returns a column. Moreover, this function has
an argument which is a function itself, the getField()
function. The getField()
function can be used in the
transformation to reference other columns in the dataframe by their fully qualified name.
In the following example, a transformation adds a new field my_array.c
to the dataframe by concatenating a root
level column id
with a nested field my_array.b
:
scala> val dfOut = df.nestedWithColumnExtended("my_array.c", getField =>
concat(getField("id").cast("string"), getField("my_array.b"))
)
scala> dfOut.printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- a: long (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: string (nullable = true)
scala> dfOut.show(false)
+---+------------------------------------------------+
|id |my_array |
+---+------------------------------------------------+
|1 |[[1, foo, 1foo]] |
|2 |[[1, bar, 2bar], [2, baz, 2baz], [3, foz, 2foz]]|
+---+------------------------------------------------+
-
Note. You can still use
col
to reference root level columns. But if a column is inside an array (likemy_array.b
), invokingcol("my_array.b")
will reference the whole array, not an individual element. ThegetField()
function that is passed to the transformation solves this by adding a generic way of addressing array elements on arbitrary levels of nesting. -
Advanced Note. If there are several arrays in the schema,
getField()
allows to reference elements of an array if it is one of the parents of the output column.
The nestedDropColumn
method allows dropping fields inside nested structures and arrays.
scala> df.nestedDropColumn("my_array.b").printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- a: long (nullable = true)
scala> df.nestedDropColumn("my_array.b").show(false)
+---+---------------+
|id |my_array |
+---+---------------+
|1 |[[1]] |
|2 |[[1], [2], [3]]|
+---+---------------+
The nestedMapColumn
method applies a transformation on a nested field. If the input column is a primitive field the
method will add outputColumnName
at the same level of nesting. If a struct column is expected you can use
.getField(...)
method to operate on its children.
The output column name can omit the full path as the field will be created at the same level of nesting as the input column.
scala> df.nestedMapColumn(inputColumnName = "my_array.a", outputColumnName = "c", expression = a => a + 1).printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- a: long (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: long (nullable = true)
scala> df.nestedMapColumn(inputColumnName = "my_array.a", outputColumnName = "c", expression = a => a + 1).show(false)
+---+---------------------------------------+
|id |my_array |
+---+---------------------------------------+
|1 |[[1, foo, 2]] |
|2 |[[1, bar, 2], [2, baz, 3], [3, foz, 4]]|
+---+---------------------------------------+
Syntax: df.nestedUnstruct("NestedStructColumnName")
.
Flattens one level of nesting when a struct is nested in another struct. For example,
scala> df.printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: long (nullable = true)
| | |-- b: string (nullable = true)
| | |-- c: struct (containsNull = true)
| | | |--nestedField1: string (nullable = true)
| | | |--nestedField2: long (nullable = true)
scala> df.nestedUnstruct("my_array.c").printSchema
root
|-- id: long (nullable = true)
|-- my_array: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: long (nullable = true)
| | |-- b: string (nullable = true)
| | |-- nestedField1: string (nullable = true)
| | |-- nestedField2: long (nullable = true)
Note that the output schema doesn't have the c
struct. All fields of c
are now part of the parent struct.