pit

module
v0.0.0-...-0b6fdb1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: Apache-2.0

README

Pit

Lightweight data orchestration for small-to-medium data teams. Go orchestrator, language-agnostic tasks.

Tasks are just executables — Python scripts, shell scripts, SQL files, or anything with an exit code. No decorators, no magic, no framework lock-in. The orchestrator is a single binary.

Installation

go install github.com/druarnfield/pit/cmd/pit@latest

Or build from source:

git clone https://github.com/druarnfield/pit.git
cd pit
go build ./cmd/pit

Quick Start

# Scaffold a new project
pit init my_pipeline                 # Python project (full package layout)
pit init --type sql my_transforms    # SQL-only project (minimal)
pit init --type shell my_jobs        # Shell-only project
pit init --type dbt my_dbt_project   # dbt project (uvx-managed)

# Validate all project configs
pit validate

# Run a DAG
pit run my_pipeline                  # run entire DAG
pit run my_pipeline/extract          # run a single task
pit run my_pipeline --verbose        # stream task output to stdout

# Start the scheduler (cron, FTP watch, and webhook triggers)
pit serve                            # runs until SIGINT/SIGTERM
pit serve --verbose                  # with live task output
pit serve --port 8080                # webhook listener on custom port (default 9090)

# View logs from past runs
pit logs my_pipeline                 # latest run, all tasks
pit logs my_pipeline/extract         # latest run, single task
pit logs my_pipeline --list          # list available runs
pit logs my_pipeline --run-id <id>   # specific run

# Query the outputs registry
pit outputs                          # list all declared outputs
pit outputs --project my_pipeline    # filter by project
pit outputs --type table             # filter by output type
pit outputs --location "warehouse.*" # filter by location (glob)

Project Structure

Projects live under projects/<name>/ with a pit.toml defining the DAG:

projects/
├── claims_pipeline/           # Python project
│   ├── pit.toml               # DAG definition
│   ├── pyproject.toml         # Python deps
│   ├── src/claims/            # shared package
│   └── tasks/
│       ├── extract.py
│       ├── validate.py
│       └── load.py
├── warehouse_transforms/      # SQL-only project
│   ├── pit.toml
│   └── tasks/
│       ├── staging_claims.sql
│       └── dim_providers.sql
├── nightly_maintenance/       # Shell-only project
│   ├── pit.toml
│   └── tasks/
│       ├── vacuum_db.sh
│       └── archive_logs.sh
└── analytics_dbt/             # dbt project
    ├── pit.toml               # [dag.dbt] config
    └── dbt_repo/              # dbt project root
        ├── dbt_project.yml
        ├── models/
        └── tests/

DAG Configuration

Each project's pit.toml declares tasks, dependencies, and outputs:

[dag]
name = "claims_pipeline"
schedule = "0 6 * * *"
overlap = "skip"
timeout = "45m"

[[tasks]]
name = "extract"
script = "tasks/extract.py"
timeout = "15m"
retries = 2
retry_delay = "30s"

[[tasks]]
name = "validate"
script = "tasks/validate.py"
depends_on = ["extract"]

[[tasks]]
name = "load"
script = "tasks/load.py"
depends_on = ["validate"]

[[outputs]]
name = "claims_staging"
type = "table"
location = "warehouse.staging.claims"
Git-backed Projects

A DAG can pull its source from a remote git repository instead of a local directory. Add git_url and git_ref to [dag]:

[dag]
name = "claims_pipeline"
git_url = "git@github.com:company/claims.git"
git_ref = "main"            # branch, tag, or commit SHA
schedule = "0 6 * * *"

[[tasks]]
name = "extract"
script = "tasks/extract.py"   # path relative to repo root

Both fields must be set together (or both omitted). The pit.toml itself stays local in projects/<name>/; only the task source files come from the remote repo.

How it works:

  1. Before each run Pit clones the repo (or fetches updates if already cloned) into repo_cache/<dag_name>/.
  2. The cached clone is snapshotted into the run directory exactly like a local project.
  3. Python venv (uv) is resolved against the persistent cache, so dependencies are only reinstalled when the lockfile changes.

Pit shells out to the system git binary, inheriting your SSH agent and credential helper configuration automatically.

Directory layout with git-backed projects:

<root>/
├── projects/<name>/pit.toml    ← DAG definition (local)
├── repo_cache/<name>/          ← persistent clone (managed by pit)
└── runs/<run-id>/              ← snapshot + logs (unchanged)

Validation skips local filesystem checks (script existence, dbt.project_dir) for git-backed projects since the source is not on disk until run time. For git-backed DAGs, dbt.project_dir is also optional — if omitted it defaults to the repo root.

Task Runners

Runner is determined by file extension, with an optional override:

Extension Default Runner Execution
.py python uv run --project {project_dir} {script}
.sh bash bash {script}
.sql sql Go SQL operator against [dag.sql] connection
n/a dbt uvx dbt {command} via [dag.dbt] config

Custom runners use the $ prefix syntax:

[[tasks]]
name = "transform"
script = "tasks/transform.js"
runner = "$ node"              # runs: node tasks/transform.js

CLI Commands

Implemented
Command Description
pit validate Validate all pit.toml files (cycles, missing deps, script paths)
pit init <name> Scaffold a new project (--type python|sql|shell|dbt)
pit run <dag>[/<task>] Execute a DAG or single task (--verbose for live output)
pit serve [--port N] Run the scheduler with cron, FTP watch, and webhook triggers (webhook port default: 9090)
pit logs <dag>[/<task>] View task logs (--list for runs, --run-id for specific run)
pit outputs List declared outputs (--project, --type, --location filters)
Global Flags
Flag Description
--project-dir Root project directory (default: .)
--verbose Enable verbose output
--secrets Path to secrets TOML file (enables SDK socket and SQL connections)

Run Snapshots

When a run begins, Pit copies the project directory to a snapshot. Tasks execute from the snapshot, not the source — so git pulls or edits during a run can't affect in-flight tasks.

runs/
└── 20240115_143022.123_claims_pipeline/
    ├── project/     # frozen copy of the project
    ├── logs/        # per-task log files
    │   ├── extract.log
    │   ├── validate.log
    │   └── load.log
    └── data/        # inter-task Parquet files
        └── extracted_claims.parquet

The data/ directory is used for inter-task data passing. Tasks discover it via the PIT_DATA_DIR environment variable.

Execution Model

  • Tasks execute in topological order, parallelising independent branches
  • Per-task retries with configurable delay
  • Per-task and per-DAG timeouts via context cancellation
  • Failed tasks mark all downstream tasks as upstream_failed
  • Task states: pendingrunningsuccess | failed | skipped | upstream_failed

Automated Scheduling

pit serve runs as a long-lived process, monitoring all projects for scheduled triggers and FTP file watches.

Cron Triggers

Set schedule in pit.toml using standard cron syntax (5-field):

[dag]
name = "nightly_etl"
schedule = "0 6 * * *"    # 6 AM daily
overlap = "skip"           # skip if previous run still active
FTP Watch Triggers

Monitor an FTP server for incoming files. When files matching the pattern are stable (unchanged size for stable_seconds), a DAG run is triggered with the files seeded into the run's data/ directory.

[dag]
name = "sales_ingest"
overlap = "skip"

[dag.ftp_watch]
secret = "ftp_creds"               # structured secret for host, user, password
port = 21
tls = true
directory = "/incoming/sales"
pattern = "sales_*.csv"
archive_dir = "/archive/sales"      # move files here after success
poll_interval = "30s"
stable_seconds = 30                  # wait for file to stop growing

The secret field references a structured secret containing host, user, and password fields:

[global.ftp_creds]
host = "ftp.example.com"
user = "data_user"
password = "secret123"

Legacy configuration using host, user, and password_secret as separate fields is still supported for backward compatibility.

Both trigger types can be combined on the same DAG.

Webhook Triggers

Trigger a DAG run via an inbound HTTP POST request. Useful for CI/CD pipelines, GitHub Actions, or any system that can send a webhook.

[dag]
name = "deploy_pipeline"
overlap = "skip"

[dag.webhook]
token_secret = "deploy_webhook_token"   # plain secret name for auth token

Add the token to your secrets file:

[global]
deploy_webhook_token = "supersecret123"

Start the server with --secrets (required for webhook token resolution):

pit serve --secrets secrets.toml --port 9090

Trigger a run:

curl -X POST http://localhost:9090/webhook/deploy_pipeline \
  -H "Authorization: Bearer supersecret123"
# → 202 Accepted

The webhook listener only starts if at least one DAG has [dag.webhook] configured. All DAGs with a webhook share the same port; the URL path routes by DAG name.

Workspace Configuration

Create a pit_config.toml in the project root to set workspace-level defaults:

secrets_dir = "secrets/secrets.toml"
runs_dir = "runs"
repo_cache_dir = "repo_cache"
dbt_driver = "ODBC Driver 17 for SQL Server"
keep_artifacts = ["logs", "project", "data"]
Field Default Description
secrets_dir (none) Path to secrets TOML file
runs_dir "runs" Directory for run snapshots
repo_cache_dir "repo_cache" Directory for persistent git repository clones
dbt_driver "ODBC Driver 17 for SQL Server" ODBC driver for dbt profiles
keep_artifacts ["logs", "project", "data"] Which run subdirs to keep after completion

All fields are optional. Relative paths are resolved from the project root. CLI flags take precedence if both are set.

Artifact Retention

By default, Pit keeps all run artifacts (project snapshot, logs, and data). To save disk space, configure keep_artifacts to retain only what you need:

# Workspace level — applies to all projects
keep_artifacts = ["logs"]

Per-project overrides are supported in pit.toml:

[dag]
name = "my_pipeline"
keep_artifacts = ["logs", "data"]   # override workspace default

Resolution order: per-project (if set) > workspace (if set) > default (keep all). Valid values: logs, project, data.

Development

# Run all tests
go test -race ./...

# Run with verbose output
go test -v ./internal/engine

# Integration tests (requires bash, uv, etc.)
go test -tags integration ./...

# Vet
go vet ./...

Secrets

Secrets are stored in a TOML file outside the repo (e.g. /etc/pit/secrets.toml in production). Pass the path via --secrets:

pit run my_pipeline --secrets ./secrets.toml

Secrets are organised by project scope with a [global] fallback. A secret can be either a plain value or a structured secret with multiple fields:

[global]
smtp_password = "plain_secret"

# Structured secret — one secret with many fields
[global.warehouse_db]
host = "sql-server.example.com"
port = "1433"
database = "warehouse"
schema = "dbo"
user = "admin"
password = "secret"

[claims_pipeline]
claims_db = "sqlserver://user:pass@host/db"

# FTP credentials as a structured secret
[claims_pipeline.ftp_creds]
host = "ftp.example.com"
user = "ftpuser"
password = "secret123"

Resolution order: project-scoped section first, then [global].

Plain secrets are resolved with Resolve(project, key). Structured secrets support field-level access with ResolveField(project, secret, field). When Resolve is called on a structured secret, it returns a JSON object of all fields.

SDK Socket

Pit starts a JSON-over-socket server for every run (Unix domain socket on Linux/macOS, TCP localhost on Windows). Tasks connect via the PIT_SOCKET environment variable. When --secrets is provided, the server can resolve secrets and load data into databases.

Python tasks use the bundled SDK client:

from pit_sdk import get_secret, read_sql, output_sql, write_output, load_data
import json

# Read a plain secret
conn_str = get_secret("claims_db")

# Read a structured secret (returns JSON)
db_creds = json.loads(get_secret("warehouse_db"))
host = db_creds["host"]

# Read a single field from a structured secret
host = get_secret_field("warehouse_db", "host")

# Query straight to Parquet on disk (no table held in memory)
output_sql(conn_str, "SELECT * FROM staging.claims", "claims")

# Or read into memory when you need to transform first
table = read_sql(conn_str, "SELECT * FROM staging.claims")
write_output("claims", table)

# Bulk-load Parquet into database (Go-side, no ODBC)
load_data("claims.parquet", "target_table", "claims_db")
Environment Variables
Variable Description
PIT_RUN_ID Current run identifier
PIT_TASK_NAME Current task name
PIT_DAG_NAME Current DAG name
PIT_SOCKET SDK server address
PIT_DATA_DIR Path to run's data directory for Parquet files

SQL Execution

SQL tasks (.sql files) execute in-process via Go's database/sql. Configure the connection name in pit.toml:

[dag]
name = "warehouse_transforms"

[dag.sql]
connection = "warehouse_db"

The connection name is resolved from the secrets file. Supported drivers:

Connection string prefix Driver
sqlserver://, mssql:// Microsoft SQL Server

Without --secrets, SQL tasks fall back to stub mode (log file contents without executing).

dbt Projects

Pit can orchestrate dbt projects managed by other teams. dbt is executed via uvx (no global install needed), and profiles.yml is generated at runtime from Pit secrets.

dbt Configuration
[dag]
name = "analytics_dbt"
schedule = "0 7 * * *"
overlap = "skip"
timeout = "2h"

[dag.dbt]
version = "1.9.1"              # dbt-core version
adapter = "dbt-sqlserver"       # pip package name for the adapter
extra_deps = ["dbt-utils"]      # additional pip packages (optional)
project_dir = "dbt_repo"        # relative path to dbt project root (optional for git-backed; defaults to repo root)
profile = "analytics"           # profile name in profiles.yml (default: dag name)
target = "prod"                 # target name (default: "prod")
connection = "analytics_db"     # structured secret name for db credentials

[[tasks]]
name = "staging"
script = "run --select staging"  # dbt command (not a file path)
runner = "dbt"

[[tasks]]
name = "marts"
script = "run --select marts"
runner = "dbt"
depends_on = ["staging"]

[[tasks]]
name = "test"
script = "test"
runner = "dbt"
depends_on = ["marts"]

For dbt tasks, the script field contains the dbt subcommand and arguments (e.g. "run --select staging"), not a file path. The runner field must be set to "dbt".

dbt Secrets

dbt connection details are resolved from a structured secret named by the connection field in [dag.dbt]. The secret must have these fields:

[analytics_dbt.analytics_db]
host = "sql-server.example.com"
port = "1433"
database = "analytics"
schema = "dbo"
user = "dbt_user"
password = "secret123"

Pit generates a profiles.yml in a temporary directory before each run and sets DBT_PROFILES_DIR so dbt picks it up automatically.

dbt JSON Log Parsing

dbt is invoked with --log-format json. Pit parses the JSON output and displays key events:

  • Model results: OK stg_orders (2.5s, 1500 rows)
  • Test results: PASS not_null_orders_id (0.3s)
  • Freshness results: FRESH raw_orders
  • Completion: Completed in 15.7s

Python SDK

The Python SDK (sdk/python/) provides helpers for tasks running under Pit:

Function Description
get_secret(key) Retrieve a secret (plain string or JSON for structured secrets)
get_secret_field(secret, field) Retrieve a single field from a structured secret
read_sql(conn, query) Read from a database via ConnectorX (returns Arrow Table)
output_sql(conn, query, name) Query straight to Parquet on disk — no table held in Python memory
write_output(name, data) Write Arrow/pandas/polars data to Parquet in the data directory
read_input(name) Read a named Parquet file from the data directory
load_data(file, table, conn) Trigger Go-side bulk load of Parquet into a database
ftp_list(secret, directory, pattern) List files on an FTP server matching a glob pattern
ftp_download(secret, path, *, pattern) Download file(s) from FTP to the data directory
ftp_upload(secret, local_name, remote_path) Upload a file from the data directory to FTP
ftp_move(secret, src, dst) Move or rename a file on an FTP server

The load_data function accepts optional schema (default "dbo"), and mode parameters. Supported modes:

Mode Behaviour
append (default) Insert rows into the existing table
truncate_and_load Truncate the table, then insert rows
create_or_replace Drop the table if it exists, recreate it from the Parquet schema, then insert rows

Database reads use ConnectorX (Rust-native, no ODBC drivers needed). Database writes go through the Go orchestrator's bulk loader via RPC (also no ODBC).

FTP Operations

The FTP functions communicate with the Go FTP client through the SDK socket. Credentials are resolved from structured secrets — Python never sees passwords.

from pit_sdk import ftp_list, ftp_download, ftp_upload, ftp_move

# List files matching a pattern
files = ftp_list("ftp_creds", "/incoming/sales", "sales_*.csv")

# Download a single file
ftp_download("ftp_creds", "/incoming/sales/report.csv")

# Download all matching files from a directory
downloaded = ftp_download("ftp_creds", "/incoming/sales", pattern="*.csv")

# Upload a file from the data directory
ftp_upload("ftp_creds", "results.parquet", "/outgoing/results.parquet")

# Move/archive a file on the server
ftp_move("ftp_creds", "/incoming/sales/report.csv", "/archive/sales/report.csv")

The secret parameter references a structured secret with FTP connection details:

[global.ftp_creds]
host = "ftp.example.com"
user = "data_user"
password = "secret123"
port = "21"        # optional, default 21
tls = "true"       # optional, default false

Downloaded files are saved to the run's data/ directory (PIT_DATA_DIR). Uploaded files are read from the same directory.

Roadmap

The following features are planned but not yet implemented. See pit-architecture.md for full design details.

Near-term
  • pit sync — Sync Python environments. Hash uv.lock files, only run uv sync for changed lockfiles. Parallel sync across projects.
  • pit status — Show DAG and task status from run history. Recent runs, success/failure counts, last run timestamps.
  • Cross-project requirements — Temporal dependencies between DAGs (requires = { max_age = "24h" }). Check SQLite run history at DAG start.
Mid-term
  • SQLite metadata store — Persistent run history, task instance tracking, environment hashes. WAL mode for concurrent access.
  • Notifications — Email on DAG failure via SMTP connector, outbound webhook support for Slack/Teams.
  • Additional Go connectors — SMTP, HTTP, Minio/S3 — exposed via SDK socket.
Long-term
  • REST API — Full API for DAG status, run history, task logs, outputs registry, manual triggers.
  • Status dashboard — Read-only web UI for monitoring DAGs, viewing logs, browsing outputs.
  • SDK socket authentication — Per-connection auth, secret access audit logging, per-project ACLs.
  • Log archival — Archive completed run logs for historical queries.
  • Encrypted secrets — At-rest encryption, key rotation, connection pooling across tasks.

Directories

Path Synopsis
cmd
pit command
internal
cli
dag
ftp
gitrepo
Package gitrepo manages cloning and updating git repositories used as project sources.
Package gitrepo manages cloning and updating git repositories used as project sources.
sdk

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL