Documentation
¶
Overview ¶
Package eventstream provides event streaming abstractions for server-to-client and bidirectional communication over HTTP, with implementations for SSE and WebSocket.
Index ¶
- type BidirectionalEventStream
- type BidirectionalEventStreamUpgrader
- type Event
- type EventStream
- type EventStreamUpgrader
- type StreamManager
- func (m *StreamManager[S]) Add(ctx context.Context, groupID, memberID string, stream S)
- func (m *StreamManager[S]) BroadcastToGroup(ctx context.Context, groupID string, event *Event)
- func (m *StreamManager[S]) BroadcastToGroupFiltered(ctx context.Context, groupID string, event *Event, ...)
- func (m *StreamManager[S]) Get(ctx context.Context, groupID, memberID string) S
- func (m *StreamManager[S]) GetGroupStreams(ctx context.Context, groupID string) []S
- func (m *StreamManager[S]) GetStreamCount(ctx context.Context, groupID string) int
- func (m *StreamManager[S]) GroupHasStreams(ctx context.Context, groupID string) bool
- func (m *StreamManager[S]) Remove(ctx context.Context, groupID, memberID string)
- func (m *StreamManager[S]) SendToMember(ctx context.Context, groupID, memberID string, event *Event) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BidirectionalEventStream ¶
type BidirectionalEventStream interface {
EventStream
// Receive returns a channel of inbound events from the client.
Receive() <-chan *Event
}
BidirectionalEventStream extends EventStream with client-to-server receiving.
type BidirectionalEventStreamUpgrader ¶
type BidirectionalEventStreamUpgrader interface {
UpgradeToBidirectionalStream(w http.ResponseWriter, r *http.Request) (BidirectionalEventStream, error)
}
BidirectionalEventStreamUpgrader upgrades an HTTP connection to a BidirectionalEventStream.
type Event ¶
type Event struct {
Type string `json:"type"`
Payload json.RawMessage `json:"payload,omitempty"`
}
Event represents a typed event with a JSON payload.
type EventStream ¶
type EventStream interface {
// Send pushes an event to the client.
Send(ctx context.Context, event *Event) error
// Done returns a channel that closes when the stream terminates.
Done() <-chan struct{}
// Close terminates the stream.
Close() error
}
EventStream is a unidirectional server-to-client event stream.
type EventStreamUpgrader ¶
type EventStreamUpgrader interface {
UpgradeToEventStream(w http.ResponseWriter, r *http.Request) (EventStream, error)
}
EventStreamUpgrader upgrades an HTTP connection to a unidirectional EventStream.
type StreamManager ¶
type StreamManager[S EventStream] struct { // contains filtered or unexported fields }
StreamManager manages active event streams grouped by group ID and member ID.
func NewStreamManager ¶
func NewStreamManager[S EventStream]( tracerProvider tracing.TracerProvider, logger logging.Logger, ) *StreamManager[S]
NewStreamManager creates a new StreamManager.
func (*StreamManager[S]) Add ¶
func (m *StreamManager[S]) Add(ctx context.Context, groupID, memberID string, stream S)
Add registers a stream for a group and member.
func (*StreamManager[S]) BroadcastToGroup ¶
func (m *StreamManager[S]) BroadcastToGroup(ctx context.Context, groupID string, event *Event)
BroadcastToGroup sends an event to all streams in a group.
func (*StreamManager[S]) BroadcastToGroupFiltered ¶
func (m *StreamManager[S]) BroadcastToGroupFiltered(ctx context.Context, groupID string, event *Event, includeFunc func(memberID string) bool)
BroadcastToGroupFiltered sends an event to streams in a group for which includeFunc returns true.
func (*StreamManager[S]) Get ¶
func (m *StreamManager[S]) Get(ctx context.Context, groupID, memberID string) S
Get returns a specific stream, or the zero value if not found.
func (*StreamManager[S]) GetGroupStreams ¶
func (m *StreamManager[S]) GetGroupStreams(ctx context.Context, groupID string) []S
GetGroupStreams returns all streams for a group.
func (*StreamManager[S]) GetStreamCount ¶
func (m *StreamManager[S]) GetStreamCount(ctx context.Context, groupID string) int
GetStreamCount returns the number of streams for a group.
func (*StreamManager[S]) GroupHasStreams ¶
func (m *StreamManager[S]) GroupHasStreams(ctx context.Context, groupID string) bool
GroupHasStreams returns whether a group has any active streams.
func (*StreamManager[S]) Remove ¶
func (m *StreamManager[S]) Remove(ctx context.Context, groupID, memberID string)
Remove removes a stream.
func (*StreamManager[S]) SendToMember ¶
func (m *StreamManager[S]) SendToMember(ctx context.Context, groupID, memberID string, event *Event) error
SendToMember sends an event to a specific member in a group.