streaming

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: Apache-2.0 Imports: 10 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// ReasonClosed — the caller closed the subscription, or the bus itself
	// was closed. No events were lost from the caller's perspective.
	ReasonClosed = "closed"
	// ReasonOverflow — the subscriber's buffer filled and the bus
	// terminated the subscription to preserve publisher progress. The
	// client missed one or more events; it should reconnect and (once the
	// replay/resume story lands in Phase 2) resume from LastSeq().
	ReasonOverflow = "overflow"
)

Close reasons surfaced via Subscription.Reason() after the subscription channel closes. SSE / API handlers should read the reason after their `for ev := range sub.C()` loop exits to distinguish a clean end-of-stream from a forced termination.

Variables

View Source
var ErrClosed = errors.New("streaming bus is closed")

Functions

This section is empty.

Types

type Bus

type Bus interface {
	Publisher
	Subscriber
}

type Event

type Event struct {
	ID                        string                 `json:"id,omitempty"`
	StreamID                  string                 `json:"streamId,omitempty"`
	ConversationID            string                 `json:"conversationId,omitempty"`
	TurnID                    string                 `json:"turnId,omitempty"`
	MessageID                 string                 `json:"messageId,omitempty"`
	EventSeq                  int64                  `json:"eventSeq,omitempty"`
	AgentIDUsed               string                 `json:"agentIdUsed,omitempty"`
	AgentName                 string                 `json:"agentName,omitempty"`
	AssistantMessageID        string                 `json:"assistantMessageId,omitempty"`
	ParentMessageID           string                 `json:"parentMessageId,omitempty"`
	RequestID                 string                 `json:"requestId,omitempty"`
	ResponseID                string                 `json:"responseId,omitempty"`
	OperationID               string                 `json:"operationId,omitempty"`
	ToolCallID                string                 `json:"toolCallId,omitempty"`
	ToolMessageID             string                 `json:"toolMessageId,omitempty"`
	RequestPayloadID          string                 `json:"requestPayloadId,omitempty"`
	ResponsePayloadID         string                 `json:"responsePayloadId,omitempty"`
	ProviderRequestPayloadID  string                 `json:"providerRequestPayloadId,omitempty"`
	ProviderResponsePayloadID string                 `json:"providerResponsePayloadId,omitempty"`
	StreamPayloadID           string                 `json:"streamPayloadId,omitempty"`
	LinkedConversationID      string                 `json:"linkedConversationId,omitempty"`
	LinkedConversationAgentID string                 `json:"linkedConversationAgentId,omitempty"`
	LinkedConversationTitle   string                 `json:"linkedConversationTitle,omitempty"`
	ExecutionRole             string                 `json:"executionRole,omitempty"`
	Phase                     string                 `json:"phase,omitempty"`
	PageID                    string                 `json:"pageId,omitempty"`
	Mode                      string                 `json:"mode,omitempty"`
	Type                      EventType              `json:"type"`
	Op                        string                 `json:"op,omitempty"`
	Patch                     map[string]interface{} `json:"patch,omitempty"`
	Content                   string                 `json:"content,omitempty"`
	Narration                 string                 `json:"narration,omitempty"`
	// NarrationSource identifies the author of a narration payload.
	// Populated on narration events and on model_completed events that
	// carry an inline Narration field. Values:
	//
	//   - "model"     — emitted by the model as pre-tool-call framing
	//   - "reasoning" — model's thinking / reasoning content
	//   - "narrator"  — emitted by the async runtime narrator pipeline
	//
	// Empty when no narration is present on the event. Clients that want
	// to render reasoning differently from progress commentary (e.g. to
	// hide reasoning, tag it, or group it separately) branch on this
	// field. Tool-agnostic and event-agnostic.
	NarrationSource  string                 `json:"narrationSource,omitempty"`
	ToolName         string                 `json:"toolName,omitempty"`
	SkillName        string                 `json:"skillName,omitempty"`
	SkillExecutionID string                 `json:"skillExecutionId,omitempty"`
	Arguments        map[string]interface{} `json:"arguments,omitempty"`
	Error            string                 `json:"error,omitempty"`
	Status           string                 `json:"status,omitempty"`
	Iteration        int                    `json:"iteration,omitempty"`
	PageIndex        int                    `json:"pageIndex,omitempty"`
	PageCount        int                    `json:"pageCount,omitempty"`
	LatestPage       bool                   `json:"latestPage,omitempty"`
	// FinalResponse is DEPRECATED on streaming events. The "final
	// assistant message" concept has been removed — end-of-turn is
	// signaled by EventTypeTurnCompleted / EventTypeTurnFailed /
	// EventTypeTurnCanceled; individual messages are all equal-rank.
	// The field remains on the wire for transcript-snapshot fidelity
	// (see sdk/api.ExecutionPage.FinalResponse) but MUST NOT be set on
	// live stream emissions. New code sets it only when copying from
	// persisted page state in a transcript-refresh path.
	FinalResponse    bool                   `json:"finalResponse,omitempty"`
	Model            *EventModel            `json:"model,omitempty"`
	ToolCallsPlanned []PlannedToolCall      `json:"toolCallsPlanned,omitempty"`
	CreatedAt        time.Time              `json:"createdAt,omitempty"`
	ElicitationID    string                 `json:"elicitationId,omitempty"`
	ElicitationData  map[string]interface{} `json:"elicitationData,omitempty"`
	CallbackURL      string                 `json:"callbackUrl,omitempty"`
	ResponsePayload  map[string]interface{} `json:"responsePayload,omitempty"`
	CompletedAt      *time.Time             `json:"completedAt,omitempty"`
	StartedAt        *time.Time             `json:"startedAt,omitempty"`
	UserMessageID    string                 `json:"userMessageId,omitempty"`
	// Queue fields — present on turn_queued events.
	QueueSeq           int    `json:"queueSeq,omitempty"`
	StartedByMessageID string `json:"startedByMessageId,omitempty"`
	ModelCallID        string `json:"modelCallId,omitempty"`
	Provider           string `json:"provider,omitempty"`
	ModelName          string `json:"modelName,omitempty"`
	// Tool feed fields.
	FeedID        string      `json:"feedId,omitempty"`
	FeedTitle     string      `json:"feedTitle,omitempty"`
	FeedItemCount int         `json:"feedItemCount,omitempty"`
	FeedData      interface{} `json:"feedData,omitempty"`
	// Planner fields.
	PlannerTrigger         string `json:"plannerTrigger,omitempty"`
	PlannerStaticProfile   string `json:"plannerStaticProfile,omitempty"`
	PlannerStrategyFamily  string `json:"plannerStrategyFamily,omitempty"`
	PlannerAttempt         int    `json:"plannerAttempt,omitempty"`
	PlannerValidated       *bool  `json:"plannerValidated,omitempty"`
	PlannerSecondPolicy    string `json:"plannerSecondPolicy,omitempty"`
	PlannerOutputPayloadID string `json:"plannerOutputPayloadId,omitempty"`
	// Conversation usage summary fields.
	UsageInputTokens     int `json:"usageInputTokens,omitempty"`
	UsageOutputTokens    int `json:"usageOutputTokens,omitempty"`
	UsageEmbeddingTokens int `json:"usageEmbeddingTokens,omitempty"`
	UsageTotalTokens     int `json:"usageTotalTokens,omitempty"`
}

Event is a transport-neutral streaming event.

func FromLLMEvent

func FromLLMEvent(streamID string, in llm.StreamEvent) *Event

FromLLMEvent converts an llm stream event to a generic streaming event. When the event carries typed Kind fields, those take precedence over the legacy Response-based inference.

func (*Event) NormalizeIdentity added in v0.1.7

func (e *Event) NormalizeIdentity(fallbackConversationID, fallbackTurnID string)

NormalizeIdentity fills canonical transport identity fields without overwriting explicit values already set by the caller.

type EventModel

type EventModel struct {
	Provider string `json:"provider,omitempty"`
	Model    string `json:"model,omitempty"`
	Kind     string `json:"kind,omitempty"`
}

type EventType

type EventType string
const (
	// Stream delta types — fine-grained provider output.
	EventTypeTextDelta      EventType = "text_delta"
	EventTypeReasoningDelta EventType = "reasoning_delta"
	EventTypeToolCallDelta  EventType = "tool_call_delta"
	EventTypeError          EventType = "error"

	// Control events (patch-based updates).
	EventTypeControl EventType = "control"

	// Turn lifecycle.
	EventTypeTurnStarted   EventType = "turn_started"
	EventTypeTurnCompleted EventType = "turn_completed"
	EventTypeTurnFailed    EventType = "turn_failed"
	EventTypeTurnCanceled  EventType = "turn_canceled"
	// EventTypeTurnQueued is emitted when a new turn is enqueued behind an active turn.
	EventTypeTurnQueued EventType = "turn_queued"

	// Model lifecycle.
	EventTypeModelStarted   EventType = "model_started"
	EventTypeModelCompleted EventType = "model_completed"

	// Assistant content (aggregated).
	//
	// EventTypeNarration carries ephemeral assistant-side text — formerly
	// split conceptually between "preamble" (model-emitted, pre-tool-call)
	// and "narration" (runtime-emitted, during-tool-call). These are one
	// concept: running commentary. Wire payload fields: messageId,
	// content, and optionally toolCallId when emitted by the async
	// narrator pipeline.
	EventTypeNarration EventType = "narration"

	// EventTypeAssistant fires for a real turn message row (user or
	// assistant, NOT interim narration). One event type covers all
	// message appends — the `patch.role` field distinguishes user vs.
	// assistant. Replaces legacy final/control message events with a
	// single semantic surface.
	//
	// Contract:
	//   - Idempotent by messageId. First emission creates the client
	//     bubble; subsequent emissions update its fields.
	//   - Carries semantic fields: role (in patch), content, mode,
	//     status (in patch), sequence (in patch), createdAt. Never
	//     raw DB column diffs.
	//   - Fires only for real messages (interim=0). Interim
	//     commentary flows through `narration`.
	//   - A turn may carry any number of these; NONE is "the final".
	//     End-of-turn is signaled ONLY by EventTypeTurnCompleted /
	//     EventTypeTurnFailed / EventTypeTurnCanceled.
	//
	// The wire value is `"assistant"` for historical / path clarity
	// on the main case (assistant message append); user messages
	// carry `patch.role = "user"` on the same event type.
	EventTypeAssistant EventType = "assistant"

	// EventTypeMessageAppended is an alias of EventTypeAssistant kept
	// for in-flight session code that typed the name before the rename
	// settled. Deprecated: use EventTypeAssistant.
	EventTypeMessageAppended = EventTypeAssistant

	// Tool call lifecycle.
	EventTypeToolCallsPlanned  EventType = "tool_calls_planned"
	EventTypeToolCallStarted   EventType = "tool_call_started"
	EventTypeToolCallWaiting   EventType = "tool_call_waiting"
	EventTypeToolCallCompleted EventType = "tool_call_completed"
	EventTypeToolCallFailed    EventType = "tool_call_failed"
	EventTypeToolCallCanceled  EventType = "tool_call_canceled"

	// Stream metadata / completion.
	EventTypeItemCompleted EventType = "item_completed"
	EventTypeUsage         EventType = "usage"

	// Elicitation lifecycle.
	EventTypeElicitationRequested EventType = "elicitation_requested"
	EventTypeElicitationResolved  EventType = "elicitation_resolved"

	// Linked conversation.
	EventTypeLinkedConversationAttached EventType = "linked_conversation_attached"

	// Skill lifecycle.
	EventTypeSkillStarted         EventType = "skill_started"
	EventTypeSkillCompleted       EventType = "skill_completed"
	EventTypeSkillRegistryUpdated EventType = "skill_registry_updated"

	// Intake lifecycle (workspace-intake LLM router).
	//
	// EventTypeIntakeWorkspaceCompleted fires when the workspace-intake LLM
	// call finishes successfully and produced a usable ClassifierResult.
	// Patch payload (consistent shape regardless of action):
	//   - "action"          — "route" | "answer" | "clarify"
	//   - "selectedAgentId" — agent id (only when action="route")
	//   - "answerLen"       — length of the answer text (only when action="answer")
	//   - "questionLen"     — length of the clarification question (only when action="clarify")
	//   - "durationMs"      — wall-clock duration of the LLM call
	//   - "model"           — model id used
	//   - "source"          — always "workspace"
	//
	// EventTypeIntakeWorkspaceFailed fires when the workspace-intake LLM
	// call errors, times out, or returns unparseable output. Patch payload:
	//   - "reason"          — short failure reason (e.g. "llm_error", "parse_error")
	//   - "fallbackAgentId" — agent id the runtime fell back to (when applicable)
	//   - "model"           — model id attempted
	//   - "errMessage"      — trimmed error message (when present)
	EventTypeIntakeWorkspaceCompleted EventType = "intake.workspace.completed"
	EventTypeIntakeWorkspaceFailed    EventType = "intake.workspace.failed"

	// Planner lifecycle.
	EventTypePlannerSelected  EventType = "planner.selected"
	EventTypePlannerOutput    EventType = "planner.output"
	EventTypePlannerValidated EventType = "planner.validated"
	EventTypePlannerFailed    EventType = "planner.failed"

	// Tool feed lifecycle.
	EventTypeToolFeedActive   EventType = "tool_feed_active"
	EventTypeToolFeedInactive EventType = "tool_feed_inactive"

	// Conversation metadata.
	// Emitted whenever server-side code changes conversation-level fields
	// (title, summary, agentId, …) so clients can update sidebar/header state
	// without polling. Patch contains only the changed fields.
	EventTypeConversationMetaUpdated EventType = "conversation_meta_updated"

	// Stream terminated by the bus because the subscriber's buffer filled
	// up. This is a control event injected by SSE handlers (not a regular
	// bus event) to signal clients that one or more events may have been
	// missed and they should reconnect. The event's EventSeq carries the
	// last successfully delivered sequence so clients know where to
	// resume from (once Phase 2 resume support lands).
	EventTypeStreamOverflow EventType = "stream_overflow"
)

type Filter

type Filter func(*Event) bool

type MemoryBus

type MemoryBus struct {
	// contains filtered or unexported fields
}

func NewMemoryBus

func NewMemoryBus(buffer int) *MemoryBus

func (*MemoryBus) Close

func (b *MemoryBus) Close() error

func (*MemoryBus) Publish

func (b *MemoryBus) Publish(ctx context.Context, event *Event) error

func (*MemoryBus) Subscribe

func (b *MemoryBus) Subscribe(_ context.Context, filter Filter) (Subscription, error)

Subscribe creates a subscription with the bus default buffer size. Prefer SubscribeOpts for per-subscriber tuning.

func (*MemoryBus) SubscribeOpts added in v0.1.8

func (b *MemoryBus) SubscribeOpts(_ context.Context, opts ...SubscribeOption) (Subscription, error)

SubscribeOpts creates a subscription with optional per-subscriber overrides (buffer size, filter). Back-pressure tolerance is a consumer concern, so UI clients that fall behind during heavy deltas can request a larger buffer via WithBuffer(N).

type PlannedToolCall

type PlannedToolCall struct {
	ToolCallID string `json:"toolCallId,omitempty"`
	ToolName   string `json:"toolName,omitempty"`
}

type Publisher

type Publisher interface {
	Publish(ctx context.Context, event *Event) error
}

type SubscribeOption added in v0.1.8

type SubscribeOption func(*subscribeConfig)

SubscribeOption configures a new subscription.

func WithBuffer added in v0.1.8

func WithBuffer(n int) SubscribeOption

WithBuffer sets the subscriber-side event buffer. Values <= 0 fall back to the bus default. UI clients that tolerate lag can pick a deeper buffer to avoid overflow-driven disconnects; fast in-process consumers can stay with the default.

func WithFilter added in v0.1.8

func WithFilter(f Filter) SubscribeOption

WithFilter installs an event filter on the subscription.

type Subscriber

type Subscriber interface {
	Subscribe(ctx context.Context, filter Filter) (Subscription, error)
}

type Subscription

type Subscription interface {
	ID() string
	C() <-chan *Event
	Close() error
	// Reason returns why the subscription channel closed. Empty while the
	// subscription is still live. One of: ReasonClosed (normal close) or
	// ReasonOverflow (buffer filled and bus dropped the subscription).
	// Consumers should check Reason() after their `range C()` loop exits
	// to distinguish a clean end-of-stream from a forced disconnect.
	Reason() string
	// LastSeq returns the highest EventSeq delivered on this subscription.
	// After an overflow close a client can reconnect and resume from this
	// sequence (Phase 2 of the streaming backpressure work).
	LastSeq() int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL