workflow-plugin-data-engineering

module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2026 License: MIT

README

workflow-plugin-data-engineering

Data engineering plugin for the GoCodeAlone/workflow engine. Provides CDC, lakehouse (Iceberg), time-series (InfluxDB, TimescaleDB, ClickHouse, QuestDB, Druid), graph (Neo4j), data quality, schema migrations, multi-tenancy, and data catalog (DataHub, OpenMetadata) capabilities.

Install

Download from Releases or install via wfctl:

wfctl plugin install data-engineering

Place the binary + plugin.json in your workflow server's plugins directory.

Module Types

CDC
cdc.source

Change Data Capture from PostgreSQL, MySQL, or DynamoDB via Bento, Debezium, or AWS DMS.

modules:
  - name: aurora_cdc
    type: cdc.source
    config:
      provider: bento          # bento | debezium | dms
      source_id: aurora-users
      source_type: postgres
      connection: "${ config(\"aurora_dsn\") }"
      tables:
        - public.users
        - public.orders
      options:
        slot_name: workflow_cdc
        publication: workflow_pub
Provider Backend Use Case
bento Bento/RedPanda Connect (postgres_cdc, mysql binlog, dynamodb_streams) Default. Single binary, no Kafka required
debezium Kafka Connect REST API Enterprise. Requires Kafka Connect cluster
dms AWS DMS via SDK v2 AWS-native. Aurora/RDS to Kinesis/Kafka
data.tenancy

Multi-tenant data isolation with three strategies.

modules:
  - name: tenancy
    type: data.tenancy
    config:
      strategy: schema_per_tenant  # schema_per_tenant | db_per_tenant | row_level
      tenant_key: ctx.tenant_id
      schema_prefix: "t_"
      # db_per_tenant: connection_template: "postgres://localhost/tenant_{{tenant}}"
      # row_level: tenant_column: org_id, tables: [users, orders]
Lakehouse
catalog.iceberg

Apache Iceberg REST Catalog connection.

modules:
  - name: analytics_catalog
    type: catalog.iceberg
    config:
      endpoint: "${ config(\"iceberg_catalog_url\") }"
      warehouse: "s3://data-lake/warehouse"
      credential: "${ config(\"iceberg_token\") }"
      httpTimeout: 30s
lakehouse.table

Iceberg table lifecycle management.

modules:
  - name: users_table
    type: lakehouse.table
    config:
      catalog: analytics_catalog
      namespace: [analytics, raw]
      table: users
      schema:
        fields:
          - { name: id, type: long, required: true }
          - { name: email, type: string, required: true }
          - { name: created_at, type: timestamptz }
Time-Series

All time-series modules implement a shared TimeSeriesWriter interface (WritePoint, WriteBatch, Query), so the common steps (step.ts_write, step.ts_query, etc.) work with any backend.

timeseries.influxdb
modules:
  - name: metrics
    type: timeseries.influxdb
    config:
      url: "${ config(\"influx_url\") }"
      token: "${ config(\"influx_token\") }"
      org: my-org
      bucket: default
      batchSize: 1000
      flushInterval: 1s
      precision: ns
timeseries.timescaledb
modules:
  - name: tsdb
    type: timeseries.timescaledb
    config:
      connection: "${ config(\"timescale_dsn\") }"
      maxOpenConns: 25
      hypertables:
        - { table: metrics, timeColumn: time, chunkInterval: 1d }
timeseries.clickhouse
modules:
  - name: analytics_ch
    type: timeseries.clickhouse
    config:
      endpoints: ["clickhouse1:9000", "clickhouse2:9000"]
      database: analytics
      username: "${ config(\"ch_user\") }"
      password: "${ config(\"ch_pass\") }"
      compression: lz4
      secure: true
timeseries.questdb
modules:
  - name: questdb
    type: timeseries.questdb
    config:
      ilpEndpoint: "localhost:9009"
      httpEndpoint: "http://localhost:9000"
      authToken: "${ config(\"questdb_token\") }"
timeseries.druid
modules:
  - name: druid
    type: timeseries.druid
    config:
      routerUrl: "http://druid-router:8888"
      username: "${ config(\"druid_user\") }"
      password: "${ config(\"druid_pass\") }"
      httpTimeout: 60s
Graph
graph.neo4j
modules:
  - name: knowledge_graph
    type: graph.neo4j
    config:
      uri: "bolt://neo4j:7687"
      database: neo4j
      username: "${ config(\"neo4j_user\") }"
      password: "${ config(\"neo4j_pass\") }"
      maxConnectionPoolSize: 50
Data Catalog
catalog.schema_registry

Confluent Schema Registry.

modules:
  - name: schema_reg
    type: catalog.schema_registry
    config:
      endpoint: "http://schema-registry:8081"
      username: "${ config(\"sr_user\") }"
      password: "${ config(\"sr_pass\") }"
      defaultCompatibility: BACKWARD
catalog.datahub
modules:
  - name: catalog
    type: catalog.datahub
    config:
      endpoint: "http://datahub-gms:8080"
      token: "${ config(\"datahub_token\") }"
catalog.openmetadata
modules:
  - name: catalog
    type: catalog.openmetadata
    config:
      endpoint: "http://openmetadata:8585"
      token: "${ config(\"om_token\") }"
Data Quality
quality.checks
modules:
  - name: data_quality
    type: quality.checks
    config:
      provider: builtin     # builtin | dbt | soda | great_expectations
      contractsDir: ./contracts/
      database: my-database
Migrations
migrate.schema
modules:
  - name: app_migrations
    type: migrate.schema
    config:
      strategy: both          # declarative | scripted | both
      target: my-database
      schemas:
        - path: schemas/users.yaml
      migrationsDir: ./migrations/
      lockTable: schema_migrations
      onBreakingChange: block  # block | warn | blue_green

Step Types

CDC Steps
Step Config Output
step.cdc_start source_id {action, source_id}
step.cdc_stop source_id {action: "stopped"}
step.cdc_status source_id {state, provider, tables, lag}
step.cdc_snapshot source_id, tables {action: "snapshot_triggered"}
step.cdc_schema_history source_id, table {versions: [...], count}
Tenancy Steps
Step Config Output
step.tenant_provision tenantId {status: "provisioned", schema}
step.tenant_deprovision tenantId, mode {status: "deprovisioned"}
step.tenant_migrate tenantIds, parallelism, failureThreshold {status, tenants, count, failed}
Lakehouse Steps
Step Config Output
step.lakehouse_create_table catalog, namespace, table, schema {status, tableUUID}
step.lakehouse_evolve_schema catalog, namespace, table, changes {status, schemaId}
step.lakehouse_write catalog, namespace, table, data, mode {status, recordCount}
step.lakehouse_compact catalog, namespace, table {status}
step.lakehouse_snapshot catalog, namespace, table, action varies by action
step.lakehouse_query catalog, namespace, table {snapshots, schema}
step.lakehouse_expire_snapshots catalog, namespace, table, olderThan {status, expiredCount}
Time-Series Steps (shared)
Step Config Output
step.ts_write module, measurement, tags, fields, timestamp {status: "written"}
step.ts_write_batch module, points {status: "written", count}
step.ts_query module, query {rows: [...], count}
step.ts_downsample module, source, target, aggregation, interval {status, query}
step.ts_retention module, duration {status: "updated"}
step.ts_continuous_query module, viewName, query, refreshInterval, action {status, viewName}
Druid Steps
Step Config Output
step.ts_druid_ingest module, spec {status, supervisorId}
step.ts_druid_query module, query, queryType {rows: [...], count}
step.ts_druid_datasource module, datasource, action varies by action
step.ts_druid_compact module, datasource {status}
Schema Registry Steps
Step Config Output
step.schema_register registry, subject, schema, schemaType {schemaId, version}
step.schema_validate registry, subject, data {valid, errors}
Graph Steps
Step Config Output
step.graph_query module, cypher, params {rows: [...], count}
step.graph_write module, nodes, relationships {nodesCreated, relationshipsCreated}
step.graph_import module, source, mapping {imported, nodeLabel}
step.graph_extract_entities text, types {entities: [...], count}
step.graph_link module, from, to, type {linked}
Catalog Steps
Step Config Output
step.catalog_register catalog, dataset, schema, owner {status: "registered"}
step.catalog_search catalog, query, limit {results: [...], total}
step.contract_validate contract, database {passed, schemaOk, qualityOk}
Data Quality Steps
Step Config Output
step.quality_check module, table, checks {passed, results: [...]}
step.quality_schema_validate module, contract, data {valid, errors}
step.quality_profile module, table, columns {rowCount, columns: {...}}
step.quality_compare module, baseline, current, tolerances {passed, diffs}
step.quality_anomaly module, table, columns, method, threshold {results: [...]}
step.quality_dbt_test project, select {passed, results}
step.quality_soda_check config, checksFile {passed, results}
step.quality_ge_validate checkpoint {passed, results}
Migration Steps
Step Config Output
step.migrate_plan module {plan: [...], safe, changeCount}
step.migrate_apply module, plan, mode {status: "applied", changesApplied}
step.migrate_run module, version {status: "migrated", version}
step.migrate_rollback module, steps {status, fromVersion, toVersion}
step.migrate_status module {version, pending, applied}
Expand-Contract Migration Steps

Zero-downtime DDL via pgroll-style dual-version column serving.

Step Config Output
step.migrate_expand table, changes (see below), _executor {status, table, triggerNames, newColumns}
step.migrate_contract table, changes, _executor {status, droppedColumns, droppedTriggers}
step.migrate_expand_status table, triggerName, _executor {expanded, triggerActive, safeToContract}

Change types: rename_column (fields: oldColumn, newColumn, newType, transform), add_column (fields: newColumn, newType), change_type (fields: oldColumn, newColumn, newType, transform).

Schema Evolution Pipeline Steps

Coordinated schema changes across CDC → Kafka schema registry → Iceberg lakehouse → source DB.

Step Config Output
step.schema_evolve_pipeline table, namespace, change, source_db, cdc_source, schema_registry, lakehouse_catalog {plan, safe, executed, rollbackAvailable}
step.schema_evolve_verify table, subject, source_db, schema_registry {consistent, layers: [{name, schemaVersion, fields}], diffs}
Catalog Lineage Steps
Step Config Output
step.catalog_lineage catalog, pipeline, upstream (list of {dataset, platform}), downstream {status: "recorded", upstreamCount, downstreamCount}
step.catalog_lineage_query catalog, dataset, direction (upstream|downstream|both), depth {nodes: [...], edges: [...]}
Time-Series Tier Management Steps
Step Config Output
step.ts_archive module, olderThan (e.g. 30d), destination, format (parquet|csv), deleteAfterArchive {status, rowsArchived, bytesWritten, destination, deletedFromHot}
step.ts_tier_status module, measurement {hotRows, coldFiles, totalBytes, hotOldestTimestamp}
ClickHouse Materialized View Steps
Step Config Output
step.ts_clickhouse_view module, viewName, action (create|drop|status), query, engine, orderBy, partitionBy {status/isActive, viewName, engine}
Graph LLM Extraction Steps
Step Config Output
step.graph_extract_entities_llm text, provider (claude|openai), apiKey, model, types {entities: [...], count, provider, fallback}
Tenant Tier Promotion Steps
Step Config Output
step.tenant_evaluate_promotion tenant_id, current_tier, metrics, thresholds {tenant_id, current_tier, recommended_tier, should_promote, reason, metrics}
step.tenant_promote tenant_id, target_strategy, tables, schema_prefix, delete_from_shared {status: "promoted", tenant_id, from_tier, to_tier, rows_migrated, tables}
step.tenant_demote tenant_id, from_tier, tables, schema_prefix {status: "demoted", tenant_id, from_tier, to_tier, rows_migrated}
CDC Backpressure Steps
Step Config Output
step.cdc_backpressure source_id, action (check|throttle|resume), thresholds {source_id, lag_bytes, lag_seconds, status, thresholds}
step.cdc_monitor source_id, thresholds {source_id, lag_bytes, lag_seconds, status, alerts_sent, auto_throttled}

Trigger Types

trigger.cdc

Fires a workflow when a CDC change event is captured.

pipelines:
  - name: on_user_change
    trigger:
      type: cdc
      config:
        source_id: aurora-users
        tables: [public.users]
        actions: [INSERT, UPDATE]
    steps:
      - name: process
        type: step.set
        config:
          operation: "${ trigger.action }"
          user_id: "${ trigger.data.id }"

Example: CDC to Lakehouse Pipeline

modules:
  - name: cdc_source
    type: cdc.source
    config:
      provider: bento
      source_id: users-cdc
      source_type: postgres
      connection: "${ config(\"aurora_dsn\") }"
      tables: [public.users]

  - name: catalog
    type: catalog.iceberg
    config:
      endpoint: "${ config(\"iceberg_url\") }"
      warehouse: "s3://lake/warehouse"
      credential: "${ config(\"iceberg_token\") }"

  - name: quality
    type: quality.checks
    config:
      provider: builtin

pipelines:
  - name: cdc_to_lakehouse
    trigger:
      type: cdc
      config:
        source_id: users-cdc
        tables: [public.users]
    steps:
      - name: validate
        type: step.quality_check
        config:
          checks:
            - { type: not_null, columns: [id, email] }

      - name: write
        type: step.lakehouse_write
        config:
          catalog: catalog
          namespace: [raw]
          table: users
          mode: upsert

Data Contracts

Define quality expectations in YAML:

# contracts/users.yaml
dataset: raw.users
owner: data-team
schema:
  columns:
    - { name: id, type: bigint, nullable: false }
    - { name: email, type: varchar, nullable: false, pattern: "^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$" }
quality:
  - { type: freshness, maxAge: 1h }
  - { type: row_count, min: 1000 }
  - { type: not_null, columns: [id, email] }
  - { type: unique, columns: [id] }

Validate with step.contract_validate:

- name: check_contract
  type: step.contract_validate
  config:
    contract: contracts/users.yaml
    database: my-database

Schema Migration

Declarative (desired-state diffing)
# schemas/users.yaml
table: users
columns:
  - { name: id, type: bigint, primaryKey: true }
  - { name: email, type: "varchar(255)", nullable: false, unique: true }
  - { name: created_at, type: timestamptz, default: "now()" }
indexes:
  - { columns: [email], unique: true }

Non-breaking changes (add column, add index, widen type) apply automatically. Breaking changes respect onBreakingChange policy.

Scripted (numbered migrations)
migrations/
  001_create_users.up.sql
  001_create_users.down.sql
  002_add_email.up.sql
  002_add_email.down.sql

Real-World Use Cases

CDC from Aurora/RDS to Iceberg Data Lake

Problem: Companies like BeeHero need real-time mirrors of operational databases in queryable data lakes. Traditional ETL batch jobs create stale data; manually wiring Debezium→Kafka→Flink→Iceberg requires an SRE squad.

Solution with this plugin:

modules:
  - name: aurora_cdc
    type: cdc.source
    config:
      provider: bento
      source_id: aurora-users
      source_type: postgres
      connection: "${ config(\"aurora_dsn\") }"
      tables: [public.users, public.orders]
      options:
        slot_name: workflow_cdc
        publication: workflow_pub

  - name: lake_catalog
    type: catalog.iceberg
    config:
      endpoint: "${ config(\"iceberg_rest_url\") }"
      warehouse: "s3://data-lake/warehouse"
      credential: "${ config(\"iceberg_token\") }"

  - name: quality
    type: quality.checks
    config:
      provider: builtin

pipelines:
  - name: cdc_to_iceberg
    trigger:
      type: cdc
      config:
        source_id: aurora-users
        tables: [public.users]
    steps:
      - name: validate
        type: step.quality_check
        config:
          checks:
            - { type: not_null, columns: [id, email] }
            - { type: unique, columns: [id] }
      - name: write_lake
        type: step.lakehouse_write
        config:
          catalog: lake_catalog
          namespace: [raw]
          table: users
          mode: upsert
      - name: register
        type: step.schema_register
        config:
          registry: schema_reg
          subject: raw-users-value
          schemaType: JSON

  - name: lake_maintenance
    trigger:
      type: scheduler
      config:
        cron: "0 2 * * *"
    steps:
      - name: compact
        type: step.lakehouse_compact
        config:
          catalog: lake_catalog
          namespace: [raw]
          table: users
      - name: expire
        type: step.lakehouse_expire_snapshots
        config:
          catalog: lake_catalog
          namespace: [raw]
          table: users
          olderThan: 7d

Swap provider: bento for provider: debezium (enterprise Kafka Connect) or provider: dms (AWS-native) without changing the rest of the pipeline.

Real-Time Analytics with ClickHouse + Druid

Problem: Companies like Mux process billions of events for real-time dashboards. They need sub-second query latency on streaming data with dual-write for different query patterns (OLAP vs real-time aggregation).

Solution:

modules:
  - name: clickhouse
    type: timeseries.clickhouse
    config:
      endpoints: ["ch1:9000", "ch2:9000"]
      database: analytics
      username: "${ config(\"ch_user\") }"
      password: "${ config(\"ch_pass\") }"
      compression: lz4

  - name: druid
    type: timeseries.druid
    config:
      routerUrl: "http://druid-router:8888"

  - name: quality
    type: quality.checks
    config:
      provider: builtin

pipelines:
  - name: ingest_events
    trigger:
      type: http
      config:
        path: /api/v1/events
        method: POST
    steps:
      - name: write_clickhouse
        type: step.ts_write_batch
        config:
          module: clickhouse
          points: "${ body.events }"
      - name: ingest_druid
        type: step.ts_druid_ingest
        config:
          module: druid
          spec:
            type: kafka
            dataSchema:
              dataSource: events
              timestampSpec: { column: __time, format: auto }
              dimensionsSpec: { useSchemaDiscovery: true }
            ioConfig:
              topic: events-stream
              consumerProperties:
                bootstrap.servers: "kafka:9092"

  - name: anomaly_scan
    trigger:
      type: scheduler
      config:
        cron: "*/15 * * * *"
    steps:
      - name: query_recent
        type: step.ts_query
        config:
          module: clickhouse
          query: "SELECT host, avg(latency_ms) as avg_latency FROM events WHERE time > now() - INTERVAL 1 HOUR GROUP BY host"
      - name: detect
        type: step.quality_anomaly
        config:
          method: zscore
          threshold: 3.0
Multi-Tenant SaaS Data Platform

Problem: SaaS companies need tenant-isolated data infrastructure that scales from free-tier shared tables to enterprise-grade dedicated schemas, with zero-downtime tenant onboarding and safe per-tenant migrations.

Solution:

modules:
  - name: tenancy
    type: data.tenancy
    config:
      strategy: schema_per_tenant
      tenant_key: ctx.tenant_id
      schema_prefix: "tenant_"

  - name: migrations
    type: migrate.schema
    config:
      strategy: both
      schemas:
        - path: schemas/users.yaml
        - path: schemas/orders.yaml
      migrationsDir: ./migrations/
      lockTable: schema_migrations
      onBreakingChange: block

pipelines:
  - name: tenant_onboard
    trigger:
      type: http
      config:
        path: /api/v1/tenants
        method: POST
    steps:
      - name: provision
        type: step.tenant_provision
        config:
          tenantId: "${ body.tenant_id }"
      - name: migrate
        type: step.migrate_apply
        config:
          module: migrations
          mode: online
      - name: register_catalog
        type: step.catalog_register
        config:
          catalog: datahub
          dataset: "${ \"tenant_\" + body.tenant_id + \".users\" }"
          owner: "${ body.owner_email }"

  - name: tenant_schema_upgrade
    trigger:
      type: http
      config:
        path: /api/v1/tenants/migrate
        method: POST
    steps:
      - name: plan
        type: step.migrate_plan
        config:
          module: migrations
      - name: apply
        type: step.tenant_migrate
        config:
          tenantIds: "${ body.tenant_ids }"
          parallelism: 5
          failureThreshold: 3
Knowledge Graph for AI/RAG

Problem: Companies like Precina Health and Cedars-Sinai build knowledge graphs from relational data for clinical decision support and research. GraphRAG improves answer accuracy 35% over vector-only RAG by providing structured context.

Solution:

modules:
  - name: kg
    type: graph.neo4j
    config:
      uri: "bolt://neo4j:7687"
      database: knowledge
      username: "${ config(\"neo4j_user\") }"
      password: "${ config(\"neo4j_pass\") }"

  - name: catalog
    type: catalog.datahub
    config:
      endpoint: "http://datahub-gms:8080"
      token: "${ config(\"datahub_token\") }"

pipelines:
  - name: build_knowledge_graph
    trigger:
      type: http
      config:
        path: /api/v1/kg/ingest
        method: POST
    steps:
      - name: extract
        type: step.graph_extract_entities
        config:
          text: "${ body.content }"
          types: [person, org, location, email, date]
      - name: create_nodes
        type: step.graph_write
        config:
          module: kg
          nodes: "${ steps.extract.entities }"
      - name: link_entities
        type: step.graph_link
        config:
          module: kg
          from: { label: Person, key: name }
          to: { label: Organization, key: name }
          type: WORKS_AT

  - name: import_from_db
    trigger:
      type: http
      config:
        path: /api/v1/kg/import
        method: POST
    steps:
      - name: import
        type: step.graph_import
        config:
          module: kg
          source: "${ body.records }"
          mapping:
            nodeLabel: Patient
            properties:
              id: patient_id
              name: full_name
              diagnosis: primary_dx
Data Contracts at Scale

Problem: GoCardless and similar companies need formal agreements between data producers and consumers, with tiered enforcement (block for critical data, warn for non-critical).

Solution:

# contracts/payments.yaml
dataset: raw.payments
owner: payments-team
schema:
  columns:
    - { name: id, type: bigint, nullable: false }
    - { name: amount, type: "numeric(12,2)", nullable: false }
    - { name: currency, type: "varchar(3)", nullable: false, pattern: "^[A-Z]{3}$" }
    - { name: created_at, type: timestamptz, nullable: false }
quality:
  - { type: freshness, maxAge: 15m }
  - { type: row_count, min: 100 }
  - { type: not_null, columns: [id, amount, currency] }
  - { type: unique, columns: [id] }
  - { type: referential, column: customer_id, refTable: customers, refColumn: id }
pipelines:
  - name: validate_payments
    trigger:
      type: scheduler
      config:
        cron: "*/30 * * * *"
    steps:
      - name: check
        type: step.contract_validate
        config:
          contract: contracts/payments.yaml
          database: payments-db
      - name: profile
        type: step.quality_profile
        config:
          table: raw.payments
          columns: [amount, currency]
      - name: alert_on_failure
        type: step.conditional
        config:
          field: steps.check.passed
          routes:
            "true": done
            "false": notify
      - name: notify
        type: step.publish
        config:
          topic: data-quality-alerts
          message: "${ json(steps.check) }"
      - name: done
        type: step.set
        config:
          status: healthy
IoT Time-Series at Scale

Problem: Industrial IoT deployments generate millions of data points per second from thousands of sensors. Companies need fast ingestion, automatic downsampling, and anomaly detection — while legacy data historians can't keep up.

Solution:

modules:
  - name: influx_hot
    type: timeseries.influxdb
    config:
      url: "${ config(\"influx_url\") }"
      token: "${ config(\"influx_token\") }"
      org: factory
      bucket: sensors-hot
      batchSize: 5000
      flushInterval: 100ms

  - name: tsdb
    type: timeseries.timescaledb
    config:
      connection: "${ config(\"timescale_dsn\") }"
      hypertables:
        - { table: sensor_data, timeColumn: ts, chunkInterval: 1h }

  - name: quality
    type: quality.checks
    config:
      provider: builtin

pipelines:
  - name: ingest_sensor_data
    trigger:
      type: http
      config:
        path: /api/v1/sensors/ingest
        method: POST
    steps:
      - name: write_hot
        type: step.ts_write_batch
        config:
          module: influx_hot
          points: "${ body.readings }"
      - name: write_tsdb
        type: step.ts_write_batch
        config:
          module: tsdb
          points: "${ body.readings }"

  - name: downsample_hourly
    trigger:
      type: scheduler
      config:
        cron: "5 * * * *"
    steps:
      - name: aggregate
        type: step.ts_downsample
        config:
          module: influx_hot
          source: sensor_data
          target: sensor_hourly
          aggregation: mean
          interval: 1h
      - name: continuous_agg
        type: step.ts_continuous_query
        config:
          module: tsdb
          viewName: sensor_hourly
          query: |
            SELECT time_bucket('1 hour', ts) AS bucket,
                   device_id, AVG(value) AS avg_val, MAX(value) AS max_val
            FROM sensor_data
            GROUP BY bucket, device_id
          refreshInterval: 30m
          action: create

  - name: anomaly_detection
    trigger:
      type: scheduler
      config:
        cron: "*/5 * * * *"
    steps:
      - name: query
        type: step.ts_query
        config:
          module: influx_hot
          query: |
            from(bucket: "sensors-hot")
              |> range(start: -1h)
              |> filter(fn: (r) => r._measurement == "sensor_data")
              |> mean()
      - name: detect
        type: step.quality_anomaly
        config:
          method: iqr
          threshold: 1.5

  - name: retention
    trigger:
      type: scheduler
      config:
        cron: "0 3 * * *"
    steps:
      - name: influx_retention
        type: step.ts_retention
        config:
          module: influx_hot
          duration: 7d
Data Mesh with DataHub

Problem: Large organizations need domain-oriented data ownership with centralized discovery, lineage tracking, and governance — without requiring every team to build their own catalog infrastructure.

Solution:

modules:
  - name: datahub
    type: catalog.datahub
    config:
      endpoint: "${ config(\"datahub_gms_url\") }"
      token: "${ config(\"datahub_token\") }"

  - name: schema_reg
    type: catalog.schema_registry
    config:
      endpoint: "${ config(\"sr_url\") }"
      defaultCompatibility: BACKWARD

pipelines:
  - name: register_data_product
    trigger:
      type: http
      config:
        path: /api/v1/catalog/register
        method: POST
    steps:
      - name: validate_contract
        type: step.contract_validate
        config:
          contract: "${ body.contract_path }"
          database: "${ body.database }"
      - name: register_schema
        type: step.schema_register
        config:
          registry: schema_reg
          subject: "${ body.dataset + \"-value\" }"
          schema: "${ body.schema }"
          schemaType: JSON
      - name: register_catalog
        type: step.catalog_register
        config:
          catalog: datahub
          dataset: "${ body.dataset }"
          schema: "${ body.schema }"
          owner: "${ body.owner }"

  - name: discover_datasets
    trigger:
      type: http
      config:
        path: /api/v1/catalog/search
        method: GET
    steps:
      - name: search
        type: step.catalog_search
        config:
          catalog: datahub
          query: "${ query.q }"
          limit: 20

Changelog

v0.3.0
  • End-to-end schema evolution (step.schema_evolve_pipeline, step.schema_evolve_verify): coordinated DDL across CDC, Kafka schema registry, Iceberg lakehouse, and source DB in a single orchestrated plan
  • Expand-contract migrations (step.migrate_expand, step.migrate_contract, step.migrate_expand_status): pgroll-style dual-version column serving with PostgreSQL trigger sync for zero-downtime DDL
  • Catalog lineage (step.catalog_lineage, step.catalog_lineage_query): record and query upstream/downstream dataset lineage in DataHub and OpenMetadata
  • Hot/cold tier management (step.ts_archive, step.ts_tier_status): archive aged time-series data to Parquet/S3, with delete-after option
  • LLM entity extraction (step.graph_extract_entities_llm): Claude/OpenAI-powered knowledge graph entity extraction with regex fallback
  • ClickHouse materialized views (step.ts_clickhouse_view): create, drop, and inspect ClickHouse MergeTree materialized views
  • Tenant tier promotion (step.tenant_evaluate_promotion, step.tenant_promote, step.tenant_demote): auto-evaluate and execute tier changes based on row count, query rate, and storage metrics
  • CDC backpressure (step.cdc_backpressure, step.cdc_monitor): WAL lag monitoring with healthy/warning/critical thresholds and auto-throttle on critical lag

Build

go build -o workflow-plugin-data-engineering ./cmd/workflow-plugin-data-engineering
go test ./... -race

License

MIT - see LICENSE

Directories

Path Synopsis
cmd
workflow-plugin-data-engineering command
Command workflow-plugin-data-engineering is a workflow engine external plugin providing CDC, lakehouse, time-series, graph, data quality, and multi-tenancy capabilities.
Command workflow-plugin-data-engineering is a workflow engine external plugin providing CDC, lakehouse, time-series, graph, data quality, and multi-tenancy capabilities.
Package internal implements the workflow-plugin-data-engineering plugin.
Package internal implements the workflow-plugin-data-engineering plugin.
catalog
Package catalog provides schema catalog modules (Schema Registry, etc.).
Package catalog provides schema catalog modules (Schema Registry, etc.).
cdc
Package cdc implements Change Data Capture modules, steps, and triggers.
Package cdc implements Change Data Capture modules, steps, and triggers.
graph
Package graph implements the graph.neo4j module and associated steps.
Package graph implements the graph.neo4j module and associated steps.
httpclient
Package httpclient provides a shared HTTP client for REST API integrations.
Package httpclient provides a shared HTTP client for REST API integrations.
ident
Package ident provides SQL and Cypher identifier validation.
Package ident provides SQL and Cypher identifier validation.
lakehouse
Package lakehouse implements Iceberg lakehouse modules and steps.
Package lakehouse implements Iceberg lakehouse modules and steps.
migrate
Package migrate provides declarative schema diffing and scripted migration running.
Package migrate provides declarative schema diffing and scripted migration running.
quality
Package quality implements Go-native data quality checks, profiling, and anomaly detection.
Package quality implements Go-native data quality checks, profiling, and anomaly detection.
registry
Package registry provides a generic thread-safe key-value registry.
Package registry provides a generic thread-safe key-value registry.
tenancy
Package tenancy implements multi-tenant data isolation modules and steps.
Package tenancy implements multi-tenant data isolation modules and steps.
Package testhelpers provides test utilities for workflow-plugin-data-engineering.
Package testhelpers provides test utilities for workflow-plugin-data-engineering.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL