pulse

package
v1.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package pulse exposes a stream.Sink implementation that publishes runtime events to goa.design/pulse streams. It mirrors the layering used by existing Pulse deployments: services build a Redis client, pass it to the Pulse client, and hand the resulting sink to the runtime.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Envelope

type Envelope struct {
	// Type identifies the event kind (e.g., "tool_end", "assistant_reply").
	Type string `json:"type"`
	// EventKey is the stable logical identity propagated from the originating
	// hook event when one exists.
	EventKey string `json:"event_key,omitempty"`
	// RunID links the event to a specific workflow execution.
	RunID string `json:"run_id"`
	// SessionID links the event to the logical session that owns the run.
	SessionID string `json:"session_id,omitempty"`
	// Timestamp records when the event was published (UTC).
	Timestamp time.Time `json:"timestamp"`
	// Payload contains the event-specific data, if any.
	Payload any `json:"payload,omitempty"`
	// ServerData carries server-only metadata for events that support it
	// (currently `tool_end`). It is never forwarded to model providers, but
	// downstream subscribers (e.g., persistence drains) may consume it.
	ServerData rawjson.Message `json:"server_data,omitempty"`
}

Envelope wraps runtime events for transmission over Pulse streams. It adds metadata and serializes the event content as JSON.

Envelope is part of the sink's public configuration surface to support callers that need to customize JSON serialization (e.g., for tests or transport interop). The sink always publishes an envelope as the value stored in Pulse; only the marshaler is customizable.

type EnvelopeDecoder

type EnvelopeDecoder func([]byte) (stream.Event, error)

EnvelopeDecoder converts raw payloads read from Pulse into runtime stream events. Custom decoders can be provided to handle non-standard envelope formats.

type Options

type Options struct {
	// Client is the Pulse client used to publish events. Required.
	Client pulse.Client
	// StreamID derives the target Pulse stream from an event. Defaults to
	// `session/<SessionID>`.
	StreamID func(stream.Event) (string, error)
	// MarshalEnvelope allows overriding the envelope serialization (primarily for tests).
	MarshalEnvelope func(Envelope) ([]byte, error)
	// OnPublished, when set, is invoked after an event has been successfully
	// written to the underlying Pulse stream. If it returns an error, Send
	// fails and callers should treat the event as not fully emitted.
	OnPublished func(context.Context, PublishedEvent) error
}

Options configures the Pulse sink.

type PublishedEvent

type PublishedEvent struct {
	Event    stream.Event
	StreamID string
	EntryID  string
}

PublishedEvent describes a runtime event that has been successfully written to a Pulse stream. It carries the original event together with the concrete stream name and the Redis-assigned entry ID.

type RuntimeStreams

type RuntimeStreams struct {
	// contains filtered or unexported fields
}

RuntimeStreams wires a caller-provided Pulse client into loom-mcp's runtime. It owns a publishing sink (used by runtime.Options.Stream) and can spawn subscribers that reuse the same client so services do not need to manage multiple Pulse connections.

func NewRuntimeStreams

func NewRuntimeStreams(opts RuntimeStreamsOptions) (*RuntimeStreams, error)

NewRuntimeStreams constructs helpers for publishing runtime hook events to Pulse and subscribing to the resulting streams. Callers pass the returned sink to runtime.Options.Stream and keep the helper around to create subscribers (e.g., SSE fan-out) later on.

func (*RuntimeStreams) Close

func (r *RuntimeStreams) Close(ctx context.Context) error

Close shuts down the publishing sink (and therefore the underlying Pulse client). Call this during service shutdown after all subscribers have been canceled.

func (*RuntimeStreams) NewSubscriber

func (r *RuntimeStreams) NewSubscriber(opts SubscriberOptions) (*Subscriber, error)

NewSubscriber constructs a Pulse-backed subscriber that reuses the helper's client. This keeps stream publishing and consumption on the same Redis connection pool for efficiency.

func (*RuntimeStreams) Sink

func (r *RuntimeStreams) Sink() stream.Sink

Sink exposes the publishing sink so callers can pass it to runtime.Options.

type RuntimeStreamsOptions

type RuntimeStreamsOptions struct {
	// Client is the Pulse client used for both publishing and subscribing. It is
	// required and typically built via features/stream/pulse/clients/pulse.
	Client clientspulse.Client
	// Sink holds optional overrides for the publishing sink (stream ID derivation,
	// marshaling). Leave zero-valued for defaults.
	Sink Options
}

RuntimeStreamsOptions configures the helper returned by NewRuntimeStreams.

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

Sink publishes runtime Event values into Pulse streams. It delegates serialization to the configured envelope marshaler. Thread-safe for concurrent Send operations.

func NewSink

func NewSink(opts Options) (*Sink, error)

NewSink constructs a Pulse-backed stream sink. The Client field in opts is required; StreamID and MarshalEnvelope default to the built-in implementations if not provided.

func (*Sink) Close

func (s *Sink) Close(ctx context.Context) error

Close releases resources owned by the sink. This delegates to the underlying Pulse client, which may or may not close the Redis connection depending on the client implementation.

func (*Sink) Send

func (s *Sink) Send(ctx context.Context, event stream.Event) error

Send publishes the event to the derived Pulse stream. It derives the stream ID, wraps the event in an envelope, marshals it to JSON, and publishes it via the Pulse client. Thread-safe for concurrent calls.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber consumes Pulse streams and emits runtime stream events. It wraps a Pulse sink (consumer group) and decodes incoming payloads into stream.Event values.

func NewSubscriber

func NewSubscriber(opts SubscriberOptions) (*Subscriber, error)

NewSubscriber constructs a Pulse-backed subscriber. The Client field in opts is required; SinkName, Buffer, and Decoder default to sensible values if not provided (see SubscriberOptions field documentation).

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(
	ctx context.Context,
	streamID string,
	opts ...streamopts.Sink,
) (<-chan stream.Event, <-chan error, context.CancelFunc, error)

Subscribe opens a Pulse sink on the given stream ID and returns channels for events and errors. It spawns a goroutine that consumes from the sink, decodes payloads, and emits stream events. The returned cancel function stops consumption, closes the sink, and closes both channels.

Usage:

events, errs, cancel, err := sub.Subscribe(ctx, "session/abc123")
defer cancel()
for evt := range events {
    // process event
}

type SubscriberOptions

type SubscriberOptions struct {
	// Client is the Pulse client used to consume events. Required.
	Client clientspulse.Client
	// SinkName identifies the Pulse consumer group. Defaults to "loom_mcp_subscriber".
	SinkName string
	// Buffer specifies the event channel capacity. Defaults to 64.
	Buffer int
	// Decoder deserializes event payloads. Defaults to the built-in JSON decoder.
	Decoder EnvelopeDecoder
}

SubscriberOptions configures a Pulse-backed subscriber.

Directories

Path Synopsis
clients
pulse
Package pulse provides a thin loom-mcp specific wrapper around Pulse streams.
Package pulse provides a thin loom-mcp specific wrapper around Pulse streams.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL