Documentation
¶
Overview ¶
Package sdk gives task functions access to the Airflow "model" (Variables, Connections, and XCom) at run time.
A task function does not construct a client itself. The runtime inspects the function's parameters and injects one by type, so you declare the narrowest interface you need and use it:
func mytask(ctx context.Context, client sdk.Client, log *slog.Logger) error {
val, err := client.GetVariable(ctx, "my_variable")
if err != nil {
return err
}
log.Info("got variable", "value", val)
return nil
}
Ask for Client for full access, or a narrower interface such as VariableClient or ConnectionClient when the task only reads one kind of object. The narrower type documents what the task touches and makes it easy to pass a fake in unit tests.
To publish a result, return a value from the task function: the runtime pushes it as the task's return-value XCom, so most tasks never call XComClient directly. Lookups that miss return a wrapped sentinel error (VariableNotFound, ConnectionNotFound, XComNotFound) you can test for with errors.Is.
Index ¶
Constants ¶
const ( // VariableEnvPrefix is the environment-variable prefix used as a local // fallback for Variable lookups. GetVariable first checks the process // environment for VariableEnvPrefix plus the uppercased key (so key // "my_var" is read from AIRFLOW_VAR_MY_VAR) before asking the API server, // mirroring the Python SDK and making local development and tests easy. VariableEnvPrefix = "AIRFLOW_VAR_" // ConnectionEnvPrefix is the matching prefix for Connections. The // connection env fallback is not wired up yet, so it is currently unused. ConnectionEnvPrefix = "AIRFLOW_CONN_" )
Variables ¶
var ConnectionNotFound = errors.New("connection not found")
ConnectionNotFound is an error value used to signal that a connection could not be found (and that there were no communication issues with the API server).
See the “GetConnection“ method of ConnectionClient for an example
var VariableNotFound = errors.New("variable not found")
VariableNotFound is an error value used to signal that a variable could not be found (and that there were no communication issues with the API server).
See the “GetVariable“ method of VariableClient for an example
var XComNotFound = errors.New("xcom not found")
XComNotFound is an error value used to signal that an XCom value could not be found (and that there were no communication issues with the API server).
See the “GetXCom“ method of XComClient for an example
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client interface {
VariableClient
ConnectionClient
XComClient
}
Client is the full task-facing API: read Variables and Connections, and read/write XCom. A task that declares an sdk.Client parameter is handed one by the runtime. If a task needs only one capability, ask for the narrower VariableClient, ConnectionClient, or XComClient instead.
func NewClient ¶
func NewClient() Client
NewClient returns the default Client, which serves Variable, Connection, and XCom calls from the Execution API (reading the per-task API client from the context). Task functions are handed a Client by the runtime and rarely need this; it is exported mainly for tests and advanced setups.
type Connection ¶
type Connection struct {
ID string
// The Connection type, as defined in Airflow
Type string
Host string
Port int
// Login/username of the connection. Optional, can be nil (nil is an indication of no login which is distinct from an empty login)
Login *string
// Password of the connection. Optional, can be nil.
Password *string
// The path. Called `Schema` in Airflow python code. For database connections this often contains the
// database name.
Path string
Extra map[string]any
}
Connection is an Airflow Connection resolved for a task: the endpoint and credentials registered in Airflow under a conn_id. Obtain one with ConnectionClient.GetConnection. Optional fields are pointers so an absent value (nil) is distinct from an empty one.
func ConnFromAPIResponse ¶
func ConnFromAPIResponse(resp *api.ConnectionResponse) (Connection, error)
ConnFromAPIResponse converts an Execution-API ConnectionResponse into the SDK's Connection type. It is exported so other internal SDK packages (for example, the coordinator-mode runtime in bundlev1server/impl/coord) can reuse the same conversion.
func (Connection) GetURI ¶
func (c Connection) GetURI() *url.URL
GetURI renders the connection as a URL of the form scheme://login:password@host:port/path?extra. It is the Go equivalent of Python's Connection.get_uri and is handy when a client library wants a single connection string rather than individual fields.
type ConnectionClient ¶
type ConnectionClient interface {
// GetConnection returns the value of an Airflow Connection.
//
// If the conn is not found error will be a wrapped “ConnectionNotFound“:
//
// conn, err := client.GetConnection(ctx, "my-db")
// if errors.Is(err, ConnectionNotFound) {
// // Handle not found, set default, return custom error etc
// } else {
// // Other errors here, such as http network timeouts etc.
// }
GetConnection(ctx context.Context, connID string) (Connection, error)
}
ConnectionClient reads Airflow Connections.
type DagRun ¶
type DagRun struct {
DagID string
RunID string
LogicalDate *time.Time
DataIntervalStart *time.Time
DataIntervalEnd *time.Time
}
DagRun identifies the Dag run the current task instance belongs to and carries its scheduling timestamps. The *time.Time fields are nil when the supervisor did not provide a value (for example, a manually triggered run without a logical date).
type TIRunContext ¶
type TIRunContext interface {
context.Context
// TaskInstance identifies the task instance that is executing.
TaskInstance() TaskInstance
// DagRun identifies the Dag run the task instance belongs to.
DagRun() DagRun
}
TIRunContext is the execution context handed to a task. It behaves as the standard context.Context (cancellation, deadline, request-scoped values) and additionally exposes the identifiers and scheduling timestamps of the task instance that is executing, along with the Dag run it belongs to. It is the Go equivalent of the execution context the Python and Java SDKs expose to task authors.
The runtime injects it into a task function by parameter type, so declare it as the task's context argument:
func myTask(ctx sdk.TIRunContext, log *slog.Logger) error {
log.Info("running",
"task_id", ctx.TaskInstance().TaskID,
"run_id", ctx.DagRun().RunID,
)
return nil
}
Because it embeds context.Context it is usable wherever one is expected: pass it straight to client calls, select on ctx.Done(), or hand it to downstream helpers that take a context.Context.
It is an interface rather than a struct holding a context.Context, which the context package advises against (https://pkg.go.dev/context#hdr-Contexts_and_structs): the runtime constructs a fresh value around the live task context for each invocation, and task code cannot end up with a half-initialised value. Build one in tests with NewTIRunContext.
func NewTIRunContext ¶
func NewTIRunContext(ctx context.Context, ti TaskInstance, dagRun DagRun) TIRunContext
NewTIRunContext returns a TIRunContext that delegates context behaviour to ctx and exposes ti and dagRun. It panics on a nil ctx, mirroring the context package's own constructors. The runtime calls it when binding a task's TIRunContext parameter; in unit tests, use it to hand-build the argument:
ctx := sdk.NewTIRunContext(context.Background(), sdk.TaskInstance{TaskID: "t1"}, sdk.DagRun{})
type TaskInstance ¶
type TaskInstance struct {
DagID string
RunID string
TaskID string
// MapIndex is the index within a dynamically mapped task, or nil for an
// unmapped (regular) task instance.
MapIndex *int
TryNumber int
}
TaskInstance identifies the currently executing task instance.
type VariableClient ¶
type VariableClient interface {
// GetVariable returns the value of an Airflow Variable.
//
// It will first look in the os.environ for the appropriately named variable, and if not found there will
// fallback to asking the API server
//
// If the variable is not found error will be a wrapped “VariableNotFound“:
//
// val, err := client.GetVariable(ctx, "my-var")
// if errors.Is(err, VariableNotFound) {
// // Handle not found, set default, return custom error etc
// } else {
// // Other errors here, such as http network timeouts etc.
// }
GetVariable(ctx context.Context, key string) (string, error)
// UnmarshalJSONVariable fetches a variable and unmarshals its value into
// pointer via json.Unmarshal. Use this when the variable was stored as a
// JSON object, array, or number; for plain string variables call
// GetVariable directly.
//
// pointer must be a non-nil pointer, as required by encoding/json.
UnmarshalJSONVariable(ctx context.Context, key string, pointer any) error
}
VariableClient reads Airflow Variables.
Go has no function overloading, so the "give me the raw string" and "give me a decoded struct" cases are split into two methods rather than one polymorphic call: GetVariable returns the raw string, UnmarshalJSONVariable decodes a JSON-encoded variable into a caller-supplied pointer. This mirrors the std-lib split between os.LookupEnv and json.Unmarshal — each method has one job, and the caller picks based on how the variable was stored.
type XComClient ¶
type XComClient interface {
// GetXCom returns the value stored under key by the task identified by
// dagId/runId/taskId. For a mapped task instance pass its mapIndex,
// otherwise pass nil. If no value exists the error wraps XComNotFound.
//
// value is reserved for future typed decoding and is currently ignored; the
// stored value is returned as the first result instead.
GetXCom(
ctx context.Context,
dagId, runId, taskId string,
mapIndex *int,
key string,
value any,
) (any, error)
// PushXCom stores value under key for the given task instance ti.
PushXCom(ctx context.Context, ti api.TaskInstance, key string, value any) error
}
XComClient reads and writes XCom values. Most tasks never need this: to publish a result, return a value from the task function and the runtime pushes it as the return-value XCom. Reach for these methods only to read another task's XCom, or to push under a custom key.