API enabling switching between Spark execution engine and local implementation based on Scala collections.
- Goals
- Getting started
- Examples
- IO operations
- Aggregations
- Supported Spark versions
- Supported Spark operations
- Benchmarks
- Contributing
- Speed up unit testing for Spark applications
- Enable switching between Spark execution engine and Scala collections depending on use case, especially size of data without changing implementation
Include dependency:
"com.github.piotr-kalanski" % "spark-local_2.1.1_2.11" % "0.6.0"
or
<dependency>
<groupId>com.github.piotr-kalanski</groupId>
<artifactId>spark-local_2.1.1_2.11</artifactId>
<version>0.6.0</version>
</dependency>
Include dependency:
"com.github.piotr-kalanski" % "spark-local_2.1.0_2.11" % "0.6.0"
or
<dependency>
<groupId>com.github.piotr-kalanski</groupId>
<artifactId>spark-local_2.1.0_2.11</artifactId>
<version>0.6.0</version>
</dependency>
Entry point for library is session object which is similar to SparkSession object from Apache Spark.
Process of creating Session is also similar to Apache Spark.
When creating Session object you can choose between different execution engines. Currently supported:
- Spark - wrapper on Spark, which can be used at production data volumes
- ScalaEager - implementation based on Scala collection with eager transformations, which makes it fast for unit testing
- ScalaLazy - implementation based on Scala collection with lazy transformations, dedicated for working with small/mid size data
- ScalaParallel - implementation based on Scala parallel collection with eager transformations
- ScalaParallelLazy - implementation based on Scala parallel collection with lazy transformations
import com.datawizards.sparklocal.session.ExecutionEngine.ExecutionEngine
import com.datawizards.sparklocal.session.{ExecutionEngine, SparkSessionAPI}
// just change this value to start using different execution engine
val engine = ExecutionEngine.ScalaEager
val session = SparkSessionAPI
.builder(engine)
.master("local")
.getOrCreate()
val ds = session.read[Person](CSVDataStore(file))
import com.datawizards.sparklocal.rdd.RDDAPI
import org.apache.spark.sql.SparkSession
object ExampleRDD1 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
val data = Seq(1,2,3)
val rdd = spark.sparkContext.parallelize(data)
assertEquals(
calculateSum(RDDAPI(data)),
calculateSum(RDDAPI(rdd))
)
assertEquals(
calculateSumOfSquares(RDDAPI(data)),
calculateSumOfSquares(RDDAPI(rdd))
)
}
def assertEquals[T](r1:T, r2:T): Unit = {
println(r1)
assert(r1 == r2)
}
def calculateSum(ds: RDDAPI[Int]): Int = ds.reduce(_ + _)
def calculateSumOfSquares(ds: RDDAPI[Int]): Int = ds.map(x=>x*x).reduce(_ + _)
}
import com.datawizards.sparklocal.dataset.DataSetAPI
import org.apache.spark.sql.SparkSession
object ExampleDataset1 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val data = Seq(1,2,3)
val ds = data.toDS()
assertEquals(
calculateSum(DataSetAPI(data)),
calculateSum(DataSetAPI(ds))
)
assertEquals(
calculateSumOfSquares(DataSetAPI(data)),
calculateSumOfSquares(DataSetAPI(ds))
)
}
def assertEquals[T](r1:T, r2:T): Unit = {
println(r1)
assert(r1 == r2)
}
def calculateSum(ds: DataSetAPI[Int]): Int = ds.reduce(_ + _)
def calculateSumOfSquares(ds: DataSetAPI[Int]): Int = ds.map(x=>x*x).reduce(_ + _)
}
case class Person(id: Int, name: String, gender: String)
case class WorkExperience(personId: Int, year: Int, title: String)
case class HRReport(year: Int, title: String, gender: String, count: Int)
object ExampleHRReport {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val people = SampleData.people
val peopleDs = people.toDS()
val experience = SampleData.experience
val experienceDs = experience.toDS()
calculateReport(DataSetAPI(people), DataSetAPI(experience))
calculateReport(DataSetAPI(peopleDs), DataSetAPI(experienceDs))
}
def calculateReport(people: DataSetAPI[Person], workExperience: DataSetAPI[WorkExperience]): DataSetAPI[HRReport] = {
workExperience
.join(people)(_.personId, _.id)
.groupByKey(wp => (wp._1.year, wp._1.title, wp._2.gender))
.mapGroups{case ((year, title, gender), vals) => HRReport(year, title, gender, vals.size)}
}
}
Library provides dedicated API for input/output operations with implementation for Spark and Scala collections.
Supported formats:
- CSV
- JSON
- Parquet
- Avro
- Hive
- JDBC
- H2
- MySQL
- Elasticsearch
val reader: Reader = ReaderScalaImpl // Scala implementation
//val reader: Reader = ReaderSparkImpl // Spark implementation
reader.read[Person](
CSVDataStore(
path = "people.csv",
delimiter = ';',
header = false,
columns = Seq("name","age")
)
)
ds.write(CSVDataStore(file), SaveMode.Overwrite)
reader.read[Person](JsonDataStore("people.json"))
ds.write(JsonDataStore("people.json"), SaveMode.Overwrite)
Current implementation produces different binary files for Spark and Scala. Spark by default compress files with snappy and spark-local implementation is based on: https://github.com/sksamuel/avro4s, which saves data without compression.
reader.read[Person](AvroDataStore("people.avro"))
ds.write(AvroDataStore("people.avro"), SaveMode.Overwrite)
reader.read[Person](ParquetDataStore("people.parquet"))
ds.write(ParquetDataStore("people.parquet"), SaveMode.Overwrite)
reader.read[Person](HiveDataStore("db", "table"))
ds.write(HiveDataStore("db", "table"), SaveMode.Overwrite)
val database = "public"
val table = "people"
val properties = new java.util.Properties()
reader.read[Person](H2DataStore(connectionString, database, table, properties))
ds.write(H2DataStore(connectionString, database, table, properties), SaveMode.Append)
val indexName = "people"
val typeName = "person"
ds.write(ElasticsearchSimpleIndexDataStore("localhost", indexName, typeName), SaveMode.Append)
Library supports reading data using old and new version of data model.
Reading with old version of data model is straightforward, just existing fields will be read.
On other hand, when reading with new version of data model, new columns should be Option
type and value None
is assigned to those fields.
Example
import com.datawizards.sparklocal.datastore.CSVDataStore
import com.datawizards.sparklocal.session.{ExecutionEngine, SparkSessionAPI}
import org.apache.spark.sql.SaveMode
case class Person(name: String, age: Int)
case class PersonV2(name: String, age: Int, title: Option[String])
case class PersonV3(name: String, age: Int, title: Option[String], salary: Option[Long])
val session = SparkSessionAPI
.builder(ExecutionEngine.ScalaEager)
.master("local")
.getOrCreate()
import session.implicits._
val peopleV2 = session.createDataset(Seq(
PersonV2("p1", 10, Some("Mr")),
PersonV2("p2", 20, Some("Ms")),
PersonV2("p3", 30, None),
PersonV2("p4", 40, Some("Mr"))
))
val dataStore = CSVDataStore("people_v2.csv")
peopleV2.write(dataStore, SaveMode.Overwrite)
// Read old version:
val peopleV1 = session.read[Person](dataStore)
// Read new version:
val peopleV3 = session.read[PersonV3](dataStore)
peopleV1.show()
peopleV3.show()
+----+---+
|name|age|
+----+---+
|p1 |10 |
|p2 |20 |
|p3 |30 |
|p4 |40 |
+----+---+
+----+---+-----+------+
|name|age|title|salary|
+----+---+-----+------+
|p1 |10 |Mr | |
|p2 |20 |Ms | |
|p3 |30 | | |
|p4 |40 |Mr | |
+----+---+-----+------+
Library supports changing name of fields when writing and reading data.
Mapping is provided using column
annotation from project: https://github.com/piotr-kalanski/data-model-generator.
Example
import com.datawizards.dmg.annotations.column
import com.datawizards.sparklocal.dataset.io.ModelDialects
import com.datawizards.sparklocal.datastore.JsonDataStore
import com.datawizards.sparklocal.session.{ExecutionEngine, SparkSessionAPI}
import org.apache.spark.sql.SaveMode
case class PersonWithMapping(
@column("personName", dialect = ModelDialects.JSON)
name: String,
@column("personAge", dialect = ModelDialects.JSON)
age: Int
)
val session = SparkSessionAPI
.builder(ExecutionEngine.ScalaEager)
.master("local")
.getOrCreate()
import session.implicits._
val people = session.createDataset(Seq(
PersonWithMapping("p1", 10),
PersonWithMapping("p2", 20),
PersonWithMapping("p3", 30),
PersonWithMapping("p4", 40)
))
val dataStore = JsonDataStore("people_mapping.json")
people.write(dataStore, SaveMode.Overwrite)
Name of fields in JSON file are consistent with column names provided in annotations:
{"personName":"p1","personAge":10}
{"personName":"p2","personAge":20}
{"personName":"p3","personAge":30}
{"personName":"p4","personAge":40}
Reading data:
val people2 = session.read[PersonWithMapping](dataStore)
people2.show()
Names of columns are the same as case class fields:
+----+---+
|name|age|
+----+---+
|p1 |10 |
|p2 |20 |
|p3 |30 |
|p4 |40 |
+----+---+
Library provides custom type-safe API for aggregations.
Example operations:
import com.datawizards.sparklocal.dataset.agg._
ds
.groupByKey(_.name)
.agg(sum(_.age), count(), mean(_.age), max(_.age))
spark-local | Spark version |
---|---|
0.7 | 2.1.1 2.1.0 |
0.6 | 2.1.1 2.1.0 |
0.5 | 2.1.0 |
0.4 | 2.1.0 |
0.3 | 2.1.0 |
0.2 | 2.1.0 |
0.1 | 2.1.0 |
Please report any bugs to spark-local Github issue tracker.