Documentation
¶
Overview ¶
Package events provides the process-local runtime event bus used to observe PicoClaw components without coupling them to agent-specific event envelopes.
Index ¶
- Variables
- type BackpressurePolicy
- type Bus
- type ConcurrencyKind
- type Correlation
- type Event
- type EventBus
- type EventChannel
- type Filter
- type Handler
- type Kind
- type PanicPolicy
- type PublishResult
- type Scope
- type ScopeFilter
- type Severity
- type Source
- type Stats
- type SubscribeOptions
- type SubscriberStats
- type Subscription
Constants ¶
This section is empty.
Variables ¶
var ( // ErrBusClosed is returned when subscribing to a closed event bus. ErrBusClosed = errors.New("events: bus is closed") // ErrNilHandler is returned when subscribing without a handler. ErrNilHandler = errors.New("events: handler is nil") )
Functions ¶
This section is empty.
Types ¶
type BackpressurePolicy ¶
type BackpressurePolicy string
BackpressurePolicy controls delivery when a subscription queue is full.
const ( // DropNewest drops the event being published when the queue is full. DropNewest BackpressurePolicy = "drop_newest" // DropOldest drops one queued event and enqueues the event being published. DropOldest BackpressurePolicy = "drop_oldest" // Block waits for queue capacity until Publish's context is canceled. Block BackpressurePolicy = "block" )
type Bus ¶
type Bus interface {
Publish(ctx context.Context, evt Event) PublishResult
PublishNonBlocking(evt Event) PublishResult
Channel() EventChannel
Close() error
Stats() Stats
}
Bus publishes runtime events and creates filtered channels.
type ConcurrencyKind ¶
type ConcurrencyKind string
ConcurrencyKind controls how handler subscriptions process queued events.
const ( // Concurrent processes each event in its own goroutine. Concurrent ConcurrencyKind = "concurrent" // Locked processes events sequentially in subscription order. Locked ConcurrencyKind = "locked" // Keyed is reserved for keyed sequential processing and currently behaves as Locked. Keyed ConcurrencyKind = "keyed" )
type Correlation ¶
type Correlation struct {
TraceID string `json:"trace_id,omitempty"`
ParentTurnID string `json:"parent_turn_id,omitempty"`
RequestID string `json:"request_id,omitempty"`
ReplyToID string `json:"reply_to_id,omitempty"`
}
Correlation carries cross-event tracing fields.
type Event ¶
type Event struct {
ID string `json:"id"`
Kind Kind `json:"kind"`
Time time.Time `json:"time"`
Source Source `json:"source"`
Scope Scope `json:"scope,omitempty"`
Correlation Correlation `json:"correlation,omitempty"`
Severity Severity `json:"severity,omitempty"`
Payload any `json:"payload,omitempty"`
Attrs map[string]any `json:"attrs,omitempty"`
}
Event is the runtime event envelope shared across PicoClaw components.
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus is an in-process runtime event broadcaster.
func (*EventBus) Channel ¶
func (b *EventBus) Channel() EventChannel
Channel returns the root event channel for this bus.
func (*EventBus) Publish ¶
func (b *EventBus) Publish(ctx context.Context, evt Event) PublishResult
Publish broadcasts evt to subscriptions whose filters match it.
func (*EventBus) PublishNonBlocking ¶
func (b *EventBus) PublishNonBlocking(evt Event) PublishResult
PublishNonBlocking broadcasts evt without waiting for subscriber queue capacity.
type EventChannel ¶
type EventChannel interface {
Filter(filter Filter) EventChannel
OfKind(kinds ...Kind) EventChannel
KindPrefix(prefix string) EventChannel
Source(component string, names ...string) EventChannel
Scope(scope ScopeFilter) EventChannel
Subscribe(ctx context.Context, opts SubscribeOptions, handler Handler) (Subscription, error)
SubscribeChan(ctx context.Context, opts SubscribeOptions) (Subscription, <-chan Event, error)
SubscribeOnce(ctx context.Context, opts SubscribeOptions, handler Handler) (Subscription, error)
}
EventChannel is a filtered view over an EventBus.
type Filter ¶
Filter decides whether an event should pass through an EventChannel.
func MatchKindPrefix ¶
MatchKindPrefix matches events whose kind starts with prefix.
func MatchScope ¶
func MatchScope(scope ScopeFilter) Filter
MatchScope matches events whose Scope contains all non-empty filter fields.
func MatchSource ¶
MatchSource matches events emitted by component and, optionally, one of names.
type Kind ¶
type Kind string
Kind identifies a runtime event category.
const ( // KindAgentTurnStart is emitted when an agent turn starts. KindAgentTurnStart Kind = "agent.turn.start" // KindAgentTurnEnd is emitted when an agent turn ends. KindAgentTurnEnd Kind = "agent.turn.end" // KindAgentLLMRequest is emitted before an LLM request. KindAgentLLMRequest Kind = "agent.llm.request" // KindAgentLLMDelta is emitted for streaming LLM deltas. KindAgentLLMDelta Kind = "agent.llm.delta" // KindAgentLLMResponse is emitted after an LLM response. KindAgentLLMResponse Kind = "agent.llm.response" // KindAgentLLMRetry is emitted before retrying an LLM request. KindAgentLLMRetry Kind = "agent.llm.retry" // KindAgentContextCompress is emitted when agent context is compressed. KindAgentContextCompress Kind = "agent.context.compress" // KindAgentSessionSummarize is emitted when session summarization completes. KindAgentSessionSummarize Kind = "agent.session.summarize" // KindAgentToolExecStart is emitted before a tool executes. KindAgentToolExecStart Kind = "agent.tool.exec_start" // KindAgentToolExecEnd is emitted after a tool finishes. KindAgentToolExecEnd Kind = "agent.tool.exec_end" // KindAgentToolExecSkipped is emitted when a tool call is skipped. KindAgentToolExecSkipped Kind = "agent.tool.exec_skipped" // KindAgentSteeringInjected is emitted when steering is injected into context. KindAgentSteeringInjected Kind = "agent.steering.injected" // KindAgentFollowUpQueued is emitted when async follow-up input is queued. KindAgentFollowUpQueued Kind = "agent.follow_up.queued" // KindAgentInterruptReceived is emitted when a turn interrupt is accepted. KindAgentInterruptReceived Kind = "agent.interrupt.received" // KindAgentSubTurnSpawn is emitted when a sub-turn is spawned. KindAgentSubTurnSpawn Kind = "agent.subturn.spawn" // KindAgentSubTurnEnd is emitted when a sub-turn ends. KindAgentSubTurnEnd Kind = "agent.subturn.end" // KindAgentSubTurnResultDelivered is emitted when a sub-turn result is delivered. KindAgentSubTurnResultDelivered Kind = "agent.subturn.result_delivered" // KindAgentSubTurnOrphan is emitted when a sub-turn result cannot be delivered. KindAgentSubTurnOrphan Kind = "agent.subturn.orphan" // KindAgentError is emitted when agent execution reports an error. KindAgentError Kind = "agent.error" // KindChannelLifecycleStarted is emitted when a channel starts. KindChannelLifecycleStarted Kind = "channel.lifecycle.started" // KindChannelLifecycleInitialized is emitted when a channel is initialized. KindChannelLifecycleInitialized Kind = "channel.lifecycle.initialized" // KindChannelLifecycleStartFailed is emitted when a channel fails to start. KindChannelLifecycleStartFailed Kind = "channel.lifecycle.start_failed" // KindChannelLifecycleStopped is emitted when a channel stops. KindChannelLifecycleStopped Kind = "channel.lifecycle.stopped" // KindChannelWebhookRegistered is emitted when a channel webhook is registered. KindChannelWebhookRegistered Kind = "channel.webhook.registered" // KindChannelWebhookUnregistered is emitted when a channel webhook is unregistered. KindChannelWebhookUnregistered Kind = "channel.webhook.unregistered" // KindChannelMessageOutboundQueued is emitted when an outbound message is queued. KindChannelMessageOutboundQueued Kind = "channel.message.outbound_queued" // KindChannelMessageOutboundSent is emitted when an outbound channel message is sent. KindChannelMessageOutboundSent Kind = "channel.message.outbound_sent" // KindChannelMessageOutboundFailed is emitted when an outbound channel message fails. KindChannelMessageOutboundFailed Kind = "channel.message.outbound_failed" // KindChannelRateLimited is emitted when channel rate limiting blocks delivery. KindChannelRateLimited Kind = "channel.rate_limited" // KindBusPublishFailed is emitted when message bus publish fails. KindBusPublishFailed Kind = "bus.publish.failed" // KindBusCloseStarted is emitted when message bus close starts. KindBusCloseStarted Kind = "bus.close.started" // KindBusCloseCompleted is emitted when message bus close completes. KindBusCloseCompleted Kind = "bus.close.completed" // KindBusCloseDrained is emitted when message bus close drains buffered messages. KindBusCloseDrained Kind = "bus.close.drained" // KindGatewayStart is emitted when gateway startup reaches runtime bootstrap. KindGatewayStart Kind = "gateway.start" // KindGatewayReady is emitted when gateway services are started and ready. KindGatewayReady Kind = "gateway.ready" // KindGatewayShutdown is emitted when gateway shutdown starts. KindGatewayShutdown Kind = "gateway.shutdown" // KindGatewayReloadStarted is emitted when gateway reload starts. KindGatewayReloadStarted Kind = "gateway.reload.started" // KindGatewayReloadCompleted is emitted when gateway reload completes. KindGatewayReloadCompleted Kind = "gateway.reload.completed" // KindGatewayReloadFailed is emitted when gateway reload fails. KindGatewayReloadFailed Kind = "gateway.reload.failed" // KindMCPServerConnected is emitted when an MCP server connects. KindMCPServerConnected Kind = "mcp.server.connected" // KindMCPServerConnecting is emitted before connecting to an MCP server. KindMCPServerConnecting Kind = "mcp.server.connecting" // KindMCPServerFailed is emitted when an MCP server fails. KindMCPServerFailed Kind = "mcp.server.failed" // KindMCPToolDiscovered is emitted when an MCP tool is discovered. KindMCPToolDiscovered Kind = "mcp.tool.discovered" // KindMCPToolCallStart is emitted when an MCP tool call starts. KindMCPToolCallStart Kind = "mcp.tool.call.start" // KindMCPToolCallEnd is emitted when an MCP tool call ends. KindMCPToolCallEnd Kind = "mcp.tool.call.end" )
func KnownKinds ¶
func KnownKinds() []Kind
KnownKinds returns the runtime event kinds declared by this package.
type PanicPolicy ¶
type PanicPolicy string
PanicPolicy controls handler panic behavior.
const ( // RecoverAndLog recovers handler panics and records them in subscription stats. RecoverAndLog PanicPolicy = "recover_and_log" // Crash lets handler panics propagate from the worker goroutine. Crash PanicPolicy = "crash" )
type PublishResult ¶
PublishResult reports per-publish delivery outcomes.
type Scope ¶
type Scope struct {
RuntimeID string `json:"runtime_id,omitempty"`
AgentID string `json:"agent_id,omitempty"`
SessionKey string `json:"session_key,omitempty"`
TurnID string `json:"turn_id,omitempty"`
Channel string `json:"channel,omitempty"`
Account string `json:"account,omitempty"`
ChatID string `json:"chat_id,omitempty"`
TopicID string `json:"topic_id,omitempty"`
SpaceID string `json:"space_id,omitempty"`
SpaceType string `json:"space_type,omitempty"`
ChatType string `json:"chat_type,omitempty"`
SenderID string `json:"sender_id,omitempty"`
MessageID string `json:"message_id,omitempty"`
}
Scope identifies the runtime ownership of an event.
Scope is intentionally limited to agent, session, turn, channel, chat, message, and sender identity. Tool, provider, model, and MCP details belong in Source, Payload, or Attrs.
type ScopeFilter ¶
type ScopeFilter struct {
AgentID string
SessionKey string
TurnID string
Channel string
ChatID string
MessageID string
}
ScopeFilter matches selected non-empty fields against Event.Scope.
type Severity ¶
type Severity string
Severity describes the operational severity of an event.
const ( // SeverityDebug is used for verbose diagnostic events. SeverityDebug Severity = "debug" // SeverityInfo is used for normal lifecycle and activity events. SeverityInfo Severity = "info" // SeverityWarn is used for recoverable abnormal events. SeverityWarn Severity = "warn" // SeverityError is used for failed operations and unrecoverable events. SeverityError Severity = "error" )
type Stats ¶
type Stats struct {
Published uint64
Matched uint64
Delivered uint64
Dropped uint64
Blocked uint64
Closed bool
Subscribers int
SubscriberStats []SubscriberStats
}
Stats reports aggregate EventBus counters.
type SubscribeOptions ¶
type SubscribeOptions struct {
Name string
Buffer int
Priority int
Concurrency ConcurrencyKind
Backpressure BackpressurePolicy
// Timeout bounds how long the subscription worker waits for one handler call.
// Handlers should still honor ctx cancellation; timed-out calls keep running
// until their handler returns.
Timeout time.Duration
PanicPolicy PanicPolicy
}
SubscribeOptions controls how a subscription receives events.
type SubscriberStats ¶
type SubscriberStats struct {
ID uint64
Name string
Received uint64
Handled uint64
Failed uint64
Dropped uint64
Panicked uint64
TimedOut uint64
}
SubscriberStats reports counters for one subscription.
type Subscription ¶
type Subscription interface {
ID() uint64
Name() string
Close() error
Done() <-chan struct{}
Stats() SubscriberStats
}
Subscription represents an active event subscription.