streaming

package module
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2026 License: Apache-2.0 Imports: 27 Imported by: 0

README

Streaming Extension

Real-time streaming extension for Forge with WebSocket/SSE support, rooms, channels, presence tracking, typing indicators, and distributed coordination.

Features

  • WebSocket & SSE - Built on top of Forge's core router streaming support
  • Rooms - Create chat rooms with members, roles, and permissions
  • Channels - Pub/sub channels with filters and subscriptions
  • Presence - Track online/offline/away status across users
  • Typing Indicators - Real-time typing indicators per room
  • Message History - Persist and retrieve message history
  • Distributed - Redis/NATS backends for multi-node deployments
  • Interface-First - All major components are interfaces for testability

Installation

import "github.com/xraph/forge/extensions/streaming"

Quick Start

Basic Setup (Local Backend)
package main

import (
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/streaming"
)

func main() {
    // Create container and app
    container := forge.NewContainer()
    app := forge.NewApp(container)
    router := forge.NewRouter(forge.WithContainer(container))

    // Create streaming extension with local backend
    streamExt := streaming.NewExtension(
        streaming.WithLocalBackend(),
        streaming.WithFeatures(true, true, true, true, true), // rooms, channels, presence, typing, history
    )

    // Register and start
    app.Use(streamExt)
    app.Start(context.Background())

    // Register streaming routes
    streamExt.RegisterRoutes(router, "/ws", "/sse")

    // Start server
    http.ListenAndServe(":8080", router)
}
With Redis Backend (Distributed)
streamExt := streaming.NewExtension(
    streaming.WithRedisBackend("redis://localhost:6379"),
    streaming.WithFeatures(true, true, true, true, true),
    streaming.WithNodeID("node-1"), // Optional, auto-generated if not set
)

Architecture

Interface-First Design

All major components are defined as interfaces with multiple implementations:

streaming (interfaces)
├── Manager              - Central orchestrator
├── EnhancedConnection   - WebSocket connection with metadata
├── Room                 - Room management
├── RoomStore           - Room persistence backend
├── Channel             - Pub/sub channel
├── ChannelStore        - Channel persistence backend
├── PresenceTracker     - Presence tracking
├── PresenceStore       - Presence persistence backend
├── TypingTracker       - Typing indicators
├── TypingStore         - Typing persistence backend
├── MessageStore        - Message history
└── DistributedBackend  - Cross-node coordination
Package Structure
v2/extensions/streaming/
├── streaming.go         # Core interfaces (Manager, EnhancedConnection)
├── room.go             # Room interfaces
├── channel.go          # Channel interfaces
├── presence.go         # Presence interfaces
├── typing.go           # Typing interfaces
├── persistence.go      # Message store interfaces
├── distributed.go      # Distributed backend interfaces
├── config.go           # Configuration
├── errors.go           # Domain errors
├── manager.go          # Manager implementation
├── connection.go       # Enhanced connection implementation
├── extension.go        # Extension entry point
├── backends/
│   ├── factory.go      # Store factory
│   ├── local/          # In-memory implementations
│   ├── redis/          # Redis implementations (TODO)
│   └── nats/           # NATS implementations (TODO)
└── trackers/
    ├── presence_tracker.go  # Presence tracker implementation
    └── typing_tracker.go    # Typing tracker implementation

Usage Examples

Custom WebSocket Handler
router.WebSocket("/chat", func(ctx forge.Context, conn forge.Connection) error {
    // Get streaming manager from DI
    var manager streaming.Manager
    ctx.Container().Resolve(&manager)

    // Get user from auth
    userID := ctx.Get("user_id").(string)

    // Create enhanced connection
    enhanced := streaming.NewEnhancedConnection(conn)
    enhanced.SetUserID(userID)
    enhanced.SetSessionID(uuid.New().String())

    // Register
    manager.Register(enhanced)
    defer manager.Unregister(conn.ID())

    // Set online
    manager.SetPresence(ctx.Request().Context(), userID, streaming.StatusOnline)
    defer manager.SetPresence(ctx.Request().Context(), userID, streaming.StatusOffline)

    // Message loop
    for {
        var msg streaming.Message
        if err := conn.ReadJSON(&msg); err != nil {
            return err
        }

        // Handle message
        switch msg.Type {
        case streaming.MessageTypeMessage:
            if msg.RoomID != "" {
                manager.BroadcastToRoom(ctx.Request().Context(), msg.RoomID, &msg)
            }
        case streaming.MessageTypeJoin:
            manager.JoinRoom(ctx.Request().Context(), conn.ID(), msg.RoomID)
        case streaming.MessageTypeLeave:
            manager.LeaveRoom(ctx.Request().Context(), conn.ID(), msg.RoomID)
        }
    }
})
Room Management REST API
api := router.Group("/api/v1")

// Create room
api.POST("/rooms", func(ctx forge.Context, req *CreateRoomRequest) error {
    var manager streaming.Manager
    ctx.Container().Resolve(&manager)

    userID := ctx.Get("user_id").(string)

    room := streaming.RoomOptions{
        ID:          uuid.New().String(),
        Name:        req.Name,
        Description: req.Description,
        Owner:       userID,
    }

    if err := manager.CreateRoom(ctx.Request().Context(), room); err != nil {
        return err
    }

    return ctx.JSON(200, room)
})

// Get room history
api.GET("/rooms/:id/history", func(ctx forge.Context) error {
    var manager streaming.Manager
    ctx.Container().Resolve(&manager)

    roomID := ctx.Param("id")

    messages, err := manager.GetHistory(ctx.Request().Context(), roomID, streaming.HistoryQuery{
        Limit: 100,
    })
    if err != nil {
        return err
    }

    return ctx.JSON(200, messages)
})

Configuration

Complete Configuration Example
streamExt := streaming.NewExtension(
    // Backend
    streaming.WithBackend("redis"),
    streaming.WithBackendURLs("redis://localhost:6379"),
    streaming.WithAuthentication("username", "password"),

    // Features
    streaming.WithFeatures(true, true, true, true, true),

    // Limits
    streaming.WithConnectionLimits(5, 50, 100), // conns/user, rooms/user, channels/user
    streaming.WithMessageLimits(64*1024, 100),  // max size, max/second

    // Timeouts
    streaming.WithTimeouts(30*time.Second, 10*time.Second, 10*time.Second), // ping, pong, write

    // Retention
    streaming.WithMessageRetention(30 * 24 * time.Hour), // 30 days

    // Distributed
    streaming.WithNodeID("node-1"),

    // TLS
    streaming.WithTLS("cert.pem", "key.pem", "ca.pem"),
)
Configuration from File
# config.yaml
extensions:
  streaming:
    backend: redis
    backend_urls:
      - redis://localhost:6379
    enable_rooms: true
    enable_channels: true
    enable_presence: true
    enable_typing_indicators: true
    enable_message_history: true
    max_connections_per_user: 5
    max_rooms_per_user: 50
    max_message_size: 65536
    message_retention: 720h # 30 days
// Automatically loads from config
streamExt := streaming.NewExtension()

Message Protocol

Message Structure
type Message struct {
    ID        string         `json:"id"`
    Type      string         `json:"type"`      // "message", "presence", "typing", "system"
    Event     string         `json:"event,omitempty"`
    RoomID    string         `json:"room_id,omitempty"`
    ChannelID string         `json:"channel_id,omitempty"`
    UserID    string         `json:"user_id"`
    Data      any            `json:"data"`
    Metadata  map[string]any `json:"metadata,omitempty"`
    Timestamp time.Time      `json:"timestamp"`
    ThreadID  string         `json:"thread_id,omitempty"`
}
Message Types
  • MessageTypeMessage - Regular chat message
  • MessageTypePresence - Presence update (online/offline/away)
  • MessageTypeTyping - Typing indicator
  • MessageTypeSystem - System notification
  • MessageTypeJoin - User joined room
  • MessageTypeLeave - User left room
  • MessageTypeError - Error message

Backend Comparison

Feature Local Redis NATS
Single Node
Multi-Node
Persistence Memory Disk Disk
Message History Limited Full Full
Presence Sync
Performance Fastest Fast Fastest
Setup None Redis NATS Server

Production Considerations

Scaling

For distributed deployments:

  1. Use Redis or NATS backend
  2. Set unique node IDs per instance
  3. Configure proper timeouts and limits
  4. Enable message persistence
  5. Monitor metrics
Security
  • Always use authentication middleware before WebSocket routes
  • Validate user permissions for room/channel access
  • Rate limit connections per user
  • Use TLS in production
  • Never log sensitive message content
Monitoring

Key metrics to monitor:

  • streaming.connections.active - Active connections
  • streaming.connections.total - Total connections created
  • streaming.messages.broadcast - Messages broadcast
  • streaming.rooms.joins - Room joins
  • streaming.presence.updates - Presence updates
Health Checks
// Extension provides health check
if err := streamExt.Health(ctx); err != nil {
    log.Error("streaming unhealthy", err)
}

Testing

Mock Implementations

All interfaces can be easily mocked for testing:

type mockManager struct {
    streaming.Manager
    registerCalls int
}

func (m *mockManager) Register(conn streaming.EnhancedConnection) error {
    m.registerCalls++
    return nil
}

func TestMyHandler(t *testing.T) {
    manager := &mockManager{}
    // Test with mock manager
}
Integration Tests
// Use local backend for tests
streamExt := streaming.NewExtension(streaming.WithLocalBackend())
// Run tests against real implementation

Roadmap

  • Redis backend implementation
  • NATS backend implementation
  • Message compression for old messages
  • Advanced filtering and search
  • WebRTC signaling support
  • GraphQL subscriptions integration
  • Admin dashboard for monitoring

License

Part of Forge framework.

Documentation

Index

Constants

View Source
const (
	TransportWebSocket = "websocket"
	TransportSSE       = "sse"
)

Transport type constants.

View Source
const ContentTypeBinary = internal.ContentTypeBinary
View Source
const ContentTypeJSON = internal.ContentTypeJSON

Content type constants.

View Source
const ContentTypeMsgPack = internal.ContentTypeMsgPack
View Source
const ContentTypeProtobuf = internal.ContentTypeProtobuf
View Source
const ContentTypeText = internal.ContentTypeText
View Source
const (
	// ManagerKey is the DI key for the streaming manager.
	ManagerKey = "streaming"
)

DI container keys for streaming extension services.

View Source
const MessageTypeJoin = internal.MessageTypeJoin
View Source
const MessageTypeLeave = internal.MessageTypeLeave
View Source
const MessageTypeMessage = internal.MessageTypeMessage

Message type constants.

View Source
const MessageTypePresence = internal.MessageTypePresence
View Source
const MessageTypeTyping = internal.MessageTypeTyping
View Source
const StatusAway = internal.StatusAway
View Source
const StatusBusy = internal.StatusBusy
View Source
const StatusOffline = internal.StatusOffline
View Source
const StatusOnline = internal.StatusOnline

Status constants.

Variables

View Source
var DefaultPresenceOptions = internal.DefaultPresenceOptions

Default option functions.

View Source
var DefaultTypingOptions = internal.DefaultTypingOptions
View Source
var ErrAlreadyRoomMember = internal.ErrAlreadyRoomMember
View Source
var ErrAlreadySubscribed = internal.ErrAlreadySubscribed
View Source
var ErrBackendNotFound = internal.ErrBackendNotConnected
View Source
var ErrBackendTimeout = internal.ErrBackendTimeout
View Source
var ErrBackendUnavailable = internal.ErrBackendUnavailable
View Source
var ErrChannelAlreadyExists = internal.ErrChannelAlreadyExists
View Source
var ErrChannelNotFound = internal.ErrChannelNotFound

Channel errors.

View Source
var ErrConnectionClosed = internal.ErrConnectionClosed
View Source
var ErrConnectionLimitReached = internal.ErrConnectionLimitReached
View Source
var ErrConnectionNotFound = internal.ErrConnectionNotFound

Connection errors.

View Source
var ErrInsufficientRole = internal.ErrInsufficientRole
View Source
var ErrInvalidChannel = internal.ErrInvalidChannel
View Source
var ErrInvalidConfig = internal.ErrInvalidConfig
View Source
var ErrInvalidConnection = internal.ErrInvalidConnection
View Source
var ErrInvalidMessage = internal.ErrInvalidMessage
View Source
var ErrInvalidPermission = internal.ErrInvalidPermission
View Source
var ErrInvalidRoom = internal.ErrInvalidRoom
View Source
var ErrInvalidStatus = internal.ErrInvalidStatus
View Source
var ErrInviteExpired = internal.ErrInviteExpired
View Source
var ErrInviteNotFound = internal.ErrInviteNotFound

Invite errors.

View Source
var ErrLockAcquisitionFailed = internal.ErrLockAcquisitionFailed
View Source
var ErrLockNotHeld = internal.ErrLockNotHeld
View Source
var ErrMessageNotFound = internal.ErrMessageNotFound
View Source
var ErrMessageTooLarge = internal.ErrMessageTooLarge
View Source
var ErrNodeNotFound = internal.ErrNodeNotFound
View Source
var ErrNotRoomMember = internal.ErrNotRoomMember
View Source
var ErrNotSubscribed = internal.ErrNotSubscribed
View Source
var ErrPermissionDenied = internal.ErrPermissionDenied

Permission errors.

View Source
var ErrPresenceNotFound = internal.ErrPresenceNotFound
View Source
var ErrRoomAlreadyExists = internal.ErrRoomAlreadyExists
View Source
var ErrRoomFull = internal.ErrRoomFull
View Source
var ErrRoomLimitReached = internal.ErrRoomLimitReached
View Source
var ErrRoomNotFound = internal.ErrRoomNotFound

Room errors.

View Source
var NewBackendError = internal.NewBackendError
View Source
var NewChannelError = internal.NewChannelError
View Source
var NewConnectionError = internal.NewConnectionError

Error constructors.

View Source
var NewMessageError = internal.NewMessageError
View Source
var NewRoomError = internal.NewRoomError

Functions

func NewExtension

func NewExtension(opts ...ConfigOption) forge.Extension

NewExtension creates a new streaming extension with functional options.

func NewExtensionWithConfig

func NewExtensionWithConfig(config Config) forge.Extension

NewExtensionWithConfig creates a new streaming extension with a complete config.

func NewLocalRoom

func NewLocalRoom(opts RoomOptions) *local.LocalRoom

Room creation.

func NewSSEConnection added in v0.10.0

func NewSSEConnection(stream forge.Stream, remoteAddr, localAddr string) forge.Connection

NewSSEConnection creates a new SSE connection adapter.

Types

type ActivityInfo

type ActivityInfo = internal.ActivityInfo

type AnalyticsEvent

type AnalyticsEvent = internal.AnalyticsEvent

type AnalyticsQuery

type AnalyticsQuery = internal.AnalyticsQuery

type AnalyticsResult

type AnalyticsResult = internal.AnalyticsResult

Analytics types.

type Availability

type Availability = internal.Availability

type BackendError

type BackendError = internal.BackendError

Backend error.

type BinaryCodec added in v1.3.0

type BinaryCodec struct{}

BinaryCodec handles raw binary data. On decode, it stores the raw bytes in msg.RawData and sets the content type. On encode, it returns msg.RawData directly.

func (*BinaryCodec) ContentType added in v1.3.0

func (c *BinaryCodec) ContentType() string

func (*BinaryCodec) Decode added in v1.3.0

func (c *BinaryCodec) Decode(data []byte, msg *streaming.Message) error

func (*BinaryCodec) Encode added in v1.3.0

func (c *BinaryCodec) Encode(msg *streaming.Message) ([]byte, error)

type Channel

type Channel = internal.Channel

type ChannelStore

type ChannelStore = internal.ChannelStore

type Codec added in v1.3.0

type Codec interface {
	// ContentType returns the MIME type this codec handles (e.g. "application/json").
	ContentType() string
	// Encode serializes a message to bytes.
	Encode(msg *streaming.Message) ([]byte, error)
	// Decode deserializes bytes into a message.
	Decode(data []byte, msg *streaming.Message) error
}

Codec handles encoding and decoding messages for a specific content type.

type CodecRegistry added in v1.3.0

type CodecRegistry struct {
	// contains filtered or unexported fields
}

CodecRegistry manages codecs by content type and provides encode/decode dispatch based on message content type.

func NewCodecRegistry added in v1.3.0

func NewCodecRegistry() *CodecRegistry

NewCodecRegistry creates a new codec registry pre-loaded with a JSON codec as default.

func (*CodecRegistry) Decode added in v1.3.0

func (r *CodecRegistry) Decode(data []byte, msg *streaming.Message) error

Decode deserializes bytes into a message using the default codec. If decoding fails with the default codec, it returns the error.

func (*CodecRegistry) DecodeWithType added in v1.3.0

func (r *CodecRegistry) DecodeWithType(contentType string, data []byte, msg *streaming.Message) error

DecodeWithType deserializes bytes into a message using the codec for the given content type.

func (*CodecRegistry) Default added in v1.3.0

func (r *CodecRegistry) Default() Codec

Default returns the default codec (JSON).

func (*CodecRegistry) Encode added in v1.3.0

func (r *CodecRegistry) Encode(msg *streaming.Message) ([]byte, error)

Encode serializes a message using the codec matching msg.ContentType, or the default codec if ContentType is empty.

func (*CodecRegistry) Get added in v1.3.0

func (r *CodecRegistry) Get(contentType string) (Codec, bool)

Get returns the codec for the given content type.

func (*CodecRegistry) Register added in v1.3.0

func (r *CodecRegistry) Register(codec Codec)

Register adds a codec. If a codec for the same content type already exists, it is replaced.

func (*CodecRegistry) SetDefault added in v1.3.0

func (r *CodecRegistry) SetDefault(contentType string) error

SetDefault changes the default codec to the one registered for the given content type.

type Config

type Config = internal.Config

Configuration.

func DefaultConfig

func DefaultConfig() Config

type ConfigOption

type ConfigOption = internal.ConfigOption

func WithAuthentication

func WithAuthentication(username, password string) ConfigOption

func WithBackend

func WithBackend(backend string) ConfigOption

func WithBackendURLs

func WithBackendURLs(urls ...string) ConfigOption

func WithBufferSizes

func WithBufferSizes(read, write int) ConfigOption

func WithConfig

func WithConfig(config Config) ConfigOption

func WithConnectionLimits

func WithConnectionLimits(perUser, roomsPerUser, channelsPerUser int) ConfigOption

func WithFeatures

func WithFeatures(rooms, channels, presence, typing, history bool) ConfigOption

func WithLocalBackend

func WithLocalBackend() ConfigOption

func WithMessageLimits

func WithMessageLimits(maxSize, maxPerSecond int) ConfigOption

func WithMessageRetention

func WithMessageRetention(retention time.Duration) ConfigOption

func WithNATSBackend

func WithNATSBackend(urls ...string) ConfigOption

func WithNodeID

func WithNodeID(nodeID string) ConfigOption

func WithPresenceTimeout

func WithPresenceTimeout(timeout time.Duration) ConfigOption

func WithRedisBackend

func WithRedisBackend(url string) ConfigOption

func WithRequireConfig

func WithRequireConfig(require bool) ConfigOption

func WithTLS

func WithTLS(certFile, keyFile, caFile string) ConfigOption

func WithTimeouts

func WithTimeouts(ping, pong, write time.Duration) ConfigOption

func WithTypingTimeout

func WithTypingTimeout(timeout time.Duration) ConfigOption

type Connection

type Connection = internal.EnhancedConnection

func NewConnection

func NewConnection(conn forge.Connection) Connection

NewConnection creates a new enhanced connection with default transport "websocket".

func NewConnectionWithTransport added in v1.3.0

func NewConnectionWithTransport(conn forge.Connection, transport string) Connection

NewConnectionWithTransport creates a new enhanced connection with a specified transport type.

type ConnectionHook added in v1.3.0

type ConnectionHook interface {
	StreamingHook
	// OnConnect is called before registration. Return error to reject connection.
	OnConnect(ctx context.Context, conn Connection) error
	// OnDisconnect is called after unregistration.
	OnDisconnect(ctx context.Context, conn Connection)
}

ConnectionHook fires on connection lifecycle events.

type ConnectionInfo

type ConnectionInfo = internal.ConnectionInfo

Connection types.

type DeviceInfo

type DeviceInfo = internal.DeviceInfo

type DistributedBackend

type DistributedBackend = internal.DistributedBackend

Distributed backend.

type DistributedBackendOptions

type DistributedBackendOptions = internal.DistributedBackendOptions

type ErrorHook added in v1.3.0

type ErrorHook interface {
	StreamingHook
	// OnError is called when a message handling error occurs.
	OnError(ctx context.Context, conn Connection, err error)
}

ErrorHook fires on message handling errors.

type Extension

type Extension struct {
	*forge.BaseExtension
	// contains filtered or unexported fields
}

Extension implements forge.Extension for streaming functionality.

func (*Extension) AsyncAPISpec

func (e *Extension) AsyncAPISpec() *forge.AsyncAPISpec

AsyncAPISpec generates AsyncAPI 3.0.0 specification for the streaming extension This documents all streaming channels, operations, and message types.

func (*Extension) Codecs added in v1.3.0

func (e *Extension) Codecs() *CodecRegistry

Codecs returns the codec registry for direct access.

func (*Extension) DashboardContributor added in v1.3.0

func (e *Extension) DashboardContributor() contributor.LocalContributor

DashboardContributor implements dashboard.DashboardAware. Returns a streaming dashboard contributor for auto-registration. Uses resolver closures so the manager/config are resolved at render time, not at discovery time (when they may not yet be initialized).

func (*Extension) Health

func (e *Extension) Health(ctx context.Context) error

Health checks if the streaming extension is healthy.

func (*Extension) Hooks added in v1.3.0

func (e *Extension) Hooks() *HookRegistry

Hooks returns the hook registry for direct access.

func (*Extension) Manager

func (e *Extension) Manager() Manager

Manager returns the streaming manager (for advanced usage).

func (*Extension) Register

func (e *Extension) Register(app forge.App) error

Register registers the streaming extension with the app.

func (*Extension) RegisterCodec added in v1.3.0

func (e *Extension) RegisterCodec(codec Codec)

RegisterCodec adds a message codec for a specific content type.

func (*Extension) RegisterDashboardBridge added in v1.3.0

func (e *Extension) RegisterDashboardBridge(b *bridge.Bridge) error

RegisterDashboardBridge implements dashboard.BridgeAware. Registers streaming bridge functions for Go↔JS communication. Uses resolver closures so the manager/config are resolved at request time.

func (*Extension) RegisterHook added in v1.3.0

func (e *Extension) RegisterHook(hook StreamingHook)

RegisterHook adds a streaming hook for lifecycle events. Hooks can implement one or more hook interfaces (ConnectionHook, MessageHook, RawMessageHook, RoomHook, PresenceHook, ErrorHook).

func (*Extension) RegisterRoutes

func (e *Extension) RegisterRoutes(router forge.Router, wsPath, ssePath string) error

RegisterRoutes is a helper to register WebSocket and SSE routes with the router.

func (*Extension) Start

func (e *Extension) Start(ctx context.Context) error

Start starts the streaming extension.

func (*Extension) Stop

func (e *Extension) Stop(ctx context.Context) error

Stop stops the streaming extension.

func (*Extension) UnregisterHook added in v1.3.0

func (e *Extension) UnregisterHook(name string)

UnregisterHook removes a streaming hook by name.

type FileInfo

type FileInfo = internal.FileInfo

type FileQuery

type FileQuery = internal.FileQuery

type FileUpload

type FileUpload = internal.FileUpload

File types.

type HistoryQuery

type HistoryQuery = internal.HistoryQuery

Query types.

type HookRegistry added in v1.3.0

type HookRegistry struct {
	// contains filtered or unexported fields
}

HookRegistry manages streaming hooks and dispatches events.

func NewHookRegistry added in v1.3.0

func NewHookRegistry() *HookRegistry

NewHookRegistry creates a new hook registry.

func (*HookRegistry) Add added in v1.3.0

func (r *HookRegistry) Add(hook StreamingHook)

Add registers a hook. The hook is type-asserted to categorize it into the appropriate dispatch lists.

func (*HookRegistry) FireOnConnect added in v1.3.0

func (r *HookRegistry) FireOnConnect(ctx context.Context, conn streaming.EnhancedConnection) error

FireOnConnect fires ConnectionHook.OnConnect for all registered hooks. Returns the first error encountered, which should be used to reject the connection.

func (*HookRegistry) FireOnDisconnect added in v1.3.0

func (r *HookRegistry) FireOnDisconnect(ctx context.Context, conn streaming.EnhancedConnection)

FireOnDisconnect fires ConnectionHook.OnDisconnect for all registered hooks. Errors are ignored (post-hook).

func (*HookRegistry) FireOnError added in v1.3.0

func (r *HookRegistry) FireOnError(ctx context.Context, conn streaming.EnhancedConnection, err error)

FireOnError fires ErrorHook.OnError for all hooks (post-hook).

func (*HookRegistry) FireOnMessageDelivered added in v1.3.0

func (r *HookRegistry) FireOnMessageDelivered(ctx context.Context, conn streaming.EnhancedConnection, msg *streaming.Message)

FireOnMessageDelivered fires MessageHook.OnMessageDelivered asynchronously. This is non-blocking to avoid slowing down message delivery.

func (*HookRegistry) FireOnMessageReceived added in v1.3.0

func (r *HookRegistry) FireOnMessageReceived(ctx context.Context, conn streaming.EnhancedConnection, msg *streaming.Message) (*streaming.Message, error)

FireOnMessageReceived fires MessageHook.OnMessageReceived for all hooks in sequence. Each hook can transform or block (return nil) the message.

func (*HookRegistry) FireOnPresenceChange added in v1.3.0

func (r *HookRegistry) FireOnPresenceChange(ctx context.Context, userID, oldStatus, newStatus string)

FireOnPresenceChange fires PresenceHook.OnPresenceChange for all hooks (post-hook).

func (*HookRegistry) FireOnRawMessage added in v1.3.0

func (r *HookRegistry) FireOnRawMessage(ctx context.Context, conn streaming.EnhancedConnection, data []byte) ([]byte, error)

FireOnRawMessage fires RawMessageHook.OnRawMessage for all hooks in sequence. Each hook can transform the bytes or block (return error) the message.

func (*HookRegistry) FireOnRoomCreate added in v1.3.0

func (r *HookRegistry) FireOnRoomCreate(ctx context.Context, room streaming.Room) error

FireOnRoomCreate fires RoomHook.OnRoomCreate for all hooks. Returns the first error encountered, which should be used to reject creation.

func (*HookRegistry) FireOnRoomDelete added in v1.3.0

func (r *HookRegistry) FireOnRoomDelete(ctx context.Context, roomID string)

FireOnRoomDelete fires RoomHook.OnRoomDelete for all hooks (post-hook).

func (*HookRegistry) FireOnRoomJoin added in v1.3.0

func (r *HookRegistry) FireOnRoomJoin(ctx context.Context, conn streaming.EnhancedConnection, roomID string) error

FireOnRoomJoin fires RoomHook.OnRoomJoin for all hooks. Returns the first error encountered, which should be used to reject the join.

func (*HookRegistry) FireOnRoomLeave added in v1.3.0

func (r *HookRegistry) FireOnRoomLeave(ctx context.Context, conn streaming.EnhancedConnection, roomID string)

FireOnRoomLeave fires RoomHook.OnRoomLeave for all hooks (post-hook).

func (*HookRegistry) List added in v1.3.0

func (r *HookRegistry) List() []StreamingHook

List returns all registered hooks.

func (*HookRegistry) Remove added in v1.3.0

func (r *HookRegistry) Remove(name string)

Remove unregisters a hook by name.

type Invite

type Invite = internal.Invite

Room types.

type InviteOptions

type InviteOptions = internal.InviteOptions

type JSONCodec added in v1.3.0

type JSONCodec struct{}

JSONCodec encodes/decodes messages as JSON. This is the default codec.

func (*JSONCodec) ContentType added in v1.3.0

func (c *JSONCodec) ContentType() string

func (*JSONCodec) Decode added in v1.3.0

func (c *JSONCodec) Decode(data []byte, msg *streaming.Message) error

func (*JSONCodec) Encode added in v1.3.0

func (c *JSONCodec) Encode(msg *streaming.Message) ([]byte, error)

type LocalRoom

type LocalRoom = *local.LocalRoom

Room creation (local backend).

type Lock

type Lock = internal.Lock

type Manager

type Manager = internal.Manager

Core interfaces.

func GetManager

func GetManager(c forge.Container) (Manager, error)

GetManager retrieves the streaming Manager from the container. Returns error if not found or type assertion fails.

func GetManagerFromApp

func GetManagerFromApp(app forge.App) (Manager, error)

GetManagerFromApp retrieves the streaming Manager from the app. Returns error if not found or type assertion fails.

func MustGetManager

func MustGetManager(c forge.Container) Manager

MustGetManager retrieves the streaming Manager from the container. Panics if not found or type assertion fails.

func MustGetManagerFromApp

func MustGetManagerFromApp(app forge.App) Manager

MustGetManagerFromApp retrieves the streaming Manager from the app. Panics if not found or type assertion fails.

func NewManager

func NewManager(
	config Config,
	roomStore RoomStore,
	channelStore ChannelStore,
	messageStore MessageStore,
	presenceTracker PresenceTracker,
	typingTracker TypingTracker,
	distributed DistributedBackend,
	logger forge.Logger,
	metrics forge.Metrics,
	opts ...ManagerOption,
) Manager

NewManager creates a new streaming manager.

type ManagerOption added in v0.10.0

type ManagerOption func(*manager)

ManagerOption configures the manager.

func WithCodecRegistry added in v1.3.0

func WithCodecRegistry(cr *CodecRegistry) ManagerOption

WithCodecRegistry sets the codec registry for message encoding/decoding.

func WithCoordinator added in v0.10.0

func WithCoordinator(c coordinator.StreamCoordinator) ManagerOption

WithCoordinator sets the distributed coordinator.

func WithFilterChain added in v0.10.0

func WithFilterChain(fc filters.FilterChain) ManagerOption

WithFilterChain sets the message filter chain.

func WithHookRegistry added in v1.3.0

func WithHookRegistry(hr *HookRegistry) ManagerOption

WithHookRegistry sets the hook registry for lifecycle hooks.

func WithManagerHealthChecker added in v0.10.0

func WithManagerHealthChecker(hc lb.HealthChecker) ManagerOption

WithManagerHealthChecker sets the health checker.

func WithManagerLoadBalancer added in v0.10.0

func WithManagerLoadBalancer(l lb.LoadBalancer) ManagerOption

WithManagerLoadBalancer sets the load balancer.

func WithManagerNodeID added in v0.10.0

func WithManagerNodeID(id string) ManagerOption

WithNodeID sets the node ID for distributed mode.

func WithRateLimiter added in v0.10.0

func WithRateLimiter(rl ratelimit.RateLimiter) ManagerOption

WithRateLimiter sets the rate limiter.

func WithSessionStore added in v0.10.0

func WithSessionStore(ss SessionStore) ManagerOption

WithSessionStore sets the session store for session resumption.

func WithValidator added in v0.10.0

func WithValidator(v validation.MessageValidator) ManagerOption

WithValidator sets the message validator.

type ManagerStats

type ManagerStats = internal.ManagerStats

type Member

type Member = internal.Member

type MemberOptions

type MemberOptions = internal.MemberOptions

type Message

type Message = internal.Message

Message types.

type MessageEdit

type MessageEdit = internal.MessageEdit

type MessageHandler

type MessageHandler = internal.MessageHandler

type MessageHook added in v1.3.0

type MessageHook interface {
	StreamingHook
	// OnMessageReceived is called before message processing. Can transform or block (return nil).
	OnMessageReceived(ctx context.Context, conn Connection, msg *Message) (*Message, error)
	// OnMessageDelivered is called after delivery (non-blocking, runs async).
	OnMessageDelivered(ctx context.Context, conn Connection, msg *Message)
}

MessageHook fires on message events.

type MessageReaction

type MessageReaction = internal.MessageReaction

type MessageSearchQuery

type MessageSearchQuery = internal.MessageSearchQuery

type MessageStore

type MessageStore = internal.MessageStore

type ModerationEvent

type ModerationEvent = internal.ModerationEvent

type ModerationStatus

type ModerationStatus = internal.ModerationStatus

Moderation types.

type NodeChangeEvent

type NodeChangeEvent = internal.NodeChangeEvent

type NodeChangeHandler

type NodeChangeHandler = internal.NodeChangeHandler

type NodeInfo

type NodeInfo = internal.NodeInfo

type OnlineStats

type OnlineStats = internal.OnlineStats

type PresenceEvent

type PresenceEvent = internal.PresenceEvent

Presence types.

type PresenceFilters

type PresenceFilters = internal.PresenceFilters

type PresenceHook added in v1.3.0

type PresenceHook interface {
	StreamingHook
	// OnPresenceChange is called after a user's presence status changes.
	OnPresenceChange(ctx context.Context, userID, oldStatus, newStatus string)
}

PresenceHook fires on presence changes.

type PresenceOptions

type PresenceOptions = internal.PresenceOptions

type PresenceStore

type PresenceStore = internal.PresenceStore

type PresenceTracker

type PresenceTracker = internal.PresenceTracker

Tracker interfaces.

type RateLimitStatus

type RateLimitStatus = internal.RateLimitStatus

Rate limiting.

type RawMessageHook added in v1.3.0

type RawMessageHook interface {
	StreamingHook
	// OnRawMessage processes raw bytes before decoding. Return error to drop the message.
	OnRawMessage(ctx context.Context, conn Connection, data []byte) ([]byte, error)
}

RawMessageHook fires before deserialization on raw bytes from the connection.

type Room

type Room = internal.Room

type RoomBan

type RoomBan = internal.RoomBan

type RoomEvent

type RoomEvent = internal.RoomEvent

type RoomHook added in v1.3.0

type RoomHook interface {
	StreamingHook
	// OnRoomJoin is called before join. Return error to reject.
	OnRoomJoin(ctx context.Context, conn Connection, roomID string) error
	// OnRoomLeave is called after leave.
	OnRoomLeave(ctx context.Context, conn Connection, roomID string)
	// OnRoomCreate is called before room creation. Return error to reject.
	OnRoomCreate(ctx context.Context, room Room) error
	// OnRoomDelete is called after room deletion.
	OnRoomDelete(ctx context.Context, roomID string)
}

RoomHook fires on room lifecycle events.

type RoomOptions

type RoomOptions = internal.RoomOptions

type RoomStats

type RoomStats = internal.RoomStats

Statistics types.

type RoomStore

type RoomStore = internal.RoomStore

Store interfaces.

type SessionSnapshot added in v0.10.0

type SessionSnapshot struct {
	SessionID      string            `json:"session_id"`
	UserID         string            `json:"user_id"`
	Rooms          []string          `json:"rooms"`
	Channels       []string          `json:"channels"`
	Metadata       map[string]string `json:"metadata,omitempty"`
	DisconnectedAt time.Time         `json:"disconnected_at"`
}

SessionSnapshot captures the state of a connection for resumption.

type SessionStore added in v0.10.0

type SessionStore interface {
	// Save stores a session snapshot with a TTL.
	Save(ctx context.Context, snapshot *SessionSnapshot, ttl time.Duration) error

	// Get retrieves a session snapshot by session ID.
	Get(ctx context.Context, sessionID string) (*SessionSnapshot, error)

	// Delete removes a session snapshot.
	Delete(ctx context.Context, sessionID string) error
}

SessionStore stores session snapshots for resumption.

func NewInMemorySessionStore added in v0.10.0

func NewInMemorySessionStore() SessionStore

NewInMemorySessionStore creates an in-memory session store.

type StreamingHook added in v1.3.0

type StreamingHook interface {
	Name() string
}

StreamingHook is the base interface for all streaming hooks. Hooks implement one or more of the optional hook interfaces below.

type TextCodec added in v1.3.0

type TextCodec struct{}

TextCodec handles plain text data. On decode, it stores the text in msg.Data as a string. On encode, it converts msg.Data to a string and returns bytes.

func (*TextCodec) ContentType added in v1.3.0

func (c *TextCodec) ContentType() string

func (*TextCodec) Decode added in v1.3.0

func (c *TextCodec) Decode(data []byte, msg *streaming.Message) error

func (*TextCodec) Encode added in v1.3.0

func (c *TextCodec) Encode(msg *streaming.Message) ([]byte, error)

type TypingOptions

type TypingOptions = internal.TypingOptions

Typing.

type TypingStore

type TypingStore = internal.TypingStore

type TypingTracker

type TypingTracker = internal.TypingTracker

type UserPresence

type UserPresence = internal.UserPresence

type UserPresenceStats

type UserPresenceStats = internal.UserPresenceStats

type UserStats

type UserStats = internal.UserStats

type WebhookConfig

type WebhookConfig = internal.WebhookConfig

Webhook types.

Directories

Path Synopsis
examples
chat command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL