orchestrator

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2025 License: Apache-2.0 Imports: 5 Imported by: 1

Documentation

Overview

Package orchestrator exposes a lightweight programmatic emit/await API for task actions. It allows tasks (e.g., a ReAct plan step) to fan-out child executions and optionally await their completion without modifying the workflow YAML. The orchestrator is injected into the execution context and retrieved in the action via ctx.Value(orchestrator.OrchestratorContextKey).

Index

Constants

View Source
const OrchestratorContextKey = "fluxor.orchestrator"

OrchestratorContextKey is the string key used to inject the orchestrator into the per-execution context. Processor copies session execution-context entries into the action context using these string keys.

Variables

This section is empty.

Functions

This section is empty.

Types

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

Orchestrator exposes programmatic fan-out/fan-in helpers that tasks can use from inside their action implementation.

func FromContext

func FromContext(ctx context.Context) (*Orchestrator, bool)

FromContext extracts an orchestrator previously injected into the execution context. Callers should check the boolean return value.

func New

func New(rt Runtime) *Orchestrator

NewOrchestrator wraps the provided runtime to expose programmatic emit/await APIs to task code.

func (*Orchestrator) AwaitGroup

func (o *Orchestrator) AwaitGroup(ctx context.Context, id string, timeout time.Duration) ([]interface{}, error)

AwaitGroup blocks until the correlation group completes or timeout elapses. It returns the aggregated child outputs.

func (*Orchestrator) AwaitResume

func (o *Orchestrator) AwaitResume(ctx context.Context, timeout time.Duration) (interface{}, error)

AwaitResume blocks until the current execution is unblocked by the allocator (i.e. the async correlation group completes). It returns the updated execution output (already merged by the allocator according to the await/merge strategy) or an error.

func (*Orchestrator) Call

func (o *Orchestrator) Call(ctx context.Context, child *execution.Execution, timeout time.Duration) (interface{}, error)

Call schedules a single ad-hoc child execution on the shared queue and blocks until it completes or timeout elapses. This path does not create a correlation group nor switch the parent execution into waitAsync; it is ideal for synchronous function-call semantics during LLM streaming.

func (*Orchestrator) EmitChildren

func (o *Orchestrator) EmitChildren(ctx context.Context, children []*execution.Execution) (string, error)

EmitChildren emits the supplied child executions and transitions the current action's execution into waitAsync.

func (*Orchestrator) EmitForEach

func (o *Orchestrator) EmitForEach(ctx context.Context, items []interface{}, as string, template *graph.Task) (string, error)

EmitForEach clones the provided task template for each element in items, registers tasks in the current process and emits child executions. The current action's execution is transitioned into waitAsync.

func (*Orchestrator) EmitOne

func (o *Orchestrator) EmitOne(ctx context.Context, child *execution.Execution) (string, error)

EmitOne is a convenience wrapper to emit a single child execution and create a correlation group with Expected=1.

type Runtime added in v0.2.0

type Runtime interface {
	EmitExecutions(ctx context.Context, parent *execution.Execution, children []*execution.Execution) (string, error)
	WaitForUnblock(ctx context.Context, execID string, timeout time.Duration) (*execution.Execution, error)
	AwaitGroup(ctx context.Context, id string, timeout time.Duration) ([]interface{}, error)
	ScheduleExecution(ctx context.Context, exec *execution.Execution) (func(duration time.Duration) (*execution.Execution, error), error)
}

Runtime abstracts the minimal subset of fluxor.Runtime used by the orchestrator. It is intentionally declared locally to avoid an import cycle between the root module and this package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL