Documentation
¶
Overview ¶
Package pulse exposes a stream.Sink implementation that publishes runtime events to goa.design/pulse streams. It mirrors the layering used by existing Pulse deployments: services build a Redis client, pass it to the Pulse client, and hand the resulting sink to the runtime.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Envelope ¶
type Envelope struct {
// Type identifies the event kind (e.g., "tool_end", "assistant_reply").
Type string `json:"type"`
// EventKey is the stable logical identity propagated from the originating
// hook event when one exists.
EventKey string `json:"event_key,omitempty"`
// RunID links the event to a specific workflow execution.
RunID string `json:"run_id"`
// SessionID links the event to the logical session that owns the run.
SessionID string `json:"session_id,omitempty"`
// Timestamp records when the event was published (UTC).
Timestamp time.Time `json:"timestamp"`
// Payload contains the event-specific data, if any.
Payload any `json:"payload,omitempty"`
// ServerData carries server-only metadata for events that support it
// (currently `tool_end`). It is never forwarded to model providers, but
// downstream subscribers (e.g., persistence drains) may consume it.
ServerData rawjson.Message `json:"server_data,omitempty"`
}
Envelope wraps runtime events for transmission over Pulse streams. It adds metadata and serializes the event content as JSON.
Envelope is part of the sink's public configuration surface to support callers that need to customize JSON serialization (e.g., for tests or transport interop). The sink always publishes an envelope as the value stored in Pulse; only the marshaler is customizable.
type EnvelopeDecoder ¶
EnvelopeDecoder converts raw payloads read from Pulse into runtime stream events. Custom decoders can be provided to handle non-standard envelope formats.
type Options ¶
type Options struct {
// Client is the Pulse client used to publish events. Required.
Client pulse.Client
// StreamID derives the target Pulse stream from an event. Defaults to
// `session/<SessionID>`.
StreamID func(stream.Event) (string, error)
// MarshalEnvelope allows overriding the envelope serialization (primarily for tests).
MarshalEnvelope func(Envelope) ([]byte, error)
// OnPublished, when set, is invoked after an event has been successfully
// written to the underlying Pulse stream. If it returns an error, Send
// fails and callers should treat the event as not fully emitted.
OnPublished func(context.Context, PublishedEvent) error
}
Options configures the Pulse sink.
type PublishedEvent ¶
PublishedEvent describes a runtime event that has been successfully written to a Pulse stream. It carries the original event together with the concrete stream name and the Redis-assigned entry ID.
type RuntimeStreams ¶
type RuntimeStreams struct {
// contains filtered or unexported fields
}
RuntimeStreams wires a caller-provided Pulse client into loom-mcp's runtime. It owns a publishing sink (used by runtime.Options.Stream) and can spawn subscribers that reuse the same client so services do not need to manage multiple Pulse connections.
func NewRuntimeStreams ¶
func NewRuntimeStreams(opts RuntimeStreamsOptions) (*RuntimeStreams, error)
NewRuntimeStreams constructs helpers for publishing runtime hook events to Pulse and subscribing to the resulting streams. Callers pass the returned sink to runtime.Options.Stream and keep the helper around to create subscribers (e.g., SSE fan-out) later on.
func (*RuntimeStreams) Close ¶
func (r *RuntimeStreams) Close(ctx context.Context) error
Close shuts down the publishing sink (and therefore the underlying Pulse client). Call this during service shutdown after all subscribers have been canceled.
func (*RuntimeStreams) NewSubscriber ¶
func (r *RuntimeStreams) NewSubscriber(opts SubscriberOptions) (*Subscriber, error)
NewSubscriber constructs a Pulse-backed subscriber that reuses the helper's client. This keeps stream publishing and consumption on the same Redis connection pool for efficiency.
func (*RuntimeStreams) Sink ¶
func (r *RuntimeStreams) Sink() stream.Sink
Sink exposes the publishing sink so callers can pass it to runtime.Options.
type RuntimeStreamsOptions ¶
type RuntimeStreamsOptions struct {
// Client is the Pulse client used for both publishing and subscribing. It is
// required and typically built via features/stream/pulse/clients/pulse.
Client clientspulse.Client
// Sink holds optional overrides for the publishing sink (stream ID derivation,
// marshaling). Leave zero-valued for defaults.
Sink Options
}
RuntimeStreamsOptions configures the helper returned by NewRuntimeStreams.
type Sink ¶
type Sink struct {
// contains filtered or unexported fields
}
Sink publishes runtime Event values into Pulse streams. It delegates serialization to the configured envelope marshaler. Thread-safe for concurrent Send operations.
func NewSink ¶
NewSink constructs a Pulse-backed stream sink. The Client field in opts is required; StreamID and MarshalEnvelope default to the built-in implementations if not provided.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber consumes Pulse streams and emits runtime stream events. It wraps a Pulse sink (consumer group) and decodes incoming payloads into stream.Event values.
func NewSubscriber ¶
func NewSubscriber(opts SubscriberOptions) (*Subscriber, error)
NewSubscriber constructs a Pulse-backed subscriber. The Client field in opts is required; SinkName, Buffer, and Decoder default to sensible values if not provided (see SubscriberOptions field documentation).
func (*Subscriber) Subscribe ¶
func (s *Subscriber) Subscribe( ctx context.Context, streamID string, opts ...streamopts.Sink, ) (<-chan stream.Event, <-chan error, context.CancelFunc, error)
Subscribe opens a Pulse sink on the given stream ID and returns channels for events and errors. It spawns a goroutine that consumes from the sink, decodes payloads, and emits stream events. The returned cancel function stops consumption, closes the sink, and closes both channels.
Usage:
events, errs, cancel, err := sub.Subscribe(ctx, "session/abc123")
defer cancel()
for evt := range events {
// process event
}
type SubscriberOptions ¶
type SubscriberOptions struct {
// Client is the Pulse client used to consume events. Required.
Client clientspulse.Client
// SinkName identifies the Pulse consumer group. Defaults to "loom_mcp_subscriber".
SinkName string
// Buffer specifies the event channel capacity. Defaults to 64.
Buffer int
// Decoder deserializes event payloads. Defaults to the built-in JSON decoder.
Decoder EnvelopeDecoder
}
SubscriberOptions configures a Pulse-backed subscriber.