async

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultPercentSignalThreshold = 5
	DefaultMaxPollFailures        = 3
	DefaultIdleTimeoutMs          = int((10 * time.Minute) / time.Millisecond)
)

Variables

This section is empty.

Functions

func ExecutionModeWaits added in v0.1.8

func ExecutionModeWaits(value string) bool

ExecutionModeWaits reports whether an op in this mode parks on the status barrier. Only "wait" waits; "detach" does not.

func ExtractIntent added in v0.1.8

func ExtractIntent(args map[string]any, path string, fallback string) string

func ExtractOperationID

func ExtractOperationID(raw string, path string) (string, error)

func ExtractSummary added in v0.1.8

func ExtractSummary(args map[string]any, paths []string) string

func NormalizeExecutionMode added in v0.1.8

func NormalizeExecutionMode(value string, fallback string) string

NormalizeExecutionMode coerces a mode string to the canonical enum value. Unknown values (historical "fork" entries, typos, empty) fall back to `fallback`, which itself defaults to "wait" if empty or also invalid.

func WithManager added in v0.1.8

func WithManager(ctx context.Context, manager *Manager) context.Context

WithManager attaches the async Manager to ctx. Intended for use by the runtime bootstrap layer so protocol-layer tools can retrieve the manager without importing the higher-level service shared/toolexec package.

Callers that also populate toolexec.WithAsyncManager should call both so the two context surfaces stay in sync — the alternative is a single canonical context key, which is the longer-term direction.

Types

type AggregatedItem added in v0.1.8

type AggregatedItem struct {
	OperationID      string          `json:"operationId,omitempty"`
	ToolName         string          `json:"toolName,omitempty"`
	OperationIntent  string          `json:"operationIntent,omitempty"`
	OperationSummary string          `json:"operationSummary,omitempty"`
	State            State           `json:"state,omitempty"`
	Reason           string          `json:"reason,omitempty"`
	Detail           string          `json:"detail,omitempty"`
	Payload          json.RawMessage `json:"payload,omitempty"`
}

type AggregatedResult added in v0.1.8

type AggregatedResult struct {
	Items          []AggregatedItem `json:"items,omitempty"`
	OpsStillActive bool             `json:"opsStillActive,omitempty"`
}

type CancelConfig

type CancelConfig struct {
	Tool           string `json:"tool" yaml:"tool"`
	OperationIDArg string `json:"operationIdArg" yaml:"operationIdArg"`
}

type ChangeEvent added in v0.1.8

type ChangeEvent struct {
	OperationID  string          `json:"operationId,omitempty"`
	PriorDigest  string          `json:"priorDigest,omitempty"`
	NewDigest    string          `json:"newDigest,omitempty"`
	Status       string          `json:"status,omitempty"`
	Message      string          `json:"message,omitempty"`
	MessageKind  string          `json:"messageKind,omitempty"`
	Percent      *int            `json:"percent,omitempty"`
	KeyData      json.RawMessage `json:"keyData,omitempty"`
	Error        string          `json:"error,omitempty"`
	State        State           `json:"state,omitempty"`
	ChangedAt    time.Time       `json:"changedAt,omitempty"`
	ToolName     string          `json:"toolName,omitempty"`
	Intent       string          `json:"intent,omitempty"`
	Summary      string          `json:"summary,omitempty"`
	Conversation string          `json:"conversationId,omitempty"`
	TurnID       string          `json:"turnId,omitempty"`
}

type Config

type Config struct {
	Run                  RunConfig     `json:"run" yaml:"run"`
	Status               StatusConfig  `json:"status" yaml:"status"`
	Cancel               *CancelConfig `json:"cancel,omitempty" yaml:"cancel,omitempty"`
	DefaultExecutionMode string        `json:"defaultExecutionMode,omitempty" yaml:"defaultExecutionMode,omitempty"`
	TimeoutMs            int           `json:"timeoutMs,omitempty" yaml:"timeoutMs,omitempty"`
	PollIntervalMs       int           `json:"pollIntervalMs,omitempty" yaml:"pollIntervalMs,omitempty"`
	IdleTimeoutMs        int           `json:"idleTimeoutMs,omitempty" yaml:"idleTimeoutMs,omitempty"`
	// Narration controls how wait-mode async progress is surfaced to the parent
	// turn. Supported values:
	//   - "none": silent
	//   - "keydata": no narrator LLM; surface the async update directly from
	//     selector-extracted payload data/message (used by llm/agents)
	//   - "template": deterministic template rendering
	//   - "llm": supervising narrator LLM summarizes each async update
	Narration         string `json:"narration,omitempty" yaml:"narration,omitempty"`
	NarrationTemplate string `json:"narrationTemplate,omitempty" yaml:"narrationTemplate,omitempty"`
}

type ExecutionMode added in v0.1.8

type ExecutionMode string

ExecutionMode is the barrier/ownership decision for an async op.

Exactly two values:

  • "wait" (default): start registers the op and a later status call parks on the barrier; poller + narrator are engaged and TimeoutAt is enforced by the poller tick loop.
  • "detach": start returns; the op is fire-and-forget at the runtime layer — no PollAsyncOperation goroutine is spawned and no barrier is attached. `TimeoutAt` is still populated from `TimeoutMs` so the activated-status loop in `tool_executor. maybeExecuteActivatedStatusTool` can time-box its re-polls for a changed snapshot. When nothing observes the op, it is reclaimed by the Manager GC after the workspace-configured `default.async.gc.maxAge` idle window.

There was previously a third value, "fork", intended for child-conversation launches. It behaved identically to "wait" in this package and was never set by any production caller. The "launch a child and wait" vs. "wait inline" distinction lives at the skill / agents layer (see service/skill and protocol/skill), not on this enum. Keep the async layer minimal: the only question here is "does the status call park or not?"

const (
	ExecutionModeWait   ExecutionMode = "wait"
	ExecutionModeDetach ExecutionMode = "detach"
)

type Extracted

type Extracted struct {
	Status      string
	Message     string
	MessageKind string
	Percent     *int
	KeyData     json.RawMessage
	Error       string
}

func ExtractPayload

func ExtractPayload(raw string, selector Selector) (*Extracted, error)

type Filter added in v0.1.8

type Filter struct {
	ConversationID string // populated by runtime from context; never by LLM
	Tool           string // start-tool name, e.g. "llm/agents:start"
	ExecutionMode  string // "wait" | "detach"; "" = any
}

Filter is the generic selector for listing operations. Zero-value fields are treated as "don't filter by this."

INTERNAL USE ONLY. This struct is not part of any LLM-facing tool schema. In particular, `ConversationID` is always populated by the runtime from the request context — never from LLM-provided input. Exposing it to the LLM would let a prompt-injected or hallucinated conversation id leak ops across conversations; every path that builds a Filter must read ConversationID from trusted context, not from tool args.

Tool-agnostic: an op from any source (internal service, MCP, system, user tool) shows up in the same shape. Callers that legitimately want cross-conversation visibility (tests, admin flows) pass ConversationID = "".

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func ManagerFromContext added in v0.1.8

func ManagerFromContext(ctx context.Context) (*Manager, bool)

ManagerFromContext returns the *Manager attached to ctx by WithManager. Returns (nil, false) when no manager is present or the stored value is not of the expected concrete type.

func NewManager

func NewManager() *Manager

func (*Manager) ActiveOps

func (m *Manager) ActiveOps(_ context.Context, convID, turnID string) []*OperationRecord

func (*Manager) ActiveWaitOps

func (m *Manager) ActiveWaitOps(ctx context.Context, convID, turnID string) []*OperationRecord

func (*Manager) AdmitPoller added in v0.1.8

func (m *Manager) AdmitPoller(_ context.Context, id string, cancel context.CancelFunc) bool

AdmitPoller atomically admits a poller goroutine: refuses if the Manager is closed or a poller for the same op is already running, otherwise adds the map entry, stores the cancel func, and bumps pollerWG. Callers wrap a poller launch with:

pollCtx, cancel := context.WithCancel(...)
if !manager.AdmitPoller(ctx, opID, cancel) {
    cancel()
    return
}
go func() {
    defer manager.FinishPoller(ctx, opID) // always runs for an admitted poller
    defer cancel()
    // ... poll loop ...
}()

Storing the cancel in the same critical section as admission closes the race where TryStartPoller + StorePollerCancel were two separate calls — a concurrent Close firing between them would have left the admitted poller with no registered cancel.

func (*Manager) AwaitTerminal added in v0.1.8

func (m *Manager) AwaitTerminal(ctx context.Context, opIDs []string) <-chan AggregatedResult

func (*Manager) BindToolCarrier added in v0.1.8

func (m *Manager) BindToolCarrier(_ context.Context, id, toolCallID, toolMessageID, toolName string) (*OperationRecord, bool)

func (*Manager) CancelTurnPollers

func (m *Manager) CancelTurnPollers(_ context.Context, convID, turnID string)

CancelTurnPollers cancels every autonomous poller that belongs to the given conversation/turn. It is safe to call when no pollers are running (no-op). The service layer calls this when a turn is explicitly canceled so pollers do not outlive the turn they belong to.

func (*Manager) Close added in v0.1.8

func (m *Manager) Close()

Close releases Manager-owned resources synchronously. It:

  1. Cancels every active poller (equivalent to CancelTurnPollers across all turns) AND waits for each admitted poller goroutine to exit via pollerWG.
  2. Cancels every StartGC goroutine and waits for each to exit, so there are no leaked sweepers after Close returns.
  3. Closes every in-flight subscription channel (waking any consumers that were blocked on receive so they can exit cleanly).
  4. Signals every waiter from WaitForChange / WaitForNextPoll so their ctx-less partners see a wakeup.

After Close, no new Register / Subscribe / AwaitTerminal / Sweep / StartGC / AdmitPoller / TryStartPoller calls should be made — the Manager is no longer in a valid steady state. Post-close admissions/launches short-circuit. Close is idempotent: subsequent calls are no-ops.

Close is synchronous for every goroutine the Manager tracks: when it returns, all GC sweepers AND all admitted poller goroutines have exited. This is the correctness guarantee the cross-package poller code (service/shared/toolexec.PollAsyncOperation) relies on: canceled ctx propagates to the poll loop; the defer-chain calls FinishPoller; Close blocks on pollerWG until every such defer has run.

func (*Manager) ConsumeChanged

func (m *Manager) ConsumeChanged(convID, turnID string) []*OperationRecord

func (*Manager) FindActiveByRequest

func (m *Manager) FindActiveByRequest(_ context.Context, convID, turnID, toolName, requestArgsDigest string) (*OperationRecord, bool)

func (*Manager) FinishPoller

func (m *Manager) FinishPoller(_ context.Context, id string)

FinishPoller deregisters a poller. It is idempotent — only the first call (the one that actually removed the map entry) decrements pollerWG. This matters because dropOpLocked no longer touches m.pollers; the admitted goroutine's FinishPoller is the sole deregistration site, so the wg stays tied to goroutine lifecycle.

Callers MUST call FinishPoller exactly once per successful admission, typically via `defer manager.FinishPoller(ctx, opID)` placed BEFORE any early return in the poller goroutine (so partial setup doesn't leak the admission).

func (*Manager) Get

func (m *Manager) Get(_ context.Context, id string) (*OperationRecord, bool)

func (*Manager) HasActiveWaitOps

func (m *Manager) HasActiveWaitOps(ctx context.Context, convID, turnID string) bool

func (*Manager) ListOperations added in v0.1.8

func (m *Manager) ListOperations(f Filter) []PendingOp

ListOperations returns non-terminal operations matching the filter. Terminal ops (success / failure / canceled) are excluded by construction: they are pruned by GC and carry no actionable state for callers enumerating outstanding work. Callers that want a historical view should query the conversation message store, not the live manager.

Tool-agnostic: an op from any source (internal service, MCP, system, user tool) is projected through the same `PendingOp` shape.

func (*Manager) OperationsForTurn

func (m *Manager) OperationsForTurn(_ context.Context, convID, turnID string) []*OperationRecord

func (*Manager) RecordPollFailure

func (m *Manager) RecordPollFailure(_ context.Context, id, errMsg string, transient bool) (*OperationRecord, bool)

func (*Manager) Register

func (m *Manager) Register(_ context.Context, input RegisterInput) (*OperationRecord, bool)

Register creates a new op record. Returns the canonical record and a flag indicating whether a prior record under the same id was overwritten. Overwrite is almost always a bug — a buggy OperationIDPath or a retried start-handler that double-registers — so it is logged at warn and counted in Stats(). The previous record's state is lost.

func (*Manager) ResetPollFailures

func (m *Manager) ResetPollFailures(_ context.Context, id string) (*OperationRecord, bool)

func (*Manager) SignalTurn added in v0.1.8

func (m *Manager) SignalTurn(_ context.Context, convID, turnID string) bool

func (*Manager) StartGC added in v0.1.8

func (m *Manager) StartGC(ctx context.Context, interval, maxAge time.Duration)

StartGC runs Sweep in a background goroutine until ctx is canceled OR the Manager is Close()d. Both signals cancel the goroutine immediately; whichever fires first wins.

interval controls the sweep cadence; maxAge is forwarded to Sweep. Both arguments MUST be positive — this package holds no defaults of its own. When either is non-positive, StartGC returns without launching a goroutine. The authoritative defaults live in the workspace `default.async` baseline (see `workspace/config.DefaultsWithFallback`); bootstrap resolves them to durations and forwards the values here via `protocol/async/wsconfig.WorkspaceConfig.Apply`.

The goroutine is cheap (one timer + one lock acquisition per tick) and exits cleanly. Callers that prefer explicit pacing (e.g. tied to a request/turn boundary) can call Sweep directly instead.

StartGC is a no-op after Close — the Manager is no longer in a valid steady state, so spawning a fresh sweeper would race with the teardown that already happened.

func (*Manager) Stats added in v0.1.8

func (m *Manager) Stats() ManagerStats

Stats returns a snapshot of lifetime counters and live gauges. Intended for operator-facing debug surfaces (expvar, admin endpoints) — not on any correctness path.

func (*Manager) StorePollerCancel deprecated

func (m *Manager) StorePollerCancel(_ context.Context, id string, cancel context.CancelFunc)

StorePollerCancel registers a cancel function for the given operation so that CancelTurnPollers / Close can reach it.

Deprecated: use AdmitPoller, which stores the cancel atomically with admission. StorePollerCancel remains for tests and legacy callers that paired it with TryStartPoller — they should migrate when convenient.

func (*Manager) Subscribe added in v0.1.8

func (m *Manager) Subscribe(opIDs []string) (<-chan ChangeEvent, uint64)

Subscribe returns a channel that receives ChangeEvents for any of the given op ids. The channel is buffered (16) and non-blocking on send — slow consumers drop events. Callers must re-read op state on each received event and must not treat the absence of an event as absence of change.

The returned subscribe handle can be used with Unsubscribe to release the subscription early (important for callers that abandon the channel before all targets reach terminal — otherwise the subscription lingers and blocks GC of ops it references).

When all targets are already terminal at subscribe time, the channel is closed immediately and subscribeID is 0.

func (*Manager) Sweep added in v0.1.8

func (m *Manager) Sweep(now time.Time, maxAge time.Duration) int

Sweep prunes op records that are safe to forget.

An op is a prune candidate when all of the following hold:

  1. It is terminal (success / failure / canceled) OR it is a detach-mode op (no barrier and no auto-poller is attached to it — the record is orphaned once detach is chosen).
  2. Its last update is older than maxAge.
  3. No current subscription references its id (pruning a subscribed op would silently strand a waiter, so we never do it).

Wait / fork ops that are still non-terminal are never pruned — they represent live work the barrier or a future status call is expected to observe. Callers that want timely cleanup of those should cancel them first so they transition to a terminal state.

Sweep is safe to call concurrently with other Manager operations. It returns the number of ops removed. maxAge <= 0 is a no-op.

func (*Manager) TerminalFailure

func (m *Manager) TerminalFailure(_ context.Context, convID, turnID string) (*OperationRecord, bool)

func (*Manager) TryStartPoller

func (m *Manager) TryStartPoller(_ context.Context, id string) bool

TryStartPoller is the legacy admission API: admits a poller without storing a cancel func. Prefer AdmitPoller in new code. Retained for tests and callers that register the cancel via StorePollerCancel separately. Refuses admission after Close.

func (*Manager) TurnSignalVersion added in v0.1.8

func (m *Manager) TurnSignalVersion(convID, turnID string) uint64

func (*Manager) Unsubscribe added in v0.1.8

func (m *Manager) Unsubscribe(subID uint64)

Unsubscribe releases a subscription identified by the id returned from Subscribe. Safe to call multiple times and with id == 0 (no-op). After Unsubscribe, the subscription no longer pins its target ops against GC and no further events are delivered on the associated channel. The channel is closed if it had not already been closed by all-targets-terminal.

Consumers that finish with a subscription (e.g. barrier waiter that returned an aggregated result, narrator that saw its parked call terminate) should call Unsubscribe to avoid lingering GC pins.

func (*Manager) Update

func (m *Manager) Update(_ context.Context, input UpdateInput) (*OperationRecord, bool)

func (*Manager) WaitForChange

func (m *Manager) WaitForChange(ctx context.Context, convID, turnID string) error

func (*Manager) WaitForChangeSince added in v0.1.8

func (m *Manager) WaitForChangeSince(ctx context.Context, convID, turnID string, signalAfter uint64) error

func (*Manager) WaitForNextPoll

func (m *Manager) WaitForNextPoll(ctx context.Context, convID, turnID string) error

type ManagerStats added in v0.1.8

type ManagerStats struct {
	// Lifetime counters.
	RegisterCount          int64 `json:"registerCount"`
	RegisterOverwriteCount int64 `json:"registerOverwriteCount"`
	UpdateCount            int64 `json:"updateCount"`
	UpdateChangedCount     int64 `json:"updateChangedCount"`
	SweepPruneCount        int64 `json:"sweepPruneCount"`
	SubscribeCount         int64 `json:"subscribeCount"`
	UnsubscribeCount       int64 `json:"unsubscribeCount"`

	// Live gauges at call time.
	ActiveOps           int `json:"activeOps"`
	ActiveSubscriptions int `json:"activeSubscriptions"`
	ActivePollers       int `json:"activePollers"`
}

ManagerStats is a read-only snapshot of Manager counters plus a live gauge of in-flight state. Safe to call concurrently with any other Manager operation. Counters are lifetime-cumulative from NewManager; gauges reflect state at call time.

type OperationRecord

type OperationRecord struct {
	ID                   string
	ParentConvID         string
	ParentTurnID         string
	ToolCallID           string
	ToolMessageID        string
	ToolName             string
	StatusToolName       string
	StatusOperationIDArg string
	SameToolRecall       bool
	StatusArgs           map[string]interface{}
	CancelToolName       string
	RequestArgsDigest    string
	RequestArgs          map[string]interface{}
	OperationIntent      string
	OperationSummary     string
	ExecutionMode        string
	State                State
	Status               string
	Message              string
	MessageKind          string
	Percent              *int
	LastSignaledPercent  *int
	KeyData              json.RawMessage
	Error                string
	CreatedAt            time.Time
	UpdatedAt            time.Time
	LastPayloadChangeAt  time.Time
	TimeoutAt            *time.Time
	TimeoutMs            int
	IdleTimeoutMs        int
	PollIntervalMs       int
	PollFailures         int
	// contains filtered or unexported fields
}

func (*OperationRecord) Terminal

func (r *OperationRecord) Terminal() bool

type PendingOp added in v0.1.8

type PendingOp struct {
	OperationID    string `json:"operationId"`
	Tool           string `json:"tool"`
	StatusTool     string `json:"statusTool,omitempty"`
	OperationIDArg string `json:"operationIdArg,omitempty"`
	SameToolRecall bool   `json:"sameToolRecall,omitempty"`
	// StatusArgs is the ready-to-send argument map for StatusTool.
	// Includes the op id under OperationIDArg plus any declared
	// StatusConfig.ExtraArgs (and, for same-tool-recall patterns, any
	// additional args needed to elicit status from the run tool).
	StatusArgs    map[string]interface{} `json:"statusArgs,omitempty"`
	ExecutionMode string                 `json:"executionMode,omitempty"`
	State         State                  `json:"state,omitempty"`
	Intent        string                 `json:"intent,omitempty"`
	Summary       string                 `json:"summary,omitempty"`
	UpdatedAt     time.Time              `json:"updatedAt,omitempty"`
}

PendingOp is a compact, LLM-oriented view of a non-terminal operation. Contains exactly what a caller needs to invoke the status tool.

Primary usage pattern: the LLM reads `statusTool` and sends `statusArgs` verbatim to it. `statusArgs` is always populated with a ready-to-send map — it includes the op id under the correct arg name plus any `StatusConfig.ExtraArgs` the tool expects. The LLM does not have to know or reconstruct extras.

`operationIdArg` and `sameToolRecall` are exposed for introspection (logging, debugging, LLM transparency) but are not required for a correct status call — `statusArgs` alone is sufficient.

type RegisterInput

type RegisterInput struct {
	ID                   string
	ParentConvID         string
	ParentTurnID         string
	ToolCallID           string
	ToolMessageID        string
	ToolName             string
	StatusToolName       string
	StatusOperationIDArg string
	SameToolRecall       bool
	StatusArgs           map[string]interface{}
	CancelToolName       string
	RequestArgsDigest    string
	RequestArgs          map[string]interface{}
	OperationIntent      string
	OperationSummary     string
	ExecutionMode        string
	Status               string
	Message              string
	MessageKind          string
	Percent              *int
	KeyData              json.RawMessage
	Error                string
	TimeoutMs            int
	IdleTimeoutMs        int
	PollIntervalMs       int
}

type RunConfig

type RunConfig struct {
	Tool              string                 `json:"tool" yaml:"tool"`
	OperationIDPath   string                 `json:"operationIdPath" yaml:"operationIdPath"`
	ExecutionModePath string                 `json:"executionModePath,omitempty" yaml:"executionModePath,omitempty"`
	IntentPath        string                 `json:"intentPath,omitempty" yaml:"intentPath,omitempty"`
	SummaryPaths      []string               `json:"summaryPaths,omitempty" yaml:"summaryPaths,omitempty"`
	ExtraArgs         map[string]interface{} `json:"extraArgs,omitempty" yaml:"extraArgs,omitempty"`
	Selector          *Selector              `json:"selector,omitempty" yaml:"selector,omitempty"`
}

type Selector

type Selector struct {
	StatusPath       string   `json:"statusPath" yaml:"statusPath"`
	MessagePath      string   `json:"messagePath,omitempty" yaml:"messagePath,omitempty"`
	DataPath         string   `json:"dataPath,omitempty" yaml:"dataPath,omitempty"`
	ProgressPath     string   `json:"progressPath,omitempty" yaml:"progressPath,omitempty"`
	ErrorPath        string   `json:"errorPath,omitempty" yaml:"errorPath,omitempty"`
	TerminalStatuses []string `json:"terminalStatuses,omitempty" yaml:"terminalStatuses,omitempty"`
}

type State

type State string
const (
	StateStarted   State = "started"
	StateRunning   State = "running"
	StateWaiting   State = "waiting"
	StateCompleted State = "completed"
	StateFailed    State = "failed"
	StateCanceled  State = "canceled"
)

func DeriveState

func DeriveState(status, errMsg string, requested State) State

type StatusConfig

type StatusConfig struct {
	Tool           string                 `json:"tool" yaml:"tool"`
	OperationIDArg string                 `json:"operationIdArg" yaml:"operationIdArg"`
	ReuseRunArgs   bool                   `json:"reuseRunArgs,omitempty" yaml:"reuseRunArgs,omitempty"`
	ExtraArgs      map[string]interface{} `json:"extraArgs,omitempty" yaml:"extraArgs,omitempty"`
	Selector       Selector               `json:"selector" yaml:"selector"`
}

type UpdateInput

type UpdateInput struct {
	ID          string
	Status      string
	Message     string
	MessageKind string
	Percent     *int
	KeyData     json.RawMessage
	Error       string
	State       State
}

Directories

Path Synopsis
Package wsconfig applies operator-tunable async defaults to the Manager + narrator package.
Package wsconfig applies operator-tunable async defaults to the Manager + narrator package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL