Apache Airflow Go Task SDK
The Go SDK is a Go implementation of the Airflow Task SDK. It lets you write task functions in Go that
have native access to the Airflow "model" (Variables, Connections, and XCom), instead of writing them in
Python.
It is built on the Task Execution Interface (TEI, a.k.a. the Task API) introduced by AIP-72 in Airflow
3.0.0. AIP-72 standardised how a task runtime talks to Airflow over an HTTP Execution API, which decoupled
the language a task is written in from the Airflow core. The Go SDK is one such runtime; the Java SDK is
another.
[!WARNING]
This is an experimental feature. The SDK is under active development and its APIs, wire protocols,
and tooling may change between releases without notice.
The compiled-language constraint
Python tasks are imported and run in-process. Go is compiled, so the model is different.
A single binary that bundles one or more Dags' task functions is called a bundle. You build one with
the SDK's packer, airflow-go-pack, which compiles your code and appends a metadata footer (the manifest
of dag_ids and task_ids, plus the Dag source) to the executable. The result is a self-contained
executable bundle: a single runnable file that is the bundle, with no separate manifest or archive to
ship alongside it.
You still need a Python stub Dag (for now)
The Task API does not yet carry Dag structure for non-Python languages, so the Dag's shape and task
dependencies are still declared in a small Python file using @task.stub:
from airflow.sdk import dag, task
@task.stub(queue="golang")
def extract(): ...
@task.stub(queue="golang")
def transform(): ...
@dag()
def simple_dag():
extract() >> transform()
simple_dag()
@task.stub tells the Dag parser the "shape" of the Go tasks (their names and dependencies) without any
Python implementation. The queue= value routes the task to the Go runtime. This Python requirement is a
known limitation.
Authoring a bundle
Implement bundlev1.BundleProvider, register your Dags and tasks, and main is one line. From
example/bundle/main.go:
type myBundle struct{}
var _ v1.BundleProvider = (*myBundle)(nil)
func (m *myBundle) GetBundleVersion() v1.BundleInfo {
return v1.BundleInfo{Name: bundleName, Version: &bundleVersion}
}
func (m *myBundle) RegisterDags(dagbag v1.Registry) error {
simpleDag := dagbag.AddDag("simple_dag")
simpleDag.AddTask(extract)
simpleDag.AddTask(transform)
return nil
}
func main() {
if err := bundlev1server.Serve(&myBundle{}); err != nil {
log.Fatal(err)
}
}
A task is an ordinary Go function. The runtime inspects its signature and injects arguments by type:
sdk.TIRunContext, *slog.Logger, and an sdk.Client (or a narrower interface such as
sdk.VariableClient). An optional (any, error) return becomes the task's XCom; an error return marks
the task failed.
func extract(ctx sdk.TIRunContext, client sdk.Client, log *slog.Logger) (any, error) {
conn, err := client.GetConnection(ctx, "test_http")
// ... do work, honour ctx cancellation ...
return map[string]any{"go_version": runtime.Version()}, nil
}
func transform(ctx sdk.TIRunContext, client sdk.VariableClient, log *slog.Logger) error {
val, err := client.GetVariable(ctx, "my_variable")
if err != nil {
return err
}
log.Info("Obtained variable", "my_variable", val)
return nil
}
Asking for the narrowest interface a task needs (e.g. sdk.VariableClient instead of sdk.Client) makes
unit testing easier and documents which Airflow features the task touches. RegisterDags is the single
source of truth for which dag_ids and task_ids a bundle can run.
Reading the task runtime context
Declare an sdk.TIRunContext parameter on a task to read the identifiers and scheduling timestamps of the
running task instance and its Dag run -- the Go equivalent of the execution context the Python and Java SDKs
expose. It is an interface that embeds context.Context, so the same ctx drives cancellation and client
calls. The runtime binds it by type, just like the other injected parameters:
func extract(ctx sdk.TIRunContext, log *slog.Logger) (any, error) {
ti := ctx.TaskInstance()
log.Info("running",
"dag_id", ti.DagID,
"run_id", ti.RunID,
"task_id", ti.TaskID,
"try_number", ti.TryNumber,
"logical_date", ctx.DagRun().LogicalDate,
)
return nil, nil
}
ctx.TaskInstance() returns DagID, RunID, TaskID, MapIndex (nil for an unmapped task), and
TryNumber; ctx.DagRun() returns DagID, RunID, and the *time.Time fields LogicalDate,
DataIntervalStart, and DataIntervalEnd (nil when the run has no such value, e.g. a manual trigger).
Deployment modes
A bundle can run in two ways. The same bundle binary works in both; you pick one per deployment:
- Coordinator (recommended)
- Edge Worker
For the protocol details behind each, see How it works.
Coordinator (recommended)
A Python task runner executes the Go task directly, with no separate Go worker process to run on the host.
This is the same coordinator mechanism the Java SDK uses.
Why this is recommended: the mature Python supervisor handles the Airflow-facing concerns, so this path
inherits its capabilities (remote task logs to S3/GCS, the full range of task states, and alternate XCom
backends) rather than reimplementing them in Go. These are exactly the features the Edge Worker path is
still missing (see Known limitations).
Quickstart
-
Build and pack your bundle with airflow-go-pack. The packer compiles the bundle and appends an
embedded metadata footer so the coordinator can read its dag_ids without executing the binary,
producing a single runnable file:
go tool airflow-go-pack ./example/bundle -- -trimpath -tags=prod
Use --output <path> to write the packed bundle straight into a directory the coordinator scans
(executables_root), and pass extra go build flags after --.
For cross-compiling (e.g. deploy to a Linux host from an Apple-silicon (darwin/arm64) machine), pass --goos/--goarch and the
packer cross-builds for you:
go tool airflow-go-pack --goos linux --goarch amd64 \
--output ~/airflow/executable-bundles/sample-dag-bundle \
./example/bundle
Alternatively, use --executable/--source. The packer normally execs the binary to read
its metadata; a cross-compiled binary cannot run on the host, so generate the metadata on a machine that
can run it and pass the file with --airflow-metadata:
# on linux/amd64 machine:
go build -o my-bundle ./example/bundle
./my-bundle --airflow-metadata > airflow-metadata.yaml
# on darwin/arm64 machine:
go tool airflow-go-pack --executable ./my-bundle --source main.go --airflow-metadata airflow-metadata.yaml
[!NOTE]
The packer ships via the Go 1.24 tool directive, so there is no global install: add
tool github.com/apache/airflow/go-sdk/cmd/airflow-go-pack to your bundle module's go.mod and run
it with go tool airflow-go-pack. This pins the packer version per project.
-
Register the coordinator and route the queue to it, under [sdk] in airflow.cfg (or the equivalent
AIRFLOW__SDK__* env vars):
[sdk]
coordinators = {"go": {"classpath": "airflow.sdk.coordinators.executable.ExecutableCoordinator", "kwargs": {"executables_root": ["~/airflow/executable-bundles"]}}}
queue_to_coordinator = {"golang": "go"}
executables_root is one or more directories the coordinator scans for bundles; queue_to_coordinator
routes stub tasks with queue="golang" to this Go coordinator.
[!IMPORTANT]
The coordinator is part of the Airflow worker, so the [sdk] config (and the bundle files in
executables_root) only need to be present wherever tasks actually execute. With CeleryExecutor,
setting it on the Celery workers is sufficient. With LocalExecutor, tasks run inside the scheduler
process, so it must be set where the scheduler can read it. The API server and Dag processor do not
need it.
-
Deploy the matching Python stub Dag (above) into Airflow. There is no separate Go worker to run: the
Airflow worker forks the bundle binary once per task instance.
Edge Worker (go-plugin)
A long-running Go worker process (airflow-go-edge-worker) polls Airflow for work and runs your bundle,
with no Python in the data path. This path runs end-to-end today, but is missing the features listed under
Known limitations.
Quickstart
-
See example/bundle/main.go for an example Dag bundle.
-
Compile it into a binary:
go build -o ./bin/sample-dag-bundle ./example/bundle
(or see the Justfile for how to build it and set the bundle version at
build time.)
-
Configure the Go edge worker by editing $AIRFLOW_HOME/go-sdk.yaml. The ports below are the defaults
assuming Airflow runs locally via airflow standalone; tweak the ports and secrets to match your setup:
edge:
api_url: "http://0.0.0.0:8080/"
execution:
api_url: "http://0.0.0.0:8080/execution/"
api_auth:
# This needs to match the value from the same setting in your API server for Edge API to function
secret_key: "hPDU4Yi/wf5COaWiqeI3g=="
bundles:
# Which folder to look in for pre-compiled bundle binaries
folder: "./bin"
logging:
# Where to write task logs to
base_log_folder: "./logs"
# Secret key matching airflow API server config, to only allow log requests from there.
secret_key: "u0ZDb2ccINAbhzNmvYzclw=="
You can also set these options via environment variables of AIRFLOW__${SECTION}__${KEY}, for example
AIRFLOW__API_AUTH__SECRET_KEY.
-
Install the worker:
go install github.com/apache/airflow/go-sdk/cmd/airflow-go-edge-worker@latest
-
Run it:
airflow-go-edge-worker run --queues golang
-
Deploy the matching Python stub Dag (above) into Airflow.
Known limitations
A non-exhaustive list of features the Edge Worker (go-plugin) path has yet to implement. These are the
main reason the coordinator-based path is recommended: in that mode the Python supervisor handles these
concerns, so they are not limitations there.
- Putting tasks into states other than success or failed/up-for-retry (deferred,
failed-without-retries, etc.).
- Remote task logs (i.e. S3/GCS etc.).
- XCom reading/writing through non-default XCom backends.
How it works
The same bundle binary speaks two different protocols; which one it uses is decided at launch by the CLI
flags it was invoked with. User code (func main) is identical either way.
Coordinator protocol
Python supervisor / task runner
│ finds + validates the bundle, then forks it:
▼
<bundle binary> --comm=127.0.0.1:P1 --logs=127.0.0.1:P2
│ binary dials BACK over TCP loopback (msgpack-over-IPC)
▼
GetConnection / GetVariable / GetXCom / SetXCom ... → SucceedTask / TaskState
- The Python
ExecutableCoordinator forks the bundle binary with --comm/--logs addresses it is already
listening on. The binary dials back (it never listens) and speaks a length-prefixed msgpack-over-IPC wire
protocol on the comm socket, with structured JSON-line logs on the logs socket.
- The Python runtime is the worker. It proxies every
GetConnection / GetVariable / GetXCom /
SetXCom call through to the Execution API. The Go binary just runs the task function.
The Go side of the protocol is implemented in pkg/execution/. On the Python side it is the
ExecutableCoordinator in task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py.
Edge Worker protocol
Airflow scheduler ──Edge Executor API──► airflow-go-edge-worker ──go-plugin/gRPC──► bundle binary
(ExecuteTaskWorkload) (long-running Go process) (child process)
airflow-go-edge-worker is a long-running Go process. It registers with the scheduler, polls the Edge
Executor API for ExecuteTaskWorkloads, and heartbeats.
- For each workload it execs the bundle binary as a child and connects over HashiCorp
go-plugin (gRPC over a handshake-gated socket).
- The Task API itself has no way to deliver an
ExecuteTaskWorkload to a Go worker, so the Edge Executor
API fills that gap. Longer term that API will likely need stabilising and versioning.
Architectural decisions
The adr/ directory records the design decisions behind the SDK:
- ADR 0001: bundle-packing options.
- ADR 0002: deliver the bundle packer via the
Go 1.24
tool directive.
- ADR 0003: dual-mode coordinator protocol, where one
binary speaks both go-plugin gRPC (Edge Worker) and msgpack-over-IPC (Python coordinator).
- ADR 0004: the self-contained executable bundle, where
the executable is the bundle.
The normative, language-agnostic on-disk bundle format (the footer layout, manifest fields, and what the
ExecutableCoordinator reads) is specified in
executable-bundle-spec.rst.
airflow-go-pack produces bundles conforming to that spec.
Future Direction
This is more of an "it would be nice to have" than any plan or commitment, and a place to record ideas.
- Defining the whole Dag in the Go SDK, so the Python stub Dag is no longer required and a bundle's
structure and task dependencies can be declared natively in Go.
- The ability to run Airflow tasks "in" an existing code base, i.e. defining an Airflow task function that
runs (in a goroutine) inside an existing app.
- Doing the task function reflection ahead of time, rather than for each Execute call.