execution

package
v1.0.0-beta2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

Package execution implements the SDK coordinator-protocol runtime (msgpack-over-IPC). It is the second mode of bundlev1server.Serve: when the bundle binary is launched with --comm/--logs by the Airflow supervisor (Python ExecutableCoordinator), bundlev1server.Serve dispatches here.

The first inbound frame on the comm socket is a StartupDetails message that drives multi-round task execution.

See go-sdk/adr/0003-coordinator-protocol-msgpack-ipc.md.

Index

Constants

View Source
const MaxFrameSize = 1<<32 - 1

MaxFrameSize is the maximum payload length of a single frame, in bytes. The 4-byte length prefix bounds this to 2^32 - 1; matches Python's cap in task-sdk comms.py:_FrameMixin.as_bytes (n >= 2**32 raises OverflowError).

View Source
const SupervisorSchemaVersion = "2026-06-16"

SupervisorSchemaVersion is the dated AIP-72 supervisor wire-schema version this SDK's coordinator protocol is compiled against, in YYYY-MM-DD form. It is reported in a bundle's airflow-metadata manifest as sdk.supervisor_schema_version so the supervisor can downgrade outbound messages / upgrade inbound messages to a shape the bundle understands.

Variables

View Source
var ErrDispatcherClosed = errors.New("coordinator comm: dispatcher closed")

ErrDispatcherClosed is wrapped into the error Communicate returns once the background reader goroutine has exited — typically because the supervisor closed the comm socket.

Functions

func DumpAirflowMetadata

func DumpAirflowMetadata(bundle bundlev1.BundleProvider, format MetadataFormat) error

DumpAirflowMetadata writes the bundle's airflow-metadata manifest to stdout (YAML by default, JSON when format is MetadataFormatJSON). It runs RegisterDags against an in-memory recorder only — no gRPC server, no external services. airflow-go-pack execs the binary with --airflow-metadata and decodes this output to build the embedded manifest.

func RunTask

func RunTask(
	ctx context.Context,
	bundle bundlev1.Bundle,
	details *StartupDetails,
	comm *CoordinatorComm,
	logger *slog.Logger,
) map[string]any

RunTask executes a task based on StartupDetails received from the supervisor.

It looks up the task in the bundle, creates a CoordinatorClient for SDK calls, executes the task, and returns a terminal message body (SucceedTaskMsg or TaskStateMsg) ready to ship as the final response frame.

The supervisor owns the Execution-API state transitions in coordinator mode, so we deliberately bypass worker.ExecuteTaskWorkload (which drives Run / UpdateState itself) and only invoke the user's task function.

ctx is the task's root context; Serve derives it from SIGINT/SIGTERM, so a cooperative task that honors ctx returns promptly on a supervisor shutdown.

func Serve

func Serve(provider bundlev1.BundleProvider, commAddr, logsAddr string) error

Serve runs the bundle binary in coordinator mode. It dials the supervisor's comm and logs sockets, installs an slog handler that writes JSON-line records to the logs connection, and dispatches on the first frame.

Serve returns nil on a clean shutdown: the task ran and its terminal TaskState/SucceedTask frame was delivered, and the caller should exit 0. A non-nil error indicates a protocol-level failure (connection loss, malformed frames, unknown first message type) that happens before or instead of delivering a terminal frame.

Failure-signaling contract: the caller (main) must turn a non-nil error into a non-zero process exit. The supervisor derives the task's final state primarily from the child's exit code -- a non-zero exit is recorded as FAILED (or UP_FOR_RETRY when retries are configured), and a structured TaskState frame is only honored when the process exits 0 (see the Python supervisor's ActivitySubprocess.final_state). So an early error return here fails closed without needing to send a frame; the post-connect paths below log the reason at Error first so it still reaches the supervisor's log stream over the already-connected logs socket.

Types

type ApiError

type ApiError struct {
	Err    string
	Detail any
}

ApiError represents an error returned by the supervisor over the comm socket.

func (*ApiError) Error

func (e *ApiError) Error() string

type BundleInfoMsg

type BundleInfoMsg struct {
	Name    string
	Version string
}

BundleInfoMsg holds bundle identification from StartupDetails.

type ConnectionResult

type ConnectionResult struct {
	ConnID   string
	ConnType string
	Host     string
	Schema   string
	Login    *string
	Password *string
	Port     int
	Extra    string
}

ConnectionResult is the response to GetConnection. Login and Password are nullable in the supervisor schema (None vs "" are distinct), so they are decoded as *string to preserve that distinction.

type CoordinatorClient

type CoordinatorClient struct {
	// contains filtered or unexported fields
}

CoordinatorClient implements sdk.Client by communicating with the Airflow supervisor over the comm socket using msgpack-framed IPC instead of HTTP.

func NewCoordinatorClient

func NewCoordinatorClient(comm *CoordinatorComm) *CoordinatorClient

NewCoordinatorClient creates a new client backed by the comm socket.

func (*CoordinatorClient) GetConnection

func (c *CoordinatorClient) GetConnection(
	ctx context.Context,
	connID string,
) (sdk.Connection, error)

GetConnection requests a connection from the supervisor.

func (*CoordinatorClient) GetVariable

func (c *CoordinatorClient) GetVariable(ctx context.Context, key string) (string, error)

GetVariable requests a variable value from the supervisor.

func (*CoordinatorClient) GetXCom

func (c *CoordinatorClient) GetXCom(
	ctx context.Context,
	dagId, runId, taskId string,
	mapIndex *int,
	key string,
	_ any,
) (any, error)

GetXCom requests an XCom value from the supervisor.

func (*CoordinatorClient) PushXCom

func (c *CoordinatorClient) PushXCom(
	ctx context.Context,
	ti api.TaskInstance,
	key string,
	value any,
) error

PushXCom sends an XCom value to the supervisor.

func (*CoordinatorClient) UnmarshalJSONVariable

func (c *CoordinatorClient) UnmarshalJSONVariable(
	ctx context.Context,
	key string,
	pointer any,
) error

UnmarshalJSONVariable gets a variable and unmarshals its JSON value.

type CoordinatorComm

type CoordinatorComm struct {
	// contains filtered or unexported fields
}

CoordinatorComm manages bidirectional communication with the Airflow supervisor over a length-prefixed msgpack socket connection.

Reads are multiplexed by frame ID. A single background reader goroutine, lazily started on the first Communicate call, consumes inbound frames and dispatches each one to the Communicate caller whose request used that ID. This lets multiple goroutines call Communicate concurrently without serialising the full send-then-read round trip behind a single mutex, and guarantees the response a caller receives matches the request it sent.

The supervisor's initial StartupDetails frame arrives unsolicited, before any client request is in flight, and is read synchronously via ReadMessage; ReadMessage must not be called after the dispatcher has been started.

func NewCoordinatorComm

func NewCoordinatorComm(reader io.Reader, writer io.Writer, logger *slog.Logger) *CoordinatorComm

NewCoordinatorComm creates a new communication channel.

func (*CoordinatorComm) Communicate

func (c *CoordinatorComm) Communicate(
	ctx context.Context,
	body map[string]any,
) (map[string]any, error)

Communicate sends a request and blocks until the supervisor's response with the matching frame ID is delivered by the dispatcher, ctx is cancelled, or its deadline expires. Safe to call concurrently from multiple goroutines.

If the response carries an error (either as the third element of a 3-tuple frame or as a body whose "type" is "ErrorResponse") it is returned as an *ApiError. If the dispatcher's read loop has terminated, the underlying read error is returned wrapped in ErrDispatcherClosed.

func (*CoordinatorComm) ReadMessage

func (c *CoordinatorComm) ReadMessage() (IncomingFrame, error)

ReadMessage reads and decodes one frame directly from the comm socket. It is used to read the supervisor's initial frame before any request/response traffic begins. Calling it after the dispatcher has started would race the reader goroutine for input bytes, so it returns an error in that case.

func (*CoordinatorComm) SendRequest

func (c *CoordinatorComm) SendRequest(id int64, body map[string]any) error

SendRequest writes a request frame (2-element [id, body]) to the supervisor. Concurrent calls are serialised so frames are never interleaved on the wire.

type ErrorResponse

type ErrorResponse struct {
	Error  string
	Detail any
}

ErrorResponse represents an error returned by the supervisor.

type GetConnectionMsg

type GetConnectionMsg struct {
	ConnID string
}

GetConnectionMsg is sent to request a connection from the supervisor.

type GetVariableMsg

type GetVariableMsg struct {
	Key string
}

GetVariableMsg is sent to request a variable from the supervisor.

type GetXComMsg

type GetXComMsg struct {
	Key               string
	DagID             string
	TaskID            string
	RunID             string
	MapIndex          *int
	IncludePriorDates bool
}

GetXComMsg is sent to request an XCom value from the supervisor.

type IncomingFrame

type IncomingFrame struct {
	ID   int64
	Body map[string]any
	Err  map[string]any // non-nil only for response frames (3-element arrays)
}

IncomingFrame represents a decoded frame received from the comm socket. ID is int64 to match the wire encoding and CoordinatorComm.nextID; narrowing to int would reintroduce wraparound on 32-bit GOARCH.

type MetadataFormat

type MetadataFormat string

MetadataFormat selects the encoding DumpAirflowMetadata writes to stdout for the bundle binary's --airflow-metadata flag.

const (
	// MetadataFormatYAML is the default; it matches the airflow-metadata.yaml a
	// bundle embeds, so `mybundle --airflow-metadata > airflow-metadata.yaml`
	// yields a ready-to-use file.
	MetadataFormatYAML MetadataFormat = "yaml"
	// MetadataFormatJSON is opt-in via --format json.
	MetadataFormatJSON MetadataFormat = "json"
)

func ParseMetadataFormat

func ParseMetadataFormat(s string) (MetadataFormat, error)

ParseMetadataFormat validates a --format value and returns the matching MetadataFormat. An empty value defaults to YAML.

type SetXComMsg

type SetXComMsg struct {
	Key          string
	Value        any
	DagID        string
	TaskID       string
	RunID        string
	MapIndex     *int
	MappedLength *int
}

SetXComMsg is sent to set an XCom value. MapIndex mirrors Python's SetXCom.map_index (int | None): nil means "unmapped task", and is omitted from the wire payload rather than encoded as a -1 sentinel.

type SocketLogHandler

type SocketLogHandler struct {
	// contains filtered or unexported fields
}

SocketLogHandler is an slog.Handler that streams structured JSON log lines to the logs TCP socket. Each log entry is a single JSON object followed by a newline, matching the Airflow log streaming format.

Key mapping:

  • "event" for the log message (not "msg")
  • "level" in lowercase (not "INFO"/"ERROR")
  • "timestamp" in RFC3339Nano format (not "time")
  • Additional attributes are included as top-level fields

Groups are encoded as dotted key prefixes on a flat JSON object (`{"grp.key": "val"}`), not as nested objects. The Airflow supervisor's log-streaming format consumes flat top-level fields, so emitting nested objects here would not be parsed correctly. Do not change this without updating the supervisor side in lockstep.

func NewSocketLogHandler

func NewSocketLogHandler(writer io.Writer, level slog.Level) *SocketLogHandler

NewSocketLogHandler creates a new handler. If writer is nil, messages are buffered until Connect() is called.

func (*SocketLogHandler) Connect

func (h *SocketLogHandler) Connect(w io.Writer)

Connect sets the writer and flushes any buffered log messages.

func (*SocketLogHandler) Enabled

func (h *SocketLogHandler) Enabled(_ context.Context, level slog.Level) bool

func (*SocketLogHandler) Handle

func (h *SocketLogHandler) Handle(_ context.Context, r slog.Record) error

func (*SocketLogHandler) WithAttrs

func (h *SocketLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler

func (*SocketLogHandler) WithGroup

func (h *SocketLogHandler) WithGroup(name string) slog.Handler

type StartupDetails

type StartupDetails struct {
	TI                TaskInstanceInfo
	DagRelPath        string
	BundleInfo        BundleInfoMsg
	StartDate         time.Time
	TIContext         TIRunContext
	SentryIntegration string
}

StartupDetails is sent by the supervisor to initiate task execution.

type SucceedTaskMsg

type SucceedTaskMsg struct {
	EndDate      time.Time
	TaskOutlets  []any
	OutletEvents []any
}

SucceedTaskMsg is sent as a terminal message when a task succeeds.

type TIRunContext

type TIRunContext struct {
	LogicalDate       *time.Time
	DataIntervalStart *time.Time
	DataIntervalEnd   *time.Time
}

TIRunContext holds the runtime context for a task instance.

type TaskInstanceInfo

type TaskInstanceInfo struct {
	ID             string
	TaskID         string
	DagID          string
	RunID          string
	TryNumber      int
	DagVersionID   string
	MapIndex       int
	ContextCarrier map[string]any
}

TaskInstanceInfo holds task instance details from StartupDetails.

type TaskState

type TaskState string

TaskState is the terminal non-success state reported via TaskStateMsg. The wire values match Python's TaskInstanceState enum (and the generated api.TerminalStateNonSuccess); we define a local typed string so call sites get compile-time checking and don't have to import pkg/api just for the constants.

const (
	TaskStateFailed  TaskState = "failed"
	TaskStateRemoved TaskState = "removed"
	TaskStateSkipped TaskState = "skipped"
)

type TaskStateMsg

type TaskStateMsg struct {
	State   TaskState
	EndDate time.Time
}

TaskStateMsg is sent as a terminal message for failed/removed/skipped tasks.

type VariableResult

type VariableResult struct {
	Key   string
	Value any
}

VariableResult is the response to GetVariable.

type XComResult

type XComResult struct {
	Key   string
	Value any
}

XComResult is the response to GetXCom.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL