events

package
v0.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package events provides the process-local runtime event bus used to observe PicoClaw components without coupling them to agent-specific event envelopes.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrBusClosed is returned when subscribing to a closed event bus.
	ErrBusClosed = errors.New("events: bus is closed")
	// ErrNilHandler is returned when subscribing without a handler.
	ErrNilHandler = errors.New("events: handler is nil")
)

Functions

This section is empty.

Types

type BackpressurePolicy

type BackpressurePolicy string

BackpressurePolicy controls delivery when a subscription queue is full.

const (
	// DropNewest drops the event being published when the queue is full.
	DropNewest BackpressurePolicy = "drop_newest"
	// DropOldest drops one queued event and enqueues the event being published.
	DropOldest BackpressurePolicy = "drop_oldest"
	// Block waits for queue capacity until Publish's context is canceled.
	Block BackpressurePolicy = "block"
)

type Bus

type Bus interface {
	Publish(ctx context.Context, evt Event) PublishResult
	PublishNonBlocking(evt Event) PublishResult
	Channel() EventChannel
	Close() error
	Stats() Stats
}

Bus publishes runtime events and creates filtered channels.

type ConcurrencyKind

type ConcurrencyKind string

ConcurrencyKind controls how handler subscriptions process queued events.

const (
	// Concurrent processes each event in its own goroutine.
	Concurrent ConcurrencyKind = "concurrent"
	// Locked processes events sequentially in subscription order.
	Locked ConcurrencyKind = "locked"
	// Keyed is reserved for keyed sequential processing and currently behaves as Locked.
	Keyed ConcurrencyKind = "keyed"
)

type Correlation

type Correlation struct {
	TraceID      string `json:"trace_id,omitempty"`
	ParentTurnID string `json:"parent_turn_id,omitempty"`
	RequestID    string `json:"request_id,omitempty"`
	ReplyToID    string `json:"reply_to_id,omitempty"`
}

Correlation carries cross-event tracing fields.

type Event

type Event struct {
	ID          string         `json:"id"`
	Kind        Kind           `json:"kind"`
	Time        time.Time      `json:"time"`
	Source      Source         `json:"source"`
	Scope       Scope          `json:"scope,omitempty"`
	Correlation Correlation    `json:"correlation,omitempty"`
	Severity    Severity       `json:"severity,omitempty"`
	Payload     any            `json:"payload,omitempty"`
	Attrs       map[string]any `json:"attrs,omitempty"`
}

Event is the runtime event envelope shared across PicoClaw components.

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus is an in-process runtime event broadcaster.

func NewBus

func NewBus() *EventBus

NewBus creates an in-process runtime event bus.

func (*EventBus) Channel

func (b *EventBus) Channel() EventChannel

Channel returns the root event channel for this bus.

func (*EventBus) Close

func (b *EventBus) Close() error

Close closes the bus and all active subscriptions.

func (*EventBus) Publish

func (b *EventBus) Publish(ctx context.Context, evt Event) PublishResult

Publish broadcasts evt to subscriptions whose filters match it.

func (*EventBus) PublishNonBlocking

func (b *EventBus) PublishNonBlocking(evt Event) PublishResult

PublishNonBlocking broadcasts evt without waiting for subscriber queue capacity.

func (*EventBus) Stats

func (b *EventBus) Stats() Stats

Stats returns a snapshot of bus and subscription counters.

type EventChannel

type EventChannel interface {
	Filter(filter Filter) EventChannel
	OfKind(kinds ...Kind) EventChannel
	KindPrefix(prefix string) EventChannel
	Source(component string, names ...string) EventChannel
	Scope(scope ScopeFilter) EventChannel

	Subscribe(ctx context.Context, opts SubscribeOptions, handler Handler) (Subscription, error)
	SubscribeChan(ctx context.Context, opts SubscribeOptions) (Subscription, <-chan Event, error)
	SubscribeOnce(ctx context.Context, opts SubscribeOptions, handler Handler) (Subscription, error)
}

EventChannel is a filtered view over an EventBus.

type Filter

type Filter func(Event) bool

Filter decides whether an event should pass through an EventChannel.

func And

func And(filters ...Filter) Filter

And combines filters and short-circuits on the first non-match.

func MatchKind

func MatchKind(kinds ...Kind) Filter

MatchKind matches events whose kind is in kinds. Empty kinds match all events.

func MatchKindPrefix

func MatchKindPrefix(prefix string) Filter

MatchKindPrefix matches events whose kind starts with prefix.

func MatchScope

func MatchScope(scope ScopeFilter) Filter

MatchScope matches events whose Scope contains all non-empty filter fields.

func MatchSource

func MatchSource(component string, names ...string) Filter

MatchSource matches events emitted by component and, optionally, one of names.

func Or

func Or(filters ...Filter) Filter

Or combines filters and short-circuits on the first match.

type Handler

type Handler func(context.Context, Event) error

Handler processes a runtime event delivered to a subscription.

type Kind

type Kind string

Kind identifies a runtime event category.

const (
	// KindAgentTurnStart is emitted when an agent turn starts.
	KindAgentTurnStart Kind = "agent.turn.start"
	// KindAgentTurnEnd is emitted when an agent turn ends.
	KindAgentTurnEnd Kind = "agent.turn.end"

	// KindAgentLLMRequest is emitted before an LLM request.
	KindAgentLLMRequest Kind = "agent.llm.request"
	// KindAgentLLMDelta is emitted for streaming LLM deltas.
	KindAgentLLMDelta Kind = "agent.llm.delta"
	// KindAgentLLMResponse is emitted after an LLM response.
	KindAgentLLMResponse Kind = "agent.llm.response"
	// KindAgentLLMRetry is emitted before retrying an LLM request.
	KindAgentLLMRetry Kind = "agent.llm.retry"

	// KindAgentContextCompress is emitted when agent context is compressed.
	KindAgentContextCompress Kind = "agent.context.compress"
	// KindAgentSessionSummarize is emitted when session summarization completes.
	KindAgentSessionSummarize Kind = "agent.session.summarize"

	// KindAgentToolExecStart is emitted before a tool executes.
	KindAgentToolExecStart Kind = "agent.tool.exec_start"
	// KindAgentToolExecEnd is emitted after a tool finishes.
	KindAgentToolExecEnd Kind = "agent.tool.exec_end"
	// KindAgentToolExecSkipped is emitted when a tool call is skipped.
	KindAgentToolExecSkipped Kind = "agent.tool.exec_skipped"

	// KindAgentSteeringInjected is emitted when steering is injected into context.
	KindAgentSteeringInjected Kind = "agent.steering.injected"
	// KindAgentFollowUpQueued is emitted when async follow-up input is queued.
	KindAgentFollowUpQueued Kind = "agent.follow_up.queued"
	// KindAgentInterruptReceived is emitted when a turn interrupt is accepted.
	KindAgentInterruptReceived Kind = "agent.interrupt.received"

	// KindAgentSubTurnSpawn is emitted when a sub-turn is spawned.
	KindAgentSubTurnSpawn Kind = "agent.subturn.spawn"
	// KindAgentSubTurnEnd is emitted when a sub-turn ends.
	KindAgentSubTurnEnd Kind = "agent.subturn.end"
	// KindAgentSubTurnResultDelivered is emitted when a sub-turn result is delivered.
	KindAgentSubTurnResultDelivered Kind = "agent.subturn.result_delivered"
	// KindAgentSubTurnOrphan is emitted when a sub-turn result cannot be delivered.
	KindAgentSubTurnOrphan Kind = "agent.subturn.orphan"
	// KindAgentError is emitted when agent execution reports an error.
	KindAgentError Kind = "agent.error"

	// KindChannelLifecycleStarted is emitted when a channel starts.
	KindChannelLifecycleStarted Kind = "channel.lifecycle.started"
	// KindChannelLifecycleInitialized is emitted when a channel is initialized.
	KindChannelLifecycleInitialized Kind = "channel.lifecycle.initialized"
	// KindChannelLifecycleStartFailed is emitted when a channel fails to start.
	KindChannelLifecycleStartFailed Kind = "channel.lifecycle.start_failed"
	// KindChannelLifecycleStopped is emitted when a channel stops.
	KindChannelLifecycleStopped Kind = "channel.lifecycle.stopped"
	// KindChannelWebhookRegistered is emitted when a channel webhook is registered.
	KindChannelWebhookRegistered Kind = "channel.webhook.registered"
	// KindChannelWebhookUnregistered is emitted when a channel webhook is unregistered.
	KindChannelWebhookUnregistered Kind = "channel.webhook.unregistered"
	// KindChannelMessageOutboundQueued is emitted when an outbound message is queued.
	KindChannelMessageOutboundQueued Kind = "channel.message.outbound_queued"
	// KindChannelMessageOutboundSent is emitted when an outbound channel message is sent.
	KindChannelMessageOutboundSent Kind = "channel.message.outbound_sent"
	// KindChannelMessageOutboundFailed is emitted when an outbound channel message fails.
	KindChannelMessageOutboundFailed Kind = "channel.message.outbound_failed"
	// KindChannelRateLimited is emitted when channel rate limiting blocks delivery.
	KindChannelRateLimited Kind = "channel.rate_limited"

	// KindBusPublishFailed is emitted when message bus publish fails.
	KindBusPublishFailed Kind = "bus.publish.failed"
	// KindBusCloseStarted is emitted when message bus close starts.
	KindBusCloseStarted Kind = "bus.close.started"
	// KindBusCloseCompleted is emitted when message bus close completes.
	KindBusCloseCompleted Kind = "bus.close.completed"
	// KindBusCloseDrained is emitted when message bus close drains buffered messages.
	KindBusCloseDrained Kind = "bus.close.drained"

	// KindGatewayStart is emitted when gateway startup reaches runtime bootstrap.
	KindGatewayStart Kind = "gateway.start"
	// KindGatewayReady is emitted when gateway services are started and ready.
	KindGatewayReady Kind = "gateway.ready"
	// KindGatewayShutdown is emitted when gateway shutdown starts.
	KindGatewayShutdown Kind = "gateway.shutdown"
	// KindGatewayReloadStarted is emitted when gateway reload starts.
	KindGatewayReloadStarted Kind = "gateway.reload.started"
	// KindGatewayReloadCompleted is emitted when gateway reload completes.
	KindGatewayReloadCompleted Kind = "gateway.reload.completed"
	// KindGatewayReloadFailed is emitted when gateway reload fails.
	KindGatewayReloadFailed Kind = "gateway.reload.failed"

	// KindMCPServerConnected is emitted when an MCP server connects.
	KindMCPServerConnected Kind = "mcp.server.connected"
	// KindMCPServerConnecting is emitted before connecting to an MCP server.
	KindMCPServerConnecting Kind = "mcp.server.connecting"
	// KindMCPServerFailed is emitted when an MCP server fails.
	KindMCPServerFailed Kind = "mcp.server.failed"
	// KindMCPToolDiscovered is emitted when an MCP tool is discovered.
	KindMCPToolDiscovered Kind = "mcp.tool.discovered"
	// KindMCPToolCallStart is emitted when an MCP tool call starts.
	KindMCPToolCallStart Kind = "mcp.tool.call.start"
	// KindMCPToolCallEnd is emitted when an MCP tool call ends.
	KindMCPToolCallEnd Kind = "mcp.tool.call.end"
)

func KnownKinds

func KnownKinds() []Kind

KnownKinds returns the runtime event kinds declared by this package.

func (Kind) String

func (k Kind) String() string

String returns the string representation of the event kind.

type PanicPolicy

type PanicPolicy string

PanicPolicy controls handler panic behavior.

const (
	// RecoverAndLog recovers handler panics and records them in subscription stats.
	RecoverAndLog PanicPolicy = "recover_and_log"
	// Crash lets handler panics propagate from the worker goroutine.
	Crash PanicPolicy = "crash"
)

type PublishResult

type PublishResult struct {
	Matched   int
	Delivered int
	Dropped   int
	Blocked   int
	Closed    bool
}

PublishResult reports per-publish delivery outcomes.

type Scope

type Scope struct {
	RuntimeID string `json:"runtime_id,omitempty"`

	AgentID    string `json:"agent_id,omitempty"`
	SessionKey string `json:"session_key,omitempty"`
	TurnID     string `json:"turn_id,omitempty"`

	Channel string `json:"channel,omitempty"`
	Account string `json:"account,omitempty"`
	ChatID  string `json:"chat_id,omitempty"`
	TopicID string `json:"topic_id,omitempty"`

	SpaceID   string `json:"space_id,omitempty"`
	SpaceType string `json:"space_type,omitempty"`
	ChatType  string `json:"chat_type,omitempty"`

	SenderID  string `json:"sender_id,omitempty"`
	MessageID string `json:"message_id,omitempty"`
}

Scope identifies the runtime ownership of an event.

Scope is intentionally limited to agent, session, turn, channel, chat, message, and sender identity. Tool, provider, model, and MCP details belong in Source, Payload, or Attrs.

type ScopeFilter

type ScopeFilter struct {
	AgentID    string
	SessionKey string
	TurnID     string
	Channel    string
	ChatID     string
	MessageID  string
}

ScopeFilter matches selected non-empty fields against Event.Scope.

type Severity

type Severity string

Severity describes the operational severity of an event.

const (
	// SeverityDebug is used for verbose diagnostic events.
	SeverityDebug Severity = "debug"
	// SeverityInfo is used for normal lifecycle and activity events.
	SeverityInfo Severity = "info"
	// SeverityWarn is used for recoverable abnormal events.
	SeverityWarn Severity = "warn"
	// SeverityError is used for failed operations and unrecoverable events.
	SeverityError Severity = "error"
)

type Source

type Source struct {
	Component string `json:"component"`
	Name      string `json:"name,omitempty"`
}

Source identifies the component that emitted an event.

type Stats

type Stats struct {
	Published   uint64
	Matched     uint64
	Delivered   uint64
	Dropped     uint64
	Blocked     uint64
	Closed      bool
	Subscribers int

	SubscriberStats []SubscriberStats
}

Stats reports aggregate EventBus counters.

type SubscribeOptions

type SubscribeOptions struct {
	Name         string
	Buffer       int
	Priority     int
	Concurrency  ConcurrencyKind
	Backpressure BackpressurePolicy
	// Timeout bounds how long the subscription worker waits for one handler call.
	// Handlers should still honor ctx cancellation; timed-out calls keep running
	// until their handler returns.
	Timeout     time.Duration
	PanicPolicy PanicPolicy
}

SubscribeOptions controls how a subscription receives events.

type SubscriberStats

type SubscriberStats struct {
	ID       uint64
	Name     string
	Received uint64
	Handled  uint64
	Failed   uint64
	Dropped  uint64
	Panicked uint64
	TimedOut uint64
}

SubscriberStats reports counters for one subscription.

type Subscription

type Subscription interface {
	ID() uint64
	Name() string
	Close() error
	Done() <-chan struct{}
	Stats() SubscriberStats
}

Subscription represents an active event subscription.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL