WordCount
object WordCount extends SequenceFileJob[InputAndOutput] {
override def execute(argument: InputAndOutput)(implicit ec: ElectricSession) = {
val session = ec.getSparkSession
import session.implicits._
val file = ec.text(argument.input)
val words = file.flatMap(_.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+"))
words
.groupByKey(f => f)
.count()
.write
.option("delimiter", "\t")
.csv(argument.output)
}
}
WordCountTest
class WordCountTest extends ElectricJobTest {
test("wordcount test with spark") {
val input = createFile {
"""
hello world
Zero world
Some world
""".stripMargin
}
val output = createTempPath()
launch(WordCount, InputAndOutput(input, output))
val lines = readFilesInDirectory(output, "part")
lines should contain("hello\t1")
lines should contain("world\t3")
}
}