Library for controlling the size of partitions when writing using Spark.
Sometimes partitions written by Spark are quite unequal(data skew), which makes further reading potentially problematic and processing inefficient.
spark-partition-sizing
aims to reduce this problem by providing a number of utilities for achieving a more balanced partitioning.
This library is build in a Spark-specific manner, so individual Spark versions are part of library name.
spark-partition-sizing-spark2.4 | spark-partition-sizing-spark3.2 | spark-partition-sizing-spark3.3 | |
---|---|---|---|
Scala 2.11 | |||
Scala 2.12 |
libraryDependencies += "za.co.absa" %% "spark-partition-sizing-spark${sparkVersion}" % "X.Y.Z"
so, e.g.
libraryDependencies += "za.co.absa" %% "spark-partition-sizing-spark2.4" % "X.Y.Z"
libraryDependencies += "za.co.absa" %% "spark-partition-sizing-spark3.2" % "X.Y.Z"
libraryDependencies += "za.co.absa" %% "spark-partition-sizing-spark3.3" % "X.Y.Z"
<dependency>
<groupId>za.co.absa</groupId>
<artifactId>spark-partition-sizing-spark${sparkVersion}_${scalaVersion}</artifactId>
<version>${latest_version}</version>
</dependency>
To build and test the package locally, run:
sbt clean test
sbt ++{scala_version} jacoco
Code coverage will be generated on path:
{project-root}/target/scala-{scala_version}/jacoco/report/html
The goal of the DataFramePartitioner
class is to offer new partitioning possibilities and other helping functions.
Repartitions the DataFrame
so the partition size is between the provided min and max parameters, judged by the execution
plan, thus if the current partition size is not within the specified range, the dataframe will be repartitioned so that the
block size will be within the range.
import za.co.absa.spark.partition.sizing.DataFramePartitioner.DataFrameFunctions
val minPartitionSizeInBytes = Some(1048576)//1mb
val maxPartitionSizeInBytes = Some(2097152)//2mbs
//specifying a range of values
val repartitionedDfWithRange = df.repartitionByPlanSize(minPartitionSizeInBytes, maxPartitionSizeInBytes)
//specifying a minimum partition size
val repartitionedDfWithMax = df.repartitionByPlanSize(None, maxPartitionSizeInBytes)
//specifying a maximum partition size
val repartitionedDfWithMin = df.repartitionByPlanSize(minPartitionSizeInBytes, None)
Repartitions the DataFrame
that each partition contains roughly the provided number of records
import za.co.absa.spark.partition.sizing.DataFramePartitioner.DataFrameFunctions
val targetNrOfRecordsPerPartition = 100
val repartitionedDf = df.repartitionByRecordCount(targetNrOfRecordsPerPartition)
Similarly to repartitionByPlanSize
, it estimates the total size of the dataframe and checks if that estimation is within
the specified min and max range. However, the difference is that other options for computing the estimated dataset size
are available, as opposed the plan size which may not always give accurate results.
Providing a way of estimating the total size is done through Sizers. These are the available sizers:
It estimates the size of each row and then sums the values, thus giving a better estimate of the data size and not needing an extra parameter. Its main drawback is that it can be slow, since all the data is being processed and may fail on deeply nested rows if the computing resources are limited.
import za.co.absa.spark.partition.sizing.DataFramePartitioner.DataFrameFunctions
import za.co.absa.spark.partition.sizing.sizer.FromDataframeSizer
val minPartitionSizeInBytes = Some(1048576)//1mb
val maxPartitionSizeInBytes = Some(2097152)//2mbs
val sizer = new FromDataframeSizer()
//specifying a range of values
val repartitionedDfWithRange = df.repartitionByDesiredSize(sizer)(minPartitionSizeInBytes, maxPartitionSizeInBytes)
Estimate the row size based on a dataframe schema and the expected/typical field sizes. Its main advantage is that this approach is quite quick, since the data from the dataset will not be used and no action will run through the data. The accuracy of the estimation, however, is not likely to be high, since nullability or complex structures may not be so well estimated. It needs an implicit parameter for computing the data sizes, which can be a custom user provided one or the default DefaultDataTypeSizes.
import za.co.absa.spark.partition.sizing.DataFramePartitioner.DataFrameFunctions
import za.co.absa.spark.partition.sizing.sizer.FromSchemaSizer
import za.co.absa.spark.partition.sizing.types.DataTypeSizes
import za.co.absa.spark.partition.sizing.types.DataTypeSizes.DefaultDataTypeSizes
val minPartitionSizeInBytes = Some(1048576)//1mb
val maxPartitionSizeInBytes = Some(2097152)//2mbs
implicit val defaultSizes: DataTypeSizes = DefaultDataTypeSizes
val sizer = new FromSchemaSizer()
//specifying a range of values
val repartitionedDfWithRange = df.repartitionByDesiredSize(sizer)(minPartitionSizeInBytes, maxPartitionSizeInBytes)
Similarly to FromSchemaSizer
, it uses the schema, but also takes the nullability into account by using the dataset summaries to
compute what percentage of each column is null. Therefore, it will apply a weight(the percentage of non-null values) to each column's estimation.
This approach has the advantage of being quicker, but somewhat slower than FromSchemaSizer
.
Its main limitation lies in the fact that it can only be used when all the columns of the dataset are primitive, non-nested values,
the limitation being due to the Spark summary statistics.
import za.co.absa.spark.partition.sizing.DataFramePartitioner.DataFrameFunctions
import za.co.absa.spark.partition.sizing.sizer.FromSchemaWithSummariesSizer
import za.co.absa.spark.partition.sizing.types.DataTypeSizes
import za.co.absa.spark.partition.sizing.types.DataTypeSizes.DefaultDataTypeSizes
val minPartitionSizeInBytes = Some(1048576)//1mb
val maxPartitionSizeInBytes = Some(2097152)//2mbs
implicit val defaultSizes: DataTypeSizes = DefaultDataTypeSizes
val sizer = new FromSchemaWithSummariesSizer()
//specifying a range of values
val repartitionedDfWithRange = df.repartitionByDesiredSize(sizer)(minPartitionSizeInBytes, maxPartitionSizeInBytes)
Estimate the data size based on a few random sample rows taken from the whole data. The number of samples can be specified by the user, otherwise the default would be 1. This approach has the advantage of being quicker, since taking samples is not so costly as going through all the data, but the total estimated size will be dependent on the random samples, therefore a higher number of samples is likely to give a better estimate, but be costlier.
import za.co.absa.spark.partition.sizing.DataFramePartitioner.DataFrameFunctions
import za.co.absa.spark.partition.sizing.sizer.FromDataframeSampleSizer
val numberOfSamples = 10
val minPartitionSizeInBytes = Some(1048576)//1mb
val maxPartitionSizeInBytes = Some(2097152)//2mbs
val sizer = new FromDataframeSampleSizer(numberOfSamples)
//specifying a range of values
val repartitionedDfWithRange = df.repartitionByDesiredSize(sizer)(minPartitionSizeInBytes, maxPartitionSizeInBytes)
Please see this file for more details.