sparkflow

module
v0.0.0-...-d76f36f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: Apache-2.0

README

Sparkflow

CI Go Report Card License

Distributed workflow orchestrator with native Apache Spark and Hadoop integration. A lightweight Go alternative to Apache Airflow.

Architecture

Client (CLI/SDK/HTTP) --> API Gateway (gRPC + REST)
  --> Scheduler Cluster (Raft consensus, 3 nodes)
    --> Worker Pool (gRPC dispatch, heartbeat)
      --> Executors: Spark | Hadoop MR | HDFS | Shell | Python | Docker
  --> PostgreSQL (metadata, DAG state, run history)
  --> OpenTelemetry (traces, metrics, logs)

Features

  • DAG-based workflows - Define tasks and dependencies in YAML or Python
  • 6 built-in executors - Shell, Python, Docker, Spark, Hadoop MapReduce, HDFS
  • Distributed scheduling - Raft-based leader election with worker pool
  • Retry policies - Fixed, exponential, linear backoff with dead letter queue
  • Cron scheduling - Periodic DAG execution with cron expressions
  • Exactly-once semantics - Idempotency keys for task execution
  • Observability - OpenTelemetry tracing, Prometheus metrics, structured logging
  • Python SDK - Decorator-based DAG definitions
  • Production-ready - Health checks, graceful shutdown, Helm charts

Quick Start

Install
go install github.com/ilya-shevelev/sparkflow/cmd/sparkflowctl@latest
Define a DAG

Create workflow.yaml:

id: my-etl
name: My ETL Pipeline
schedule:
  expression: "0 2 * * *"
config:
  max_concurrency: 4
  timeout: "1h"
tasks:
  - id: extract
    name: Extract Data
    executor: shell
    config:
      command: "curl -o /tmp/data.csv https://api.example.com/data"
    timeout: "5m"

  - id: transform
    name: Transform Data
    executor: python
    config:
      script: "transform.py"
    dependencies: [extract]

  - id: load
    name: Load to Warehouse
    executor: shell
    config:
      command: "psql -c \"COPY warehouse FROM '/tmp/output.csv'\""
    dependencies: [transform]
Validate and Run
# Validate the DAG
sparkflowctl validate -f workflow.yaml

# Run locally
sparkflowctl run -f workflow.yaml
Start the Server
# Start with in-memory store
sparkflow-server --http-addr=:8080 --grpc-addr=:9090

# Start with PostgreSQL
sparkflow-server --store-dsn="postgres://user:pass@localhost:5432/sparkflow"

# Start a worker
sparkflow-worker --server-addr=localhost:9090 --concurrency=8
Docker Compose
docker compose -f deploy/docker/docker-compose.yaml up

Executors

Executor Description Config Keys
shell Run shell commands command, shell, env, working_dir
python Run Python scripts script, code, args, python_path
docker Run in Docker containers image, command, env, volumes
spark Submit Spark jobs application, class, master, spark_conf
hadoop Submit MapReduce jobs jar, class, args, memory_mb
hdfs HDFS file operations operation, path, content, recursive

Spark ETL Example

id: spark-etl
name: Spark ETL Pipeline
tasks:
  - id: transform
    executor: spark
    config:
      mode: submit
      master: yarn
      deploy_mode: cluster
      application: "hdfs:///apps/etl.jar"
      class: com.example.ETL
      driver_memory: "4g"
      executor_memory: "8g"
      num_executors: 10
      spark_conf:
        spark.sql.adaptive.enabled: "true"
      app_args: ["--date", "{{date}}"]

Python SDK

from sparkflow import dag, task, RetryPolicy

@dag("my-pipeline", "My Pipeline", schedule="0 * * * *")
def pipeline():
    e = extract()
    t = transform(depends_on=[e])
    load(depends_on=[t])

@task(executor="shell", timeout="5m")
def extract():
    return {"command": "extract.sh"}

@task(executor="python", retry=RetryPolicy(max_retries=3))
def transform():
    return {"script": "transform.py"}

@task(executor="shell")
def load():
    return {"command": "load.sh"}

# Generate YAML
dag_builder = pipeline()
dag_builder.save("workflow.yaml")

Deployment

Helm
helm install sparkflow deploy/helm/sparkflow/ \
  --set postgresql.auth.password=secret \
  --set replicaCount.worker=3
Kubernetes Architecture
                    +-----------------+
                    |    Ingress      |
                    +--------+--------+
                             |
                    +--------v--------+
                    | sparkflow-server|  (gRPC + HTTP)
                    | (1-3 replicas)  |
                    +--------+--------+
                             |
              +--------------+--------------+
              |                             |
    +---------v---------+         +---------v---------+
    | sparkflow-worker  |         | sparkflow-worker  |
    | (N replicas)      |         | (N replicas)      |
    +-------------------+         +-------------------+
              |
    +---------v---------+
    |    PostgreSQL      |
    +-------------------+

Development

# Build
make build

# Test
make test

# Lint
make lint

# Run locally
make run-server   # terminal 1
make run-worker   # terminal 2

# Docker dev environment
make dev

Configuration

Flag Default Description
--grpc-addr :9090 gRPC listen address
--http-addr :8080 HTTP listen address
--metrics-addr :9091 Metrics listen address
--store-dsn (empty) PostgreSQL DSN (empty = in-memory)
--data-dir /var/lib/sparkflow Data directory
--log-level info Log level (debug/info/warn/error)

License

Apache License 2.0 - see LICENSE for details.

Directories

Path Synopsis
cmd
sparkflow-server command
Command sparkflow-server starts the Sparkflow orchestrator server.
Command sparkflow-server starts the Sparkflow orchestrator server.
sparkflow-worker command
Command sparkflow-worker starts a Sparkflow worker node.
Command sparkflow-worker starts a Sparkflow worker node.
sparkflowctl command
Command sparkflowctl is the CLI tool for interacting with a Sparkflow server.
Command sparkflowctl is the CLI tool for interacting with a Sparkflow server.
internal
hadoop
Package hadoop provides HDFS and YARN client implementations using the WebHDFS and YARN Resource Manager REST APIs.
Package hadoop provides HDFS and YARN client implementations using the WebHDFS and YARN Resource Manager REST APIs.
server
Package server provides the main gRPC and HTTP server for the Sparkflow orchestrator.
Package server provides the main gRPC and HTTP server for the Sparkflow orchestrator.
spark
Package spark provides Spark Connect client and spark-submit wrapper for submitting and monitoring Spark jobs.
Package spark provides Spark Connect client and spark-submit wrapper for submitting and monitoring Spark jobs.
worker
Package worker provides the worker node implementation that receives and executes tasks dispatched by the scheduler.
Package worker provides the worker node implementation that receives and executes tasks dispatched by the scheduler.
pkg
consensus
Package consensus provides Raft-based distributed consensus for leader election and state replication using hashicorp/raft.
Package consensus provides Raft-based distributed consensus for leader election and state replication using hashicorp/raft.
dag
Package dag provides directed acyclic graph structures for workflow definitions.
Package dag provides directed acyclic graph structures for workflow definitions.
executor
Package executor defines the Executor interface and provides implementations for running tasks via shell, Python, Docker, Spark, Hadoop, and HDFS.
Package executor defines the Executor interface and provides implementations for running tasks via shell, Python, Docker, Spark, Hadoop, and HDFS.
observability
Package observability provides OpenTelemetry tracing, Prometheus metrics, and structured logging setup for Sparkflow components.
Package observability provides OpenTelemetry tracing, Prometheus metrics, and structured logging setup for Sparkflow components.
retry
Package retry provides retry policies with various backoff strategies and a dead letter queue for tasks that exhaust their retries.
Package retry provides retry policies with various backoff strategies and a dead letter queue for tasks that exhaust their retries.
scheduler
Package scheduler provides task scheduling strategies including FIFO, priority-based, and distributed scheduling with cron support.
Package scheduler provides task scheduling strategies including FIFO, priority-based, and distributed scheduling with cron support.
store
Package store defines the storage interface and provides in-memory and PostgreSQL implementations for DAG and task run persistence.
Package store defines the storage interface and provides in-memory and PostgreSQL implementations for DAG and task run persistence.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL