Documentation
¶
Overview ¶
Package execution implements the SDK coordinator-protocol runtime (msgpack-over-IPC). It is the second mode of bundlev1server.Serve: when the bundle binary is launched with --comm/--logs by the Airflow supervisor (Python ExecutableCoordinator), bundlev1server.Serve dispatches here.
The first inbound frame on the comm socket is a StartupDetails message that drives multi-round task execution.
See go-sdk/adr/0003-coordinator-protocol-msgpack-ipc.md.
Index ¶
- Constants
- Variables
- func DumpAirflowMetadata(bundle bundlev1.BundleProvider, format MetadataFormat) error
- func RunTask(ctx context.Context, bundle bundlev1.Bundle, details *StartupDetails, ...) map[string]any
- func Serve(provider bundlev1.BundleProvider, commAddr, logsAddr string) error
- type ApiError
- type BundleInfoMsg
- type ConnectionResult
- type CoordinatorClient
- func (c *CoordinatorClient) GetConnection(ctx context.Context, connID string) (sdk.Connection, error)
- func (c *CoordinatorClient) GetVariable(ctx context.Context, key string) (string, error)
- func (c *CoordinatorClient) GetXCom(ctx context.Context, dagId, runId, taskId string, mapIndex *int, key string, ...) (any, error)
- func (c *CoordinatorClient) PushXCom(ctx context.Context, ti api.TaskInstance, key string, value any) error
- func (c *CoordinatorClient) UnmarshalJSONVariable(ctx context.Context, key string, pointer any) error
- type CoordinatorComm
- type ErrorResponse
- type GetConnectionMsg
- type GetVariableMsg
- type GetXComMsg
- type IncomingFrame
- type MetadataFormat
- type SetXComMsg
- type SocketLogHandler
- func (h *SocketLogHandler) Connect(w io.Writer)
- func (h *SocketLogHandler) Enabled(_ context.Context, level slog.Level) bool
- func (h *SocketLogHandler) Handle(_ context.Context, r slog.Record) error
- func (h *SocketLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler
- func (h *SocketLogHandler) WithGroup(name string) slog.Handler
- type StartupDetails
- type SucceedTaskMsg
- type TIRunContext
- type TaskInstanceInfo
- type TaskState
- type TaskStateMsg
- type VariableResult
- type XComResult
Constants ¶
const MaxFrameSize = 1<<32 - 1
MaxFrameSize is the maximum payload length of a single frame, in bytes. The 4-byte length prefix bounds this to 2^32 - 1; matches Python's cap in task-sdk comms.py:_FrameMixin.as_bytes (n >= 2**32 raises OverflowError).
const SupervisorSchemaVersion = "2026-06-16"
SupervisorSchemaVersion is the dated AIP-72 supervisor wire-schema version this SDK's coordinator protocol is compiled against, in YYYY-MM-DD form. It is reported in a bundle's airflow-metadata manifest as sdk.supervisor_schema_version so the supervisor can downgrade outbound messages / upgrade inbound messages to a shape the bundle understands.
Variables ¶
var ErrDispatcherClosed = errors.New("coordinator comm: dispatcher closed")
ErrDispatcherClosed is wrapped into the error Communicate returns once the background reader goroutine has exited — typically because the supervisor closed the comm socket.
Functions ¶
func DumpAirflowMetadata ¶
func DumpAirflowMetadata(bundle bundlev1.BundleProvider, format MetadataFormat) error
DumpAirflowMetadata writes the bundle's airflow-metadata manifest to stdout (YAML by default, JSON when format is MetadataFormatJSON). It runs RegisterDags against an in-memory recorder only — no gRPC server, no external services. airflow-go-pack execs the binary with --airflow-metadata and decodes this output to build the embedded manifest.
func RunTask ¶
func RunTask( ctx context.Context, bundle bundlev1.Bundle, details *StartupDetails, comm *CoordinatorComm, logger *slog.Logger, ) map[string]any
RunTask executes a task based on StartupDetails received from the supervisor.
It looks up the task in the bundle, creates a CoordinatorClient for SDK calls, executes the task, and returns a terminal message body (SucceedTaskMsg or TaskStateMsg) ready to ship as the final response frame.
The supervisor owns the Execution-API state transitions in coordinator mode, so we deliberately bypass worker.ExecuteTaskWorkload (which drives Run / UpdateState itself) and only invoke the user's task function.
ctx is the task's root context; Serve derives it from SIGINT/SIGTERM, so a cooperative task that honors ctx returns promptly on a supervisor shutdown.
func Serve ¶
func Serve(provider bundlev1.BundleProvider, commAddr, logsAddr string) error
Serve runs the bundle binary in coordinator mode. It dials the supervisor's comm and logs sockets, installs an slog handler that writes JSON-line records to the logs connection, and dispatches on the first frame.
Serve returns nil on a clean shutdown: the task ran and its terminal TaskState/SucceedTask frame was delivered, and the caller should exit 0. A non-nil error indicates a protocol-level failure (connection loss, malformed frames, unknown first message type) that happens before or instead of delivering a terminal frame.
Failure-signaling contract: the caller (main) must turn a non-nil error into a non-zero process exit. The supervisor derives the task's final state primarily from the child's exit code -- a non-zero exit is recorded as FAILED (or UP_FOR_RETRY when retries are configured), and a structured TaskState frame is only honored when the process exits 0 (see the Python supervisor's ActivitySubprocess.final_state). So an early error return here fails closed without needing to send a frame; the post-connect paths below log the reason at Error first so it still reaches the supervisor's log stream over the already-connected logs socket.
Types ¶
type BundleInfoMsg ¶
BundleInfoMsg holds bundle identification from StartupDetails.
type ConnectionResult ¶
type ConnectionResult struct {
ConnID string
ConnType string
Host string
Schema string
Login *string
Password *string
Port int
Extra string
}
ConnectionResult is the response to GetConnection. Login and Password are nullable in the supervisor schema (None vs "" are distinct), so they are decoded as *string to preserve that distinction.
type CoordinatorClient ¶
type CoordinatorClient struct {
// contains filtered or unexported fields
}
CoordinatorClient implements sdk.Client by communicating with the Airflow supervisor over the comm socket using msgpack-framed IPC instead of HTTP.
func NewCoordinatorClient ¶
func NewCoordinatorClient(comm *CoordinatorComm) *CoordinatorClient
NewCoordinatorClient creates a new client backed by the comm socket.
func (*CoordinatorClient) GetConnection ¶
func (c *CoordinatorClient) GetConnection( ctx context.Context, connID string, ) (sdk.Connection, error)
GetConnection requests a connection from the supervisor.
func (*CoordinatorClient) GetVariable ¶
GetVariable requests a variable value from the supervisor.
func (*CoordinatorClient) GetXCom ¶
func (c *CoordinatorClient) GetXCom( ctx context.Context, dagId, runId, taskId string, mapIndex *int, key string, _ any, ) (any, error)
GetXCom requests an XCom value from the supervisor.
func (*CoordinatorClient) PushXCom ¶
func (c *CoordinatorClient) PushXCom( ctx context.Context, ti api.TaskInstance, key string, value any, ) error
PushXCom sends an XCom value to the supervisor.
func (*CoordinatorClient) UnmarshalJSONVariable ¶
func (c *CoordinatorClient) UnmarshalJSONVariable( ctx context.Context, key string, pointer any, ) error
UnmarshalJSONVariable gets a variable and unmarshals its JSON value.
type CoordinatorComm ¶
type CoordinatorComm struct {
// contains filtered or unexported fields
}
CoordinatorComm manages bidirectional communication with the Airflow supervisor over a length-prefixed msgpack socket connection.
Reads are multiplexed by frame ID. A single background reader goroutine, lazily started on the first Communicate call, consumes inbound frames and dispatches each one to the Communicate caller whose request used that ID. This lets multiple goroutines call Communicate concurrently without serialising the full send-then-read round trip behind a single mutex, and guarantees the response a caller receives matches the request it sent.
The supervisor's initial StartupDetails frame arrives unsolicited, before any client request is in flight, and is read synchronously via ReadMessage; ReadMessage must not be called after the dispatcher has been started.
func NewCoordinatorComm ¶
NewCoordinatorComm creates a new communication channel.
func (*CoordinatorComm) Communicate ¶
func (c *CoordinatorComm) Communicate( ctx context.Context, body map[string]any, ) (map[string]any, error)
Communicate sends a request and blocks until the supervisor's response with the matching frame ID is delivered by the dispatcher, ctx is cancelled, or its deadline expires. Safe to call concurrently from multiple goroutines.
If the response carries an error (either as the third element of a 3-tuple frame or as a body whose "type" is "ErrorResponse") it is returned as an *ApiError. If the dispatcher's read loop has terminated, the underlying read error is returned wrapped in ErrDispatcherClosed.
func (*CoordinatorComm) ReadMessage ¶
func (c *CoordinatorComm) ReadMessage() (IncomingFrame, error)
ReadMessage reads and decodes one frame directly from the comm socket. It is used to read the supervisor's initial frame before any request/response traffic begins. Calling it after the dispatcher has started would race the reader goroutine for input bytes, so it returns an error in that case.
func (*CoordinatorComm) SendRequest ¶
func (c *CoordinatorComm) SendRequest(id int64, body map[string]any) error
SendRequest writes a request frame (2-element [id, body]) to the supervisor. Concurrent calls are serialised so frames are never interleaved on the wire.
type ErrorResponse ¶
ErrorResponse represents an error returned by the supervisor.
type GetConnectionMsg ¶
type GetConnectionMsg struct {
ConnID string
}
GetConnectionMsg is sent to request a connection from the supervisor.
type GetVariableMsg ¶
type GetVariableMsg struct {
Key string
}
GetVariableMsg is sent to request a variable from the supervisor.
type GetXComMsg ¶
type GetXComMsg struct {
Key string
DagID string
TaskID string
RunID string
MapIndex *int
IncludePriorDates bool
}
GetXComMsg is sent to request an XCom value from the supervisor.
type IncomingFrame ¶
type IncomingFrame struct {
ID int64
Body map[string]any
Err map[string]any // non-nil only for response frames (3-element arrays)
}
IncomingFrame represents a decoded frame received from the comm socket. ID is int64 to match the wire encoding and CoordinatorComm.nextID; narrowing to int would reintroduce wraparound on 32-bit GOARCH.
type MetadataFormat ¶
type MetadataFormat string
MetadataFormat selects the encoding DumpAirflowMetadata writes to stdout for the bundle binary's --airflow-metadata flag.
const ( // MetadataFormatYAML is the default; it matches the airflow-metadata.yaml a // bundle embeds, so `mybundle --airflow-metadata > airflow-metadata.yaml` // yields a ready-to-use file. MetadataFormatYAML MetadataFormat = "yaml" // MetadataFormatJSON is opt-in via --format json. MetadataFormatJSON MetadataFormat = "json" )
func ParseMetadataFormat ¶
func ParseMetadataFormat(s string) (MetadataFormat, error)
ParseMetadataFormat validates a --format value and returns the matching MetadataFormat. An empty value defaults to YAML.
type SetXComMsg ¶
type SetXComMsg struct {
Key string
Value any
DagID string
TaskID string
RunID string
MapIndex *int
MappedLength *int
}
SetXComMsg is sent to set an XCom value. MapIndex mirrors Python's SetXCom.map_index (int | None): nil means "unmapped task", and is omitted from the wire payload rather than encoded as a -1 sentinel.
type SocketLogHandler ¶
type SocketLogHandler struct {
// contains filtered or unexported fields
}
SocketLogHandler is an slog.Handler that streams structured JSON log lines to the logs TCP socket. Each log entry is a single JSON object followed by a newline, matching the Airflow log streaming format.
Key mapping:
- "event" for the log message (not "msg")
- "level" in lowercase (not "INFO"/"ERROR")
- "timestamp" in RFC3339Nano format (not "time")
- Additional attributes are included as top-level fields
Groups are encoded as dotted key prefixes on a flat JSON object (`{"grp.key": "val"}`), not as nested objects. The Airflow supervisor's log-streaming format consumes flat top-level fields, so emitting nested objects here would not be parsed correctly. Do not change this without updating the supervisor side in lockstep.
func NewSocketLogHandler ¶
func NewSocketLogHandler(writer io.Writer, level slog.Level) *SocketLogHandler
NewSocketLogHandler creates a new handler. If writer is nil, messages are buffered until Connect() is called.
func (*SocketLogHandler) Connect ¶
func (h *SocketLogHandler) Connect(w io.Writer)
Connect sets the writer and flushes any buffered log messages.
type StartupDetails ¶
type StartupDetails struct {
TI TaskInstanceInfo
DagRelPath string
BundleInfo BundleInfoMsg
StartDate time.Time
TIContext TIRunContext
SentryIntegration string
}
StartupDetails is sent by the supervisor to initiate task execution.
type SucceedTaskMsg ¶
SucceedTaskMsg is sent as a terminal message when a task succeeds.
type TIRunContext ¶
type TIRunContext struct {
LogicalDate *time.Time
DataIntervalStart *time.Time
DataIntervalEnd *time.Time
}
TIRunContext holds the runtime context for a task instance.
type TaskInstanceInfo ¶
type TaskInstanceInfo struct {
ID string
TaskID string
DagID string
RunID string
TryNumber int
DagVersionID string
MapIndex int
ContextCarrier map[string]any
}
TaskInstanceInfo holds task instance details from StartupDetails.
type TaskState ¶
type TaskState string
TaskState is the terminal non-success state reported via TaskStateMsg. The wire values match Python's TaskInstanceState enum (and the generated api.TerminalStateNonSuccess); we define a local typed string so call sites get compile-time checking and don't have to import pkg/api just for the constants.
type TaskStateMsg ¶
TaskStateMsg is sent as a terminal message for failed/removed/skipped tasks.
type VariableResult ¶
VariableResult is the response to GetVariable.
type XComResult ¶
XComResult is the response to GetXCom.