tharwaninitin / etlflow   1.7.3

Apache License 2.0 GitHub

EtlFlow is an ecosystem of functional libraries in Scala based on ZIO for running complex Auditable workflows which can interact with Google Cloud Platform, AWS, Kubernetes, Databases, SFTP servers, On-Prem Systems and more.

Scala versions: 3.x 2.13 2.12
Scala.js versions: 1.x

EtlFlow

License EtlFlow CI Maven Central javadoc

EtlFlow is an ecosystem of functional libraries in Scala based on ZIO for running complex Auditable jobs/workflows which can interact with Google Cloud Platform, AWS, Kubernetes, Databases and more.

Below are some important features of this library, some of which come from ZIO.

  • Universal. It provides a consistent way to interact with different services/products across cloud platforms like GCP, AWS, Azure, and On-Premises systems.
  • Functional. Rapidly compose complex jobs/workflows from simple tasks.
  • Auditable. Build jobs/workflows that provides auditability by default for multiple backends.
  • Resource-safe. Build jobs/workflows that never leak resources (including threads!), even when they fail.
  • Compile Time DI. Build jobs/workflows that allows resolving dependencies at compile time.
  • Fibers. Built on non-blocking fibers that never waste or leak resources, which lets you build scalable, resilient, and reactive applications
  • Concurrent and Asynchronous. Easily build concurrent asynchronous or synchronous jobs/workflows without deadlocks, race conditions, or complexity.
  • Type-safe. Use the full power of the Scala compiler to catch bugs at compile time.
  • Testable. Inject test services into your job/workflow for fast, deterministic, and type-safe testing.
  • Resilient. Build jobs/workflows that never lose errors, and which respond to failure locally and flexibly.

Examples

Modules Dependency Graph

ModuleDepGraph

Module Latest Version Documentation Scala Versions Java Version
Core Latest Version javadoc etlflow-core Scala version support Java 8 +
GCP Latest Version javadoc etlflow-gcp Scala version support Java 8 +
JDBC Latest Version javadoc etlflow-jdbc Scala version support Java 8 +
Http Latest Version javadoc etlflow-http Scala version support Java 11 +
K8S Latest Version javadoc etlflow-k8s Scala version support Java 8 +
Email Latest Version javadoc etlflow-email Scala version support Java 8 +
AWS Latest Version javadoc etlflow-aws Scala version support Java 8 +
FTP Latest Version javadoc etlflow-ftp Scala version support Java 8 +
Redis Latest Version javadoc etlflow-redis Scala version support Java 8 +
Spark Latest Version javadoc etlflow-spark Scala version support Java 8 +

Requirements and Installation

This project is compiled with scala versions 2.12.17, 2.13.10, 3.3.0

Available via maven central. Add the below latest release as a dependency to your project

Latest Version

SBT

libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-gcp" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-spark" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-k8s" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-http" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-ftp" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-redis" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-aws" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-email" % "1.7.3"

Maven

<dependency>
    <groupId>com.github.tharwaninitin</groupId>
    <artifactId>etlflow-core_2.12</artifactId>
    <version>1.7.3</version>
</dependency>

Etlflow Modules

Core

The core module provides Task and Audit APIs, which are used by all tasks in different modules. It also provides a Job API that facilitates grouping multiple tasks together to leverage auditing and logging capabilities at the job/workflow level.

Task API

Below is the simplest example of creating a Task and running it using EtlFlow. This example uses the noop audit backend, which does nothing. This is useful when you want to test a task that requires an audit backend to be passed in.

libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.3"
import etlflow.audit.Audit
import etlflow.task.GenericTask
import zio._

object Job1 extends ZIOAppDefault {

  def executeTask(): Task[Unit] = ZIO.logInfo(s"Hello EtlFlow Task")

  val genericTask1: GenericTask[Any, Unit] = GenericTask(
    name = "Generic Task",
    task = executeTask()
  )

  val task1: RIO[Audit, Unit] = genericTask1.toZIO

  override def run: Task[Unit] = task1.provide(etlflow.audit.noop)
}

Audit API

EtlFlow provides an auditing interface that can be used to track the execution of tasks and jobs (collections of tasks). The auditing interface is integrated with the Task Interface. Each task uses this interface to maintain the state of all tasks in the job/workflow in the backend of choice for end-to-end auditability. Currently, there are audit backend implementations available for BigQuery, MySQL, and Postgres. Audit has a simple and concise interface, which makes it quite easy to add any new backend.

libraryDependencies += "com.github.tharwaninitin" %% "etlflow-core" % "1.7.3"
libraryDependencies += "com.github.tharwaninitin" %% "etlflow-jdbc" % "1.7.3"
import etlflow.task.{GenericTask, DBQueryTask}
import etlflow.model.Credential.JDBC
import zio._

object Job2 extends ZIOAppDefault {
  
  private val task1 = GenericTask(
    name = "Generic Task 1",
    task = ZIO.logInfo(s"Task 1")
  )

  private val task2 = GenericTask(
    name = "Generic Task 2",
    task = ZIO.logInfo(s"Task 2")
  )

  val job = for {
    _ <- task1.toZIO
    _ <- task2.toZIO
  } yield ()

  private val cred = JDBC(sys.env("DB_URL"), sys.env("DB_USER"), sys.env("DB_PWD"), sys.env("DB_DRIVER"))

  override def run: Task[Unit] = job.provide(etlflow.audit.DB(cred))
}

Here's a snapshot of data for the task_run table after this job has run:

task_run_id job_run_id task_name task_type metadata status created_at modified_at
1 100 Task 1 GenericTask {} SUCCESS 2023-07-13 10:00:00 UTC 2023-07-13 11:00:00 UTC
2 100 Task 2 GenericTask {} RUNNING 2023-07-13 12:00:00 UTC 2023-07-13 13:00:00 UTC

Job API

Job API enables grouping multiple tasks together for auditing capabilities at the job level, below is the example of creating a JobApp and running it using EtlFlow. By default, it uses noop audit layer but here we are using JDBC layer to persist auditing information in database.

import etlflow._
import etlflow.audit.Audit
import etlflow.task._
import zio._

object MyJobApp extends JobApp {

  private val cred = JDBC(sys.env("DB_URL"), sys.env("DB_USER"), sys.env("DB_PWD"), sys.env("DB_DRIVER"))
  
  override val auditLayer: Layer[Throwable, Audit] = etlflow.audit.DB(cred)

  private val task1 = GenericTask(
    name = "Task_1",
    task = ZIO.logInfo(s"Hello EtlFlow Task")
  )

  def job(args: Chunk[String]): RIO[audit.Audit, Unit] = task1.toZIO
}

Here's a snapshot of data for the job_run and task_run table after this job has run:

job_run_id job_name metadata status created_at modified_at
1 MyJobApp {} SUCCESS 2023-07-13 10:00:00 UTC 2023-07-13 11:00:00 UTC
task_run_id job_run_id task_name task_type metadata status created_at modified_at
1 1 Task 1 GenericTask {} SUCCESS 2023-07-13 10:00:00 UTC 2023-07-13 11:00:00 UTC

GCP

# To run all below GCP examples set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the location of the service account json key. 
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/key.json

Dataproc

import etlflow.task._
import gcp4zio.dp._
import etlflow.audit
import etlflow.audit.Audit
import zio._

val gcpProject: String = "GCP_PROJECT"
val gcpRegion: String  = "GCP_REGION"
val dpCluster: String  = "DP_CLUSTER"
val dpEndpoint: String = "DP_ENDPOINT"
val dpBucket: String   = "DP_BUCKET"

val createCluster = DPCreateTask("DPCreateTask", dpCluster, ClusterProps(dpBucket))
val deleteCluster = DPDeleteTask("DPDeleteTask", dpCluster)

val args      = List("1000")
val mainClass = "org.apache.spark.examples.SparkPi"
val libs      = List("file:///usr/lib/spark/examples/jars/spark-examples.jar")
val conf      = Map("spark.executor.memory" -> "1g", "spark.driver.memory" -> "1g")

val sparkJob = DPSparkJobTask("DPSparkJobTask", args, mainClass, libs, conf)

val programGCP: RIO[DPJob with DPCluster with Audit, Unit] = for {
  _ <- createCluster.toZIO
  _ <- sparkJob.toZIO
  _ <- deleteCluster.toZIO
} yield ()

val dpJobLayer = DPJob.live(dpCluster, gcpProject, gcpRegion, dpEndpoint)
val dpClusterLayer = DPCluster.live(gcpProject, gcpRegion, dpEndpoint)

programGCP.provide(dpJobLayer,dpClusterLayer,audit.noop)

Check this for complete example.

JDBC

// Todo

K8S

This module depends on kubernetes official Java client library version 18.0.1

import etlflow.task._
import etlflow.k8s._
import etlflow.audit
import etlflow.audit.Audit
import zio._

val jobName: String = "hello"

val programK8S: RIO[K8S with Audit, Unit] = for {
  _ <- K8SJobTask(
    name = "CreateKubeJobTask",
    jobName = jobName,
    image = "busybox:1.28",
    command = Some(List("/bin/sh", "-c", "sleep 5; ls /etc/key; date; echo Hello from the Kubernetes cluster"))
  ).toZIO
  _ <- K8STrackJobTask("TrackKubeJobTask", jobName).toZIO
  _ <- K8SJobLogTask("GetKubeJobLogTask", jobName).toZIO
  _ <- K8SDeleteJobTask("DeleteKubeJobTask", jobName).toZIO
} yield ()

programK8S.provide(K8S.live(),audit.noop)

Check this for complete example.

Http

// Todo

Email

// Todo

AWS

// Todo

Redis

// Todo

Spark

// Todo

Contributions

Please feel free to add issues to report any bugs or to propose new features.