postgres

package module
v3.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2026 License: MIT Imports: 10 Imported by: 0

README

socket.io-go-postgres

Go Reference Go Report Card

Description

A PostgreSQL adapter for Socket.IO server in Go, allowing to scale Socket.IO applications across multiple processes or servers using PostgreSQL's LISTEN/NOTIFY mechanism.

Installation

go get github.com/zishang520/socket.io/adapters/postgres/v3

Features

  • Multiple servers support via PostgreSQL LISTEN/NOTIFY
  • Automatic large payload handling via attachment table
  • Heartbeat-based node failure detection
  • Real-time communication between processes
  • Custom PostgreSQL configuration

How to use

Adapter
package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"

    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/zishang520/socket.io/adapters/postgres/v3"
    pgadapter "github.com/zishang520/socket.io/adapters/postgres/v3/adapter"
    "github.com/zishang520/socket.io/servers/socket/v3"
)

func main() {
    pool, err := pgxpool.New(context.Background(), "postgres://user:password@localhost:5432/mydb")
    if err != nil {
        panic(err)
    }
    defer pool.Close()

    pgClient := postgres.NewPostgresClient(context.TODO(), pool)

    io := socket.NewServer(nil, nil)
    io.SetAdapter(&pgadapter.PostgresAdapterBuilder{
        Postgres: pgClient,
    })

    io.On("connection", func(args ...any) {
        s := args[0].(*socket.Socket)
        fmt.Printf("connect %s\n", s.Id())
    })

    exit := make(chan struct{})
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sig
        close(exit)
    }()
    <-exit
}
Emitter
package main

import (
    "context"

    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/zishang520/socket.io/adapters/postgres/v3"
    pgemitter "github.com/zishang520/socket.io/adapters/postgres/v3/emitter"
)

func main() {
    pool, err := pgxpool.New(context.Background(), "postgres://user:password@localhost:5432/mydb")
    if err != nil {
        panic(err)
    }
    defer pool.Close()

    pgClient := postgres.NewPostgresClient(context.TODO(), pool)

    emitter := pgemitter.NewEmitter(pgClient, nil)
    emitter.Emit("hello", "world")
    emitter.To("room1").Emit("hello", "world")
}

Configuration Options

Adapter Options
type PostgresAdapterOptions struct {
    Key               string        // PostgreSQL channel prefix (default: "socket.io")
    TableName         string        // Attachment storage table name (default: "socket_io_attachments")
    PayloadThreshold  int           // Byte threshold for attachment storage (default: 8000)
    CleanupInterval   int64         // Cleanup interval in milliseconds (default: 30000)
    HeartbeatInterval time.Duration // Interval between heartbeats (default: 5000ms)
    HeartbeatTimeout  int64         // Heartbeat response timeout (default: 10000)
    ErrorHandler      func(error)   // Custom error handler callback
}
Emitter Options
type EmitterOptions struct {
    Key              string // PostgreSQL channel prefix (default: "socket.io")
    TableName        string // Attachment storage table name (default: "socket_io_attachments")
    PayloadThreshold int    // Byte threshold for attachment storage (default: 8000)
}

Architecture

The PostgreSQL adapter uses two mechanisms for inter-node communication:

  1. LISTEN/NOTIFY — Lightweight pub/sub for messages under the payload threshold
  2. Attachment Table — Stores large payloads or binary data that exceed the NOTIFY limit

Messages are serialized as JSON for direct NOTIFY, or MessagePack for attachment storage. This ensures compatibility with the Node.js socket.io-postgres-adapter, allowing mixed Go/Node.js deployments in the same cluster.

Database Schema

The adapter automatically creates the attachment table on startup:

CREATE TABLE IF NOT EXISTS socket_io_attachments (
    id bigserial UNIQUE,
    created_at timestamptz DEFAULT NOW(),
    payload bytea
);

Mixed Deployment

This Go adapter is wire-compatible with the Node.js socket.io-postgres-adapter and socket.io-postgres-emitter. You can mix Go and Node.js servers in the same cluster, as long as:

  • Both use the same channel prefix (default: socket.io)
  • Both use the same attachment table name (default: socket_io_attachments)
  • Both use the same namespace names

Testing

Run the test suite with:

make test

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

Support

If you encounter any issues or have questions, please file them in the issues section.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Overview

Package postgres defines constants for PostgreSQL-based message types used in the Socket.IO adapter. These message types are used for inter-node communication in a clustered Socket.IO environment.

Package postgres provides PostgreSQL client wrapper for Socket.IO PostgreSQL adapter. This package offers a unified interface for PostgreSQL operations with event handling support using LISTEN/NOTIFY for pub/sub communication.

Package postgres provides PostgreSQL-based adapter types and interfaces for Socket.IO clustering. These types define the message structures used for inter-node communication via PostgreSQL LISTEN/NOTIFY.

Index

Constants

View Source
const (
	// SOCKETS requests a list of socket IDs from other nodes.
	SOCKETS adapter.MessageType = iota

	// ALL_ROOMS requests a list of all rooms from other nodes.
	ALL_ROOMS

	// REMOTE_JOIN instructs other nodes to join a socket to specified rooms.
	REMOTE_JOIN

	// REMOTE_LEAVE instructs other nodes to remove a socket from specified rooms.
	REMOTE_LEAVE

	// REMOTE_DISCONNECT instructs other nodes to disconnect a specific socket.
	REMOTE_DISCONNECT

	// REMOTE_FETCH requests detailed socket information from other nodes.
	REMOTE_FETCH

	// SERVER_SIDE_EMIT broadcasts a server-side event to other nodes.
	SERVER_SIDE_EMIT

	// BROADCAST sends a packet to clients across all nodes.
	BROADCAST

	// BROADCAST_CLIENT_COUNT reports the number of clients that will receive a broadcast.
	BROADCAST_CLIENT_COUNT

	// BROADCAST_ACK sends acknowledgement responses for broadcast operations.
	BROADCAST_ACK
)

Message types for Socket.IO PostgreSQL adapter inter-node communication. These constants define the different operations that can be performed across multiple Socket.IO server nodes using PostgreSQL as the message broker.

View Source
const VERSION = version.VERSION

Variables

View Source
var ErrNilPostgresPacket = errors.New("cannot unmarshal into nil PostgresPacket")

ErrNilPostgresPacket indicates an attempt to unmarshal into a nil PostgresPacket.

Functions

This section is empty.

Types

type NotificationMessage added in v3.0.2

type NotificationMessage struct {
	Uid          adapter.ServerId    `json:"uid,omitempty"  msgpack:"uid,omitempty"`
	Type         adapter.MessageType `json:"type,omitempty"  msgpack:"type,omitempty"`
	AttachmentId string              `json:"attachmentId,omitempty"  msgpack:"attachmentId,omitempty"`
}

NotificationMessage represents a message received via PostgreSQL LISTEN/NOTIFY. It can either contain the full payload or a reference to an attachment.

type Parser

type Parser interface {
	// Encode serializes the given value into a byte slice.
	Encode(any) ([]byte, error)

	// Decode deserializes the byte slice into the given value.
	Decode([]byte, any) error
}

Parser defines the interface for encoding and decoding data for PostgreSQL communication. Implementations must be thread-safe as they may be called from multiple goroutines.

type PostgresClient

type PostgresClient struct {
	types.EventEmitter

	// Pool is the connection pool used for write operations
	// (pg_notify, INSERT, DELETE, SELECT, etc.).
	Pool *pgxpool.Pool

	// Context is the context used for PostgreSQL operations.
	// This context controls the lifecycle of subscriptions and operations.
	Context context.Context
	// contains filtered or unexported fields
}

PostgresClient wraps a pgxpool.Pool and provides context management and event emitting capabilities for the Socket.IO PostgreSQL adapter.

The client supports a separate listener connection for LISTEN/NOTIFY operations. The Pool is used for write operations (pg_notify, INSERT, DELETE, etc.) and the Listener connection is used for LISTEN operations.

The client supports error event emission, which allows higher-level components to handle PostgreSQL-related errors gracefully.

func NewPostgresClient

func NewPostgresClient(ctx context.Context, pool *pgxpool.Pool) *PostgresClient

NewPostgresClient creates a new PostgresClient with the given context and connection pool.

Parameters:

  • ctx: The context that controls the lifecycle of PostgreSQL operations. When canceled, all subscriptions and pending operations will be terminated.
  • pool: A pgxpool.Pool instance that handles the actual PostgreSQL communication.

Returns:

  • A pointer to the initialized PostgresClient instance.

Example:

pool, _ := pgxpool.New(context.Background(), "postgres://user:pass@localhost:5432/db")
pgClient := NewPostgresClient(context.Background(), pool)

func (*PostgresClient) CleanupAttachments

func (c *PostgresClient) CleanupAttachments(ctx context.Context, tableName string, cleanupIntervalMs int64) error

CleanupAttachments deletes attachments older than the specified interval.

Parameters:

  • ctx: The context for the operation.
  • tableName: The name of the attachment table.
  • cleanupIntervalMs: The age threshold in milliseconds; attachments older than this are deleted.

func (*PostgresClient) Close

func (c *PostgresClient) Close()

Close releases the listener connection if it was acquired.

func (*PostgresClient) EnsureTable

func (c *PostgresClient) EnsureTable(ctx context.Context, tableName string) error

EnsureTable creates the attachment table if it does not exist. This table is used to store large payloads that exceed the pg_notify limit.

Parameters:

  • ctx: The context for the operation.
  • tableName: The name of the table to create.

func (*PostgresClient) GetAttachment

func (c *PostgresClient) GetAttachment(ctx context.Context, tableName string, id int64) ([]byte, error)

GetAttachment retrieves a payload from the attachment table by ID.

Parameters:

  • ctx: The context for the operation.
  • tableName: The name of the attachment table.
  • id: The attachment ID.

func (*PostgresClient) InsertAttachment

func (c *PostgresClient) InsertAttachment(ctx context.Context, tableName string, payload []byte) (int64, error)

InsertAttachment inserts a payload into the attachment table and returns its generated ID.

Parameters:

  • ctx: The context for the operation.
  • tableName: The name of the attachment table.
  • payload: The binary payload to store.

func (*PostgresClient) Listen

func (c *PostgresClient) Listen(ctx context.Context, channels ...string) error

Listen subscribes to the specified PostgreSQL notification channels using LISTEN. A dedicated connection is used to ensure notifications are not lost.

Parameters:

  • ctx: The context for the LISTEN operation.
  • channels: One or more channel names to listen on.

func (*PostgresClient) Notify

func (c *PostgresClient) Notify(ctx context.Context, channel, payload string) error

Notify sends a NOTIFY on the specified channel with the given payload. Uses pg_notify() to send the notification through the connection pool.

Parameters:

  • ctx: The context for the notification operation.
  • channel: The notification channel name.
  • payload: The notification payload string.

func (*PostgresClient) Unlisten

func (c *PostgresClient) Unlisten(ctx context.Context, channels ...string) error

Unlisten unsubscribes from the specified PostgreSQL notification channels using UNLISTEN.

Parameters:

  • ctx: The context for the UNLISTEN operation.
  • channels: One or more channel names to unlisten from.

func (*PostgresClient) WaitForNotification

func (c *PostgresClient) WaitForNotification(ctx context.Context) (*pgconn.Notification, error)

WaitForNotification waits for a notification on the listener connection. This method blocks until a notification is received or the context is canceled.

Returns the received notification or an error if the wait was interrupted.

Directories

Path Synopsis
Package adapter provides configuration options for the PostgreSQL-based Socket.IO adapter.
Package adapter provides configuration options for the PostgreSQL-based Socket.IO adapter.
Package emitter provides broadcast capabilities for Socket.IO via PostgreSQL LISTEN/NOTIFY.
Package emitter provides broadcast capabilities for Socket.IO via PostgreSQL LISTEN/NOTIFY.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL