autoflow/

directory
v19.0.0-rc43 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: MIT

README

Autocore Load Test

Stability and scalability test harness for the autocore workflow engine. Deploys worker and injector replicas to GKE against a shared Cloud SQL Postgres instance and Memorystore Redis. Workers run the real autocore engine (shard manager, task processing). Injectors create workflows at a configurable rate. The two roles scale independently so injection rate is never bottlenecked by processing capacity.

Prerequisites

  • gcloud CLI authenticated with access to the target GCP project
  • tofu (OpenTofu) >= 1.6
  • kubectl
  • docker with buildx (for building and pushing the image)
  • pgcli (for database analysis)
  • glab

Layout

scripts/loadtests/autoflow/
  cmd/                Go binary entrypoint (autoflow-loadtest)
  modules/loadtest/   Reusable TF module (GKE, Cloud SQL, Redis, obs stack, k8s manifests)
  envs/<name>/        Thin TF root that instantiates the module for one environment
  scripts/            Wrapper scripts (loadtest.sh, analyze.sh, trace.sh)
  manifests/          ConfigMap consumed by the TF module
  dashboards/         Grafana dashboards consumed by the TF module
  Dockerfile          Multi-stage build for the autoflow-loadtest binary

A single baseline env ships by default. To add another, copy envs/baseline/ to envs/<new-name>/ and set env_name = "<new-name>" in its main.tf. Each env has its own TF state, resource names (autoflow-lt-<env>-<rand>), and can be applied/destroyed independently.

Infrastructure Setup

cd scripts/loadtests/autoflow/envs/baseline

glab tf init -R gitlab-org/cluster-integration/gitlab-agent autoflow-loadtest-baseline
tofu apply

This provisions:

  • GKE Standard cluster (n2d-standard-32 nodes)
  • Cloud SQL Postgres 17 (Enterprise Plus, C4A Axion)
  • Memorystore Redis 7.2
  • Artifact Registry repo
  • IAM + workload identity bindings
  • Docker image build and push
  • Observability stack (Prometheus, Tempo, Grafana) with autocore dashboard

Deploy the Load Test

# Configure kubectl
$(tofu output -raw kubeconfig_command)

# Apply all manifests (ServiceAccount, Secret, ConfigMap, Deployment)
tofu output -raw kubernetes_manifests | kubectl apply -f -
Observability

Access Grafana (pre-configured with Prometheus and Tempo datasources + autocore dashboard):

kubectl port-forward svc/grafana 3000:3000
# Open http://localhost:3000 (anonymous admin, no login needed)

Access Prometheus targets:

kubectl port-forward svc/prometheus 9090:9090
# Open http://localhost:9090/targets
Workloads

This deploys two Deployments:

  • autoflow-loadtest-worker (default: 8 replicas) -- runs autocore shard manager + workers
  • autoflow-loadtest-inject (default: 8 replicas) -- injects workflows at injection_rate wf/sec each

The --mode flag controls which role a replica runs:

  • --mode=combined (default) -- both inject and process (original behavior)
  • --mode=worker -- process only, no injection, idles until terminated
  • --mode=inject -- inject only, no shard claiming or task processing, exits when done
Scaling
# Scale workers and injectors independently
kubectl scale deployment autoflow-loadtest-worker --replicas=16
kubectl scale deployment autoflow-loadtest-inject --replicas=4

# Watch logs by role
kubectl logs -f -l role=worker --max-log-requests=10
kubectl logs -f -l role=inject --max-log-requests=10

# Watch all
kubectl logs -f -l app=autoflow-loadtest --max-log-requests=20

Running a Loadtest

Use scripts/loadtest.sh to kick off a fresh run. It handles cleanup, database reset, image rebuild, and deployment. The --env flag selects the target env (default baseline):

cd scripts/loadtests/autoflow
./scripts/loadtest.sh [--env baseline] [--skip-build]

This will:

  1. Scale workers to 0 and delete inject jobs
  2. Run migrations down + up to reset the database
  3. Rebuild and push the Docker image (unless --skip-build)
  4. Deploy all manifests
  5. Print the start timestamp for use with analyze.sh
Post-Loadtest Analysis
cd scripts/loadtests/autoflow
./scripts/analyze.sh --env baseline --start '<start-time>' --end '<end-time>' --grafana-port 3000
Inspect a Single Trace
cd scripts/loadtests/autoflow
./scripts/trace.sh <trace-id> --grafana-port 3000
Manual Reset (alternative to loadtest.sh)

If you need to reset without rerunning the full script:

ENV_DIR=scripts/loadtests/autoflow/envs/baseline

# Scale down
kubectl scale deployment autoflow-loadtest-worker --replicas=0
kubectl delete job autoflow-loadtest-inject

# Reset database via migrations
DSN="$(cd "$ENV_DIR" && tofu output -raw workload_dsn)"
make run-migrations autocore down MIGRATION_DSN="$DSN"
make run-migrations autocore up   MIGRATION_DSN="$DSN"

# Redeploy
(cd "$ENV_DIR" && tofu output -raw kubernetes_manifests) | kubectl apply -f -

Analysis Queries

Connect to the database interactively:

pgcli "$(tofu output -raw workload_dsn)"

For non-interactive (scripted) queries, pipe through stdin:

echo "SELECT count(*) FROM workflow_execution;" | pgcli "$(tofu output -raw workload_dsn)" --less-chatty

State values: 1=running, 2=completed, 3=failed.

Overall Summary
SELECT
    count(*) as total_workflows,
    count(*) FILTER (WHERE state = 2) as completed,
    count(*) FILTER (WHERE state = 1) as still_running,
    count(*) FILTER (WHERE state = 3) as failed
FROM workflow_execution;
Task State Distribution
SELECT state, count(*) as count
FROM task GROUP BY state ORDER BY count DESC;
History Event Types

Event type IDs: 1=WORKFLOW_CREATED, 2=WORKFLOW_COMPLETED, 3=WORKFLOW_FAILED, 4=ACTIVITY_SCHEDULED, 5=ACTIVITY_COMPLETED, 6=ACTIVITY_FAILED, 7=TIMER_STARTED, 8=TIMER_FIRED, 9=SIGNAL_RECEIVED, 10=CANCELLATION_REQUESTED, 11=WORKFLOW_CANCELED

SELECT event_type, count(*) as count
FROM history_event GROUP BY event_type ORDER BY count DESC;
Workflow Creation Throughput (10s buckets)
SELECT
    date_trunc('second', created_at)
        - (EXTRACT(SECOND FROM created_at)::int % 10) * interval '1 second' as bucket,
    count(*) as workflows_created
FROM workflow
GROUP BY 1 ORDER BY 1;
Workflow Completion Throughput (10s buckets)
SELECT
    date_trunc('second', created_at)
        - (EXTRACT(SECOND FROM created_at)::int % 10) * interval '1 second' as bucket,
    count(*) as completions
FROM history_event
WHERE event_type = 2
GROUP BY 1 ORDER BY 1;
Workflow Latency (creation to completion)
SELECT
    percentile_cont(0.50) WITHIN GROUP (ORDER BY dur) as p50,
    percentile_cont(0.90) WITHIN GROUP (ORDER BY dur) as p90,
    percentile_cont(0.99) WITHIN GROUP (ORDER BY dur) as p99,
    min(dur) as min_s,
    max(dur) as max_s,
    avg(dur) as avg_s
FROM (
    SELECT EXTRACT(EPOCH FROM he.created_at - we.created_at) as dur
    FROM history_event he
    JOIN workflow_execution we ON he.shard_id = we.shard_id AND he.workflow_id = we.workflow_id
    WHERE he.event_type = 2
) sub;
Workflow Duration Histogram
SELECT
    CASE
        WHEN dur < 2 THEN '0-2s'
        WHEN dur < 5 THEN '2-5s'
        WHEN dur < 10 THEN '5-10s'
        WHEN dur < 20 THEN '10-20s'
        WHEN dur < 30 THEN '20-30s'
        WHEN dur < 60 THEN '30-60s'
        ELSE '60s+'
    END as bucket,
    count(*) as workflows
FROM (
    SELECT EXTRACT(EPOCH FROM he.created_at - we.created_at) as dur
    FROM history_event he
    JOIN workflow_execution we ON he.shard_id = we.shard_id AND he.workflow_id = we.workflow_id
    WHERE he.event_type = 2
) sub
GROUP BY 1 ORDER BY min(dur);
Overall Throughput

NOTE: uses subqueries to avoid an expensive cross-join on large tables.

SELECT
    round(((SELECT count(*) FROM workflow_execution WHERE state = 2)::float
    / EXTRACT(EPOCH FROM he_end.created_at - he_start.created_at))::numeric, 1)
    as completed_wf_per_sec
FROM
    (SELECT min(created_at) as created_at FROM history_event WHERE event_type = 1) he_start,
    (SELECT max(created_at) as created_at FROM history_event WHERE event_type = 2) he_end;
Test Duration

NOTE: uses subqueries to avoid an expensive cross-join on large tables.

SELECT
    min(he_start.created_at) as first_workflow,
    max(he_end.created_at) as last_completion,
    round(EXTRACT(EPOCH FROM max(he_end.created_at) - min(he_start.created_at))::numeric, 1)
    as total_seconds
FROM
    (SELECT min(created_at) as created_at FROM history_event WHERE event_type = 1) he_start,
    (SELECT max(created_at) as created_at FROM history_event WHERE event_type = 2) he_end;
Shard Distribution
SELECT shard_id,
    count(*) as workflows,
    count(*) FILTER (WHERE state = 2) as completed,
    count(*) FILTER (WHERE state = 1) as running,
    count(*) FILTER (WHERE state = 3) as failed
FROM workflow_execution
GROUP BY shard_id ORDER BY workflows DESC
LIMIT 20;
Shard Ownership
SELECT owner_id,
    count(*) as shards_owned,
    count(*) FILTER (WHERE lease_expires_at > now()) as alive,
    count(*) FILTER (WHERE lease_expires_at <= now()) as expired
FROM shard
GROUP BY owner_id ORDER BY shards_owned DESC;
Connection and Lock Pressure
-- Active connections by state
SELECT state, count(*) as count
FROM pg_stat_activity
WHERE datname = 'loadtest'
GROUP BY state ORDER BY count DESC;

-- Connection count per client
SELECT application_name, client_addr, count(*) as connections
FROM pg_stat_activity
WHERE datname = 'loadtest'
GROUP BY application_name, client_addr
ORDER BY connections DESC;

-- Lock contention (active waiters)
SELECT wait_event_type, wait_event, count(*) as count
FROM pg_stat_activity
WHERE datname = 'loadtest' AND state = 'active' AND wait_event IS NOT NULL
GROUP BY wait_event_type, wait_event
ORDER BY count DESC;

-- Lock types held
SELECT mode, count(*) as count
FROM pg_locks
WHERE database = (SELECT oid FROM pg_database WHERE datname = 'loadtest')
GROUP BY mode ORDER BY count DESC;

-- Max connections
SELECT setting as max_connections
FROM pg_settings WHERE name = 'max_connections';
Table I/O Stats
SELECT relname, seq_scan, seq_tup_read, idx_scan, idx_tup_fetch,
       n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY (n_tup_ins + n_tup_upd + n_tup_del) DESC;
Table Sizes
SELECT relname,
    pg_size_pretty(pg_total_relation_size(relid)) as total_size,
    pg_size_pretty(pg_relation_size(relid)) as table_size,
    pg_size_pretty(pg_indexes_size(relid)) as index_size
FROM pg_stat_user_tables
ORDER BY pg_total_relation_size(relid) DESC;
Index Usage
SELECT indexrelname, idx_scan, idx_tup_read, idx_tup_fetch
FROM pg_stat_user_indexes
ORDER BY idx_scan DESC
LIMIT 20;
History Events Per Workflow
SELECT avg(event_count) as avg_events,
       min(event_count) as min_events,
       max(event_count) as max_events
FROM (
    SELECT workflow_id, count(*) as event_count
    FROM history_event GROUP BY workflow_id
) sub;
Planner Behavior (why seq scans?)

Small tables (task_queue, scheduled_task, shard) will always use sequential scans because Postgres correctly determines that reading 1-4 heap pages is cheaper than an index lookup. Verify with:

SELECT relname, relpages, reltuples::bigint as est_rows
FROM pg_class
WHERE relname IN ('task_queue', 'scheduled_task', 'shard',
                  'task', 'workflow_execution', 'history_event')
ORDER BY relpages DESC;

Metrics Analysis (Prometheus/Grafana)

Grafana is available at kubectl port-forward svc/grafana <local-port>:3000. The autocore dashboard shows all panels interactively. For scripted analysis, query Prometheus directly via the Grafana proxy.

All timestamps must be Unix epoch. Convert local time with:

START=$(python3 -c "from datetime import datetime; print(int(datetime(2026,4,15,17,46,0).timestamp()))")
END=$(python3 -c "from datetime import datetime; print(int(datetime(2026,4,15,17,55,0).timestamp()))")
Query pattern
curl -sG 'http://localhost:<grafana-port>/api/datasources/proxy/uid/prometheus/api/v1/query_range' \
  --data-urlencode 'query=<promql>' \
  --data-urlencode "start=$START" \
  --data-urlencode "end=$END" \
  --data-urlencode 'step=30'

NOTE: $__rate_interval is a Grafana variable and does not work in direct API queries. Use a fixed interval like 1m instead.

Useful queries

Worker pool goroutines (active + suspended over time):

sum by (state) (autocore_workflow_goroutines)

Activity tasks in flight:

sum(autocore_activity_tasks_in_flight)

Workflow tasks in flight:

sum(autocore_workflow_tasks_in_flight)

Cold vs warm resume rate:

sum by (resume_type) (rate(autocore_workflow_tasks_claimed_total[1m]))

Workflow completion rate:

sum(rate(autocore_workflow_task_processing_duration_seconds_count{result="completed"}[1m]))

Activity completion rate by name:

sum by (activity_name) (rate(autocore_activity_execution_duration_seconds_count[1m]))
Database connection pool queries

Pool utilization (acquired / max as percentage):

sum(pgxpool_acquired_connections) / sum(pgxpool_max_connections) * 100

Empty acquires/s (callers waiting for a free connection):

sum(rate(pgxpool_empty_acquire_total[1m]))

Canceled acquires/s (operations failed due to pool starvation):

sum(rate(pgxpool_canceled_acquires_total[1m]))

Query duration (p95):

histogram_quantile(0.95, sum by (le) (rate(db_client_operation_duration_seconds_bucket{pgx_operation_type="query"}[1m])))

Connection acquire duration (p95):

histogram_quantile(0.95, sum by (le) (rate(db_client_operation_duration_seconds_bucket{pgx_operation_type="acquire"}[1m])))
Pod resource queries

CPU usage (total cores across all worker pods):

sum(rate(container_cpu_usage_seconds_total{pod=~"autoflow-loadtest-worker-.*",container="worker"}[1m]))

CPU throttling (percentage of CFS periods where workers hit their CPU limit):

sum(rate(container_cpu_cfs_throttled_periods_total{pod=~"autoflow-loadtest-worker-.*",container="worker"}[1m])) / sum(rate(container_cpu_cfs_periods_total{pod=~"autoflow-loadtest-worker-.*",container="worker"}[1m])) * 100

Memory working set (total across all worker pods):

sum(container_memory_working_set_bytes{pod=~"autoflow-loadtest-worker-.*",container="worker"})
Parsing JSON output

Pipe curl output through a python script to format it:

curl -sG '...' | python3 -c "
import sys, json
data = json.load(sys.stdin)
for r in data.get('data', {}).get('result', []):
    labels = r['metric']
    label_str = ' '.join(f'{k}={v}' for k, v in labels.items() if k != '__name__')
    print(f'--- {label_str} ---')
    for ts, val in r['values']:
        from datetime import datetime
        t = datetime.fromtimestamp(ts).strftime('%H:%M:%S')
        print(f'{t}  {val}')
"

Trace Analysis (Tempo)

Search for error traces:

curl -sG 'http://localhost:<grafana-port>/api/datasources/proxy/uid/tempo/api/search' \
  --data-urlencode 'q={resource.service.name="gitlab-kas" && status=error}' \
  --data-urlencode "start=$START" \
  --data-urlencode "end=$END" \
  --data-urlencode 'limit=5'

Fetch a full trace by ID and list all spans with duration:

curl -s 'http://localhost:<grafana-port>/api/datasources/proxy/uid/tempo/api/traces/<trace-id>' \
  | python3 -c "
import sys, json
data = json.load(sys.stdin)
spans = []
for batch in data.get('batches', []):
    for scope_span in batch.get('scopeSpans', []):
        for span in scope_span.get('spans', []):
            name = span.get('name', '?')
            start = int(span.get('startTimeUnixNano', 0))
            end = int(span.get('endTimeUnixNano', 0))
            dur_ms = (end - start) / 1_000_000
            status = 'ERROR' if span.get('status', {}).get('code', 0) == 2 else 'ok'
            spans.append((start, name, dur_ms, status))
spans.sort()
for start, name, dur_ms, status in spans:
    if dur_ms >= 1:
        print(f'{name:<60} {dur_ms:>10.1f}ms  {status}')
    else:
        print(f'{name:<60} {dur_ms*1000:>10.1f}us  {status}')
"

CPU Profiling (pprof)

The worker pods expose pprof at /debug/pprof/ on the metrics port. Capture a 30-second CPU profile during load:

kubectl port-forward deploy/autoflow-loadtest-worker 9090:9090
curl -o profile.pb.gz 'http://localhost:9090/debug/pprof/profile?seconds=30'
go tool pprof -http=:8080 profile.pb.gz

Heap profile (current allocations):

curl -o heap.pb.gz 'http://localhost:9090/debug/pprof/heap'
go tool pprof -http=:8080 heap.pb.gz

Goroutine dump (useful for checking suspended goroutine count):

curl -s 'http://localhost:9090/debug/pprof/goroutine?debug=1' | head -5

Rebuild and Redeploy

After code changes, rebuild the image and restart pods:

# Rebuild and push (run from an env root)
tofu apply -replace=module.loadtest.null_resource.docker_build_push

# Restart workers (pulls new image)
kubectl rollout restart deployment autoflow-loadtest-worker

# Restart injectors (Jobs are immutable, must delete first)
kubectl delete job autoflow-loadtest-inject
tofu output -raw kubernetes_manifests | kubectl apply -f -

Tear Down

cd scripts/loadtests/autoflow/envs/baseline
tofu destroy

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL