workflowrunner

package
v0.12.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2026 License: Apache-2.0 Imports: 24 Imported by: 0

README

Workflow Runner (POC)

The workflow runner is an experimental orchestration layer that lets you execute declarative multi-agent workflows on top of the OpenAI Agents Go SDK. It takes a JSON payload that describes agents, tools, guardrails, and session settings, turns it into live SDK objects, and runs the conversation while streaming back events, state updates, and final results.

Status: Proof of concept / work in progress. The API, wiring, and filesystem layout may change without notice as we refine the feature.

Why it exists

  • Provide a hosted-workflow-style interface for Go services that want to run declarative agent flows without hand-coding orchestration logic.
  • Make multi-agent runs observable: every lifecycle event can be streamed to callbacks, persisted for dashboards, or printed to stdout for local debugging.
  • Enable production patterns such as resumable sessions, tracing, tool approval queues, and safety guardrails using the existing Agents SDK features.

Core capabilities

  • Validates and materializes WorkflowRequest payloads into configured agents, guardrails, tools, and output types (Builder).
  • Runs the workflow asynchronously through RunnerService.Execute, returning an asynctask handle for polling or awaiting.
  • Streams events to HTTP endpoints or stdout printers while keeping an ExecutionStateStore in sync (in-memory by default, pluggable for shared storage).
  • Integrates with OpenAI tracing so each run shows up in traces with workflow metadata.
  • Supports hosted MCP tools and guardrail registries out of the box.

Architecture overview

flowchart TD
    Caller["Client\n(service or CLI)"]
    Request["WorkflowRequest JSON"]
    RunnerService["RunnerService"]
    Builder["Builder\n(agent graph & run config)"]
    AgentsSDK["Agents SDK\n(agents.Runner)"]
    StateStore["ExecutionStateStore"]
    Callback["CallbackPublisher\n(HTTP | Stdout)"]

    Caller -->|constructs| Request
    Request --> RunnerService
    RunnerService --> Builder
    Builder --> AgentsSDK
    RunnerService -->|RunStreamed| AgentsSDK
    RunnerService --> StateStore
    RunnerService --> Callback
    AgentsSDK -->|stream events| RunnerService
    RunnerService -->|final summary| Caller

Execution timeline

sequenceDiagram
    participant Client
    participant RunnerService
    participant Builder
    participant AgentsRunner as agents.Runner
    participant Callback as CallbackPublisher
    participant Store as ExecutionStateStore

    Client->>RunnerService: Execute(ctx, WorkflowRequest)
    RunnerService->>Builder: Build(ctx, request)
    Builder-->>RunnerService: BuildResult (agents, session, runner)
    RunnerService->>AgentsRunner: RunStreamed(startingAgent, query)
    RunnerService->>Store: OnRunStarted / Save
    loop streamed events
        AgentsRunner-->>RunnerService: StreamEvent
        RunnerService->>Store: Update state
        RunnerService->>Callback: Publish run.event (async best-effort)
    end
    AgentsRunner-->>RunnerService: FinalOutput + run summary
    RunnerService->>Store: OnRunCompleted / Save
    RunnerService->>Callback: Publish run.completed
    RunnerService-->>Client: Task result (RunSummary)

Getting started

builder := workflowrunner.NewDefaultBuilder()
service := workflowrunner.NewRunnerService(builder)

req := workflowrunner.WorkflowRequest{
    Query: "List three fun facts about Mars.",
    Session: workflowrunner.SessionDeclaration{
        SessionID: "demo-simple",
        Credentials: workflowrunner.CredentialDeclaration{
            UserID:    "user-123",
            AccountID: "acct-456",
        },
        HistorySize: 10,
        MaxTurns:    8,
    },
    Callback: workflowrunner.CallbackDeclaration{
        Mode: "stdout", // "http" is also supported
    },
    Workflow: workflowrunner.WorkflowDeclaration{
        Name:          "simple_assistant",
        StartingAgent: "assistant",
        Agents: []workflowrunner.AgentDeclaration{
            {
                Name:         "assistant",
                Instructions: "You are an enthusiastic planetary science assistant.",
                Model: &workflowrunner.ModelDeclaration{
                    Model:       "gpt-4o-mini",
                    Temperature: floatPtr(0.3),
                },
            },
        },
    },
}

task, err := service.Execute(ctx, req)
if err != nil {
    log.Fatalf("build or run failed: %v", err)
}

summary := task.Await()
if summary.Error != nil {
    log.Printf("run failed: %v", summary.Error)
} else {
    log.Printf("final output: %v", summary.Value.FinalOutput)
}
  • See workflowrunner/examples/simple and workflowrunner/examples/complex for runnable end-to-end demos.
  • The runner requires an OpenAI API key (OPENAI_API_KEY) to be present in the environment because agents ultimately call OpenAI models.

Callback modes

  • mode: "http" (default): events are POSTed to the provided target URL as JSON payloads (run.started, run.event, run.completed, run.failed).
  • mode: "stdout" / "stdout_verbose": events are printed to stdout in a human friendly format for local testing; verbose mode also dumps final output.

State tracking & approvals

  • Every run persists a WorkflowExecutionState entry containing status, last-agent information, last response ID, and optional final output.
  • MCP approval requests automatically push the execution into the waiting_approval status so a UI can pause the run until a response arrives.
  • The default store is in-memory; to make runs resumable across processes, implement ExecutionStateStore against your data layer (e.g., Postgres, Redis, Firestore).

Limitations & roadmap

  • SQLite-backed session factory targets local experimentation; production builds may need pluggable stores and rotation policies.
  • Only a subset of tool types and guardrails are registered; expand by adding new factories to Builder.
  • Error handling is best-effort: callback publishing failures are logged but currently do not short-circuit runs.
  • Tracing and state APIs may evolve—expect breaking changes until this moves out of POC status.

What’s next

  1. Support multiple model providers directly inside declarative workflow definitions so agents can route across backends.
  2. Integrate LLM observability (e.g., LangSmith, custom tracing exporters) into the runner lifecycle.
  3. Verify human-in-the-loop flows where executions halt on approval callbacks and resume from the exact same point once cleared.
  4. Expand workflow examples and automated tests to cover richer scenarios and guard future changes.

Resources

We welcome feedback and issues while this matures; please treat the interface as unstable and pin revisions carefully if you integrate it early.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ValidateWorkflowRequest

func ValidateWorkflowRequest(req WorkflowRequest) error

ValidateWorkflowRequest performs structural validation and returns an error describing the first issue encountered.

Types

type AgentDeclaration

type AgentDeclaration struct {
	Name               string                 `json:"name"`
	DisplayName        string                 `json:"display_name,omitempty"`
	Instructions       string                 `json:"instructions,omitempty"`
	PromptID           string                 `json:"prompt_id,omitempty"`
	Model              *ModelDeclaration      `json:"model,omitempty"`
	Handoffs           []string               `json:"handoff,omitempty"`
	AgentTools         []AgentToolReference   `json:"agent_tools,omitempty"`
	Tools              []ToolDeclaration      `json:"tools,omitempty"`
	MCPServers         []MCPDeclaration       `json:"mcp,omitempty"`
	InputGuardrails    []GuardrailDeclaration `json:"input_guardrails,omitempty"`
	OutputGuardrails   []GuardrailDeclaration `json:"output_guardrails,omitempty"`
	OutputType         *OutputTypeDeclaration `json:"output_type,omitempty"`
	HandoffDescription string                 `json:"handoff_description,omitempty"`
	Annotations        map[string]any         `json:"annotations,omitempty"`
}

AgentDeclaration captures the configuration of a single agent.

type AgentToolReference

type AgentToolReference struct {
	AgentName   string `json:"agent_name"`
	ToolName    string `json:"tool_name,omitempty"`
	Description string `json:"description,omitempty"`
}

AgentToolReference allows referencing another agent as a tool.

type ApprovalRequestState

type ApprovalRequestState struct {
	RequestID   string    `json:"request_id"`
	AgentName   string    `json:"agent_name"`
	ToolName    string    `json:"tool_name"`
	ServerLabel string    `json:"server_label"`
	Arguments   string    `json:"arguments"`
	CreatedAt   time.Time `json:"created_at"`
}

type BuildResult

type BuildResult struct {
	StartingAgent *agents.Agent
	AgentMap      map[string]*agents.Agent
	Runner        agents.Runner
	Session       memory.Session
	WorkflowName  string
	TraceMetadata map[string]any
}

BuildResult contains the artifacts required to execute a workflow.

type Builder

type Builder struct {
	ToolFactories       map[string]ToolFactory
	OutputTypeFactories map[string]OutputTypeFactory
	SessionFactory      SessionFactory
}

Builder converts declarative workflow payloads into executable SDK primitives.

func NewDefaultBuilder

func NewDefaultBuilder() *Builder

NewDefaultBuilder returns a Builder with the builtin registries initialized.

func (*Builder) Build

func (b *Builder) Build(ctx context.Context, req WorkflowRequest) (*BuildResult, error)

Build constructs agents, run configuration, and session resources from the request.

type CallbackDeclaration

type CallbackDeclaration struct {
	Target string `json:"target"`
	Mode   string `json:"mode,omitempty"`
}

CallbackDeclaration describes how streaming events should be published.

func (*CallbackDeclaration) UnmarshalJSON

func (c *CallbackDeclaration) UnmarshalJSON(data []byte) error

UnmarshalJSON allows callback to be provided as string or object.

func (*CallbackDeclaration) Validate

func (c *CallbackDeclaration) Validate() error

Validate performs shallow validation of the callback declaration.

type CallbackEvent

type CallbackEvent struct {
	Type      string         `json:"type"`
	Timestamp time.Time      `json:"timestamp"`
	Payload   any            `json:"payload,omitempty"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

CallbackEvent describes an update emitted during a workflow run.

type CallbackPublisher

type CallbackPublisher interface {
	Publish(ctx context.Context, event CallbackEvent) error
}

CallbackPublisher publishes streaming events to an external sink.

type CredentialDeclaration

type CredentialDeclaration struct {
	UserID       string         `json:"user_id"`
	AccountID    string         `json:"account_id"`
	Capabilities []string       `json:"capabilities,omitempty"`
	Metadata     map[string]any `json:"metadata,omitempty"`
}

CredentialDeclaration contains minimal identity data used for validation / logging.

type ExecutionStateStore

type ExecutionStateStore interface {
	Save(ctx context.Context, state WorkflowExecutionState) error
	Load(ctx context.Context, sessionID string) (WorkflowExecutionState, bool, error)
	Clear(ctx context.Context, sessionID string) error
}

type ExecutionStatus

type ExecutionStatus string
const (
	ExecutionStatusIdle            ExecutionStatus = "idle"
	ExecutionStatusRunning         ExecutionStatus = "running"
	ExecutionStatusWaitingApproval ExecutionStatus = "waiting_approval"
	ExecutionStatusCompleted       ExecutionStatus = "completed"
	ExecutionStatusFailed          ExecutionStatus = "failed"
)

type GuardrailDeclaration

type GuardrailDeclaration struct {
	Name   string         `json:"name"`
	Config map[string]any `json:"config,omitempty"`
	Target string         `json:"target,omitempty"`
}

GuardrailDeclaration references a reusable guardrail preset.

type GuardrailFactory

type GuardrailFactory func(ctx context.Context, decl GuardrailDeclaration) (agents.InputGuardrail, error)

GuardrailFactory is a placeholder for future guardrail registry integration.

type HTTPCallbackPublisher

type HTTPCallbackPublisher struct {
	URL string
	// contains filtered or unexported fields
}

HTTPCallbackPublisher POSTs events to a configured endpoint as JSON.

func NewHTTPCallbackPublisher

func NewHTTPCallbackPublisher(url string, client *http.Client) *HTTPCallbackPublisher

NewHTTPCallbackPublisher constructs an HTTP publisher with an optional custom client.

func (*HTTPCallbackPublisher) Publish

func (p *HTTPCallbackPublisher) Publish(ctx context.Context, event CallbackEvent) error

type InMemoryExecutionStateStore

type InMemoryExecutionStateStore struct {
	// contains filtered or unexported fields
}

func NewInMemoryExecutionStateStore

func NewInMemoryExecutionStateStore() *InMemoryExecutionStateStore

func (*InMemoryExecutionStateStore) Clear

func (s *InMemoryExecutionStateStore) Clear(_ context.Context, sessionID string) error

func (*InMemoryExecutionStateStore) Load

func (*InMemoryExecutionStateStore) Save

type MCPDeclaration

type MCPDeclaration struct {
	Type            string         `json:"type,omitempty"`
	ServerLabel     string         `json:"server_label,omitempty"`
	Address         string         `json:"address"`
	RequireApproval string         `json:"require_approval,omitempty"`
	Additional      map[string]any `json:"additional,omitempty"`
}

MCPDeclaration configures hosted or stdio MCP servers.

type ModelDeclaration

type ModelDeclaration struct {
	Provider     string                `json:"provider,omitempty"`
	Model        string                `json:"model"`
	Temperature  *float64              `json:"temperature,omitempty"`
	TopP         *float64              `json:"top_p,omitempty"`
	MaxTokens    *int64                `json:"max_tokens,omitempty"`
	Reasoning    *ReasoningDeclaration `json:"reasoning,omitempty"`
	Verbosity    string                `json:"verbosity,omitempty"`
	Metadata     map[string]string     `json:"metadata,omitempty"`
	ExtraHeaders map[string]string     `json:"extra_headers,omitempty"`
	ExtraQuery   map[string]string     `json:"extra_query,omitempty"`
	ToolChoice   string                `json:"tool_choice,omitempty"`
}

ModelDeclaration indicates which model/provider to use and optional settings.

type OutputTypeDeclaration

type OutputTypeDeclaration struct {
	Name   string         `json:"name"`
	Strict bool           `json:"strict,omitempty"`
	Schema map[string]any `json:"schema,omitempty"`
}

OutputTypeDeclaration describes the expected structured output.

type OutputTypeFactory

type OutputTypeFactory func(ctx context.Context, decl OutputTypeDeclaration) (agents.OutputTypeInterface, error)

OutputTypeFactory produces custom output type implementations.

type ReasoningDeclaration

type ReasoningDeclaration struct {
	Effort  string `json:"effort,omitempty"`
	Summary string `json:"summary,omitempty"`
}

ReasoningDeclaration mirrors the subset of OpenAI reasoning parameters we support.

type RunSummary

type RunSummary struct {
	WorkflowName   string           `json:"workflow_name"`
	SessionID      string           `json:"session_id"`
	FinalOutput    any              `json:"final_output"`
	NewItems       []agents.RunItem `json:"-"`
	LastResponseID string           `json:"last_response_id"`
	Error          error            `json:"error,omitempty"`
}

RunSummary holds metadata about a completed run.

type RunnerService

type RunnerService struct {
	Builder         *Builder
	CallbackFactory func(ctx context.Context, decl CallbackDeclaration) (CallbackPublisher, error)
	StateStore      ExecutionStateStore
}

RunnerService orchestrates building and executing workflow requests.

func NewRunnerService

func NewRunnerService(builder *Builder) *RunnerService

NewRunnerService constructs a RunnerService with sensible defaults.

func (*RunnerService) Execute

Execute validates, builds, and runs the workflow asynchronously.

type SessionDeclaration

type SessionDeclaration struct {
	SessionID   string                `json:"session_id"`
	HistorySize int                   `json:"history_size,omitempty"`
	MaxTurns    int                   `json:"max_turns,omitempty"`
	Credentials CredentialDeclaration `json:"credentials"`
}

SessionDeclaration carries caller-provided state and execution limits.

type SessionFactory

type SessionFactory func(ctx context.Context, decl SessionDeclaration) (memory.Session, error)

SessionFactory allocates or loads a conversational session.

func NewSQLiteSessionFactory

func NewSQLiteSessionFactory(baseDir string) SessionFactory

NewSQLiteSessionFactory stores sessions on-disk inside baseDir (created if needed).

type StdoutCallbackPublisher

type StdoutCallbackPublisher struct{}

StdoutCallbackPublisher prints events to stdout (useful for local testing).

func (StdoutCallbackPublisher) Publish

type ToolDeclaration

type ToolDeclaration struct {
	Type   string         `json:"type"`
	Name   string         `json:"name,omitempty"`
	Config map[string]any `json:"config,omitempty"`
}

ToolDeclaration represents a tool that should be attached to an agent.

type ToolFactory

type ToolFactory func(ctx context.Context, decl ToolDeclaration, env ToolFactoryEnv) (agents.Tool, error)

ToolFactory creates an agents.Tool from the declaration.

type ToolFactoryEnv

type ToolFactoryEnv struct {
	AgentName       string
	WorkflowName    string
	RequestMetadata map[string]any
}

ToolFactoryEnv provides context when constructing tools.

type WorkflowDeclaration

type WorkflowDeclaration struct {
	Name          string             `json:"name"`
	StartingAgent string             `json:"starting_agent"`
	Agents        []AgentDeclaration `json:"agents"`
	Metadata      map[string]any     `json:"metadata,omitempty"`
}

WorkflowDeclaration defines the agent graph that should be executed.

type WorkflowExecutionState

type WorkflowExecutionState struct {
	SessionID        string                 `json:"session_id"`
	WorkflowName     string                 `json:"workflow_name"`
	Status           ExecutionStatus        `json:"status"`
	LastAgent        string                 `json:"last_agent"`
	LastResponseID   string                 `json:"last_response_id"`
	LastQuery        string                 `json:"last_query"`
	LastError        string                 `json:"last_error"`
	PendingApprovals []ApprovalRequestState `json:"pending_approvals"`
	FinalOutput      any                    `json:"final_output,omitempty"`
	UpdatedAt        time.Time              `json:"updated_at"`
}

type WorkflowRequest

type WorkflowRequest struct {
	Query    string              `json:"query"`
	Session  SessionDeclaration  `json:"session"`
	Callback CallbackDeclaration `json:"callback"`
	Workflow WorkflowDeclaration `json:"workflow"`
	Metadata map[string]any      `json:"metadata,omitempty"`
	Context  map[string]any      `json:"context,omitempty"`
}

WorkflowRequest represents the top-level payload describing a workflow run.

Directories

Path Synopsis
examples
complex command
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL