Documentation
¶
Index ¶
- Constants
- func ExecutionModeWaits(value string) bool
- func ExtractIntent(args map[string]any, path string, fallback string) string
- func ExtractOperationID(raw string, path string) (string, error)
- func ExtractSummary(args map[string]any, paths []string) string
- func NormalizeExecutionMode(value string, fallback string) string
- func WithManager(ctx context.Context, manager *Manager) context.Context
- type AggregatedItem
- type AggregatedResult
- type CancelConfig
- type ChangeEvent
- type Config
- type ExecutionMode
- type Extracted
- type Filter
- type Manager
- func (m *Manager) ActiveOps(_ context.Context, convID, turnID string) []*OperationRecord
- func (m *Manager) ActiveWaitOps(ctx context.Context, convID, turnID string) []*OperationRecord
- func (m *Manager) AdmitPoller(_ context.Context, id string, cancel context.CancelFunc) bool
- func (m *Manager) AwaitTerminal(ctx context.Context, opIDs []string) <-chan AggregatedResult
- func (m *Manager) BindToolCarrier(_ context.Context, id, toolCallID, toolMessageID, toolName string) (*OperationRecord, bool)
- func (m *Manager) CancelTurnPollers(_ context.Context, convID, turnID string)
- func (m *Manager) Close()
- func (m *Manager) ConsumeChanged(convID, turnID string) []*OperationRecord
- func (m *Manager) FindActiveByRequest(_ context.Context, convID, turnID, toolName, requestArgsDigest string) (*OperationRecord, bool)
- func (m *Manager) FinishPoller(_ context.Context, id string)
- func (m *Manager) Get(_ context.Context, id string) (*OperationRecord, bool)
- func (m *Manager) HasActiveWaitOps(ctx context.Context, convID, turnID string) bool
- func (m *Manager) ListOperations(f Filter) []PendingOp
- func (m *Manager) OperationsForTurn(_ context.Context, convID, turnID string) []*OperationRecord
- func (m *Manager) RecordPollFailure(_ context.Context, id, errMsg string, transient bool) (*OperationRecord, bool)
- func (m *Manager) Register(_ context.Context, input RegisterInput) (*OperationRecord, bool)
- func (m *Manager) ResetPollFailures(_ context.Context, id string) (*OperationRecord, bool)
- func (m *Manager) SignalTurn(_ context.Context, convID, turnID string) bool
- func (m *Manager) StartGC(ctx context.Context, interval, maxAge time.Duration)
- func (m *Manager) Stats() ManagerStats
- func (m *Manager) StorePollerCancel(_ context.Context, id string, cancel context.CancelFunc)deprecated
- func (m *Manager) Subscribe(opIDs []string) (<-chan ChangeEvent, uint64)
- func (m *Manager) Sweep(now time.Time, maxAge time.Duration) int
- func (m *Manager) TerminalFailure(_ context.Context, convID, turnID string) (*OperationRecord, bool)
- func (m *Manager) TryStartPoller(_ context.Context, id string) bool
- func (m *Manager) TurnSignalVersion(convID, turnID string) uint64
- func (m *Manager) Unsubscribe(subID uint64)
- func (m *Manager) Update(_ context.Context, input UpdateInput) (*OperationRecord, bool)
- func (m *Manager) WaitForChange(ctx context.Context, convID, turnID string) error
- func (m *Manager) WaitForChangeSince(ctx context.Context, convID, turnID string, signalAfter uint64) error
- func (m *Manager) WaitForNextPoll(ctx context.Context, convID, turnID string) error
- type ManagerStats
- type OperationRecord
- type PendingOp
- type RegisterInput
- type RunConfig
- type Selector
- type State
- type StatusConfig
- type UpdateInput
Constants ¶
const ( DefaultPercentSignalThreshold = 5 DefaultMaxPollFailures = 3 DefaultIdleTimeoutMs = int((10 * time.Minute) / time.Millisecond) )
Variables ¶
This section is empty.
Functions ¶
func ExecutionModeWaits ¶ added in v0.1.8
ExecutionModeWaits reports whether an op in this mode parks on the status barrier. Only "wait" waits; "detach" does not.
func ExtractIntent ¶ added in v0.1.8
func ExtractSummary ¶ added in v0.1.8
func NormalizeExecutionMode ¶ added in v0.1.8
NormalizeExecutionMode coerces a mode string to the canonical enum value. Unknown values (historical "fork" entries, typos, empty) fall back to `fallback`, which itself defaults to "wait" if empty or also invalid.
func WithManager ¶ added in v0.1.8
WithManager attaches the async Manager to ctx. Intended for use by the runtime bootstrap layer so protocol-layer tools can retrieve the manager without importing the higher-level service shared/toolexec package.
Callers that also populate toolexec.WithAsyncManager should call both so the two context surfaces stay in sync — the alternative is a single canonical context key, which is the longer-term direction.
Types ¶
type AggregatedItem ¶ added in v0.1.8
type AggregatedItem struct {
OperationID string `json:"operationId,omitempty"`
ToolName string `json:"toolName,omitempty"`
OperationIntent string `json:"operationIntent,omitempty"`
OperationSummary string `json:"operationSummary,omitempty"`
State State `json:"state,omitempty"`
Reason string `json:"reason,omitempty"`
Detail string `json:"detail,omitempty"`
Payload json.RawMessage `json:"payload,omitempty"`
}
type AggregatedResult ¶ added in v0.1.8
type AggregatedResult struct {
Items []AggregatedItem `json:"items,omitempty"`
OpsStillActive bool `json:"opsStillActive,omitempty"`
}
type CancelConfig ¶
type ChangeEvent ¶ added in v0.1.8
type ChangeEvent struct {
OperationID string `json:"operationId,omitempty"`
PriorDigest string `json:"priorDigest,omitempty"`
NewDigest string `json:"newDigest,omitempty"`
Status string `json:"status,omitempty"`
Message string `json:"message,omitempty"`
MessageKind string `json:"messageKind,omitempty"`
Percent *int `json:"percent,omitempty"`
KeyData json.RawMessage `json:"keyData,omitempty"`
Error string `json:"error,omitempty"`
State State `json:"state,omitempty"`
ChangedAt time.Time `json:"changedAt,omitempty"`
ToolName string `json:"toolName,omitempty"`
Intent string `json:"intent,omitempty"`
Summary string `json:"summary,omitempty"`
Conversation string `json:"conversationId,omitempty"`
TurnID string `json:"turnId,omitempty"`
}
type Config ¶
type Config struct {
Run RunConfig `json:"run" yaml:"run"`
Status StatusConfig `json:"status" yaml:"status"`
Cancel *CancelConfig `json:"cancel,omitempty" yaml:"cancel,omitempty"`
DefaultExecutionMode string `json:"defaultExecutionMode,omitempty" yaml:"defaultExecutionMode,omitempty"`
TimeoutMs int `json:"timeoutMs,omitempty" yaml:"timeoutMs,omitempty"`
PollIntervalMs int `json:"pollIntervalMs,omitempty" yaml:"pollIntervalMs,omitempty"`
IdleTimeoutMs int `json:"idleTimeoutMs,omitempty" yaml:"idleTimeoutMs,omitempty"`
// Narration controls how wait-mode async progress is surfaced to the parent
// turn. Supported values:
// - "none": silent
// - "keydata": no narrator LLM; surface the async update directly from
// selector-extracted payload data/message (used by llm/agents)
// - "template": deterministic template rendering
// - "llm": supervising narrator LLM summarizes each async update
Narration string `json:"narration,omitempty" yaml:"narration,omitempty"`
NarrationTemplate string `json:"narrationTemplate,omitempty" yaml:"narrationTemplate,omitempty"`
}
type ExecutionMode ¶ added in v0.1.8
type ExecutionMode string
ExecutionMode is the barrier/ownership decision for an async op.
Exactly two values:
- "wait" (default): start registers the op and a later status call parks on the barrier; poller + narrator are engaged and TimeoutAt is enforced by the poller tick loop.
- "detach": start returns; the op is fire-and-forget at the runtime layer — no PollAsyncOperation goroutine is spawned and no barrier is attached. `TimeoutAt` is still populated from `TimeoutMs` so the activated-status loop in `tool_executor. maybeExecuteActivatedStatusTool` can time-box its re-polls for a changed snapshot. When nothing observes the op, it is reclaimed by the Manager GC after the workspace-configured `default.async.gc.maxAge` idle window.
There was previously a third value, "fork", intended for child-conversation launches. It behaved identically to "wait" in this package and was never set by any production caller. The "launch a child and wait" vs. "wait inline" distinction lives at the skill / agents layer (see service/skill and protocol/skill), not on this enum. Keep the async layer minimal: the only question here is "does the status call park or not?"
const ( ExecutionModeWait ExecutionMode = "wait" ExecutionModeDetach ExecutionMode = "detach" )
type Extracted ¶
type Filter ¶ added in v0.1.8
type Filter struct {
ConversationID string // populated by runtime from context; never by LLM
Tool string // start-tool name, e.g. "llm/agents:start"
ExecutionMode string // "wait" | "detach"; "" = any
}
Filter is the generic selector for listing operations. Zero-value fields are treated as "don't filter by this."
INTERNAL USE ONLY. This struct is not part of any LLM-facing tool schema. In particular, `ConversationID` is always populated by the runtime from the request context — never from LLM-provided input. Exposing it to the LLM would let a prompt-injected or hallucinated conversation id leak ops across conversations; every path that builds a Filter must read ConversationID from trusted context, not from tool args.
Tool-agnostic: an op from any source (internal service, MCP, system, user tool) shows up in the same shape. Callers that legitimately want cross-conversation visibility (tests, admin flows) pass ConversationID = "".
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func ManagerFromContext ¶ added in v0.1.8
ManagerFromContext returns the *Manager attached to ctx by WithManager. Returns (nil, false) when no manager is present or the stored value is not of the expected concrete type.
func NewManager ¶
func NewManager() *Manager
func (*Manager) ActiveOps ¶
func (m *Manager) ActiveOps(_ context.Context, convID, turnID string) []*OperationRecord
func (*Manager) ActiveWaitOps ¶
func (m *Manager) ActiveWaitOps(ctx context.Context, convID, turnID string) []*OperationRecord
func (*Manager) AdmitPoller ¶ added in v0.1.8
AdmitPoller atomically admits a poller goroutine: refuses if the Manager is closed or a poller for the same op is already running, otherwise adds the map entry, stores the cancel func, and bumps pollerWG. Callers wrap a poller launch with:
pollCtx, cancel := context.WithCancel(...)
if !manager.AdmitPoller(ctx, opID, cancel) {
cancel()
return
}
go func() {
defer manager.FinishPoller(ctx, opID) // always runs for an admitted poller
defer cancel()
// ... poll loop ...
}()
Storing the cancel in the same critical section as admission closes the race where TryStartPoller + StorePollerCancel were two separate calls — a concurrent Close firing between them would have left the admitted poller with no registered cancel.
func (*Manager) AwaitTerminal ¶ added in v0.1.8
func (m *Manager) AwaitTerminal(ctx context.Context, opIDs []string) <-chan AggregatedResult
func (*Manager) BindToolCarrier ¶ added in v0.1.8
func (*Manager) CancelTurnPollers ¶
CancelTurnPollers cancels every autonomous poller that belongs to the given conversation/turn. It is safe to call when no pollers are running (no-op). The service layer calls this when a turn is explicitly canceled so pollers do not outlive the turn they belong to.
func (*Manager) Close ¶ added in v0.1.8
func (m *Manager) Close()
Close releases Manager-owned resources synchronously. It:
- Cancels every active poller (equivalent to CancelTurnPollers across all turns) AND waits for each admitted poller goroutine to exit via pollerWG.
- Cancels every StartGC goroutine and waits for each to exit, so there are no leaked sweepers after Close returns.
- Closes every in-flight subscription channel (waking any consumers that were blocked on receive so they can exit cleanly).
- Signals every waiter from WaitForChange / WaitForNextPoll so their ctx-less partners see a wakeup.
After Close, no new Register / Subscribe / AwaitTerminal / Sweep / StartGC / AdmitPoller / TryStartPoller calls should be made — the Manager is no longer in a valid steady state. Post-close admissions/launches short-circuit. Close is idempotent: subsequent calls are no-ops.
Close is synchronous for every goroutine the Manager tracks: when it returns, all GC sweepers AND all admitted poller goroutines have exited. This is the correctness guarantee the cross-package poller code (service/shared/toolexec.PollAsyncOperation) relies on: canceled ctx propagates to the poll loop; the defer-chain calls FinishPoller; Close blocks on pollerWG until every such defer has run.
func (*Manager) ConsumeChanged ¶
func (m *Manager) ConsumeChanged(convID, turnID string) []*OperationRecord
func (*Manager) FindActiveByRequest ¶
func (*Manager) FinishPoller ¶
FinishPoller deregisters a poller. It is idempotent — only the first call (the one that actually removed the map entry) decrements pollerWG. This matters because dropOpLocked no longer touches m.pollers; the admitted goroutine's FinishPoller is the sole deregistration site, so the wg stays tied to goroutine lifecycle.
Callers MUST call FinishPoller exactly once per successful admission, typically via `defer manager.FinishPoller(ctx, opID)` placed BEFORE any early return in the poller goroutine (so partial setup doesn't leak the admission).
func (*Manager) HasActiveWaitOps ¶
func (*Manager) ListOperations ¶ added in v0.1.8
ListOperations returns non-terminal operations matching the filter. Terminal ops (success / failure / canceled) are excluded by construction: they are pruned by GC and carry no actionable state for callers enumerating outstanding work. Callers that want a historical view should query the conversation message store, not the live manager.
Tool-agnostic: an op from any source (internal service, MCP, system, user tool) is projected through the same `PendingOp` shape.
func (*Manager) OperationsForTurn ¶
func (m *Manager) OperationsForTurn(_ context.Context, convID, turnID string) []*OperationRecord
func (*Manager) RecordPollFailure ¶
func (*Manager) Register ¶
func (m *Manager) Register(_ context.Context, input RegisterInput) (*OperationRecord, bool)
Register creates a new op record. Returns the canonical record and a flag indicating whether a prior record under the same id was overwritten. Overwrite is almost always a bug — a buggy OperationIDPath or a retried start-handler that double-registers — so it is logged at warn and counted in Stats(). The previous record's state is lost.
func (*Manager) ResetPollFailures ¶
func (*Manager) SignalTurn ¶ added in v0.1.8
func (*Manager) StartGC ¶ added in v0.1.8
StartGC runs Sweep in a background goroutine until ctx is canceled OR the Manager is Close()d. Both signals cancel the goroutine immediately; whichever fires first wins.
interval controls the sweep cadence; maxAge is forwarded to Sweep. Both arguments MUST be positive — this package holds no defaults of its own. When either is non-positive, StartGC returns without launching a goroutine. The authoritative defaults live in the workspace `default.async` baseline (see `workspace/config.DefaultsWithFallback`); bootstrap resolves them to durations and forwards the values here via `protocol/async/wsconfig.WorkspaceConfig.Apply`.
The goroutine is cheap (one timer + one lock acquisition per tick) and exits cleanly. Callers that prefer explicit pacing (e.g. tied to a request/turn boundary) can call Sweep directly instead.
StartGC is a no-op after Close — the Manager is no longer in a valid steady state, so spawning a fresh sweeper would race with the teardown that already happened.
func (*Manager) Stats ¶ added in v0.1.8
func (m *Manager) Stats() ManagerStats
Stats returns a snapshot of lifetime counters and live gauges. Intended for operator-facing debug surfaces (expvar, admin endpoints) — not on any correctness path.
func (*Manager) StorePollerCancel
deprecated
StorePollerCancel registers a cancel function for the given operation so that CancelTurnPollers / Close can reach it.
Deprecated: use AdmitPoller, which stores the cancel atomically with admission. StorePollerCancel remains for tests and legacy callers that paired it with TryStartPoller — they should migrate when convenient.
func (*Manager) Subscribe ¶ added in v0.1.8
func (m *Manager) Subscribe(opIDs []string) (<-chan ChangeEvent, uint64)
Subscribe returns a channel that receives ChangeEvents for any of the given op ids. The channel is buffered (16) and non-blocking on send — slow consumers drop events. Callers must re-read op state on each received event and must not treat the absence of an event as absence of change.
The returned subscribe handle can be used with Unsubscribe to release the subscription early (important for callers that abandon the channel before all targets reach terminal — otherwise the subscription lingers and blocks GC of ops it references).
When all targets are already terminal at subscribe time, the channel is closed immediately and subscribeID is 0.
func (*Manager) Sweep ¶ added in v0.1.8
Sweep prunes op records that are safe to forget.
An op is a prune candidate when all of the following hold:
- It is terminal (success / failure / canceled) OR it is a detach-mode op (no barrier and no auto-poller is attached to it — the record is orphaned once detach is chosen).
- Its last update is older than maxAge.
- No current subscription references its id (pruning a subscribed op would silently strand a waiter, so we never do it).
Wait / fork ops that are still non-terminal are never pruned — they represent live work the barrier or a future status call is expected to observe. Callers that want timely cleanup of those should cancel them first so they transition to a terminal state.
Sweep is safe to call concurrently with other Manager operations. It returns the number of ops removed. maxAge <= 0 is a no-op.
func (*Manager) TerminalFailure ¶
func (*Manager) TryStartPoller ¶
TryStartPoller is the legacy admission API: admits a poller without storing a cancel func. Prefer AdmitPoller in new code. Retained for tests and callers that register the cancel via StorePollerCancel separately. Refuses admission after Close.
func (*Manager) TurnSignalVersion ¶ added in v0.1.8
func (*Manager) Unsubscribe ¶ added in v0.1.8
Unsubscribe releases a subscription identified by the id returned from Subscribe. Safe to call multiple times and with id == 0 (no-op). After Unsubscribe, the subscription no longer pins its target ops against GC and no further events are delivered on the associated channel. The channel is closed if it had not already been closed by all-targets-terminal.
Consumers that finish with a subscription (e.g. barrier waiter that returned an aggregated result, narrator that saw its parked call terminate) should call Unsubscribe to avoid lingering GC pins.
func (*Manager) Update ¶
func (m *Manager) Update(_ context.Context, input UpdateInput) (*OperationRecord, bool)
func (*Manager) WaitForChange ¶
func (*Manager) WaitForChangeSince ¶ added in v0.1.8
type ManagerStats ¶ added in v0.1.8
type ManagerStats struct {
// Lifetime counters.
RegisterCount int64 `json:"registerCount"`
RegisterOverwriteCount int64 `json:"registerOverwriteCount"`
UpdateCount int64 `json:"updateCount"`
UpdateChangedCount int64 `json:"updateChangedCount"`
SweepPruneCount int64 `json:"sweepPruneCount"`
SubscribeCount int64 `json:"subscribeCount"`
UnsubscribeCount int64 `json:"unsubscribeCount"`
// Live gauges at call time.
ActiveOps int `json:"activeOps"`
ActiveSubscriptions int `json:"activeSubscriptions"`
ActivePollers int `json:"activePollers"`
}
ManagerStats is a read-only snapshot of Manager counters plus a live gauge of in-flight state. Safe to call concurrently with any other Manager operation. Counters are lifetime-cumulative from NewManager; gauges reflect state at call time.
type OperationRecord ¶
type OperationRecord struct {
ID string
ParentConvID string
ParentTurnID string
ToolCallID string
ToolMessageID string
ToolName string
StatusToolName string
StatusOperationIDArg string
SameToolRecall bool
StatusArgs map[string]interface{}
CancelToolName string
RequestArgsDigest string
RequestArgs map[string]interface{}
OperationIntent string
OperationSummary string
ExecutionMode string
State State
Status string
Message string
MessageKind string
Percent *int
LastSignaledPercent *int
KeyData json.RawMessage
Error string
CreatedAt time.Time
UpdatedAt time.Time
LastPayloadChangeAt time.Time
TimeoutAt *time.Time
TimeoutMs int
IdleTimeoutMs int
PollIntervalMs int
PollFailures int
// contains filtered or unexported fields
}
func (*OperationRecord) Terminal ¶
func (r *OperationRecord) Terminal() bool
type PendingOp ¶ added in v0.1.8
type PendingOp struct {
OperationID string `json:"operationId"`
Tool string `json:"tool"`
StatusTool string `json:"statusTool,omitempty"`
OperationIDArg string `json:"operationIdArg,omitempty"`
SameToolRecall bool `json:"sameToolRecall,omitempty"`
// StatusArgs is the ready-to-send argument map for StatusTool.
// Includes the op id under OperationIDArg plus any declared
// StatusConfig.ExtraArgs (and, for same-tool-recall patterns, any
// additional args needed to elicit status from the run tool).
StatusArgs map[string]interface{} `json:"statusArgs,omitempty"`
ExecutionMode string `json:"executionMode,omitempty"`
State State `json:"state,omitempty"`
Intent string `json:"intent,omitempty"`
Summary string `json:"summary,omitempty"`
UpdatedAt time.Time `json:"updatedAt,omitempty"`
}
PendingOp is a compact, LLM-oriented view of a non-terminal operation. Contains exactly what a caller needs to invoke the status tool.
Primary usage pattern: the LLM reads `statusTool` and sends `statusArgs` verbatim to it. `statusArgs` is always populated with a ready-to-send map — it includes the op id under the correct arg name plus any `StatusConfig.ExtraArgs` the tool expects. The LLM does not have to know or reconstruct extras.
`operationIdArg` and `sameToolRecall` are exposed for introspection (logging, debugging, LLM transparency) but are not required for a correct status call — `statusArgs` alone is sufficient.
type RegisterInput ¶
type RegisterInput struct {
ID string
ParentConvID string
ParentTurnID string
ToolCallID string
ToolMessageID string
ToolName string
StatusToolName string
StatusOperationIDArg string
SameToolRecall bool
StatusArgs map[string]interface{}
CancelToolName string
RequestArgsDigest string
RequestArgs map[string]interface{}
OperationIntent string
OperationSummary string
ExecutionMode string
Status string
Message string
MessageKind string
Percent *int
KeyData json.RawMessage
Error string
TimeoutMs int
IdleTimeoutMs int
PollIntervalMs int
}
type RunConfig ¶
type RunConfig struct {
Tool string `json:"tool" yaml:"tool"`
OperationIDPath string `json:"operationIdPath" yaml:"operationIdPath"`
ExecutionModePath string `json:"executionModePath,omitempty" yaml:"executionModePath,omitempty"`
IntentPath string `json:"intentPath,omitempty" yaml:"intentPath,omitempty"`
SummaryPaths []string `json:"summaryPaths,omitempty" yaml:"summaryPaths,omitempty"`
ExtraArgs map[string]interface{} `json:"extraArgs,omitempty" yaml:"extraArgs,omitempty"`
Selector *Selector `json:"selector,omitempty" yaml:"selector,omitempty"`
}
type Selector ¶
type Selector struct {
StatusPath string `json:"statusPath" yaml:"statusPath"`
MessagePath string `json:"messagePath,omitempty" yaml:"messagePath,omitempty"`
DataPath string `json:"dataPath,omitempty" yaml:"dataPath,omitempty"`
ProgressPath string `json:"progressPath,omitempty" yaml:"progressPath,omitempty"`
ErrorPath string `json:"errorPath,omitempty" yaml:"errorPath,omitempty"`
TerminalStatuses []string `json:"terminalStatuses,omitempty" yaml:"terminalStatuses,omitempty"`
}
type StatusConfig ¶
type StatusConfig struct {
Tool string `json:"tool" yaml:"tool"`
OperationIDArg string `json:"operationIdArg" yaml:"operationIdArg"`
ReuseRunArgs bool `json:"reuseRunArgs,omitempty" yaml:"reuseRunArgs,omitempty"`
ExtraArgs map[string]interface{} `json:"extraArgs,omitempty" yaml:"extraArgs,omitempty"`
Selector Selector `json:"selector" yaml:"selector"`
}