Runs Service
The Runs Service implements the RunService gRPC interface using buf connect. It manages workflow runs, actions, attempts, and cluster events.
Features
- Create and manage runs
- Track actions and their execution state
- Record action attempts and retries
- Store cluster events and phase transitions
- List and filter runs and actions
- Abort runs and individual actions
- Dual database support: SQLite (default) and PostgreSQL
- Health and readiness checks
- Streaming RPCs for real-time updates
Running the Service
Prerequisites
- Go 1.24 or later
- (Optional) PostgreSQL running for production use
Quick Start with SQLite
The service uses SQLite by default, no database setup required!
# Build and run
go build -o runs-service cmd/main.go
./runs-service --config config.yaml
Or using make:
make build
make run
The service will:
- Create/connect to SQLite database (
runs.db)
- Run database migrations
- Start HTTP/2 server on port 8090 (configurable)
Setup with PostgreSQL
For production deployments, use PostgreSQL:
# Using Docker
docker run --name flyte-runs-postgres \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=flyte_runs \
-p 5433:5432 \
-d postgres:15
# Or use your existing PostgreSQL instance
createdb flyte_runs
Run with PostgreSQL config:
./runs-service --config config-postgres.yaml
Configuration
SQLite (default) - config.yaml:
runs:
server:
host: "0.0.0.0"
port: 8090
watchBufferSize: 100
database:
sqlite:
file: "./runs.db"
logger:
level: 4 # Info level
show-source: true
PostgreSQL - config-postgres.yaml:
runs:
server:
host: "0.0.0.0"
port: 8090
watchBufferSize: 100
database:
postgres:
host: "localhost"
port: 5433
dbname: "flyte_runs"
username: "postgres"
password: "postgres"
extraOptions: "sslmode=disable"
maxIdleConnections: 10
maxOpenConnections: 100
connMaxLifeTime: 1h
logger:
level: 4
show-source: true
Testing
Run unit tests
go test ./...
# or
make test
Run API tests
Please ensure the service is started by following Quick Start with SQLite section
before running API tests.
make api-test
Run integration test with client
Using SQLite:
# Terminal 1: Start the service
go run cmd/main.go --config config.yaml
# Terminal 2: Run the test client
go run testclient/main.go
Or use make:
make integration-test
Using PostgreSQL:
make integration-test-postgres
Expected output:
Test 1: Creating a run with task spec...
✓ Run created successfully: test-run-001
Test 2: Creating a run with auto-generated name...
✓ Run with auto-generated name created: run-1759535665
Test 3: Getting run details...
✓ Retrieved run details: details:{}
Test 4: Listing runs...
✓ Found 2 runs
Test 5: Listing actions for the run...
✓ Found 0 actions
Test 6: Aborting an action...
✓ Action aborted successfully
Test 7: Aborting a run...
✓ Run aborted successfully
All tests completed successfully! 🎉
Scripts
Convenient scripts are provided in runs/tests/scripts/ to interact with the service using buf curl.
Ensure the service is running before executing these scripts.
./runs/tests/scripts/create_task.sh - create a new task
./runs/tests/scripts/list_tasks.sh - list tasks with name filtering
Check service health
# Health check
curl http://localhost:8090/healthz
# Readiness check
curl http://localhost:8090/readyz
Inspect database
SQLite:
sqlite3 runs.db
# View runs
SELECT id, org, project, domain, name, root_action_name, created_at
FROM runs
ORDER BY created_at DESC;
# View actions
SELECT id, org, project, domain, run_name, name, phase, action_type
FROM actions
ORDER BY created_at DESC;
PostgreSQL:
# Connect to PostgreSQL
psql -h localhost -p 5433 -U postgres -d flyte_runs
# View runs
SELECT id, org, project, domain, name, root_action_name, created_at
FROM runs
ORDER BY created_at DESC;
# View actions
SELECT id, org, project, domain, run_name, name, phase, action_type
FROM actions
ORDER BY created_at DESC;
API Endpoints
The service exposes the following buf connect endpoints:
Run Management
POST /flyteidl2.workflow.RunService/CreateRun - Create a new run
POST /flyteidl2.workflow.RunService/GetRunDetails - Get run details
POST /flyteidl2.workflow.RunService/ListRuns - List runs with filtering
POST /flyteidl2.workflow.RunService/AbortRun - Abort a run
Action Management
POST /flyteidl2.workflow.RunService/GetActionDetails - Get action details
POST /flyteidl2.workflow.RunService/GetActionData - Get action input/output data
POST /flyteidl2.workflow.RunService/ListActions - List actions for a run
POST /flyteidl2.workflow.RunService/AbortAction - Abort a specific action
Task Management
POST /flyteidl2.workflow.TaskService/CreateTask - Create a new task
POST /flyteidl2.workflow.TaskService/GetTask - Get task details
POST /flyteidl2.workflow.TaskService/ListTasks - List tasks with filtering and sorting
POST /flyteidl2.workflow.TaskService/UpdateTask - Update an existing task
Streaming (Watch) RPCs
POST /flyteidl2.workflow.RunService/WatchRunDetails - Stream run detail updates
POST /flyteidl2.workflow.RunService/WatchActionDetails - Stream action detail updates
POST /flyteidl2.workflow.RunService/WatchRuns - Stream run updates
POST /flyteidl2.workflow.RunService/WatchActions - Stream action updates
POST /flyteidl2.workflow.RunService/WatchClusterEvents - Stream cluster events
Health Endpoints
GET /healthz - Health check
GET /readyz - Readiness check
Database Schema
runs table
CREATE TABLE runs (
id BIGSERIAL PRIMARY KEY,
org VARCHAR(255) NOT NULL,
project VARCHAR(255) NOT NULL,
domain VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
root_action_name VARCHAR(255) NOT NULL,
trigger_org VARCHAR(255),
trigger_project VARCHAR(255),
trigger_domain VARCHAR(255),
trigger_name VARCHAR(255),
run_spec JSONB NOT NULL,
created_by_principal VARCHAR(255),
created_by_k8s_service_account VARCHAR(255),
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
UNIQUE(org, project, domain, name)
);
actions table
CREATE TABLE actions (
id BIGSERIAL PRIMARY KEY,
org VARCHAR(255) NOT NULL,
project VARCHAR(255) NOT NULL,
domain VARCHAR(255) NOT NULL,
run_name VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
run_id BIGINT NOT NULL REFERENCES runs(id),
parent_action_name VARCHAR(255),
action_type VARCHAR(50) NOT NULL,
phase VARCHAR(50) NOT NULL DEFAULT 'PHASE_QUEUED',
task_spec JSONB,
trace_spec JSONB,
condition_spec JSONB,
input_uri TEXT,
metadata JSONB,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
UNIQUE(org, project, domain, run_name, name)
);
action_attempts table
CREATE TABLE action_attempts (
id BIGSERIAL PRIMARY KEY,
action_id BIGINT NOT NULL REFERENCES actions(id),
attempt_number INT NOT NULL,
phase VARCHAR(50) NOT NULL DEFAULT 'PHASE_QUEUED',
start_time TIMESTAMP,
end_time TIMESTAMP,
outputs JSONB,
log_info JSONB,
log_context JSONB,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
UNIQUE(action_id, attempt_number)
);
cluster_events table
CREATE TABLE cluster_events (
id BIGSERIAL PRIMARY KEY,
attempt_id BIGINT NOT NULL REFERENCES action_attempts(id),
occurred_at TIMESTAMP NOT NULL,
message TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
phase_transitions table
CREATE TABLE phase_transitions (
id BIGSERIAL PRIMARY KEY,
attempt_id BIGINT NOT NULL REFERENCES action_attempts(id),
phase VARCHAR(50) NOT NULL,
start_time TIMESTAMP NOT NULL,
end_time TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
Project Structure
runs/
├── cmd/
│ └── main.go # Entry point
├── config/
│ └── config.go # Configuration structs
├── repository/
│ ├── interfaces.go # Repository interface
│ ├── models.go # Database models
│ └── postgres.go # PostgreSQL/SQLite implementation
├── service/
│ └── run_service.go # Service implementation
├── migrations/
│ └── migrations.go # Database migrations
├── client/
│ └── main.go # Test client
├── config.yaml # SQLite configuration (default)
├── config-postgres.yaml # PostgreSQL configuration
├── Makefile # Build and test commands
└── README.md # This file
Development
Build commands
# Build service
make build
# Build client
make build-testclient
# Clean artifacts
make clean
Database switching
The service automatically selects the database based on configuration:
- If
database.sqlite.file is set → uses SQLite
- If
database.postgres is set → uses PostgreSQL
No code changes needed, just update config.yaml!
Database migration
Runs schema changes are managed through versioned gormigrate migrations.
Migrations run automatically during service startup.
make run
Adding new features
- Update proto definitions in
/flyteidl2/workflow/
- Regenerate code:
buf generate
- Update repository interface in
repository/interfaces/
- Update DB models in
repository/models/
- Implement in
repository/impl/
- Add service handler in
service/xxx_service.go
- Add tests and update client
Troubleshooting
Port already in use
If port 8090 is already in use, update in config.yaml:
runs:
server:
port: 8091 # Use a different port
Database connection issues
SQLite:
- Check file permissions on
./runs.db
- Ensure directory is writable
PostgreSQL:
- Verify PostgreSQL is running:
pg_isready -h localhost -p 5433
- Check credentials in
config-postgres.yaml
- Ensure database exists:
psql -h localhost -p 5433 -U postgres -l
Service won't start
Check logs for detailed error messages. Common issues:
- Config file not found (use
--config flag)
- Database connection failed
- Port already in use