eventstream

package
v2.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2026 License: AGPL-3.0 Imports: 7 Imported by: 0

Documentation

Overview

Package eventstream provides event streaming abstractions for server-to-client and bidirectional communication over HTTP, with implementations for SSE and WebSocket.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BidirectionalEventStream

type BidirectionalEventStream interface {
	EventStream
	// Receive returns a channel of inbound events from the client.
	Receive() <-chan *Event
}

BidirectionalEventStream extends EventStream with client-to-server receiving.

type BidirectionalEventStreamUpgrader

type BidirectionalEventStreamUpgrader interface {
	UpgradeToBidirectionalStream(w http.ResponseWriter, r *http.Request) (BidirectionalEventStream, error)
}

BidirectionalEventStreamUpgrader upgrades an HTTP connection to a BidirectionalEventStream.

type Event

type Event struct {
	Type    string          `json:"type"`
	Payload json.RawMessage `json:"payload,omitempty"`
}

Event represents a typed event with a JSON payload.

type EventStream

type EventStream interface {
	// Send pushes an event to the client.
	Send(ctx context.Context, event *Event) error
	// Done returns a channel that closes when the stream terminates.
	Done() <-chan struct{}
	// Close terminates the stream.
	Close() error
}

EventStream is a unidirectional server-to-client event stream.

type EventStreamUpgrader

type EventStreamUpgrader interface {
	UpgradeToEventStream(w http.ResponseWriter, r *http.Request) (EventStream, error)
}

EventStreamUpgrader upgrades an HTTP connection to a unidirectional EventStream.

type StreamManager

type StreamManager[S EventStream] struct {
	// contains filtered or unexported fields
}

StreamManager manages active event streams grouped by group ID and member ID.

func NewStreamManager

func NewStreamManager[S EventStream](
	tracerProvider tracing.TracerProvider,
	logger logging.Logger,
) *StreamManager[S]

NewStreamManager creates a new StreamManager.

func (*StreamManager[S]) Add

func (m *StreamManager[S]) Add(ctx context.Context, groupID, memberID string, stream S)

Add registers a stream for a group and member.

func (*StreamManager[S]) BroadcastToGroup

func (m *StreamManager[S]) BroadcastToGroup(ctx context.Context, groupID string, event *Event)

BroadcastToGroup sends an event to all streams in a group.

func (*StreamManager[S]) BroadcastToGroupFiltered

func (m *StreamManager[S]) BroadcastToGroupFiltered(ctx context.Context, groupID string, event *Event, includeFunc func(memberID string) bool)

BroadcastToGroupFiltered sends an event to streams in a group for which includeFunc returns true.

func (*StreamManager[S]) Get

func (m *StreamManager[S]) Get(ctx context.Context, groupID, memberID string) S

Get returns a specific stream, or the zero value if not found.

func (*StreamManager[S]) GetGroupStreams

func (m *StreamManager[S]) GetGroupStreams(ctx context.Context, groupID string) []S

GetGroupStreams returns all streams for a group.

func (*StreamManager[S]) GetStreamCount

func (m *StreamManager[S]) GetStreamCount(ctx context.Context, groupID string) int

GetStreamCount returns the number of streams for a group.

func (*StreamManager[S]) GroupHasStreams

func (m *StreamManager[S]) GroupHasStreams(ctx context.Context, groupID string) bool

GroupHasStreams returns whether a group has any active streams.

func (*StreamManager[S]) Remove

func (m *StreamManager[S]) Remove(ctx context.Context, groupID, memberID string)

Remove removes a stream.

func (*StreamManager[S]) SendToMember

func (m *StreamManager[S]) SendToMember(ctx context.Context, groupID, memberID string, event *Event) error

SendToMember sends an event to a specific member in a group.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL