Documentation
¶
Index ¶
Constants ¶
const ( // ReasonClosed — the caller closed the subscription, or the bus itself // was closed. No events were lost from the caller's perspective. ReasonClosed = "closed" // ReasonOverflow — the subscriber's buffer filled and the bus // terminated the subscription to preserve publisher progress. The // client missed one or more events; it should reconnect and (once the // replay/resume story lands in Phase 2) resume from LastSeq(). ReasonOverflow = "overflow" )
Close reasons surfaced via Subscription.Reason() after the subscription channel closes. SSE / API handlers should read the reason after their `for ev := range sub.C()` loop exits to distinguish a clean end-of-stream from a forced termination.
Variables ¶
var ErrClosed = errors.New("streaming bus is closed")
Functions ¶
This section is empty.
Types ¶
type Bus ¶
type Bus interface {
Publisher
Subscriber
}
type Event ¶
type Event struct {
ID string `json:"id,omitempty"`
StreamID string `json:"streamId,omitempty"`
ConversationID string `json:"conversationId,omitempty"`
TurnID string `json:"turnId,omitempty"`
MessageID string `json:"messageId,omitempty"`
EventSeq int64 `json:"eventSeq,omitempty"`
AgentIDUsed string `json:"agentIdUsed,omitempty"`
AgentName string `json:"agentName,omitempty"`
AssistantMessageID string `json:"assistantMessageId,omitempty"`
ParentMessageID string `json:"parentMessageId,omitempty"`
RequestID string `json:"requestId,omitempty"`
ResponseID string `json:"responseId,omitempty"`
OperationID string `json:"operationId,omitempty"`
ToolCallID string `json:"toolCallId,omitempty"`
ToolMessageID string `json:"toolMessageId,omitempty"`
RequestPayloadID string `json:"requestPayloadId,omitempty"`
ResponsePayloadID string `json:"responsePayloadId,omitempty"`
ProviderRequestPayloadID string `json:"providerRequestPayloadId,omitempty"`
ProviderResponsePayloadID string `json:"providerResponsePayloadId,omitempty"`
StreamPayloadID string `json:"streamPayloadId,omitempty"`
LinkedConversationID string `json:"linkedConversationId,omitempty"`
LinkedConversationAgentID string `json:"linkedConversationAgentId,omitempty"`
LinkedConversationTitle string `json:"linkedConversationTitle,omitempty"`
ExecutionRole string `json:"executionRole,omitempty"`
Phase string `json:"phase,omitempty"`
PageID string `json:"pageId,omitempty"`
Mode string `json:"mode,omitempty"`
Type EventType `json:"type"`
Op string `json:"op,omitempty"`
Patch map[string]interface{} `json:"patch,omitempty"`
Content string `json:"content,omitempty"`
Narration string `json:"narration,omitempty"`
// NarrationSource identifies the author of a narration payload.
// Populated on narration events and on model_completed events that
// carry an inline Narration field. Values:
//
// - "model" — emitted by the model as pre-tool-call framing
// - "reasoning" — model's thinking / reasoning content
// - "narrator" — emitted by the async runtime narrator pipeline
//
// Empty when no narration is present on the event. Clients that want
// to render reasoning differently from progress commentary (e.g. to
// hide reasoning, tag it, or group it separately) branch on this
// field. Tool-agnostic and event-agnostic.
NarrationSource string `json:"narrationSource,omitempty"`
ToolName string `json:"toolName,omitempty"`
SkillName string `json:"skillName,omitempty"`
SkillExecutionID string `json:"skillExecutionId,omitempty"`
Arguments map[string]interface{} `json:"arguments,omitempty"`
Error string `json:"error,omitempty"`
Status string `json:"status,omitempty"`
Iteration int `json:"iteration,omitempty"`
PageIndex int `json:"pageIndex,omitempty"`
PageCount int `json:"pageCount,omitempty"`
LatestPage bool `json:"latestPage,omitempty"`
// FinalResponse is DEPRECATED on streaming events. The "final
// assistant message" concept has been removed — end-of-turn is
// signaled by EventTypeTurnCompleted / EventTypeTurnFailed /
// EventTypeTurnCanceled; individual messages are all equal-rank.
// The field remains on the wire for transcript-snapshot fidelity
// (see sdk/api.ExecutionPage.FinalResponse) but MUST NOT be set on
// live stream emissions. New code sets it only when copying from
// persisted page state in a transcript-refresh path.
FinalResponse bool `json:"finalResponse,omitempty"`
Model *EventModel `json:"model,omitempty"`
ToolCallsPlanned []PlannedToolCall `json:"toolCallsPlanned,omitempty"`
CreatedAt time.Time `json:"createdAt,omitempty"`
ElicitationID string `json:"elicitationId,omitempty"`
ElicitationData map[string]interface{} `json:"elicitationData,omitempty"`
CallbackURL string `json:"callbackUrl,omitempty"`
ResponsePayload map[string]interface{} `json:"responsePayload,omitempty"`
CompletedAt *time.Time `json:"completedAt,omitempty"`
StartedAt *time.Time `json:"startedAt,omitempty"`
UserMessageID string `json:"userMessageId,omitempty"`
// Queue fields — present on turn_queued events.
QueueSeq int `json:"queueSeq,omitempty"`
StartedByMessageID string `json:"startedByMessageId,omitempty"`
ModelCallID string `json:"modelCallId,omitempty"`
Provider string `json:"provider,omitempty"`
ModelName string `json:"modelName,omitempty"`
// Tool feed fields.
FeedID string `json:"feedId,omitempty"`
FeedTitle string `json:"feedTitle,omitempty"`
FeedItemCount int `json:"feedItemCount,omitempty"`
FeedData interface{} `json:"feedData,omitempty"`
// Planner fields.
PlannerTrigger string `json:"plannerTrigger,omitempty"`
PlannerStaticProfile string `json:"plannerStaticProfile,omitempty"`
PlannerStrategyFamily string `json:"plannerStrategyFamily,omitempty"`
PlannerAttempt int `json:"plannerAttempt,omitempty"`
PlannerValidated *bool `json:"plannerValidated,omitempty"`
PlannerSecondPolicy string `json:"plannerSecondPolicy,omitempty"`
PlannerOutputPayloadID string `json:"plannerOutputPayloadId,omitempty"`
// Conversation usage summary fields.
UsageInputTokens int `json:"usageInputTokens,omitempty"`
UsageOutputTokens int `json:"usageOutputTokens,omitempty"`
UsageEmbeddingTokens int `json:"usageEmbeddingTokens,omitempty"`
UsageTotalTokens int `json:"usageTotalTokens,omitempty"`
}
Event is a transport-neutral streaming event.
func FromLLMEvent ¶
func FromLLMEvent(streamID string, in llm.StreamEvent) *Event
FromLLMEvent converts an llm stream event to a generic streaming event. When the event carries typed Kind fields, those take precedence over the legacy Response-based inference.
func (*Event) NormalizeIdentity ¶ added in v0.1.7
NormalizeIdentity fills canonical transport identity fields without overwriting explicit values already set by the caller.
type EventModel ¶
type EventType ¶
type EventType string
const ( // Stream delta types — fine-grained provider output. EventTypeTextDelta EventType = "text_delta" EventTypeReasoningDelta EventType = "reasoning_delta" EventTypeToolCallDelta EventType = "tool_call_delta" EventTypeError EventType = "error" // Control events (patch-based updates). EventTypeControl EventType = "control" // Turn lifecycle. EventTypeTurnStarted EventType = "turn_started" EventTypeTurnCompleted EventType = "turn_completed" EventTypeTurnFailed EventType = "turn_failed" EventTypeTurnCanceled EventType = "turn_canceled" // EventTypeTurnQueued is emitted when a new turn is enqueued behind an active turn. EventTypeTurnQueued EventType = "turn_queued" // Model lifecycle. EventTypeModelStarted EventType = "model_started" EventTypeModelCompleted EventType = "model_completed" // Assistant content (aggregated). // // EventTypeNarration carries ephemeral assistant-side text — formerly // split conceptually between "preamble" (model-emitted, pre-tool-call) // and "narration" (runtime-emitted, during-tool-call). These are one // concept: running commentary. Wire payload fields: messageId, // content, and optionally toolCallId when emitted by the async // narrator pipeline. EventTypeNarration EventType = "narration" // EventTypeAssistant fires for a real turn message row (user or // assistant, NOT interim narration). One event type covers all // message appends — the `patch.role` field distinguishes user vs. // assistant. Replaces legacy final/control message events with a // single semantic surface. // // Contract: // - Idempotent by messageId. First emission creates the client // bubble; subsequent emissions update its fields. // - Carries semantic fields: role (in patch), content, mode, // status (in patch), sequence (in patch), createdAt. Never // raw DB column diffs. // - Fires only for real messages (interim=0). Interim // commentary flows through `narration`. // - A turn may carry any number of these; NONE is "the final". // End-of-turn is signaled ONLY by EventTypeTurnCompleted / // EventTypeTurnFailed / EventTypeTurnCanceled. // // The wire value is `"assistant"` for historical / path clarity // on the main case (assistant message append); user messages // carry `patch.role = "user"` on the same event type. EventTypeAssistant EventType = "assistant" // EventTypeMessageAppended is an alias of EventTypeAssistant kept // for in-flight session code that typed the name before the rename // settled. Deprecated: use EventTypeAssistant. EventTypeMessageAppended = EventTypeAssistant // Tool call lifecycle. EventTypeToolCallsPlanned EventType = "tool_calls_planned" EventTypeToolCallStarted EventType = "tool_call_started" EventTypeToolCallWaiting EventType = "tool_call_waiting" EventTypeToolCallCompleted EventType = "tool_call_completed" EventTypeToolCallFailed EventType = "tool_call_failed" EventTypeToolCallCanceled EventType = "tool_call_canceled" // Stream metadata / completion. EventTypeItemCompleted EventType = "item_completed" EventTypeUsage EventType = "usage" // Elicitation lifecycle. EventTypeElicitationRequested EventType = "elicitation_requested" EventTypeElicitationResolved EventType = "elicitation_resolved" // Linked conversation. EventTypeLinkedConversationAttached EventType = "linked_conversation_attached" // Skill lifecycle. EventTypeSkillStarted EventType = "skill_started" EventTypeSkillCompleted EventType = "skill_completed" EventTypeSkillRegistryUpdated EventType = "skill_registry_updated" // Intake lifecycle (workspace-intake LLM router). // // EventTypeIntakeWorkspaceCompleted fires when the workspace-intake LLM // call finishes successfully and produced a usable ClassifierResult. // Patch payload (consistent shape regardless of action): // - "action" — "route" | "answer" | "clarify" // - "selectedAgentId" — agent id (only when action="route") // - "answerLen" — length of the answer text (only when action="answer") // - "questionLen" — length of the clarification question (only when action="clarify") // - "durationMs" — wall-clock duration of the LLM call // - "model" — model id used // - "source" — always "workspace" // // EventTypeIntakeWorkspaceFailed fires when the workspace-intake LLM // call errors, times out, or returns unparseable output. Patch payload: // - "reason" — short failure reason (e.g. "llm_error", "parse_error") // - "fallbackAgentId" — agent id the runtime fell back to (when applicable) // - "model" — model id attempted // - "errMessage" — trimmed error message (when present) EventTypeIntakeWorkspaceCompleted EventType = "intake.workspace.completed" EventTypeIntakeWorkspaceFailed EventType = "intake.workspace.failed" // Planner lifecycle. EventTypePlannerSelected EventType = "planner.selected" EventTypePlannerOutput EventType = "planner.output" EventTypePlannerValidated EventType = "planner.validated" EventTypePlannerFailed EventType = "planner.failed" // Tool feed lifecycle. EventTypeToolFeedActive EventType = "tool_feed_active" EventTypeToolFeedInactive EventType = "tool_feed_inactive" // Conversation metadata. // Emitted whenever server-side code changes conversation-level fields // (title, summary, agentId, …) so clients can update sidebar/header state // without polling. Patch contains only the changed fields. EventTypeConversationMetaUpdated EventType = "conversation_meta_updated" // Stream terminated by the bus because the subscriber's buffer filled // up. This is a control event injected by SSE handlers (not a regular // bus event) to signal clients that one or more events may have been // missed and they should reconnect. The event's EventSeq carries the // last successfully delivered sequence so clients know where to // resume from (once Phase 2 resume support lands). EventTypeStreamOverflow EventType = "stream_overflow" )
type MemoryBus ¶
type MemoryBus struct {
// contains filtered or unexported fields
}
func NewMemoryBus ¶
func (*MemoryBus) Subscribe ¶
Subscribe creates a subscription with the bus default buffer size. Prefer SubscribeOpts for per-subscriber tuning.
func (*MemoryBus) SubscribeOpts ¶ added in v0.1.8
func (b *MemoryBus) SubscribeOpts(_ context.Context, opts ...SubscribeOption) (Subscription, error)
SubscribeOpts creates a subscription with optional per-subscriber overrides (buffer size, filter). Back-pressure tolerance is a consumer concern, so UI clients that fall behind during heavy deltas can request a larger buffer via WithBuffer(N).
type PlannedToolCall ¶
type SubscribeOption ¶ added in v0.1.8
type SubscribeOption func(*subscribeConfig)
SubscribeOption configures a new subscription.
func WithBuffer ¶ added in v0.1.8
func WithBuffer(n int) SubscribeOption
WithBuffer sets the subscriber-side event buffer. Values <= 0 fall back to the bus default. UI clients that tolerate lag can pick a deeper buffer to avoid overflow-driven disconnects; fast in-process consumers can stay with the default.
func WithFilter ¶ added in v0.1.8
func WithFilter(f Filter) SubscribeOption
WithFilter installs an event filter on the subscription.
type Subscriber ¶
type Subscriber interface {
Subscribe(ctx context.Context, filter Filter) (Subscription, error)
}
type Subscription ¶
type Subscription interface {
ID() string
C() <-chan *Event
Close() error
// Reason returns why the subscription channel closed. Empty while the
// subscription is still live. One of: ReasonClosed (normal close) or
// ReasonOverflow (buffer filled and bus dropped the subscription).
// Consumers should check Reason() after their `range C()` loop exits
// to distinguish a clean end-of-stream from a forced disconnect.
Reason() string
// LastSeq returns the highest EventSeq delivered on this subscription.
// After an overflow close a client can reconnect and resume from this
// sequence (Phase 2 of the streaming backpressure work).
LastSeq() int64
}