realtime

package
v0.24.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package realtime is a high-level realtime gateway for lagodev, modelled on Laravel Echo / NestJS gateways. It layers channels, presence and per-channel authorization on top of a transport-agnostic connection interface.

The package is a dependency-free leaf: it imports only the standard library. A WebSocket transport (RFC 6455 server handshake + framing) ships in-box (ws.go / upgrader.go), but the Hub is written against the Conn interface so it is fully testable without real sockets.

Layers:

  • Conn — transport abstraction: ReadMessage / WriteMessage / Close. Implemented by wsConn (stdlib WebSocket) and by the in-memory fake used in tests.
  • Client — a Conn registered on a Hub, with a bounded outbound buffer and slow-consumer handling.
  • Channel — a named room. Public, Private (authorized) or Presence (authorized + membership roster with join/leave events).
  • Hub — the per-app singleton tracking clients and channels; subscribe / unsubscribe / broadcast / direct send / presence.
  • Authorizer — hook deciding whether a client may join a private or presence channel.

Broadcasting bridge: realtime does not import the broadcasting package (avoids a dependency cycle and keeps this a leaf). Instead, callers wire events in by passing the Hub's Broadcast method to a broadcasting subscription. See Hub.Broadcast and the package example in doc.go.

Example

Example shows the core realtime flow a server performs per connection: register the Conn on the Hub, subscribe it to a public channel, broadcast a message, and read the exact frame the client received.

package main

import (
	"fmt"

	"github.com/devituz/lagodev/realtime"
)

// pipeConn is a minimal in-memory realtime.Conn. Outbound frames written by
// the Hub's writer goroutine land on the buffered writes channel, which the
// example drains synchronously to observe exactly what reached the wire.
type pipeConn struct {
	writes chan string
}

func newPipeConn() *pipeConn {
	return &pipeConn{writes: make(chan string, 8)}
}

// ReadMessage blocks forever: this example is send-only, so the peer never
// produces inbound frames. The Hub drives writes; reads are irrelevant here.
func (c *pipeConn) ReadMessage() (realtime.MessageType, []byte, error) {
	select {} // park; the example never starts a read loop
}

func (c *pipeConn) WriteMessage(_ realtime.MessageType, data []byte) error {
	c.writes <- string(data)
	return nil
}

func (c *pipeConn) Close() error { return nil }

// Example shows the core realtime flow a server performs per connection:
// register the Conn on the Hub, subscribe it to a public channel, broadcast a
// message, and read the exact frame the client received.
func main() {
	hub := realtime.NewHub()
	defer hub.Close()

	conn := newPipeConn()
	client, err := hub.Add(conn, nil)
	if err != nil {
		fmt.Println("add:", err)
		return
	}

	if err := hub.Subscribe(client, "orders"); err != nil {
		fmt.Println("subscribe:", err)
		return
	}

	if err := hub.Broadcast("orders", []byte(`{"event":"created","id":42}`)); err != nil {
		fmt.Println("broadcast:", err)
		return
	}

	// The writer goroutine drains the outbox onto the Conn; receiving the
	// frame here makes the example deterministic without any sleeps.
	fmt.Println("subscribers:", hub.ChannelCount("orders"))
	fmt.Println("frame:", <-conn.writes)

}
Output:
subscribers: 1
frame: {"event":"created","id":42}

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("realtime: closed")

ErrClosed is returned by Conn operations after the connection — or the owning Hub — has been closed.

View Source
var ErrUnauthorized = errors.New("realtime: unauthorized")

ErrUnauthorized is returned by Subscribe when a private/presence join is denied by the Authorizer (or when none is configured).

Functions

This section is empty.

Types

type AuthRequest

type AuthRequest struct {
	// Channel is the target channel name.
	Channel string
	// Type is the channel's classification.
	Type ChannelType
	// ClientID is the Hub-assigned id of the joining client.
	ClientID string
	// Auth carries client-supplied credentials (e.g. a signed token)
	// extracted at connect time; opaque to the Hub.
	Auth []byte
}

AuthRequest is handed to the Authorizer when a client attempts to join a private or presence channel.

type AuthResult

type AuthResult struct {
	Allowed bool
	Member  Member
}

AuthResult is returned by an Authorizer. Allowed gates the join. For presence channels, Member supplies the identity published to the roster.

type Authorizer

type Authorizer interface {
	Authorize(AuthRequest) AuthResult
}

Authorizer decides whether a client may join a private or presence channel. It is never consulted for public channels. A nil Authorizer on the Hub denies every private/presence join.

type AuthorizerFunc

type AuthorizerFunc func(AuthRequest) AuthResult

AuthorizerFunc adapts a plain function to the Authorizer interface.

func (AuthorizerFunc) Authorize

func (f AuthorizerFunc) Authorize(r AuthRequest) AuthResult

Authorize implements Authorizer.

type ChannelType

type ChannelType int

ChannelType classifies a channel's authorization and presence semantics, following Laravel Echo's channel naming convention.

const (
	// Public channels require no authorization; any client may join.
	Public ChannelType = iota
	// Private channels require the Authorizer to approve the join.
	Private
	// Presence channels require authorization and additionally maintain
	// a roster broadcast to members via join/leave events.
	Presence
)

func ClassifyChannel

func ClassifyChannel(name string) ChannelType

ClassifyChannel derives a channel's type from its name prefix, mirroring Laravel Echo: "private-*" is Private, "presence-*" is Presence, anything else is Public.

func (ChannelType) String

func (t ChannelType) String() string

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a Conn registered on a Hub. It owns a single writer goroutine draining a bounded outbox; the caller owns the reader via Serve. A Client is safe for concurrent use: enqueue/Close may be called from any goroutine.

func (*Client) Close

func (c *Client) Close() error

Close tears the client out of its Hub and closes the transport. It is idempotent and safe to call concurrently.

func (*Client) Drops

func (c *Client) Drops() uint64

Drops returns the number of messages dropped for this client under the DropMessage slow-consumer policy.

func (*Client) ID

func (c *Client) ID() string

ID returns the Hub-assigned client identifier.

func (*Client) Serve

func (c *Client) Serve(onMessage func(*Client, MessageType, []byte))

Serve runs the inbound read loop, invoking onMessage for every frame until the peer closes or the connection errors. It blocks, so callers typically run it in a goroutine. Serve always tears the client down on return, firing presence-leave events for every joined channel. onMessage may be nil to ignore inbound frames (a send-only client).

type Conn

type Conn interface {
	ReadMessage() (MessageType, []byte, error)
	WriteMessage(MessageType, []byte) error
	Close() error
}

Conn is the transport abstraction the Hub is built against. A single reader goroutine and a single writer goroutine touch a Conn, so implementations need not make ReadMessage concurrent with itself nor WriteMessage concurrent with itself; Close must be safe to call concurrently with both.

ReadMessage returns the next inbound application message. It returns io.EOF (or a wrapped transport error) once the peer closes.

WriteMessage delivers one application message to the peer.

Close terminates the connection; it is idempotent.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub is the per-app realtime gateway. It is safe for concurrent use.

func NewHub

func NewHub(opts ...HubOption) *Hub

NewHub returns an empty Hub.

func (*Hub) Add

func (h *Hub) Add(conn Conn, auth []byte) (*Client, error)

Add registers conn on the Hub and returns the resulting Client. The caller owns the read loop: call Client.Serve(onMessage) to pump inbound frames (typically in a fresh goroutine), and Client.Close / Hub.Close to tear down. auth carries optional client credentials forwarded to the Authorizer on private/presence joins.

func (*Hub) Broadcast

func (h *Hub) Broadcast(channel string, payload []byte) error

Broadcast delivers payload (as a text frame) to every client subscribed to channel. Per-client delivery is non-blocking subject to the Hub's slow-consumer policy. Safe to pass as a broadcasting.Handler bridge.

func (*Hub) BroadcastBinary

func (h *Hub) BroadcastBinary(channel string, payload []byte) error

BroadcastBinary is Broadcast for binary frames.

func (*Hub) BroadcastExcept

func (h *Hub) BroadcastExcept(channel string, payload []byte, exceptID string) error

BroadcastExcept is Broadcast that skips one client id (e.g. the sender), matching Echo's toOthers semantics.

func (*Hub) ChannelCount

func (h *Hub) ChannelCount(channel string) int

ChannelCount returns the number of subscribers on channel.

func (*Hub) Close

func (h *Hub) Close()

Close terminates every client and rejects further Add calls.

func (*Hub) Count

func (h *Hub) Count() int

Count returns the number of live clients.

func (*Hub) OnPresence

func (h *Hub) OnPresence(fn func(PresenceEvent))

OnPresence registers a callback invoked for every presence join/leave. It runs synchronously on the goroutine driving the roster change, so it must not block. Replaces any previous callback.

func (*Hub) Presence

func (h *Hub) Presence(channel string) []Member

Presence returns the current roster of a presence channel. The slice is a copy; order is unspecified. Returns nil for unknown or non-presence channels.

func (*Hub) SendTo

func (h *Hub) SendTo(clientID string, payload []byte) error

SendTo delivers payload directly to a single client by id.

func (*Hub) Subscribe

func (h *Hub) Subscribe(c *Client, channel string) error

Subscribe joins client to channel, applying authorization for private/presence channels and emitting presence join events. It returns ErrUnauthorized if the Authorizer denies the join.

func (*Hub) Unsubscribe

func (h *Hub) Unsubscribe(c *Client, name string)

Unsubscribe removes client from channel, emitting a presence leave event for presence channels. Idempotent.

type HubOption

type HubOption func(*Hub)

HubOption customises Hub construction.

func WithAuthorizer

func WithAuthorizer(a Authorizer) HubOption

WithAuthorizer installs the hook consulted for private/presence joins. Without it, every private/presence join is denied.

func WithLogger

func WithLogger(fn func(string, ...any)) HubOption

WithLogger installs a structured logger callback invoked on drops and authorization denials. Default silent.

func WithOutbox

func WithOutbox(n int) HubOption

WithOutbox sets the per-client outbound buffer size. Default 64.

func WithSlowConsumerPolicy

func WithSlowConsumerPolicy(p SlowConsumerPolicy) HubOption

WithSlowConsumerPolicy selects the overflow behaviour. Default DropMessage.

type Member

type Member struct {
	ID   string
	Info []byte
}

Member is one participant's public identity inside a presence channel. ID must be stable per user; Info is an opaque payload (typically JSON) surfaced to other members in presence events.

type MessageType

type MessageType int

MessageType distinguishes a UTF-8 text frame from a binary frame.

const (
	// TextMessage carries UTF-8 text (WebSocket opcode 0x1).
	TextMessage MessageType = 1
	// BinaryMessage carries opaque bytes (WebSocket opcode 0x2).
	BinaryMessage MessageType = 2
)

type PresenceEvent

type PresenceEvent struct {
	Channel string
	// Joined is set on a join, Left on a leave.
	Joined bool
	Member Member
}

PresenceEvent describes a roster change on a presence channel. It is delivered to the application via the OnPresence hook, separate from the wire frames the Hub sends to members.

type SlowConsumerPolicy

type SlowConsumerPolicy int

SlowConsumerPolicy selects what the Hub does when a client's outbound buffer is full at enqueue time.

const (
	// DropMessage discards the message and bumps the client's drop
	// counter. The connection survives. Good for lossy telemetry feeds.
	DropMessage SlowConsumerPolicy = iota
	// DisconnectClient closes the connection on the first overflow. Good
	// for ordered, can't-miss streams where a gap is worse than a drop.
	DisconnectClient
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL