engine

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package engine is part of the GoFastr harness.

See docs/harness-architecture.md for the architecture this package implements.

Package engine implements the agent loop and its supporting machinery: typed event bus, request/tool middleware chains, cancellation tree, and the loop itself.

See docs/harness-architecture.md § The agent loop and § Extensibility.

Index

Constants

View Source
const SubAgentMaxIterations = 8

SubAgentMaxIterations is a tighter cap for sub-agent turns (Engine.Spawn). Sub-agents are "do one focused thing" workers — they should NOT do 20 provider rounds the way a top-level coordinator turn might. 8 leaves room for: plan → 3-4 tool rounds → wrap-up. Anything more is the model getting lost.

View Source
const UntrustedContentNotice = `` /* 412-byte string literal not displayed */

UntrustedContentNotice is the standing instruction emitted alongside untrusted-content tags. The model is told to treat the contents as data, not instructions.

Variables

View Source
var ErrAnswerChannelClosed = errors.New("engine: permission answer channel closed")

ErrAnswerChannelClosed is returned when the multiplexer's answer channel closes without delivering an answer.

View Source
var ErrBudgetExceeded = errors.New("engine: cost budget exceeded")

ErrBudgetExceeded is returned when CostBudgetMiddleware aborts a request.

View Source
var ErrDispatcherNotReady = errors.New("engine: dispatcher not ready (empty registry)")

ErrDispatcherNotReady is returned when the dispatcher is invoked before its registry has any sources registered.

View Source
var ErrStreamClosed = errors.New("engine: provider stream closed unexpectedly")

ErrStreamClosed is returned by CollectStream when the provider stream closed unexpectedly.

Functions

func CapToolResultContent

func CapToolResultContent(blocks []control.ContentBlock) []control.ContentBlock

CapToolResultContent (exported for tests) enforces maxToolResultBytesPerBlock on each text block. Larger blocks are truncated with a clear suffix so the model knows content was elided.

func FormatArgvSummary

func FormatArgvSummary(toolName string, args []byte) string

FormatArgvSummary computes a short argv-style summary for a tool call, used by permission middleware to populate the PermissionRequested event and to match argv-glob rules. The form is best-effort and tool-specific:

  • For Bash, the leading shell command string.
  • For Read/Write/Edit/Glob/Ls, "<Tool>:<path>" or "<Tool>:<pattern>".
  • Otherwise the bare tool name.

func FormatMessages

func FormatMessages(h []provider.Message) string

FormatMessages is a developer helper for printing history during debugging. Not used at runtime.

func PermissionMiddleware

func PermissionMiddleware(
	bus *Bus,
	eng *permission.Engine,
	router AnswerRouter,
	session ids.SessionID,
	timeout time.Duration,
) tool.Middleware

PermissionMiddleware constructs the middleware.

session is bound at middleware-construction time (one engine = one session); originator is also fixed per-turn and supplied via context (so the multiplexer can address the prompt to the right client).

func SimpleInput

func SimpleInput(text string) []control.ContentBlock

SimpleInput converts a plain text string into a SendInput's content. Convenience for callers that don't need multi-block input.

func WithMutatingFlag

func WithMutatingFlag(ctx context.Context, mutating bool) context.Context

WithMutatingFlag stamps a context with the tool's is_mutating declaration. Used by the dispatcher before calling the middleware chain.

func WithOriginator

func WithOriginator(ctx context.Context, id ids.ClientID) context.Context

WithOriginator stamps a context with the originator ClientID. The engine loop applies this when invoking the dispatcher for a turn.

Types

type AnswerRouter

type AnswerRouter interface {
	Subscribe(session ids.SessionID, callID ids.CallID) <-chan PermissionAnswer
	Unsubscribe(session ids.SessionID, callID ids.CallID)
}

AnswerRouter is wired by the multiplexer. It returns a channel that receives AnswerPermission events for the given session, and a function that subscribes the caller for the lifetime of a single PermissionRequested. The middleware uses Subscribe to wait for a matching answer; the multiplexer narrows by CallID and identity rules before forwarding.

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is the typed event bus per SessionID. Events have monotonically increasing IDs so any transport can implement resume-from-id without transport-specific bookkeeping.

Subscribers receive a buffered channel; slow subscribers risk dropped events. The Bus does NOT persist — see session.Store for the durable log.

func NewBus

func NewBus(session ids.SessionID) *Bus

NewBus returns a new Bus for the given session.

func (*Bus) Close

func (b *Bus) Close()

Close terminates the bus and all subscriptions.

func (*Bus) NextID

func (b *Bus) NextID() uint64

NextID returns the next sequence ID that will be assigned. Useful for testing.

func (*Bus) Publish

func (b *Bus) Publish(e control.Event, originator ids.ClientID) (control.EventEnvelope, error)

Publish encodes the event into a canonical envelope, assigns the next sequence ID, and broadcasts to every active subscriber. It returns the envelope it published so callers can persist or inspect.

func (*Bus) Replay

func (b *Bus) Replay(envelopes []control.EventEnvelope, dst chan<- control.EventEnvelope)

Replay sends pre-encoded envelopes to a single subscription. Used by transports implementing resume-from-id: they fetch missing events from session.Store and replay them before the live stream resumes.

Note: Replay does NOT assign new IDs; envelopes retain their original sequence numbers from the persistent log.

func (*Bus) Session

func (b *Bus) Session() ids.SessionID

Session returns the SessionID this bus is bound to.

func (*Bus) Subscribe

func (b *Bus) Subscribe(ctx context.Context) <-chan control.EventEnvelope

Subscribe returns a receive-only channel of events. The channel is closed when ctx is cancelled or the Bus is closed.

Buffer is sized to absorb short bursts without blocking the engine. Subscribers that fall behind have their stale events dropped to preserve liveness — see § Per-transport rules → Backpressure for the policy.

type CancelTree

type CancelTree struct {
	// contains filtered or unexported fields
}

CancelTree wires cancellation hierarchically so cancelling a turn propagates to its tool calls and to any child engines it spawned (e.g., via the `delegate` tool).

The doc commits to: cancellation tree (turn → tool calls → child engines). v0.x scopes `delegate` to sync-only-blocking; the child engine has its own CancelTree rooted at the delegate's call.

func NewCancelTree

func NewCancelTree(parent context.Context) *CancelTree

NewCancelTree returns a root CancelTree bound to parent.

func (*CancelTree) Cancel

func (t *CancelTree) Cancel(cause error)

Cancel cancels this node and all descendants with the given cause.

func (*CancelTree) Cause

func (t *CancelTree) Cause() error

Cause returns the reason this node was cancelled (or nil if not).

func (*CancelTree) Child

func (t *CancelTree) Child() *CancelTree

Child returns a child node. Cancelling the parent cancels every child (transitively). Cancelling a child does not affect the parent or siblings.

func (*CancelTree) Context

func (t *CancelTree) Context() context.Context

Context returns the context tied to this node.

type ContextInjector

type ContextInjector func(ctx context.Context) []ContextSection

ContextInjectionMiddleware appends untrusted-content sections to the System prompt. Per hard rule 12, every byte from outside the trust boundary is wrapped in <untrusted-...> tags with a standing instruction not to follow instructions inside.

The injector callback returns the (section name, content) pairs to inject — empty content sections are skipped. Typical sources: AGENTS.md (wrapped as <untrusted-agents-md>), memory entries, skill metadata.

type ContextSection

type ContextSection struct {
	Name    string // becomes <untrusted-NAME>...</untrusted-NAME>; lowercase recommended
	Content string
}

type CostTracker

type CostTracker interface {
	SpentUSD(session ids.SessionID) float64
}

CostBudgetMiddleware enforces a per-session USD cap. Aborts the request before sending if the running total would exceed the cap.

Total is updated externally as CostIncremented events fire on the bus; the middleware reads from the supplied tracker.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher routes ToolCalls through the tool middleware chain to the registered tool. It also bridges the EventSink into the per-session event bus so tool progress events get broadcast.

The dispatcher is the only thing in the engine that knows about concrete tools — middleware sees the abstract Tool interface and the EventSink, never the registry.

func NewDispatcher

func NewDispatcher(bus *Bus, registry *tool.Registry, mws ...tool.Middleware) *Dispatcher

NewDispatcher builds a Dispatcher for one session.

func (*Dispatcher) Dispatch

func (d *Dispatcher) Dispatch(ctx context.Context, originator ids.ClientID, call tool.ToolCall) (*tool.ToolResult, error)

Dispatch executes a tool call through the middleware chain.

originator is the Client.ID() that triggered this turn — surfaces in PermissionRequested events so the multiplexer can address answers correctly and so identity_class self-approval guards apply.

func (*Dispatcher) Registry

func (d *Dispatcher) Registry() *tool.Registry

Registry returns the dispatcher's tool registry. Used by the engine to inject the registry into the dispatch ctx for meta tools.

type Engine

type Engine struct {
	Session  ids.SessionID
	Bus      *Bus
	Provider provider.Provider
	Model    string

	Dispatcher *Dispatcher
	Middleware []RequestMiddleware

	// History is the canonical-form message list. The loop appends
	// to this list as turns complete; middleware can read it via
	// Request.Messages.
	History []provider.Message

	// Tools snapshot for the next Request. Refreshed when the
	// registry changes.
	Tools []provider.ToolSchema

	// Tree owns cancellation for the active turn. Child turns and
	// child engines branch off this.
	Tree *CancelTree

	// OnTurnEnd, when non-nil, runs JUST BEFORE the engine publishes
	// TurnEnded — gives the multiplex a chance to release its
	// turn-busy slot so subscribers reacting to TurnEnded can
	// immediately send the next input without hitting TurnInProgress.
	OnTurnEnd func()
}

Engine is the agent loop bound to one EngineRun. It owns:

  • the per-session event bus
  • the tool dispatcher (which owns the registry reference)
  • the request middleware chain wrapping Provider.Chat
  • the conversation history (in-memory; persisted by session.Store via an event subscriber)

One Engine runs one session. Multiple engines coexist in one harness process; the multiplexer routes commands to the right one.

func NewEngine

func NewEngine(session ids.SessionID, bus *Bus, p provider.Provider, model string, d *Dispatcher) *Engine

NewEngine constructs an Engine. Caller is responsible for wiring Provider, Model, Dispatcher, and Middleware before calling Run.

func (*Engine) RunTurn

func (e *Engine) RunTurn(ctx context.Context, originator ids.ClientID, input []control.ContentBlock) error

RunTurn processes a single turn:

  1. Append the input as a user message.
  2. Build a Request and pass through the request middleware chain.
  3. Send to Provider.Chat; collect the stream into a summary.
  4. If the model emitted tool_use blocks, dispatch each through the tool middleware chain and append the results as a tool_result-bearing user message. Loop back to step 2.
  5. Otherwise yield with TurnEnded.

originator is the Client that sent the SendInput.

Cancellation: ctx (typically derived from e.Tree.Context()) is honored at every await point; partial state is emitted as best effort.

func (*Engine) Spawn

func (e *Engine) Spawn(ctx context.Context, systemHint, userPrompt string) (string, error)

Spawn runs a sub-agent inside this engine's environment: same Provider, Model, Tools, and Dispatcher, but a FRESH conversation history. Used by the Agent tool. systemHint is prepended as an extra system message on the sub-conversation only; userPrompt is the initial user turn. Returns the final assistant text after the sub-loop terminates (or empty string on error).

Sub-agent events flow on the SAME bus so all attached clients can observe what the sub-agent does in real time. The parent and child turn numbers share the same monotonic counter; this is acceptable because a sub-agent is conceptually one large tool call.

func (*Engine) String

func (e *Engine) String() string

String returns a developer-friendly representation of an Engine.

type PermissionAnswer

type PermissionAnswer struct {
	CallID ids.CallID
	Allow  bool
	Scope  control.PermitScope
	Source ids.ClientID
}

PermissionMiddleware returns a tool.Middleware that consults the permission engine before invoking the wrapped tool. The middleware:

  • Computes the argv summary for the call (FormatArgvSummary).
  • Asks the permission engine for a Decision.
  • On DecisionAllow, calls next.
  • On DecisionDeny, returns an error result.
  • On DecisionAsk, publishes a PermissionRequested event and blocks waiting for an AnswerPermission via the answer channel (typically wired by the multiplexer). Times out per PermissionTimeout; on timeout, denies the call.

Per hard rule 11, the multiplexer enforces "agents cannot self-approve their own turn" before delivering an answer to this middleware. This middleware does not re-check identity.

PermissionTimeout is the maximum time to wait for a human ack before denying. The doc commits to 60s as default.

type RequestHandler

type RequestHandler func(ctx context.Context, req *provider.Request) (<-chan provider.StreamEvent, error)

RequestHandler is the leaf in the request middleware chain — it invokes the provider's Chat method and returns the streaming channel.

func ChainRequest

func ChainRequest(base RequestHandler, ms ...RequestMiddleware) RequestHandler

ChainRequest composes a RequestHandler from a base handler and a sequence of middleware. First middleware listed is outermost.

type RequestMiddleware

type RequestMiddleware func(ctx context.Context, req *provider.Request, next RequestHandler) (<-chan provider.StreamEvent, error)

RequestMiddleware wraps a RequestHandler. Used for AGENTS.md injection, skill activation, memory selection, history compaction, cost-budget enforcement, provider routing, etc.

func ContextInjectionMiddleware

func ContextInjectionMiddleware(inject ContextInjector) RequestMiddleware

func CostBudgetMiddleware

func CostBudgetMiddleware(tracker CostTracker, session ids.SessionID, capUSD float64, bus *Bus, originator ids.ClientID) RequestMiddleware

func SystemPromptMiddleware

func SystemPromptMiddleware(header string) RequestMiddleware

SystemPromptMiddleware prepends the given header to every request's System string. Used by the profile loader to inject the profile's prompt_header.

type SimpleCostTracker

type SimpleCostTracker struct {
	// contains filtered or unexported fields
}

SimpleCostTracker is an in-memory CostTracker subscribed to the per-session bus. Aggregates CostIncremented events.

func NewSimpleCostTracker

func NewSimpleCostTracker() *SimpleCostTracker

func (*SimpleCostTracker) Add

func (c *SimpleCostTracker) Add(s ids.SessionID, usd float64)

Add accumulates additional spend for a session. Typically wired by subscribing to the bus and calling Add on every CostIncremented.

func (*SimpleCostTracker) SpentUSD

func (c *SimpleCostTracker) SpentUSD(s ids.SessionID) float64

SpentUSD returns the running USD total for a session.

type StreamSummary

type StreamSummary struct {
	Text         string            // concatenated TextDelta payloads
	Thinking     []json.RawMessage // provider-stamped thinking blocks
	ToolUses     []control.ToolUse // any tool_use blocks the model emitted
	Usage        provider.Usage    // final accounting at message_stop
	FinishReason string            // raw provider finish_reason
}

StreamCollector consumes a Provider's StreamEvent channel, publishes canonical control.Events to the per-session bus, and returns a summary describing what the model emitted in the turn.

The summary feeds the loop's "decide loop-or-yield" step:

  • If ToolUses is non-empty, the loop dispatches them and feeds ToolResults back as input.
  • Otherwise the loop yields (text-only response) unless FinishReason is "length" / "error", in which case the loop surfaces an Error event and yields.

func CollectStream

func CollectStream(
	ctx context.Context,
	bus *Bus,
	originator ids.ClientID,
	stream <-chan provider.StreamEvent,
) (StreamSummary, error)

CollectStream pumps events from a Provider stream channel into the per-session Bus and returns the summary when the channel closes (or ctx is done).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL