Documentation
¶
Overview ¶
Package realtime is a high-level realtime gateway for lagodev, modelled on Laravel Echo / NestJS gateways. It layers channels, presence and per-channel authorization on top of a transport-agnostic connection interface.
The package is a dependency-free leaf: it imports only the standard library. A WebSocket transport (RFC 6455 server handshake + framing) ships in-box (ws.go / upgrader.go), but the Hub is written against the Conn interface so it is fully testable without real sockets.
Layers:
- Conn — transport abstraction: ReadMessage / WriteMessage / Close. Implemented by wsConn (stdlib WebSocket) and by the in-memory fake used in tests.
- Client — a Conn registered on a Hub, with a bounded outbound buffer and slow-consumer handling.
- Channel — a named room. Public, Private (authorized) or Presence (authorized + membership roster with join/leave events).
- Hub — the per-app singleton tracking clients and channels; subscribe / unsubscribe / broadcast / direct send / presence.
- Authorizer — hook deciding whether a client may join a private or presence channel.
Broadcasting bridge: realtime does not import the broadcasting package (avoids a dependency cycle and keeps this a leaf). Instead, callers wire events in by passing the Hub's Broadcast method to a broadcasting subscription. See Hub.Broadcast and the package example in doc.go.
Example ¶
Example shows the core realtime flow a server performs per connection: register the Conn on the Hub, subscribe it to a public channel, broadcast a message, and read the exact frame the client received.
package main
import (
"fmt"
"github.com/devituz/lagodev/realtime"
)
// pipeConn is a minimal in-memory realtime.Conn. Outbound frames written by
// the Hub's writer goroutine land on the buffered writes channel, which the
// example drains synchronously to observe exactly what reached the wire.
type pipeConn struct {
writes chan string
}
func newPipeConn() *pipeConn {
return &pipeConn{writes: make(chan string, 8)}
}
// ReadMessage blocks forever: this example is send-only, so the peer never
// produces inbound frames. The Hub drives writes; reads are irrelevant here.
func (c *pipeConn) ReadMessage() (realtime.MessageType, []byte, error) {
select {} // park; the example never starts a read loop
}
func (c *pipeConn) WriteMessage(_ realtime.MessageType, data []byte) error {
c.writes <- string(data)
return nil
}
func (c *pipeConn) Close() error { return nil }
// Example shows the core realtime flow a server performs per connection:
// register the Conn on the Hub, subscribe it to a public channel, broadcast a
// message, and read the exact frame the client received.
func main() {
hub := realtime.NewHub()
defer hub.Close()
conn := newPipeConn()
client, err := hub.Add(conn, nil)
if err != nil {
fmt.Println("add:", err)
return
}
if err := hub.Subscribe(client, "orders"); err != nil {
fmt.Println("subscribe:", err)
return
}
if err := hub.Broadcast("orders", []byte(`{"event":"created","id":42}`)); err != nil {
fmt.Println("broadcast:", err)
return
}
// The writer goroutine drains the outbox onto the Conn; receiving the
// frame here makes the example deterministic without any sleeps.
fmt.Println("subscribers:", hub.ChannelCount("orders"))
fmt.Println("frame:", <-conn.writes)
}
Output: subscribers: 1 frame: {"event":"created","id":42}
Index ¶
- Variables
- type AuthRequest
- type AuthResult
- type Authorizer
- type AuthorizerFunc
- type ChannelType
- type Client
- type Conn
- type Hub
- func (h *Hub) Add(conn Conn, auth []byte) (*Client, error)
- func (h *Hub) Broadcast(channel string, payload []byte) error
- func (h *Hub) BroadcastBinary(channel string, payload []byte) error
- func (h *Hub) BroadcastExcept(channel string, payload []byte, exceptID string) error
- func (h *Hub) ChannelCount(channel string) int
- func (h *Hub) Close()
- func (h *Hub) Count() int
- func (h *Hub) OnPresence(fn func(PresenceEvent))
- func (h *Hub) Presence(channel string) []Member
- func (h *Hub) SendTo(clientID string, payload []byte) error
- func (h *Hub) Subscribe(c *Client, channel string) error
- func (h *Hub) Unsubscribe(c *Client, name string)
- type HubOption
- type Member
- type MessageType
- type PresenceEvent
- type SlowConsumerPolicy
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("realtime: closed")
ErrClosed is returned by Conn operations after the connection — or the owning Hub — has been closed.
ErrUnauthorized is returned by Subscribe when a private/presence join is denied by the Authorizer (or when none is configured).
Functions ¶
This section is empty.
Types ¶
type AuthRequest ¶
type AuthRequest struct {
// Channel is the target channel name.
Channel string
// Type is the channel's classification.
Type ChannelType
// ClientID is the Hub-assigned id of the joining client.
ClientID string
// Auth carries client-supplied credentials (e.g. a signed token)
// extracted at connect time; opaque to the Hub.
Auth []byte
}
AuthRequest is handed to the Authorizer when a client attempts to join a private or presence channel.
type AuthResult ¶
AuthResult is returned by an Authorizer. Allowed gates the join. For presence channels, Member supplies the identity published to the roster.
type Authorizer ¶
type Authorizer interface {
Authorize(AuthRequest) AuthResult
}
Authorizer decides whether a client may join a private or presence channel. It is never consulted for public channels. A nil Authorizer on the Hub denies every private/presence join.
type AuthorizerFunc ¶
type AuthorizerFunc func(AuthRequest) AuthResult
AuthorizerFunc adapts a plain function to the Authorizer interface.
func (AuthorizerFunc) Authorize ¶
func (f AuthorizerFunc) Authorize(r AuthRequest) AuthResult
Authorize implements Authorizer.
type ChannelType ¶
type ChannelType int
ChannelType classifies a channel's authorization and presence semantics, following Laravel Echo's channel naming convention.
const ( // Public channels require no authorization; any client may join. Public ChannelType = iota // Private channels require the Authorizer to approve the join. Private // Presence channels require authorization and additionally maintain // a roster broadcast to members via join/leave events. Presence )
func ClassifyChannel ¶
func ClassifyChannel(name string) ChannelType
ClassifyChannel derives a channel's type from its name prefix, mirroring Laravel Echo: "private-*" is Private, "presence-*" is Presence, anything else is Public.
func (ChannelType) String ¶
func (t ChannelType) String() string
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Conn registered on a Hub. It owns a single writer goroutine draining a bounded outbox; the caller owns the reader via Serve. A Client is safe for concurrent use: enqueue/Close may be called from any goroutine.
func (*Client) Close ¶
Close tears the client out of its Hub and closes the transport. It is idempotent and safe to call concurrently.
func (*Client) Drops ¶
Drops returns the number of messages dropped for this client under the DropMessage slow-consumer policy.
func (*Client) Serve ¶
func (c *Client) Serve(onMessage func(*Client, MessageType, []byte))
Serve runs the inbound read loop, invoking onMessage for every frame until the peer closes or the connection errors. It blocks, so callers typically run it in a goroutine. Serve always tears the client down on return, firing presence-leave events for every joined channel. onMessage may be nil to ignore inbound frames (a send-only client).
type Conn ¶
type Conn interface {
ReadMessage() (MessageType, []byte, error)
WriteMessage(MessageType, []byte) error
Close() error
}
Conn is the transport abstraction the Hub is built against. A single reader goroutine and a single writer goroutine touch a Conn, so implementations need not make ReadMessage concurrent with itself nor WriteMessage concurrent with itself; Close must be safe to call concurrently with both.
ReadMessage returns the next inbound application message. It returns io.EOF (or a wrapped transport error) once the peer closes.
WriteMessage delivers one application message to the peer.
Close terminates the connection; it is idempotent.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub is the per-app realtime gateway. It is safe for concurrent use.
func (*Hub) Add ¶
Add registers conn on the Hub and returns the resulting Client. The caller owns the read loop: call Client.Serve(onMessage) to pump inbound frames (typically in a fresh goroutine), and Client.Close / Hub.Close to tear down. auth carries optional client credentials forwarded to the Authorizer on private/presence joins.
func (*Hub) Broadcast ¶
Broadcast delivers payload (as a text frame) to every client subscribed to channel. Per-client delivery is non-blocking subject to the Hub's slow-consumer policy. Safe to pass as a broadcasting.Handler bridge.
func (*Hub) BroadcastBinary ¶
BroadcastBinary is Broadcast for binary frames.
func (*Hub) BroadcastExcept ¶
BroadcastExcept is Broadcast that skips one client id (e.g. the sender), matching Echo's toOthers semantics.
func (*Hub) ChannelCount ¶
ChannelCount returns the number of subscribers on channel.
func (*Hub) Close ¶
func (h *Hub) Close()
Close terminates every client and rejects further Add calls.
func (*Hub) OnPresence ¶
func (h *Hub) OnPresence(fn func(PresenceEvent))
OnPresence registers a callback invoked for every presence join/leave. It runs synchronously on the goroutine driving the roster change, so it must not block. Replaces any previous callback.
func (*Hub) Presence ¶
Presence returns the current roster of a presence channel. The slice is a copy; order is unspecified. Returns nil for unknown or non-presence channels.
func (*Hub) Subscribe ¶
Subscribe joins client to channel, applying authorization for private/presence channels and emitting presence join events. It returns ErrUnauthorized if the Authorizer denies the join.
func (*Hub) Unsubscribe ¶
Unsubscribe removes client from channel, emitting a presence leave event for presence channels. Idempotent.
type HubOption ¶
type HubOption func(*Hub)
HubOption customises Hub construction.
func WithAuthorizer ¶
func WithAuthorizer(a Authorizer) HubOption
WithAuthorizer installs the hook consulted for private/presence joins. Without it, every private/presence join is denied.
func WithLogger ¶
WithLogger installs a structured logger callback invoked on drops and authorization denials. Default silent.
func WithOutbox ¶
WithOutbox sets the per-client outbound buffer size. Default 64.
func WithSlowConsumerPolicy ¶
func WithSlowConsumerPolicy(p SlowConsumerPolicy) HubOption
WithSlowConsumerPolicy selects the overflow behaviour. Default DropMessage.
type Member ¶
Member is one participant's public identity inside a presence channel. ID must be stable per user; Info is an opaque payload (typically JSON) surfaced to other members in presence events.
type MessageType ¶
type MessageType int
MessageType distinguishes a UTF-8 text frame from a binary frame.
const ( // TextMessage carries UTF-8 text (WebSocket opcode 0x1). TextMessage MessageType = 1 // BinaryMessage carries opaque bytes (WebSocket opcode 0x2). BinaryMessage MessageType = 2 )
type PresenceEvent ¶
type PresenceEvent struct {
Channel string
// Joined is set on a join, Left on a leave.
Joined bool
Member Member
}
PresenceEvent describes a roster change on a presence channel. It is delivered to the application via the OnPresence hook, separate from the wire frames the Hub sends to members.
type SlowConsumerPolicy ¶
type SlowConsumerPolicy int
SlowConsumerPolicy selects what the Hub does when a client's outbound buffer is full at enqueue time.
const ( // DropMessage discards the message and bumps the client's drop // counter. The connection survives. Good for lossy telemetry feeds. DropMessage SlowConsumerPolicy = iota // DisconnectClient closes the connection on the first overflow. Good // for ordered, can't-miss streams where a gap is worse than a drop. DisconnectClient )