Documentation
¶
Overview ¶
Package engine is part of the GoFastr harness.
See docs/harness-architecture.md for the architecture this package implements.
Package engine implements the agent loop and its supporting machinery: typed event bus, request/tool middleware chains, cancellation tree, and the loop itself.
See docs/harness-architecture.md § The agent loop and § Extensibility.
Index ¶
- Constants
- Variables
- func CapToolResultContent(blocks []control.ContentBlock) []control.ContentBlock
- func FormatArgvSummary(toolName string, args []byte) string
- func FormatMessages(h []provider.Message) string
- func PermissionMiddleware(bus *Bus, eng *permission.Engine, router AnswerRouter, session ids.SessionID, ...) tool.Middleware
- func SimpleInput(text string) []control.ContentBlock
- func WithMutatingFlag(ctx context.Context, mutating bool) context.Context
- func WithOriginator(ctx context.Context, id ids.ClientID) context.Context
- type AnswerRouter
- type Bus
- func (b *Bus) Close()
- func (b *Bus) NextID() uint64
- func (b *Bus) Publish(e control.Event, originator ids.ClientID) (control.EventEnvelope, error)
- func (b *Bus) Replay(envelopes []control.EventEnvelope, dst chan<- control.EventEnvelope)
- func (b *Bus) Session() ids.SessionID
- func (b *Bus) Subscribe(ctx context.Context) <-chan control.EventEnvelope
- type CancelTree
- type ContextInjector
- type ContextSection
- type CostTracker
- type Dispatcher
- type Engine
- type PermissionAnswer
- type RequestHandler
- type RequestMiddleware
- type SimpleCostTracker
- type StreamSummary
Constants ¶
const SubAgentMaxIterations = 8
SubAgentMaxIterations is a tighter cap for sub-agent turns (Engine.Spawn). Sub-agents are "do one focused thing" workers — they should NOT do 20 provider rounds the way a top-level coordinator turn might. 8 leaves room for: plan → 3-4 tool rounds → wrap-up. Anything more is the model getting lost.
const UntrustedContentNotice = `` /* 412-byte string literal not displayed */
UntrustedContentNotice is the standing instruction emitted alongside untrusted-content tags. The model is told to treat the contents as data, not instructions.
Variables ¶
var ErrAnswerChannelClosed = errors.New("engine: permission answer channel closed")
ErrAnswerChannelClosed is returned when the multiplexer's answer channel closes without delivering an answer.
var ErrBudgetExceeded = errors.New("engine: cost budget exceeded")
ErrBudgetExceeded is returned when CostBudgetMiddleware aborts a request.
var ErrDispatcherNotReady = errors.New("engine: dispatcher not ready (empty registry)")
ErrDispatcherNotReady is returned when the dispatcher is invoked before its registry has any sources registered.
var ErrStreamClosed = errors.New("engine: provider stream closed unexpectedly")
ErrStreamClosed is returned by CollectStream when the provider stream closed unexpectedly.
Functions ¶
func CapToolResultContent ¶
func CapToolResultContent(blocks []control.ContentBlock) []control.ContentBlock
CapToolResultContent (exported for tests) enforces maxToolResultBytesPerBlock on each text block. Larger blocks are truncated with a clear suffix so the model knows content was elided.
func FormatArgvSummary ¶
FormatArgvSummary computes a short argv-style summary for a tool call, used by permission middleware to populate the PermissionRequested event and to match argv-glob rules. The form is best-effort and tool-specific:
- For Bash, the leading shell command string.
- For Read/Write/Edit/Glob/Ls, "<Tool>:<path>" or "<Tool>:<pattern>".
- Otherwise the bare tool name.
func FormatMessages ¶
FormatMessages is a developer helper for printing history during debugging. Not used at runtime.
func PermissionMiddleware ¶
func PermissionMiddleware( bus *Bus, eng *permission.Engine, router AnswerRouter, session ids.SessionID, timeout time.Duration, ) tool.Middleware
PermissionMiddleware constructs the middleware.
session is bound at middleware-construction time (one engine = one session); originator is also fixed per-turn and supplied via context (so the multiplexer can address the prompt to the right client).
func SimpleInput ¶
func SimpleInput(text string) []control.ContentBlock
SimpleInput converts a plain text string into a SendInput's content. Convenience for callers that don't need multi-block input.
func WithMutatingFlag ¶
WithMutatingFlag stamps a context with the tool's is_mutating declaration. Used by the dispatcher before calling the middleware chain.
Types ¶
type AnswerRouter ¶
type AnswerRouter interface {
Subscribe(session ids.SessionID, callID ids.CallID) <-chan PermissionAnswer
Unsubscribe(session ids.SessionID, callID ids.CallID)
}
AnswerRouter is wired by the multiplexer. It returns a channel that receives AnswerPermission events for the given session, and a function that subscribes the caller for the lifetime of a single PermissionRequested. The middleware uses Subscribe to wait for a matching answer; the multiplexer narrows by CallID and identity rules before forwarding.
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
Bus is the typed event bus per SessionID. Events have monotonically increasing IDs so any transport can implement resume-from-id without transport-specific bookkeeping.
Subscribers receive a buffered channel; slow subscribers risk dropped events. The Bus does NOT persist — see session.Store for the durable log.
func (*Bus) Publish ¶
Publish encodes the event into a canonical envelope, assigns the next sequence ID, and broadcasts to every active subscriber. It returns the envelope it published so callers can persist or inspect.
func (*Bus) Replay ¶
func (b *Bus) Replay(envelopes []control.EventEnvelope, dst chan<- control.EventEnvelope)
Replay sends pre-encoded envelopes to a single subscription. Used by transports implementing resume-from-id: they fetch missing events from session.Store and replay them before the live stream resumes.
Note: Replay does NOT assign new IDs; envelopes retain their original sequence numbers from the persistent log.
func (*Bus) Subscribe ¶
func (b *Bus) Subscribe(ctx context.Context) <-chan control.EventEnvelope
Subscribe returns a receive-only channel of events. The channel is closed when ctx is cancelled or the Bus is closed.
Buffer is sized to absorb short bursts without blocking the engine. Subscribers that fall behind have their stale events dropped to preserve liveness — see § Per-transport rules → Backpressure for the policy.
type CancelTree ¶
type CancelTree struct {
// contains filtered or unexported fields
}
CancelTree wires cancellation hierarchically so cancelling a turn propagates to its tool calls and to any child engines it spawned (e.g., via the `delegate` tool).
The doc commits to: cancellation tree (turn → tool calls → child engines). v0.x scopes `delegate` to sync-only-blocking; the child engine has its own CancelTree rooted at the delegate's call.
func NewCancelTree ¶
func NewCancelTree(parent context.Context) *CancelTree
NewCancelTree returns a root CancelTree bound to parent.
func (*CancelTree) Cancel ¶
func (t *CancelTree) Cancel(cause error)
Cancel cancels this node and all descendants with the given cause.
func (*CancelTree) Cause ¶
func (t *CancelTree) Cause() error
Cause returns the reason this node was cancelled (or nil if not).
func (*CancelTree) Child ¶
func (t *CancelTree) Child() *CancelTree
Child returns a child node. Cancelling the parent cancels every child (transitively). Cancelling a child does not affect the parent or siblings.
func (*CancelTree) Context ¶
func (t *CancelTree) Context() context.Context
Context returns the context tied to this node.
type ContextInjector ¶
type ContextInjector func(ctx context.Context) []ContextSection
ContextInjectionMiddleware appends untrusted-content sections to the System prompt. Per hard rule 12, every byte from outside the trust boundary is wrapped in <untrusted-...> tags with a standing instruction not to follow instructions inside.
The injector callback returns the (section name, content) pairs to inject — empty content sections are skipped. Typical sources: AGENTS.md (wrapped as <untrusted-agents-md>), memory entries, skill metadata.
type ContextSection ¶
type CostTracker ¶
CostBudgetMiddleware enforces a per-session USD cap. Aborts the request before sending if the running total would exceed the cap.
Total is updated externally as CostIncremented events fire on the bus; the middleware reads from the supplied tracker.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher routes ToolCalls through the tool middleware chain to the registered tool. It also bridges the EventSink into the per-session event bus so tool progress events get broadcast.
The dispatcher is the only thing in the engine that knows about concrete tools — middleware sees the abstract Tool interface and the EventSink, never the registry.
func NewDispatcher ¶
func NewDispatcher(bus *Bus, registry *tool.Registry, mws ...tool.Middleware) *Dispatcher
NewDispatcher builds a Dispatcher for one session.
func (*Dispatcher) Dispatch ¶
func (d *Dispatcher) Dispatch(ctx context.Context, originator ids.ClientID, call tool.ToolCall) (*tool.ToolResult, error)
Dispatch executes a tool call through the middleware chain.
originator is the Client.ID() that triggered this turn — surfaces in PermissionRequested events so the multiplexer can address answers correctly and so identity_class self-approval guards apply.
func (*Dispatcher) Registry ¶
func (d *Dispatcher) Registry() *tool.Registry
Registry returns the dispatcher's tool registry. Used by the engine to inject the registry into the dispatch ctx for meta tools.
type Engine ¶
type Engine struct {
Session ids.SessionID
Bus *Bus
Provider provider.Provider
Model string
Dispatcher *Dispatcher
Middleware []RequestMiddleware
// History is the canonical-form message list. The loop appends
// to this list as turns complete; middleware can read it via
// Request.Messages.
History []provider.Message
// Tools snapshot for the next Request. Refreshed when the
// registry changes.
Tools []provider.ToolSchema
// Tree owns cancellation for the active turn. Child turns and
// child engines branch off this.
Tree *CancelTree
// OnTurnEnd, when non-nil, runs JUST BEFORE the engine publishes
// TurnEnded — gives the multiplex a chance to release its
// turn-busy slot so subscribers reacting to TurnEnded can
// immediately send the next input without hitting TurnInProgress.
OnTurnEnd func()
}
Engine is the agent loop bound to one EngineRun. It owns:
- the per-session event bus
- the tool dispatcher (which owns the registry reference)
- the request middleware chain wrapping Provider.Chat
- the conversation history (in-memory; persisted by session.Store via an event subscriber)
One Engine runs one session. Multiple engines coexist in one harness process; the multiplexer routes commands to the right one.
func NewEngine ¶
func NewEngine(session ids.SessionID, bus *Bus, p provider.Provider, model string, d *Dispatcher) *Engine
NewEngine constructs an Engine. Caller is responsible for wiring Provider, Model, Dispatcher, and Middleware before calling Run.
func (*Engine) RunTurn ¶
func (e *Engine) RunTurn(ctx context.Context, originator ids.ClientID, input []control.ContentBlock) error
RunTurn processes a single turn:
- Append the input as a user message.
- Build a Request and pass through the request middleware chain.
- Send to Provider.Chat; collect the stream into a summary.
- If the model emitted tool_use blocks, dispatch each through the tool middleware chain and append the results as a tool_result-bearing user message. Loop back to step 2.
- Otherwise yield with TurnEnded.
originator is the Client that sent the SendInput.
Cancellation: ctx (typically derived from e.Tree.Context()) is honored at every await point; partial state is emitted as best effort.
func (*Engine) Spawn ¶
Spawn runs a sub-agent inside this engine's environment: same Provider, Model, Tools, and Dispatcher, but a FRESH conversation history. Used by the Agent tool. systemHint is prepended as an extra system message on the sub-conversation only; userPrompt is the initial user turn. Returns the final assistant text after the sub-loop terminates (or empty string on error).
Sub-agent events flow on the SAME bus so all attached clients can observe what the sub-agent does in real time. The parent and child turn numbers share the same monotonic counter; this is acceptable because a sub-agent is conceptually one large tool call.
type PermissionAnswer ¶
type PermissionAnswer struct {
CallID ids.CallID
Allow bool
Scope control.PermitScope
Source ids.ClientID
}
PermissionMiddleware returns a tool.Middleware that consults the permission engine before invoking the wrapped tool. The middleware:
- Computes the argv summary for the call (FormatArgvSummary).
- Asks the permission engine for a Decision.
- On DecisionAllow, calls next.
- On DecisionDeny, returns an error result.
- On DecisionAsk, publishes a PermissionRequested event and blocks waiting for an AnswerPermission via the answer channel (typically wired by the multiplexer). Times out per PermissionTimeout; on timeout, denies the call.
Per hard rule 11, the multiplexer enforces "agents cannot self-approve their own turn" before delivering an answer to this middleware. This middleware does not re-check identity.
PermissionTimeout is the maximum time to wait for a human ack before denying. The doc commits to 60s as default.
type RequestHandler ¶
type RequestHandler func(ctx context.Context, req *provider.Request) (<-chan provider.StreamEvent, error)
RequestHandler is the leaf in the request middleware chain — it invokes the provider's Chat method and returns the streaming channel.
func ChainRequest ¶
func ChainRequest(base RequestHandler, ms ...RequestMiddleware) RequestHandler
ChainRequest composes a RequestHandler from a base handler and a sequence of middleware. First middleware listed is outermost.
type RequestMiddleware ¶
type RequestMiddleware func(ctx context.Context, req *provider.Request, next RequestHandler) (<-chan provider.StreamEvent, error)
RequestMiddleware wraps a RequestHandler. Used for AGENTS.md injection, skill activation, memory selection, history compaction, cost-budget enforcement, provider routing, etc.
func ContextInjectionMiddleware ¶
func ContextInjectionMiddleware(inject ContextInjector) RequestMiddleware
func CostBudgetMiddleware ¶
func CostBudgetMiddleware(tracker CostTracker, session ids.SessionID, capUSD float64, bus *Bus, originator ids.ClientID) RequestMiddleware
func SystemPromptMiddleware ¶
func SystemPromptMiddleware(header string) RequestMiddleware
SystemPromptMiddleware prepends the given header to every request's System string. Used by the profile loader to inject the profile's prompt_header.
type SimpleCostTracker ¶
type SimpleCostTracker struct {
// contains filtered or unexported fields
}
SimpleCostTracker is an in-memory CostTracker subscribed to the per-session bus. Aggregates CostIncremented events.
func NewSimpleCostTracker ¶
func NewSimpleCostTracker() *SimpleCostTracker
type StreamSummary ¶
type StreamSummary struct {
Text string // concatenated TextDelta payloads
Thinking []json.RawMessage // provider-stamped thinking blocks
ToolUses []control.ToolUse // any tool_use blocks the model emitted
Usage provider.Usage // final accounting at message_stop
FinishReason string // raw provider finish_reason
}
StreamCollector consumes a Provider's StreamEvent channel, publishes canonical control.Events to the per-session bus, and returns a summary describing what the model emitted in the turn.
The summary feeds the loop's "decide loop-or-yield" step:
- If ToolUses is non-empty, the loop dispatches them and feeds ToolResults back as input.
- Otherwise the loop yields (text-only response) unless FinishReason is "length" / "error", in which case the loop surfaces an Error event and yields.
func CollectStream ¶
func CollectStream( ctx context.Context, bus *Bus, originator ids.ClientID, stream <-chan provider.StreamEvent, ) (StreamSummary, error)
CollectStream pumps events from a Provider stream channel into the per-session Bus and returns the summary when the channel closes (or ctx is done).