websocketspec

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2025 License: MIT Imports: 16 Imported by: 0

README

WebSocketSpec - Real-Time WebSocket API Framework

WebSocketSpec provides a WebSocket-based API specification for real-time, bidirectional communication with full CRUD operations, subscriptions, and lifecycle hooks.

Table of Contents

Features

  • Real-Time Bidirectional Communication: WebSocket-based persistent connections
  • Full CRUD Operations: Create, Read, Update, Delete with rich query options
  • Real-Time Subscriptions: Subscribe to entity changes with filter support
  • Automatic Notifications: Server pushes updates to subscribed clients
  • Lifecycle Hooks: Before/after hooks for all operations
  • Database Agnostic: Works with GORM and Bun ORM through adapters
  • Connection Management: Automatic connection tracking and cleanup
  • Request/Response Correlation: Message IDs for tracking requests
  • Filter & Sort: Advanced filtering, sorting, pagination, and preloading

Installation

go get github.com/bitechdev/ResolveSpec

Quick Start

Server Setup
package main

import (
    "net/http"
    "github.com/bitechdev/ResolveSpec/pkg/websocketspec"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
)

func main() {
    // Connect to database
    db, _ := gorm.Open(postgres.Open("your-connection-string"), &gorm.Config{})

    // Create WebSocket handler
    handler := websocketspec.NewHandlerWithGORM(db)

    // Register models
    handler.Registry.RegisterModel("public.users", &User{})
    handler.Registry.RegisterModel("public.posts", &Post{})

    // Setup WebSocket endpoint
    http.HandleFunc("/ws", handler.HandleWebSocket)

    // Start server
    http.ListenAndServe(":8080", nil)
}

type User struct {
    ID     uint   `json:"id" gorm:"primaryKey"`
    Name   string `json:"name"`
    Email  string `json:"email"`
    Status string `json:"status"`
}

type Post struct {
    ID      uint   `json:"id" gorm:"primaryKey"`
    Title   string `json:"title"`
    Content string `json:"content"`
    UserID  uint   `json:"user_id"`
}
Client Setup (JavaScript)
const ws = new WebSocket("ws://localhost:8080/ws");

ws.onopen = () => {
    console.log("Connected to WebSocket");
};

ws.onmessage = (event) => {
    const message = JSON.parse(event.data);
    console.log("Received:", message);
};

ws.onerror = (error) => {
    console.error("WebSocket error:", error);
};

Message Protocol

All messages are JSON-encoded with the following structure:

interface Message {
    id: string;                  // Unique message ID for correlation
    type: "request" | "response" | "notification" | "subscription";
    operation?: "read" | "create" | "update" | "delete" | "subscribe" | "unsubscribe" | "meta";
    schema?: string;             // Database schema
    entity: string;              // Table/model name
    record_id?: string;          // For single-record operations
    data?: any;                  // Request/response payload
    options?: QueryOptions;      // Filters, sorting, pagination
    subscription_id?: string;    // For subscription messages
    success?: boolean;           // Response success indicator
    error?: ErrorInfo;           // Error details
    metadata?: Record<string, any>; // Additional metadata
    timestamp?: string;          // Message timestamp
}

interface QueryOptions {
    filters?: FilterOption[];
    columns?: string[];
    preload?: PreloadOption[];
    sort?: SortOption[];
    limit?: number;
    offset?: number;
}

CRUD Operations

CREATE - Create New Records

Request:

{
    "id": "msg-1",
    "type": "request",
    "operation": "create",
    "schema": "public",
    "entity": "users",
    "data": {
        "name": "John Doe",
        "email": "john@example.com",
        "status": "active"
    }
}

Response:

{
    "id": "msg-1",
    "type": "response",
    "success": true,
    "data": {
        "id": 123,
        "name": "John Doe",
        "email": "john@example.com",
        "status": "active"
    },
    "timestamp": "2025-12-12T10:30:00Z"
}
READ - Query Records

Read Multiple Records:

{
    "id": "msg-2",
    "type": "request",
    "operation": "read",
    "schema": "public",
    "entity": "users",
    "options": {
        "filters": [
            {"column": "status", "operator": "eq", "value": "active"}
        ],
        "columns": ["id", "name", "email"],
        "sort": [
            {"column": "name", "direction": "asc"}
        ],
        "limit": 10,
        "offset": 0
    }
}

Read Single Record:

{
    "id": "msg-3",
    "type": "request",
    "operation": "read",
    "schema": "public",
    "entity": "users",
    "record_id": "123"
}

Response:

{
    "id": "msg-2",
    "type": "response",
    "success": true,
    "data": [
        {"id": 1, "name": "Alice", "email": "alice@example.com"},
        {"id": 2, "name": "Bob", "email": "bob@example.com"}
    ],
    "metadata": {
        "total": 50,
        "count": 2
    },
    "timestamp": "2025-12-12T10:30:00Z"
}
UPDATE - Update Records
{
    "id": "msg-4",
    "type": "request",
    "operation": "update",
    "schema": "public",
    "entity": "users",
    "record_id": "123",
    "data": {
        "name": "John Updated",
        "email": "john.updated@example.com"
    }
}
DELETE - Delete Records
{
    "id": "msg-5",
    "type": "request",
    "operation": "delete",
    "schema": "public",
    "entity": "users",
    "record_id": "123"
}

Subscriptions

Subscriptions allow clients to receive real-time notifications when entities change.

Subscribe to Changes
{
    "id": "sub-1",
    "type": "subscription",
    "operation": "subscribe",
    "schema": "public",
    "entity": "users",
    "options": {
        "filters": [
            {"column": "status", "operator": "eq", "value": "active"}
        ]
    }
}

Response:

{
    "id": "sub-1",
    "type": "response",
    "success": true,
    "data": {
        "subscription_id": "sub-abc123",
        "schema": "public",
        "entity": "users"
    },
    "timestamp": "2025-12-12T10:30:00Z"
}
Receive Notifications

When a subscribed entity changes, clients automatically receive notifications:

{
    "type": "notification",
    "operation": "create",
    "subscription_id": "sub-abc123",
    "schema": "public",
    "entity": "users",
    "data": {
        "id": 124,
        "name": "Jane Smith",
        "email": "jane@example.com",
        "status": "active"
    },
    "timestamp": "2025-12-12T10:35:00Z"
}

Notification Operations:

  • create - New record created
  • update - Record updated
  • delete - Record deleted
Unsubscribe
{
    "id": "unsub-1",
    "type": "subscription",
    "operation": "unsubscribe",
    "subscription_id": "sub-abc123"
}

Lifecycle Hooks

Hooks allow you to intercept and modify operations at various points in the lifecycle.

Available Hook Types
  • BeforeRead / AfterRead
  • BeforeCreate / AfterCreate
  • BeforeUpdate / AfterUpdate
  • BeforeDelete / AfterDelete
  • BeforeSubscribe / AfterSubscribe
  • BeforeConnect / AfterConnect
Hook Example
handler := websocketspec.NewHandlerWithGORM(db)

// Authorization hook
handler.Hooks().RegisterBefore(websocketspec.OperationRead, func(ctx *websocketspec.HookContext) error {
    // Check permissions
    userID, _ := ctx.Connection.GetMetadata("user_id")
    if userID == nil {
        return fmt.Errorf("unauthorized: user not authenticated")
    }

    // Add filter to only show user's own records
    if ctx.Entity == "posts" {
        ctx.Options.Filters = append(ctx.Options.Filters, common.FilterOption{
            Column:   "user_id",
            Operator: "eq",
            Value:    userID,
        })
    }

    return nil
})

// Logging hook
handler.Hooks().RegisterAfter(websocketspec.OperationCreate, func(ctx *websocketspec.HookContext) error {
    log.Printf("Created %s in %s.%s", ctx.Result, ctx.Schema, ctx.Entity)
    return nil
})

// Validation hook
handler.Hooks().RegisterBefore(websocketspec.OperationCreate, func(ctx *websocketspec.HookContext) error {
    // Validate data before creation
    if data, ok := ctx.Data.(map[string]interface{}); ok {
        if email, exists := data["email"]; !exists || email == "" {
            return fmt.Errorf("email is required")
        }
    }
    return nil
})

Client Examples

JavaScript/TypeScript Client
class WebSocketClient {
    private ws: WebSocket;
    private messageHandlers: Map<string, (data: any) => void> = new Map();
    private subscriptions: Map<string, (data: any) => void> = new Map();

    constructor(url: string) {
        this.ws = new WebSocket(url);
        this.ws.onmessage = (event) => this.handleMessage(event);
    }

    // Send request and wait for response
    async request(operation: string, entity: string, options?: any): Promise<any> {
        const id = this.generateId();

        return new Promise((resolve, reject) => {
            this.messageHandlers.set(id, (data) => {
                if (data.success) {
                    resolve(data.data);
                } else {
                    reject(data.error);
                }
            });

            this.ws.send(JSON.stringify({
                id,
                type: "request",
                operation,
                entity,
                ...options
            }));
        });
    }

    // Subscribe to entity changes
    async subscribe(entity: string, filters?: any[], callback?: (data: any) => void): Promise<string> {
        const id = this.generateId();

        return new Promise((resolve, reject) => {
            this.messageHandlers.set(id, (data) => {
                if (data.success) {
                    const subId = data.data.subscription_id;
                    if (callback) {
                        this.subscriptions.set(subId, callback);
                    }
                    resolve(subId);
                } else {
                    reject(data.error);
                }
            });

            this.ws.send(JSON.stringify({
                id,
                type: "subscription",
                operation: "subscribe",
                entity,
                options: { filters }
            }));
        });
    }

    private handleMessage(event: MessageEvent) {
        const message = JSON.parse(event.data);

        if (message.type === "response") {
            const handler = this.messageHandlers.get(message.id);
            if (handler) {
                handler(message);
                this.messageHandlers.delete(message.id);
            }
        } else if (message.type === "notification") {
            const callback = this.subscriptions.get(message.subscription_id);
            if (callback) {
                callback(message);
            }
        }
    }

    private generateId(): string {
        return `msg-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
    }
}

// Usage
const client = new WebSocketClient("ws://localhost:8080/ws");

// Read users
const users = await client.request("read", "users", {
    options: {
        filters: [{ column: "status", operator: "eq", value: "active" }],
        limit: 10
    }
});

// Subscribe to user changes
await client.subscribe("users",
    [{ column: "status", operator: "eq", value: "active" }],
    (notification) => {
        console.log("User changed:", notification.operation, notification.data);
    }
);

// Create user
const newUser = await client.request("create", "users", {
    data: {
        name: "Alice",
        email: "alice@example.com",
        status: "active"
    }
});
Python Client Example
import asyncio
import websockets
import json
import uuid

class WebSocketClient:
    def __init__(self, url):
        self.url = url
        self.ws = None
        self.handlers = {}
        self.subscriptions = {}

    async def connect(self):
        self.ws = await websockets.connect(self.url)
        asyncio.create_task(self.listen())

    async def listen(self):
        async for message in self.ws:
            data = json.loads(message)

            if data["type"] == "response":
                handler = self.handlers.get(data["id"])
                if handler:
                    handler(data)
                    del self.handlers[data["id"]]

            elif data["type"] == "notification":
                callback = self.subscriptions.get(data["subscription_id"])
                if callback:
                    callback(data)

    async def request(self, operation, entity, **kwargs):
        msg_id = str(uuid.uuid4())
        future = asyncio.Future()

        self.handlers[msg_id] = lambda data: future.set_result(data)

        await self.ws.send(json.dumps({
            "id": msg_id,
            "type": "request",
            "operation": operation,
            "entity": entity,
            **kwargs
        }))

        result = await future
        if result["success"]:
            return result["data"]
        else:
            raise Exception(result["error"]["message"])

    async def subscribe(self, entity, callback, filters=None):
        msg_id = str(uuid.uuid4())
        future = asyncio.Future()

        self.handlers[msg_id] = lambda data: future.set_result(data)

        await self.ws.send(json.dumps({
            "id": msg_id,
            "type": "subscription",
            "operation": "subscribe",
            "entity": entity,
            "options": {"filters": filters} if filters else {}
        }))

        result = await future
        if result["success"]:
            sub_id = result["data"]["subscription_id"]
            self.subscriptions[sub_id] = callback
            return sub_id
        else:
            raise Exception(result["error"]["message"])

# Usage
async def main():
    client = WebSocketClient("ws://localhost:8080/ws")
    await client.connect()

    # Read users
    users = await client.request("read", "users",
        options={
            "filters": [{"column": "status", "operator": "eq", "value": "active"}],
            "limit": 10
        }
    )
    print("Users:", users)

    # Subscribe to changes
    def on_user_change(notification):
        print(f"User {notification['operation']}: {notification['data']}")

    await client.subscribe("users", on_user_change,
        filters=[{"column": "status", "operator": "eq", "value": "active"}]
    )

asyncio.run(main())

Authentication

Implement authentication using hooks:

handler := websocketspec.NewHandlerWithGORM(db)

// Authentication on connection
handler.Hooks().Register(websocketspec.BeforeConnect, func(ctx *websocketspec.HookContext) error {
    // Extract token from query params or headers
    r := ctx.Connection.ws.UnderlyingConn().RemoteAddr()

    // Validate token (implement your auth logic)
    token := extractToken(r)
    user, err := validateToken(token)
    if err != nil {
        return fmt.Errorf("authentication failed: %w", err)
    }

    // Store user info in connection metadata
    ctx.Connection.SetMetadata("user", user)
    ctx.Connection.SetMetadata("user_id", user.ID)

    return nil
})

// Check permissions for each operation
handler.Hooks().RegisterBefore(websocketspec.OperationRead, func(ctx *websocketspec.HookContext) error {
    userID, ok := ctx.Connection.GetMetadata("user_id")
    if !ok {
        return fmt.Errorf("unauthorized")
    }

    // Add user-specific filters
    if ctx.Entity == "orders" {
        ctx.Options.Filters = append(ctx.Options.Filters, common.FilterOption{
            Column:   "user_id",
            Operator: "eq",
            Value:    userID,
        })
    }

    return nil
})

Error Handling

Errors are returned in a consistent format:

{
    "id": "msg-1",
    "type": "response",
    "success": false,
    "error": {
        "code": "validation_error",
        "message": "Email is required",
        "details": {
            "field": "email"
        }
    },
    "timestamp": "2025-12-12T10:30:00Z"
}

Common Error Codes:

  • invalid_message - Message format is invalid
  • model_not_found - Entity not registered
  • invalid_model - Model validation failed
  • read_error - Read operation failed
  • create_error - Create operation failed
  • update_error - Update operation failed
  • delete_error - Delete operation failed
  • hook_error - Hook execution failed
  • unauthorized - Authentication/authorization failed

Best Practices

  1. Always Use Message IDs: Correlate requests with responses using unique IDs
  2. Handle Reconnections: Implement automatic reconnection logic on the client
  3. Validate Data: Use before-hooks to validate data before operations
  4. Limit Subscriptions: Implement limits on subscriptions per connection
  5. Use Filters: Apply filters to subscriptions to reduce unnecessary notifications
  6. Implement Authentication: Always validate users before processing operations
  7. Handle Errors Gracefully: Display user-friendly error messages
  8. Clean Up: Unsubscribe when components unmount or disconnect
  9. Rate Limiting: Implement rate limiting to prevent abuse
  10. Monitor Connections: Track active connections and subscriptions

Filter Operators

Supported filter operators:

  • eq - Equal (=)
  • neq - Not Equal (!=)
  • gt - Greater Than (>)
  • gte - Greater Than or Equal (>=)
  • lt - Less Than (<)
  • lte - Less Than or Equal (<=)
  • like - LIKE (case-sensitive)
  • ilike - ILIKE (case-insensitive)
  • in - IN (array of values)

Performance Considerations

  • Connection Pooling: WebSocket connections are reused, reducing overhead
  • Subscription Filtering: Only matching updates are sent to clients
  • Efficient Queries: Uses database adapters for optimized queries
  • Message Batching: Multiple messages can be sent in one write
  • Keepalive: Automatic ping/pong for connection health

Comparison with Other Specs

Feature WebSocketSpec RestHeadSpec ResolveSpec
Protocol WebSocket HTTP/REST HTTP/REST
Real-time ✅ Yes ❌ No ❌ No
Subscriptions ✅ Yes ❌ No ❌ No
Bidirectional ✅ Yes ❌ No ❌ No
Query Options In Message In Headers In Body
Overhead Low Medium Medium
Use Case Real-time apps Traditional APIs Body-based APIs

License

MIT License - See LICENSE file for details

Documentation

Overview

Package websocketspec provides a WebSocket-based API specification for real-time CRUD operations with bidirectional communication and subscription support.

Key Features

  • Real-time bidirectional communication over WebSocket
  • CRUD operations (Create, Read, Update, Delete)
  • Real-time subscriptions with filtering
  • Lifecycle hooks for all operations
  • Database-agnostic: Works with GORM and Bun ORM through adapters
  • Automatic change notifications to subscribers
  • Connection and subscription management

Message Protocol

WebSocketSpec uses JSON messages for communication:

{
  "id": "unique-message-id",
  "type": "request|response|notification|subscription",
  "operation": "read|create|update|delete|subscribe|unsubscribe",
  "schema": "public",
  "entity": "users",
  "data": {...},
  "options": {
    "filters": [...],
    "columns": [...],
    "preload": [...],
    "sort": [...],
    "limit": 10
  }
}

Usage Example

// Create handler with GORM
handler := websocketspec.NewHandlerWithGORM(db)

// Register models
handler.Registry.RegisterModel("public.users", &User{})

// Setup WebSocket endpoint
http.HandleFunc("/ws", handler.HandleWebSocket)

// Start server
http.ListenAndServe(":8080", nil)

Client Example

// Connect to WebSocket
ws := new WebSocket("ws://localhost:8080/ws")

// Send read request
ws.send(JSON.stringify({
  id: "msg-1",
  type: "request",
  operation: "read",
  entity: "users",
  options: {
    filters: [{column: "status", operator: "eq", value: "active"}],
    limit: 10
  }
}))

// Subscribe to changes
ws.send(JSON.stringify({
  id: "msg-2",
  type: "subscription",
  operation: "subscribe",
  entity: "users",
  options: {
    filters: [{column: "status", operator: "eq", value: "active"}]
  }
}))
Example (BasicSetup)

Example_basicSetup demonstrates basic WebSocketSpec setup

package main

import (
	"log"
	"net/http"

	"github.com/bitechdev/ResolveSpec/pkg/websocketspec"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
)

// User model example
type User struct {
	ID     uint   `json:"id" gorm:"primaryKey"`
	Name   string `json:"name"`
	Email  string `json:"email"`
	Status string `json:"status"`
}

// Post model example
type Post struct {
	ID      uint   `json:"id" gorm:"primaryKey"`
	Title   string `json:"title"`
	Content string `json:"content"`
	UserID  uint   `json:"user_id"`
	User    *User  `json:"user,omitempty" gorm:"foreignKey:UserID"`
}

func main() {
	// Connect to database
	db, err := gorm.Open(postgres.Open("your-connection-string"), &gorm.Config{})
	if err != nil {
		log.Fatal(err)
	}

	// Create WebSocket handler
	handler := websocketspec.NewHandlerWithGORM(db)

	// Register models
	handler.Registry().RegisterModel("public.users", &User{})
	handler.Registry().RegisterModel("public.posts", &Post{})

	// Setup WebSocket endpoint
	http.HandleFunc("/ws", handler.HandleWebSocket)

	// Start server
	log.Println("WebSocket server starting on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal(err)
	}
}
Example (ClientSide)

Example_clientSide shows client-side usage example

package main

import (
	"fmt"
)

func main() {
	// This is JavaScript code for documentation purposes
	jsCode := `
// JavaScript WebSocket Client Example

const ws = new WebSocket("ws://localhost:8080/ws");

ws.onopen = () => {
    console.log("Connected to WebSocket");

    // Read users
    ws.send(JSON.stringify({
        id: "msg-1",
        type: "request",
        operation: "read",
        schema: "public",
        entity: "users",
        options: {
            filters: [{column: "status", operator: "eq", value: "active"}],
            limit: 10
        }
    }));

    // Subscribe to user changes
    ws.send(JSON.stringify({
        id: "sub-1",
        type: "subscription",
        operation: "subscribe",
        schema: "public",
        entity: "users",
        options: {
            filters: [{column: "status", operator: "eq", value: "active"}]
        }
    }));
};

ws.onmessage = (event) => {
    const message = JSON.parse(event.data);

    if (message.type === "response") {
        if (message.success) {
            console.log("Response:", message.data);
        } else {
            console.error("Error:", message.error);
        }
    } else if (message.type === "notification") {
        console.log("Notification:", message.operation, message.data);
    }
};

ws.onerror = (error) => {
    console.error("WebSocket error:", error);
};

ws.onclose = () => {
    console.log("WebSocket connection closed");
    // Implement reconnection logic here
};
`

	fmt.Println(jsCode)
}
Example (Monitoring)

Example_monitoring demonstrates monitoring connections and subscriptions

package main

import (
	"fmt"
	"log"
	"net/http"

	"github.com/bitechdev/ResolveSpec/pkg/websocketspec"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
)

// User model example
type User struct {
	ID     uint   `json:"id" gorm:"primaryKey"`
	Name   string `json:"name"`
	Email  string `json:"email"`
	Status string `json:"status"`
}

func main() {
	db, _ := gorm.Open(postgres.Open("your-connection-string"), &gorm.Config{})
	handler := websocketspec.NewHandlerWithGORM(db)

	handler.Registry().RegisterModel("public.users", &User{})

	// Add connection tracking
	handler.Hooks().Register(websocketspec.AfterConnect, func(ctx *websocketspec.HookContext) error {
		count := handler.GetConnectionCount()
		log.Printf("Client connected. Total connections: %d", count)
		return nil
	})

	handler.Hooks().Register(websocketspec.AfterDisconnect, func(ctx *websocketspec.HookContext) error {
		count := handler.GetConnectionCount()
		log.Printf("Client disconnected. Total connections: %d", count)
		return nil
	})

	// Add subscription tracking
	handler.Hooks().Register(websocketspec.AfterSubscribe, func(ctx *websocketspec.HookContext) error {
		count := handler.GetSubscriptionCount()
		log.Printf("New subscription. Total subscriptions: %d", count)
		return nil
	})

	// Monitoring endpoint
	http.HandleFunc("/stats", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintf(w, "Active Connections: %d\n", handler.GetConnectionCount())
		fmt.Fprintf(w, "Active Subscriptions: %d\n", handler.GetSubscriptionCount())
	})

	http.HandleFunc("/ws", handler.HandleWebSocket)
	log.Println("Server with monitoring starting on :8080")
	http.ListenAndServe(":8080", nil)
}
Example (WithHooks)

Example_withHooks demonstrates using lifecycle hooks

package main

import (
	"fmt"
	"log"
	"net/http"

	"github.com/bitechdev/ResolveSpec/pkg/websocketspec"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
)

// User model example
type User struct {
	ID     uint   `json:"id" gorm:"primaryKey"`
	Name   string `json:"name"`
	Email  string `json:"email"`
	Status string `json:"status"`
}

func main() {
	db, _ := gorm.Open(postgres.Open("your-connection-string"), &gorm.Config{})
	handler := websocketspec.NewHandlerWithGORM(db)

	// Register models
	handler.Registry().RegisterModel("public.users", &User{})

	// Add authentication hook
	handler.Hooks().Register(websocketspec.BeforeConnect, func(ctx *websocketspec.HookContext) error {
		// Validate authentication token
		// (In real implementation, extract from query params or headers)
		userID := uint(123) // From token

		// Store in connection metadata
		ctx.Connection.SetMetadata("user_id", userID)
		log.Printf("User %d connected", userID)

		return nil
	})

	// Add authorization hook for read operations
	handler.Hooks().RegisterBefore(websocketspec.OperationRead, func(ctx *websocketspec.HookContext) error {
		userID, ok := ctx.Connection.GetMetadata("user_id")
		if !ok {
			return fmt.Errorf("unauthorized: not authenticated")
		}

		log.Printf("User %v reading %s.%s", userID, ctx.Schema, ctx.Entity)

		// Add filter to only show user's own records
		if ctx.Entity == "posts" {
			// ctx.Options.Filters = append(ctx.Options.Filters, common.FilterOption{
			// 	Column:   "user_id",
			// 	Operator: "eq",
			// 	Value:    userID,
			// })
		}

		return nil
	})

	// Add logging hook after create
	handler.Hooks().RegisterAfter(websocketspec.OperationCreate, func(ctx *websocketspec.HookContext) error {
		userID, _ := ctx.Connection.GetMetadata("user_id")
		log.Printf("User %v created record in %s.%s", userID, ctx.Schema, ctx.Entity)
		return nil
	})

	// Add validation hook before create
	handler.Hooks().RegisterBefore(websocketspec.OperationCreate, func(ctx *websocketspec.HookContext) error {
		// Validate required fields
		if data, ok := ctx.Data.(map[string]interface{}); ok {
			if ctx.Entity == "users" {
				if email, exists := data["email"]; !exists || email == "" {
					return fmt.Errorf("validation error: email is required")
				}
				if name, exists := data["name"]; !exists || name == "" {
					return fmt.Errorf("validation error: name is required")
				}
			}
		}
		return nil
	})

	// Add limit hook for subscriptions
	handler.Hooks().Register(websocketspec.BeforeSubscribe, func(ctx *websocketspec.HookContext) error {
		// Limit subscriptions per connection
		maxSubscriptions := 10
		// Note: In a real implementation, you would count subscriptions using the connection's methods
		// currentCount := len(ctx.Connection.subscriptions) // subscriptions is private

		// For demonstration purposes, we'll just log
		log.Printf("Creating subscription (max: %d)", maxSubscriptions)
		return nil
	})

	http.HandleFunc("/ws", handler.HandleWebSocket)
	log.Println("Server with hooks starting on :8080")
	http.ListenAndServe(":8080", nil)
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExampleAuthentication

func ExampleAuthentication()

ExampleAuthentication shows how to implement authentication

func ExampleCRUDOperations

func ExampleCRUDOperations()

ExampleCRUDOperations shows basic CRUD operations

func ExampleWithBun

func ExampleWithBun(bunDB *bun.DB)

ExampleWithBun shows how to use WebSocketSpec with Bun ORM

func ExampleWithGORM

func ExampleWithGORM(db *gorm.DB)

ExampleWithGORM shows how to use WebSocketSpec with GORM

func ExampleWithHooks

func ExampleWithHooks(db *gorm.DB)

ExampleWithHooks shows how to use lifecycle hooks

func ExampleWithSubscriptions

func ExampleWithSubscriptions()

ExampleWithSubscriptions shows subscription usage

Types

type BroadcastMessage

type BroadcastMessage struct {
	// Message is the message to broadcast
	Message []byte

	// Filter is an optional function to filter which connections receive the message
	Filter func(*Connection) bool
}

BroadcastMessage represents a message to broadcast to multiple connections

type Connection

type Connection struct {
	// ID is a unique identifier for this connection
	ID string
	// contains filtered or unexported fields
}

Connection rvepresents a WebSocket connection with its state

func NewConnection

func NewConnection(id string, ws *websocket.Conn, handler *Handler) *Connection

NewConnection creates a new WebSocket connection

func (*Connection) AddSubscription

func (c *Connection) AddSubscription(sub *Subscription)

AddSubscription adds a subscription to this connection

func (*Connection) Close

func (c *Connection) Close()

Close closes the connection

func (*Connection) GetMetadata

func (c *Connection) GetMetadata(key string) (interface{}, bool)

GetMetadata retrieves metadata for this connection

func (*Connection) GetSubscription

func (c *Connection) GetSubscription(subID string) (*Subscription, bool)

GetSubscription retrieves a subscription by ID

func (*Connection) ReadPump

func (c *Connection) ReadPump()

ReadPump reads messages from the WebSocket connection

func (*Connection) RemoveSubscription

func (c *Connection) RemoveSubscription(subID string)

RemoveSubscription removes a subscription from this connection

func (*Connection) Send

func (c *Connection) Send(message []byte) error

Send sends a message to this connection

func (*Connection) SendJSON

func (c *Connection) SendJSON(v interface{}) error

SendJSON sends a JSON-encoded message to this connection

func (*Connection) SetMetadata

func (c *Connection) SetMetadata(key string, value interface{})

SetMetadata sets metadata for this connection

func (*Connection) WritePump

func (c *Connection) WritePump()

WritePump writes messages to the WebSocket connection

type ConnectionManager

type ConnectionManager struct {
	// contains filtered or unexported fields
}

ConnectionManager manages all active WebSocket connections

func NewConnectionManager

func NewConnectionManager(ctx context.Context) *ConnectionManager

NewConnectionManager creates a new connection manager

func (*ConnectionManager) Broadcast

func (cm *ConnectionManager) Broadcast(message []byte, filter func(*Connection) bool)

Broadcast sends a message to all connections matching the filter

func (*ConnectionManager) Count

func (cm *ConnectionManager) Count() int

Count returns the number of active connections

func (*ConnectionManager) GetConnection

func (cm *ConnectionManager) GetConnection(id string) (*Connection, bool)

GetConnection retrieves a connection by ID

func (*ConnectionManager) Register

func (cm *ConnectionManager) Register(conn *Connection)

Register registers a new connection

func (*ConnectionManager) Run

func (cm *ConnectionManager) Run()

Run starts the connection manager event loop

func (*ConnectionManager) Shutdown

func (cm *ConnectionManager) Shutdown()

Shutdown gracefully shuts down the connection manager

func (*ConnectionManager) Unregister

func (cm *ConnectionManager) Unregister(conn *Connection)

Unregister removes a connection

type ErrorInfo

type ErrorInfo struct {
	// Code is the error code
	Code string `json:"code"`

	// Message is a human-readable error message
	Message string `json:"message"`

	// Details contains additional error context
	Details map[string]interface{} `json:"details,omitempty"`
}

ErrorInfo contains error details

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler handles WebSocket connections and messages

func NewHandler

func NewHandler(db common.Database, registry common.ModelRegistry) *Handler

NewHandler creates a new WebSocket handler

func NewHandlerWithBun

func NewHandlerWithBun(db *bun.DB) *Handler

NewHandlerWithBun creates a new Handler with Bun adapter

func NewHandlerWithDatabase

func NewHandlerWithDatabase(db common.Database, registry common.ModelRegistry) *Handler

NewHandlerWithDatabase creates a new Handler with a custom database adapter

func NewHandlerWithGORM

func NewHandlerWithGORM(db *gorm.DB) *Handler

NewHandlerWithGORM creates a new Handler with GORM adapter

func (*Handler) BroadcastMessage

func (h *Handler) BroadcastMessage(message interface{}, filter func(*Connection) bool) error

BroadcastMessage sends a message to all connections matching the filter

func (*Handler) GetConnection

func (h *Handler) GetConnection(id string) (*Connection, bool)

GetConnection retrieves a connection by ID

func (*Handler) GetConnectionCount

func (h *Handler) GetConnectionCount() int

GetConnectionCount returns the number of active connections

func (*Handler) GetDatabase

func (h *Handler) GetDatabase() common.Database

GetDatabase returns the underlying database connection Implements common.SpecHandler interface

func (*Handler) GetRelationshipInfo

func (h *Handler) GetRelationshipInfo(modelType reflect.Type, relationName string) *common.RelationshipInfo

GetRelationshipInfo implements the RelationshipInfoProvider interface This is a placeholder implementation - full relationship support can be added later

func (*Handler) GetSubscriptionCount

func (h *Handler) GetSubscriptionCount() int

GetSubscriptionCount returns the number of active subscriptions

func (*Handler) HandleMessage

func (h *Handler) HandleMessage(conn *Connection, msg *Message)

HandleMessage routes incoming messages to appropriate handlers

func (*Handler) HandleWebSocket

func (h *Handler) HandleWebSocket(w http.ResponseWriter, r *http.Request)

HandleWebSocket upgrades HTTP connection to WebSocket

func (*Handler) Hooks

func (h *Handler) Hooks() *HookRegistry

Hooks returns the hook registry for this handler

func (*Handler) Registry

func (h *Handler) Registry() common.ModelRegistry

Registry returns the model registry for this handler

func (*Handler) Shutdown

func (h *Handler) Shutdown()

Shutdown gracefully shuts down the handler

type HookContext

type HookContext struct {
	// Context is the request context
	Context context.Context

	// Handler provides access to the handler, database, and registry
	Handler *Handler

	// Connection is the WebSocket connection
	Connection *Connection

	// Message is the original message
	Message *Message

	// Schema is the database schema
	Schema string

	// Entity is the table/model name
	Entity string

	// TableName is the actual database table name
	TableName string

	// Model is the registered model instance
	Model interface{}

	// ModelPtr is a pointer to the model for queries
	ModelPtr interface{}

	// Options contains the parsed request options
	Options *common.RequestOptions

	// ID is the record ID for single-record operations
	ID string

	// Data is the request data (for create/update operations)
	Data interface{}

	// Result is the operation result (for after hooks)
	Result interface{}

	// Subscription is the subscription being created/removed
	Subscription *Subscription

	// Error is any error that occurred (for after hooks)
	Error error

	// Metadata is additional context data
	Metadata map[string]interface{}
}

HookContext contains context information for hook execution

type HookFunc

type HookFunc func(*HookContext) error

HookFunc is a function that processes a hook

type HookRegistry

type HookRegistry struct {
	// contains filtered or unexported fields
}

HookRegistry manages lifecycle hooks

func NewHookRegistry

func NewHookRegistry() *HookRegistry

NewHookRegistry creates a new hook registry

func (*HookRegistry) Clear

func (hr *HookRegistry) Clear(hookType HookType)

Clear removes all hooks of a specific type

func (*HookRegistry) ClearAll

func (hr *HookRegistry) ClearAll()

ClearAll removes all registered hooks

func (*HookRegistry) Execute

func (hr *HookRegistry) Execute(hookType HookType, ctx *HookContext) error

Execute runs all hooks for a specific type

func (*HookRegistry) HasHooks

func (hr *HookRegistry) HasHooks(hookType HookType) bool

HasHooks checks if any hooks are registered for a hook type

func (*HookRegistry) Register

func (hr *HookRegistry) Register(hookType HookType, fn HookFunc)

Register registers a hook function for a specific hook type

func (*HookRegistry) RegisterAfter

func (hr *HookRegistry) RegisterAfter(operation OperationType, fn HookFunc)

RegisterAfter registers a hook that runs after an operation Convenience method for AfterRead, AfterCreate, AfterUpdate, AfterDelete

func (*HookRegistry) RegisterBefore

func (hr *HookRegistry) RegisterBefore(operation OperationType, fn HookFunc)

RegisterBefore registers a hook that runs before an operation Convenience method for BeforeRead, BeforeCreate, BeforeUpdate, BeforeDelete

type HookType

type HookType string

HookType represents the type of lifecycle hook

const (
	// BeforeRead is called before a read operation
	BeforeRead HookType = "before_read"
	// AfterRead is called after a read operation
	AfterRead HookType = "after_read"

	// BeforeCreate is called before a create operation
	BeforeCreate HookType = "before_create"
	// AfterCreate is called after a create operation
	AfterCreate HookType = "after_create"

	// BeforeUpdate is called before an update operation
	BeforeUpdate HookType = "before_update"
	// AfterUpdate is called after an update operation
	AfterUpdate HookType = "after_update"

	// BeforeDelete is called before a delete operation
	BeforeDelete HookType = "before_delete"
	// AfterDelete is called after a delete operation
	AfterDelete HookType = "after_delete"

	// BeforeSubscribe is called before creating a subscription
	BeforeSubscribe HookType = "before_subscribe"
	// AfterSubscribe is called after creating a subscription
	AfterSubscribe HookType = "after_subscribe"

	// BeforeUnsubscribe is called before removing a subscription
	BeforeUnsubscribe HookType = "before_unsubscribe"
	// AfterUnsubscribe is called after removing a subscription
	AfterUnsubscribe HookType = "after_unsubscribe"

	// BeforeConnect is called when a new connection is established
	BeforeConnect HookType = "before_connect"
	// AfterConnect is called after a connection is established
	AfterConnect HookType = "after_connect"

	// BeforeDisconnect is called before a connection is closed
	BeforeDisconnect HookType = "before_disconnect"
	// AfterDisconnect is called after a connection is closed
	AfterDisconnect HookType = "after_disconnect"
)

type Message

type Message struct {
	// ID is a unique identifier for request/response correlation
	ID string `json:"id,omitempty"`

	// Type is the message type
	Type MessageType `json:"type"`

	// Operation is the operation to perform
	Operation OperationType `json:"operation,omitempty"`

	// Schema is the database schema name
	Schema string `json:"schema,omitempty"`

	// Entity is the table/model name
	Entity string `json:"entity,omitempty"`

	// RecordID is the ID for single-record operations (update, delete, read by ID)
	RecordID string `json:"record_id,omitempty"`

	// Data contains the request/response payload
	Data interface{} `json:"data,omitempty"`

	// Options contains query options (filters, sorting, pagination, etc.)
	Options *common.RequestOptions `json:"options,omitempty"`

	// SubscriptionID is the subscription identifier
	SubscriptionID string `json:"subscription_id,omitempty"`

	// Success indicates if the operation was successful
	Success bool `json:"success,omitempty"`

	// Error contains error information
	Error *ErrorInfo `json:"error,omitempty"`

	// Metadata contains additional response metadata
	Metadata map[string]interface{} `json:"metadata,omitempty"`

	// Timestamp is when the message was created
	Timestamp time.Time `json:"timestamp,omitempty"`
}

Message represents a WebSocket message

func ParseMessage

func ParseMessage(data []byte) (*Message, error)

ParseMessage parses a JSON message into a Message struct

func (*Message) IsValid

func (m *Message) IsValid() bool

IsValid checks if a message is valid

func (*Message) ToJSON

func (m *Message) ToJSON() ([]byte, error)

ToJSON converts a message to JSON bytes

type MessageType

type MessageType string

MessageType represents the type of WebSocket message

const (
	// MessageTypeRequest is a client request message
	MessageTypeRequest MessageType = "request"
	// MessageTypeResponse is a server response message
	MessageTypeResponse MessageType = "response"
	// MessageTypeNotification is a server-initiated notification
	MessageTypeNotification MessageType = "notification"
	// MessageTypeSubscription is a subscription control message
	MessageTypeSubscription MessageType = "subscription"
	// MessageTypeError is an error message
	MessageTypeError MessageType = "error"
	// MessageTypePing is a keepalive ping message
	MessageTypePing MessageType = "ping"
	// MessageTypePong is a keepalive pong response
	MessageTypePong MessageType = "pong"
)

type NotificationMessage

type NotificationMessage struct {
	Type           MessageType   `json:"type"`
	Operation      OperationType `json:"operation"`
	SubscriptionID string        `json:"subscription_id"`
	Schema         string        `json:"schema"`
	Entity         string        `json:"entity"`
	Data           interface{}   `json:"data"`
	Timestamp      time.Time     `json:"timestamp"`
}

NotificationMessage represents a server-initiated notification

func NewNotificationMessage

func NewNotificationMessage(subscriptionID string, operation OperationType, schema, entity string, data interface{}) *NotificationMessage

NewNotificationMessage creates a new notification message

func (*NotificationMessage) ToJSON

func (n *NotificationMessage) ToJSON() ([]byte, error)

ToJSON converts a notification message to JSON bytes

type OperationType

type OperationType string

OperationType represents the operation to perform

const (
	// OperationRead retrieves records
	OperationRead OperationType = "read"
	// OperationCreate creates a new record
	OperationCreate OperationType = "create"
	// OperationUpdate updates an existing record
	OperationUpdate OperationType = "update"
	// OperationDelete deletes a record
	OperationDelete OperationType = "delete"
	// OperationSubscribe subscribes to entity changes
	OperationSubscribe OperationType = "subscribe"
	// OperationUnsubscribe unsubscribes from entity changes
	OperationUnsubscribe OperationType = "unsubscribe"
	// OperationMeta retrieves metadata about an entity
	OperationMeta OperationType = "meta"
)

type RequestMessage

type RequestMessage struct {
	ID        string                 `json:"id"`
	Type      MessageType            `json:"type"`
	Operation OperationType          `json:"operation"`
	Schema    string                 `json:"schema,omitempty"`
	Entity    string                 `json:"entity"`
	RecordID  string                 `json:"record_id,omitempty"`
	Data      interface{}            `json:"data,omitempty"`
	Options   *common.RequestOptions `json:"options,omitempty"`
}

RequestMessage represents a client request

func NewRequestMessage

func NewRequestMessage(id string, operation OperationType, schema, entity string) *RequestMessage

NewRequestMessage creates a new request message

type ResponseMessage

type ResponseMessage struct {
	ID        string                 `json:"id"`
	Type      MessageType            `json:"type"`
	Success   bool                   `json:"success"`
	Data      interface{}            `json:"data,omitempty"`
	Error     *ErrorInfo             `json:"error,omitempty"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
	Timestamp time.Time              `json:"timestamp"`
}

ResponseMessage represents a server response

func NewErrorResponse

func NewErrorResponse(id string, code, message string) *ResponseMessage

NewErrorResponse creates an error response message

func NewResponseMessage

func NewResponseMessage(id string, success bool, data interface{}) *ResponseMessage

NewResponseMessage creates a new response message

func (*ResponseMessage) ToJSON

func (r *ResponseMessage) ToJSON() ([]byte, error)

ToJSON converts a response message to JSON bytes

type Subscription

type Subscription struct {
	// ID is the unique subscription identifier
	ID string

	// ConnectionID is the ID of the connection that owns this subscription
	ConnectionID string

	// Schema is the database schema
	Schema string

	// Entity is the table/model name
	Entity string

	// Options contains filters and other query options
	Options *common.RequestOptions

	// Active indicates if the subscription is active
	Active bool
}

Subscription represents a subscription to entity changes

func (*Subscription) MatchesFilters

func (s *Subscription) MatchesFilters(data interface{}) bool

MatchesFilters checks if data matches the subscription's filters

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

SubscriptionManager manages all subscriptions

func NewSubscriptionManager

func NewSubscriptionManager() *SubscriptionManager

NewSubscriptionManager creates a new subscription manager

func (*SubscriptionManager) Count

func (sm *SubscriptionManager) Count() int

Count returns the total number of active subscriptions

func (*SubscriptionManager) CountForEntity

func (sm *SubscriptionManager) CountForEntity(schema, entity string) int

CountForEntity returns the number of subscriptions for a specific entity

func (*SubscriptionManager) GetSubscription

func (sm *SubscriptionManager) GetSubscription(subID string) (*Subscription, bool)

GetSubscription retrieves a subscription by ID

func (*SubscriptionManager) GetSubscriptionsByConnection

func (sm *SubscriptionManager) GetSubscriptionsByConnection(connID string) []*Subscription

GetSubscriptionsByConnection retrieves all subscriptions for a connection

func (*SubscriptionManager) GetSubscriptionsByEntity

func (sm *SubscriptionManager) GetSubscriptionsByEntity(schema, entity string) []*Subscription

GetSubscriptionsByEntity retrieves all subscriptions for an entity

func (*SubscriptionManager) Subscribe

func (sm *SubscriptionManager) Subscribe(id, connID, schema, entity string, options *common.RequestOptions) *Subscription

Subscribe creates a new subscription

func (*SubscriptionManager) Unsubscribe

func (sm *SubscriptionManager) Unsubscribe(subID string) bool

Unsubscribe removes a subscription

type SubscriptionMessage

type SubscriptionMessage struct {
	ID             string                 `json:"id"`
	Type           MessageType            `json:"type"`
	Operation      OperationType          `json:"operation"` // subscribe or unsubscribe
	Schema         string                 `json:"schema,omitempty"`
	Entity         string                 `json:"entity"`
	Options        *common.RequestOptions `json:"options,omitempty"`         // Filters for subscription
	SubscriptionID string                 `json:"subscription_id,omitempty"` // For unsubscribe
}

SubscriptionMessage represents a subscription control message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL