README
¶
Pit
Lightweight data orchestration for small-to-medium data teams. Go orchestrator, language-agnostic tasks.
Tasks are just executables — Python scripts, shell scripts, SQL files, or anything with an exit code. No decorators, no magic, no framework lock-in. The orchestrator is a single binary.
Installation
go install github.com/druarnfield/pit/cmd/pit@latest
Or build from source:
git clone https://github.com/druarnfield/pit.git
cd pit
go build ./cmd/pit
Quick Start
# Scaffold a new project
pit init my_pipeline # Python project (full package layout)
pit init --type sql my_transforms # SQL-only project (minimal)
pit init --type shell my_jobs # Shell-only project
pit init --type dbt my_dbt_project # dbt project (uvx-managed)
# Validate all project configs
pit validate
# Run a DAG
pit run my_pipeline # run entire DAG
pit run my_pipeline/extract # run a single task
pit run my_pipeline --verbose # stream task output to stdout
# Start the scheduler (cron, FTP watch, and webhook triggers)
pit serve # runs until SIGINT/SIGTERM
pit serve --verbose # with live task output
pit serve --port 8080 # webhook listener on custom port (default 9090)
# View logs from past runs
pit logs my_pipeline # latest run, all tasks
pit logs my_pipeline/extract # latest run, single task
pit logs my_pipeline --list # list available runs
pit logs my_pipeline --run-id <id> # specific run
# Query the outputs registry
pit outputs # list all declared outputs
pit outputs --project my_pipeline # filter by project
pit outputs --type table # filter by output type
pit outputs --location "warehouse.*" # filter by location (glob)
Project Structure
Projects live under projects/<name>/ with a pit.toml defining the DAG:
projects/
├── claims_pipeline/ # Python project
│ ├── pit.toml # DAG definition
│ ├── pyproject.toml # Python deps
│ ├── src/claims/ # shared package
│ └── tasks/
│ ├── extract.py
│ ├── validate.py
│ └── load.py
├── warehouse_transforms/ # SQL-only project
│ ├── pit.toml
│ └── tasks/
│ ├── staging_claims.sql
│ └── dim_providers.sql
├── nightly_maintenance/ # Shell-only project
│ ├── pit.toml
│ └── tasks/
│ ├── vacuum_db.sh
│ └── archive_logs.sh
└── analytics_dbt/ # dbt project
├── pit.toml # [dag.dbt] config
└── dbt_repo/ # dbt project root
├── dbt_project.yml
├── models/
└── tests/
DAG Configuration
Each project's pit.toml declares tasks, dependencies, and outputs:
[dag]
name = "claims_pipeline"
schedule = "0 6 * * *"
overlap = "skip"
timeout = "45m"
[[tasks]]
name = "extract"
script = "tasks/extract.py"
timeout = "15m"
retries = 2
retry_delay = "30s"
[[tasks]]
name = "validate"
script = "tasks/validate.py"
depends_on = ["extract"]
[[tasks]]
name = "load"
script = "tasks/load.py"
depends_on = ["validate"]
[[outputs]]
name = "claims_staging"
type = "table"
location = "warehouse.staging.claims"
Git-backed Projects
A DAG can pull its source from a remote git repository instead of a local directory. Add git_url and git_ref to [dag]:
[dag]
name = "claims_pipeline"
git_url = "git@github.com:company/claims.git"
git_ref = "main" # branch, tag, or commit SHA
schedule = "0 6 * * *"
[[tasks]]
name = "extract"
script = "tasks/extract.py" # path relative to repo root
Both fields must be set together (or both omitted). The pit.toml itself stays local in projects/<name>/; only the task source files come from the remote repo.
How it works:
- Before each run Pit clones the repo (or fetches updates if already cloned) into
repo_cache/<dag_name>/. - The cached clone is snapshotted into the run directory exactly like a local project.
- Python venv (
uv) is resolved against the persistent cache, so dependencies are only reinstalled when the lockfile changes.
Pit shells out to the system git binary, inheriting your SSH agent and credential helper configuration automatically.
Directory layout with git-backed projects:
<root>/
├── projects/<name>/pit.toml ← DAG definition (local)
├── repo_cache/<name>/ ← persistent clone (managed by pit)
└── runs/<run-id>/ ← snapshot + logs (unchanged)
Validation skips local filesystem checks (script existence, dbt.project_dir) for git-backed projects since the source is not on disk until run time. For git-backed DAGs, dbt.project_dir is also optional — if omitted it defaults to the repo root.
Task Runners
Runner is determined by file extension, with an optional override:
| Extension | Default Runner | Execution |
|---|---|---|
.py |
python |
uv run --project {project_dir} {script} |
.sh |
bash |
bash {script} |
.sql |
sql |
Go SQL operator against [dag.sql] connection |
| n/a | dbt |
uvx dbt {command} via [dag.dbt] config |
Custom runners use the $ prefix syntax:
[[tasks]]
name = "transform"
script = "tasks/transform.js"
runner = "$ node" # runs: node tasks/transform.js
CLI Commands
Implemented
| Command | Description |
|---|---|
pit validate |
Validate all pit.toml files (cycles, missing deps, script paths) |
pit init <name> |
Scaffold a new project (--type python|sql|shell|dbt) |
pit run <dag>[/<task>] |
Execute a DAG or single task (--verbose for live output) |
pit serve [--port N] |
Run the scheduler with cron, FTP watch, and webhook triggers (webhook port default: 9090) |
pit logs <dag>[/<task>] |
View task logs (--list for runs, --run-id for specific run) |
pit outputs |
List declared outputs (--project, --type, --location filters) |
Global Flags
| Flag | Description |
|---|---|
--project-dir |
Root project directory (default: .) |
--verbose |
Enable verbose output |
--secrets |
Path to secrets TOML file (enables SDK socket and SQL connections) |
Run Snapshots
When a run begins, Pit copies the project directory to a snapshot. Tasks execute from the snapshot, not the source — so git pulls or edits during a run can't affect in-flight tasks.
runs/
└── 20240115_143022.123_claims_pipeline/
├── project/ # frozen copy of the project
├── logs/ # per-task log files
│ ├── extract.log
│ ├── validate.log
│ └── load.log
└── data/ # inter-task Parquet files
└── extracted_claims.parquet
The data/ directory is used for inter-task data passing. Tasks discover it via the PIT_DATA_DIR environment variable.
Execution Model
- Tasks execute in topological order, parallelising independent branches
- Per-task retries with configurable delay
- Per-task and per-DAG timeouts via context cancellation
- Failed tasks mark all downstream tasks as
upstream_failed - Task states:
pending→running→success | failed | skipped | upstream_failed
Automated Scheduling
pit serve runs as a long-lived process, monitoring all projects for scheduled triggers and FTP file watches.
Cron Triggers
Set schedule in pit.toml using standard cron syntax (5-field):
[dag]
name = "nightly_etl"
schedule = "0 6 * * *" # 6 AM daily
overlap = "skip" # skip if previous run still active
FTP Watch Triggers
Monitor an FTP server for incoming files. When files matching the pattern are stable (unchanged size for stable_seconds), a DAG run is triggered with the files seeded into the run's data/ directory.
[dag]
name = "sales_ingest"
overlap = "skip"
[dag.ftp_watch]
secret = "ftp_creds" # structured secret for host, user, password
port = 21
tls = true
directory = "/incoming/sales"
pattern = "sales_*.csv"
archive_dir = "/archive/sales" # move files here after success
poll_interval = "30s"
stable_seconds = 30 # wait for file to stop growing
The secret field references a structured secret containing host, user, and password fields:
[global.ftp_creds]
host = "ftp.example.com"
user = "data_user"
password = "secret123"
Legacy configuration using host, user, and password_secret as separate fields is still supported for backward compatibility.
Both trigger types can be combined on the same DAG.
Webhook Triggers
Trigger a DAG run via an inbound HTTP POST request. Useful for CI/CD pipelines, GitHub Actions, or any system that can send a webhook.
[dag]
name = "deploy_pipeline"
overlap = "skip"
[dag.webhook]
token_secret = "deploy_webhook_token" # plain secret name for auth token
Add the token to your secrets file:
[global]
deploy_webhook_token = "supersecret123"
Start the server with --secrets (required for webhook token resolution):
pit serve --secrets secrets.toml --port 9090
Trigger a run:
curl -X POST http://localhost:9090/webhook/deploy_pipeline \
-H "Authorization: Bearer supersecret123"
# → 202 Accepted
The webhook listener only starts if at least one DAG has [dag.webhook] configured. All DAGs with a webhook share the same port; the URL path routes by DAG name.
Workspace Configuration
Create a pit_config.toml in the project root to set workspace-level defaults:
secrets_dir = "secrets/secrets.toml"
runs_dir = "runs"
repo_cache_dir = "repo_cache"
dbt_driver = "ODBC Driver 17 for SQL Server"
keep_artifacts = ["logs", "project", "data"]
| Field | Default | Description |
|---|---|---|
secrets_dir |
(none) | Path to secrets TOML file |
runs_dir |
"runs" |
Directory for run snapshots |
repo_cache_dir |
"repo_cache" |
Directory for persistent git repository clones |
dbt_driver |
"ODBC Driver 17 for SQL Server" |
ODBC driver for dbt profiles |
keep_artifacts |
["logs", "project", "data"] |
Which run subdirs to keep after completion |
All fields are optional. Relative paths are resolved from the project root. CLI flags take precedence if both are set.
Artifact Retention
By default, Pit keeps all run artifacts (project snapshot, logs, and data). To save disk space, configure keep_artifacts to retain only what you need:
# Workspace level — applies to all projects
keep_artifacts = ["logs"]
Per-project overrides are supported in pit.toml:
[dag]
name = "my_pipeline"
keep_artifacts = ["logs", "data"] # override workspace default
Resolution order: per-project (if set) > workspace (if set) > default (keep all). Valid values: logs, project, data.
Development
# Run all tests
go test -race ./...
# Run with verbose output
go test -v ./internal/engine
# Integration tests (requires bash, uv, etc.)
go test -tags integration ./...
# Vet
go vet ./...
Secrets
Secrets are stored in a TOML file outside the repo (e.g. /etc/pit/secrets.toml in production). Pass the path via --secrets:
pit run my_pipeline --secrets ./secrets.toml
Secrets are organised by project scope with a [global] fallback. A secret can be either a plain value or a structured secret with multiple fields:
[global]
smtp_password = "plain_secret"
# Structured secret — one secret with many fields
[global.warehouse_db]
host = "sql-server.example.com"
port = "1433"
database = "warehouse"
schema = "dbo"
user = "admin"
password = "secret"
[claims_pipeline]
claims_db = "sqlserver://user:pass@host/db"
# FTP credentials as a structured secret
[claims_pipeline.ftp_creds]
host = "ftp.example.com"
user = "ftpuser"
password = "secret123"
Resolution order: project-scoped section first, then [global].
Plain secrets are resolved with Resolve(project, key). Structured secrets support field-level access with ResolveField(project, secret, field). When Resolve is called on a structured secret, it returns a JSON object of all fields.
SDK Socket
Pit starts a JSON-over-socket server for every run (Unix domain socket on Linux/macOS, TCP localhost on Windows). Tasks connect via the PIT_SOCKET environment variable. When --secrets is provided, the server can resolve secrets and load data into databases.
Python tasks use the bundled SDK client:
from pit_sdk import get_secret, read_sql, output_sql, write_output, load_data
import json
# Read a plain secret
conn_str = get_secret("claims_db")
# Read a structured secret (returns JSON)
db_creds = json.loads(get_secret("warehouse_db"))
host = db_creds["host"]
# Read a single field from a structured secret
host = get_secret_field("warehouse_db", "host")
# Query straight to Parquet on disk (no table held in memory)
output_sql(conn_str, "SELECT * FROM staging.claims", "claims")
# Or read into memory when you need to transform first
table = read_sql(conn_str, "SELECT * FROM staging.claims")
write_output("claims", table)
# Bulk-load Parquet into database (Go-side, no ODBC)
load_data("claims.parquet", "target_table", "claims_db")
Environment Variables
| Variable | Description |
|---|---|
PIT_RUN_ID |
Current run identifier |
PIT_TASK_NAME |
Current task name |
PIT_DAG_NAME |
Current DAG name |
PIT_SOCKET |
SDK server address |
PIT_DATA_DIR |
Path to run's data directory for Parquet files |
SQL Execution
SQL tasks (.sql files) execute in-process via Go's database/sql. Configure the connection name in pit.toml:
[dag]
name = "warehouse_transforms"
[dag.sql]
connection = "warehouse_db"
The connection name is resolved from the secrets file. Supported drivers:
| Connection string prefix | Driver |
|---|---|
sqlserver://, mssql:// |
Microsoft SQL Server |
Without --secrets, SQL tasks fall back to stub mode (log file contents without executing).
dbt Projects
Pit can orchestrate dbt projects managed by other teams. dbt is executed via uvx (no global install needed), and profiles.yml is generated at runtime from Pit secrets.
dbt Configuration
[dag]
name = "analytics_dbt"
schedule = "0 7 * * *"
overlap = "skip"
timeout = "2h"
[dag.dbt]
version = "1.9.1" # dbt-core version
adapter = "dbt-sqlserver" # pip package name for the adapter
extra_deps = ["dbt-utils"] # additional pip packages (optional)
project_dir = "dbt_repo" # relative path to dbt project root (optional for git-backed; defaults to repo root)
profile = "analytics" # profile name in profiles.yml (default: dag name)
target = "prod" # target name (default: "prod")
connection = "analytics_db" # structured secret name for db credentials
[[tasks]]
name = "staging"
script = "run --select staging" # dbt command (not a file path)
runner = "dbt"
[[tasks]]
name = "marts"
script = "run --select marts"
runner = "dbt"
depends_on = ["staging"]
[[tasks]]
name = "test"
script = "test"
runner = "dbt"
depends_on = ["marts"]
For dbt tasks, the script field contains the dbt subcommand and arguments (e.g. "run --select staging"), not a file path. The runner field must be set to "dbt".
dbt Secrets
dbt connection details are resolved from a structured secret named by the connection field in [dag.dbt]. The secret must have these fields:
[analytics_dbt.analytics_db]
host = "sql-server.example.com"
port = "1433"
database = "analytics"
schema = "dbo"
user = "dbt_user"
password = "secret123"
Pit generates a profiles.yml in a temporary directory before each run and sets DBT_PROFILES_DIR so dbt picks it up automatically.
dbt JSON Log Parsing
dbt is invoked with --log-format json. Pit parses the JSON output and displays key events:
- Model results:
OK stg_orders (2.5s, 1500 rows) - Test results:
PASS not_null_orders_id (0.3s) - Freshness results:
FRESH raw_orders - Completion:
Completed in 15.7s
Python SDK
The Python SDK (sdk/python/) provides helpers for tasks running under Pit:
| Function | Description |
|---|---|
get_secret(key) |
Retrieve a secret (plain string or JSON for structured secrets) |
get_secret_field(secret, field) |
Retrieve a single field from a structured secret |
read_sql(conn, query) |
Read from a database via ConnectorX (returns Arrow Table) |
output_sql(conn, query, name) |
Query straight to Parquet on disk — no table held in Python memory |
write_output(name, data) |
Write Arrow/pandas/polars data to Parquet in the data directory |
read_input(name) |
Read a named Parquet file from the data directory |
load_data(file, table, conn) |
Trigger Go-side bulk load of Parquet into a database |
ftp_list(secret, directory, pattern) |
List files on an FTP server matching a glob pattern |
ftp_download(secret, path, *, pattern) |
Download file(s) from FTP to the data directory |
ftp_upload(secret, local_name, remote_path) |
Upload a file from the data directory to FTP |
ftp_move(secret, src, dst) |
Move or rename a file on an FTP server |
The load_data function accepts optional schema (default "dbo"), and mode parameters. Supported modes:
| Mode | Behaviour |
|---|---|
append (default) |
Insert rows into the existing table |
truncate_and_load |
Truncate the table, then insert rows |
create_or_replace |
Drop the table if it exists, recreate it from the Parquet schema, then insert rows |
Database reads use ConnectorX (Rust-native, no ODBC drivers needed). Database writes go through the Go orchestrator's bulk loader via RPC (also no ODBC).
FTP Operations
The FTP functions communicate with the Go FTP client through the SDK socket. Credentials are resolved from structured secrets — Python never sees passwords.
from pit_sdk import ftp_list, ftp_download, ftp_upload, ftp_move
# List files matching a pattern
files = ftp_list("ftp_creds", "/incoming/sales", "sales_*.csv")
# Download a single file
ftp_download("ftp_creds", "/incoming/sales/report.csv")
# Download all matching files from a directory
downloaded = ftp_download("ftp_creds", "/incoming/sales", pattern="*.csv")
# Upload a file from the data directory
ftp_upload("ftp_creds", "results.parquet", "/outgoing/results.parquet")
# Move/archive a file on the server
ftp_move("ftp_creds", "/incoming/sales/report.csv", "/archive/sales/report.csv")
The secret parameter references a structured secret with FTP connection details:
[global.ftp_creds]
host = "ftp.example.com"
user = "data_user"
password = "secret123"
port = "21" # optional, default 21
tls = "true" # optional, default false
Downloaded files are saved to the run's data/ directory (PIT_DATA_DIR). Uploaded files are read from the same directory.
Roadmap
The following features are planned but not yet implemented. See pit-architecture.md for full design details.
Near-term
pit sync— Sync Python environments. Hashuv.lockfiles, only runuv syncfor changed lockfiles. Parallel sync across projects.pit status— Show DAG and task status from run history. Recent runs, success/failure counts, last run timestamps.- Cross-project requirements — Temporal dependencies between DAGs (
requires = { max_age = "24h" }). Check SQLite run history at DAG start.
Mid-term
- SQLite metadata store — Persistent run history, task instance tracking, environment hashes. WAL mode for concurrent access.
- Notifications — Email on DAG failure via SMTP connector, outbound webhook support for Slack/Teams.
- Additional Go connectors — SMTP, HTTP, Minio/S3 — exposed via SDK socket.
Long-term
- REST API — Full API for DAG status, run history, task logs, outputs registry, manual triggers.
- Status dashboard — Read-only web UI for monitoring DAGs, viewing logs, browsing outputs.
- SDK socket authentication — Per-connection auth, secret access audit logging, per-project ACLs.
- Log archival — Archive completed run logs for historical queries.
- Encrypted secrets — At-rest encryption, key rotation, connection pooling across tasks.