Spark ML transformers, estimator, Spark SQL aggregations, etc that are missing in Apache Spark.
That's how we are doing Audience Modeling at Collective
resolvers += "Collective Media Bintray" at "https://dl.bintray.com/collectivemedia/releases"
And use following library dependencies:
libraryDependencies += "com.collective.sparkext" %% "sparkext-sql" % "0.0.23"
libraryDependencies += "com.collective.sparkext" %% "sparkext-mllib" % "0.0.23"
sbt test
val schema = StructType(Seq(
StructField("cookie_id", StringType),
StructField("site", StringType),
StructField("impressions", LongType)
))
val impressionLog = sqlContext.createDataFrame(sc.parallelize(Seq(
Row("cookie_1", "google.com", 10L),
Row("cookie_2", "cnn.com", 14L),
...
)), schema)
Aggregation function that collects all values from a column
import org.apache.spark.sql.ext.functions._
// collects all sites for cookie (with duplicates)
impressionLog
.groupBy(col("cookie_id"))
.agg(collectArray(col("site")))
Gets Google S2 Geometry CellId from decimal lat
and lon
val schema = StructType(Seq(
StructField("city", StringType),
StructField("lat", DoubleType),
StructField("lon", DoubleType)
))
val cities = sqlContext.createDataFrame(sc.parallelize(Seq(
Row("New York", 40.7142700, -74.0059700),
Row("London", 51.50722, -0.12750),
Row("Princeton", 40.3487200, -74.6590500)
)), schema)
val s2CellTransformer = new S2CellTransformer().setLevel(6)
s2CellTransformer.transform(cities)
Continuous features may need to be transformed to binary format using binning to account for nonlinearity. In general, binning attempts to break a set of ordered values into evenly distributed groups, such that each group contains approximately the same number of values from the sample.
Inspired by R tidyr
and reshape2
packages. Convert long
DataFrame
with values
for each key into wide
DataFrame
, applying aggregation function if single
key has multiple values
cookie_id | site_id | impressions |
---|---|---|
cookieAA | 123 | 10 |
cookieAA | 123 | 5 |
cookieAA | 456 | 20 |
val gather = new Gather()
.setPrimaryKeyCols("cookie_id")
.setKeyCol("site_id")
.setValueCol("impressions")
.setOutputCol("sites")
val gathered = gather.transform(siteLog)
cookie_id | sites |
---|---|
cookieAA | [{ site_id: 123, impressions: 15.0 }, { site_id: 456, impressions: 20.0 }] |
Encode categorical key-value pairs using dummy variables.
cookie_id | sites |
---|---|
cookieAA | [{ site_id: 1, impressions: 15.0 }, { site_id: 2, impressions: 20.0 }] |
cookieBB | [{ site_id: 2, impressions: 7.0 }, { site_id: 3, impressions: 5.0 }] |
transformed into
cookie_id | site_features |
---|---|
cookieAA | [ 15.0 , 20.0 , 0 ] |
cookieBB | [ 0.0 , 7.0 , 5.0 ] |
Optionally apply dimensionality reduction using top
transformation:
- Top coverage, is selecting categorical values by computing the count of distinct users for each value, sorting the values in descending order by the count of users, and choosing the top values from the resulting list such that the sum of the distinct user counts over these values covers c percent of all users, for example, selecting top sites covering 99% of users.
If class ratio between positives and negatives is too high, you might want to downsample all you negatives before building a model.
val downsampling = new Downsampling()
.setLabelCol("label")
.setOutputCol("sample_weight")
.setMaxClassRatio(30.0)
.setPrimaryClass(1.0) // positive class to keep as-is