Documentation
¶
Overview ¶
Package websocketspec provides a WebSocket-based API specification for real-time CRUD operations with bidirectional communication and subscription support.
Key Features ¶
- Real-time bidirectional communication over WebSocket
- CRUD operations (Create, Read, Update, Delete)
- Real-time subscriptions with filtering
- Lifecycle hooks for all operations
- Database-agnostic: Works with GORM and Bun ORM through adapters
- Automatic change notifications to subscribers
- Connection and subscription management
Message Protocol ¶
WebSocketSpec uses JSON messages for communication:
{
"id": "unique-message-id",
"type": "request|response|notification|subscription",
"operation": "read|create|update|delete|subscribe|unsubscribe",
"schema": "public",
"entity": "users",
"data": {...},
"options": {
"filters": [...],
"columns": [...],
"preload": [...],
"sort": [...],
"limit": 10
}
}
Usage Example ¶
// Create handler with GORM
handler := websocketspec.NewHandlerWithGORM(db)
// Register models
handler.Registry.RegisterModel("public.users", &User{})
// Setup WebSocket endpoint
http.HandleFunc("/ws", handler.HandleWebSocket)
// Start server
http.ListenAndServe(":8080", nil)
Client Example ¶
// Connect to WebSocket
ws := new WebSocket("ws://localhost:8080/ws")
// Send read request
ws.send(JSON.stringify({
id: "msg-1",
type: "request",
operation: "read",
entity: "users",
options: {
filters: [{column: "status", operator: "eq", value: "active"}],
limit: 10
}
}))
// Subscribe to changes
ws.send(JSON.stringify({
id: "msg-2",
type: "subscription",
operation: "subscribe",
entity: "users",
options: {
filters: [{column: "status", operator: "eq", value: "active"}]
}
}))
Example (BasicSetup) ¶
Example_basicSetup demonstrates basic WebSocketSpec setup
package main
import (
"log"
"net/http"
"github.com/bitechdev/ResolveSpec/pkg/websocketspec"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
// User model example
type User struct {
ID uint `json:"id" gorm:"primaryKey"`
Name string `json:"name"`
Email string `json:"email"`
Status string `json:"status"`
}
// Post model example
type Post struct {
ID uint `json:"id" gorm:"primaryKey"`
Title string `json:"title"`
Content string `json:"content"`
UserID uint `json:"user_id"`
User *User `json:"user,omitempty" gorm:"foreignKey:UserID"`
}
func main() {
// Connect to database
db, err := gorm.Open(postgres.Open("your-connection-string"), &gorm.Config{})
if err != nil {
log.Fatal(err)
}
// Create WebSocket handler
handler := websocketspec.NewHandlerWithGORM(db)
// Register models
handler.Registry().RegisterModel("public.users", &User{})
handler.Registry().RegisterModel("public.posts", &Post{})
// Setup WebSocket endpoint
http.HandleFunc("/ws", handler.HandleWebSocket)
// Start server
log.Println("WebSocket server starting on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}
Example (ClientSide) ¶
Example_clientSide shows client-side usage example
package main
import (
"fmt"
)
func main() {
// This is JavaScript code for documentation purposes
jsCode := `
// JavaScript WebSocket Client Example
const ws = new WebSocket("ws://localhost:8080/ws");
ws.onopen = () => {
console.log("Connected to WebSocket");
// Read users
ws.send(JSON.stringify({
id: "msg-1",
type: "request",
operation: "read",
schema: "public",
entity: "users",
options: {
filters: [{column: "status", operator: "eq", value: "active"}],
limit: 10
}
}));
// Subscribe to user changes
ws.send(JSON.stringify({
id: "sub-1",
type: "subscription",
operation: "subscribe",
schema: "public",
entity: "users",
options: {
filters: [{column: "status", operator: "eq", value: "active"}]
}
}));
};
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === "response") {
if (message.success) {
console.log("Response:", message.data);
} else {
console.error("Error:", message.error);
}
} else if (message.type === "notification") {
console.log("Notification:", message.operation, message.data);
}
};
ws.onerror = (error) => {
console.error("WebSocket error:", error);
};
ws.onclose = () => {
console.log("WebSocket connection closed");
// Implement reconnection logic here
};
`
fmt.Println(jsCode)
}
Example (Monitoring) ¶
Example_monitoring demonstrates monitoring connections and subscriptions
package main
import (
"fmt"
"log"
"net/http"
"github.com/bitechdev/ResolveSpec/pkg/websocketspec"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
// User model example
type User struct {
ID uint `json:"id" gorm:"primaryKey"`
Name string `json:"name"`
Email string `json:"email"`
Status string `json:"status"`
}
func main() {
db, _ := gorm.Open(postgres.Open("your-connection-string"), &gorm.Config{})
handler := websocketspec.NewHandlerWithGORM(db)
handler.Registry().RegisterModel("public.users", &User{})
// Add connection tracking
handler.Hooks().Register(websocketspec.AfterConnect, func(ctx *websocketspec.HookContext) error {
count := handler.GetConnectionCount()
log.Printf("Client connected. Total connections: %d", count)
return nil
})
handler.Hooks().Register(websocketspec.AfterDisconnect, func(ctx *websocketspec.HookContext) error {
count := handler.GetConnectionCount()
log.Printf("Client disconnected. Total connections: %d", count)
return nil
})
// Add subscription tracking
handler.Hooks().Register(websocketspec.AfterSubscribe, func(ctx *websocketspec.HookContext) error {
count := handler.GetSubscriptionCount()
log.Printf("New subscription. Total subscriptions: %d", count)
return nil
})
// Monitoring endpoint
http.HandleFunc("/stats", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Active Connections: %d\n", handler.GetConnectionCount())
fmt.Fprintf(w, "Active Subscriptions: %d\n", handler.GetSubscriptionCount())
})
http.HandleFunc("/ws", handler.HandleWebSocket)
log.Println("Server with monitoring starting on :8080")
http.ListenAndServe(":8080", nil)
}
Example (WithHooks) ¶
Example_withHooks demonstrates using lifecycle hooks
package main
import (
"fmt"
"log"
"net/http"
"github.com/bitechdev/ResolveSpec/pkg/websocketspec"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
// User model example
type User struct {
ID uint `json:"id" gorm:"primaryKey"`
Name string `json:"name"`
Email string `json:"email"`
Status string `json:"status"`
}
func main() {
db, _ := gorm.Open(postgres.Open("your-connection-string"), &gorm.Config{})
handler := websocketspec.NewHandlerWithGORM(db)
// Register models
handler.Registry().RegisterModel("public.users", &User{})
// Add authentication hook
handler.Hooks().Register(websocketspec.BeforeConnect, func(ctx *websocketspec.HookContext) error {
// Validate authentication token
// (In real implementation, extract from query params or headers)
userID := uint(123) // From token
// Store in connection metadata
ctx.Connection.SetMetadata("user_id", userID)
log.Printf("User %d connected", userID)
return nil
})
// Add authorization hook for read operations
handler.Hooks().RegisterBefore(websocketspec.OperationRead, func(ctx *websocketspec.HookContext) error {
userID, ok := ctx.Connection.GetMetadata("user_id")
if !ok {
return fmt.Errorf("unauthorized: not authenticated")
}
log.Printf("User %v reading %s.%s", userID, ctx.Schema, ctx.Entity)
// Add filter to only show user's own records
if ctx.Entity == "posts" {
// ctx.Options.Filters = append(ctx.Options.Filters, common.FilterOption{
// Column: "user_id",
// Operator: "eq",
// Value: userID,
// })
}
return nil
})
// Add logging hook after create
handler.Hooks().RegisterAfter(websocketspec.OperationCreate, func(ctx *websocketspec.HookContext) error {
userID, _ := ctx.Connection.GetMetadata("user_id")
log.Printf("User %v created record in %s.%s", userID, ctx.Schema, ctx.Entity)
return nil
})
// Add validation hook before create
handler.Hooks().RegisterBefore(websocketspec.OperationCreate, func(ctx *websocketspec.HookContext) error {
// Validate required fields
if data, ok := ctx.Data.(map[string]interface{}); ok {
if ctx.Entity == "users" {
if email, exists := data["email"]; !exists || email == "" {
return fmt.Errorf("validation error: email is required")
}
if name, exists := data["name"]; !exists || name == "" {
return fmt.Errorf("validation error: name is required")
}
}
}
return nil
})
// Add limit hook for subscriptions
handler.Hooks().Register(websocketspec.BeforeSubscribe, func(ctx *websocketspec.HookContext) error {
// Limit subscriptions per connection
maxSubscriptions := 10
// Note: In a real implementation, you would count subscriptions using the connection's methods
// currentCount := len(ctx.Connection.subscriptions) // subscriptions is private
// For demonstration purposes, we'll just log
log.Printf("Creating subscription (max: %d)", maxSubscriptions)
return nil
})
http.HandleFunc("/ws", handler.HandleWebSocket)
log.Println("Server with hooks starting on :8080")
http.ListenAndServe(":8080", nil)
}
Index ¶
- func ExampleAuthentication()
- func ExampleCRUDOperations()
- func ExampleWithBun(bunDB *bun.DB)
- func ExampleWithGORM(db *gorm.DB)
- func ExampleWithHooks(db *gorm.DB)
- func ExampleWithSubscriptions()
- type BroadcastMessage
- type Connection
- func (c *Connection) AddSubscription(sub *Subscription)
- func (c *Connection) Close()
- func (c *Connection) GetMetadata(key string) (interface{}, bool)
- func (c *Connection) GetSubscription(subID string) (*Subscription, bool)
- func (c *Connection) ReadPump()
- func (c *Connection) RemoveSubscription(subID string)
- func (c *Connection) Send(message []byte) error
- func (c *Connection) SendJSON(v interface{}) error
- func (c *Connection) SetMetadata(key string, value interface{})
- func (c *Connection) WritePump()
- type ConnectionManager
- func (cm *ConnectionManager) Broadcast(message []byte, filter func(*Connection) bool)
- func (cm *ConnectionManager) Count() int
- func (cm *ConnectionManager) GetConnection(id string) (*Connection, bool)
- func (cm *ConnectionManager) Register(conn *Connection)
- func (cm *ConnectionManager) Run()
- func (cm *ConnectionManager) Shutdown()
- func (cm *ConnectionManager) Unregister(conn *Connection)
- type ErrorInfo
- type Handler
- func (h *Handler) BroadcastMessage(message interface{}, filter func(*Connection) bool) error
- func (h *Handler) GetConnection(id string) (*Connection, bool)
- func (h *Handler) GetConnectionCount() int
- func (h *Handler) GetDatabase() common.Database
- func (h *Handler) GetRelationshipInfo(modelType reflect.Type, relationName string) *common.RelationshipInfo
- func (h *Handler) GetSubscriptionCount() int
- func (h *Handler) HandleMessage(conn *Connection, msg *Message)
- func (h *Handler) HandleWebSocket(w http.ResponseWriter, r *http.Request)
- func (h *Handler) Hooks() *HookRegistry
- func (h *Handler) Registry() common.ModelRegistry
- func (h *Handler) Shutdown()
- type HookContext
- type HookFunc
- type HookRegistry
- func (hr *HookRegistry) Clear(hookType HookType)
- func (hr *HookRegistry) ClearAll()
- func (hr *HookRegistry) Execute(hookType HookType, ctx *HookContext) error
- func (hr *HookRegistry) HasHooks(hookType HookType) bool
- func (hr *HookRegistry) Register(hookType HookType, fn HookFunc)
- func (hr *HookRegistry) RegisterAfter(operation OperationType, fn HookFunc)
- func (hr *HookRegistry) RegisterBefore(operation OperationType, fn HookFunc)
- type HookType
- type Message
- type MessageType
- type NotificationMessage
- type OperationType
- type RequestMessage
- type ResponseMessage
- type Subscription
- type SubscriptionManager
- func (sm *SubscriptionManager) Count() int
- func (sm *SubscriptionManager) CountForEntity(schema, entity string) int
- func (sm *SubscriptionManager) GetSubscription(subID string) (*Subscription, bool)
- func (sm *SubscriptionManager) GetSubscriptionsByConnection(connID string) []*Subscription
- func (sm *SubscriptionManager) GetSubscriptionsByEntity(schema, entity string) []*Subscription
- func (sm *SubscriptionManager) Subscribe(id, connID, schema, entity string, options *common.RequestOptions) *Subscription
- func (sm *SubscriptionManager) Unsubscribe(subID string) bool
- type SubscriptionMessage
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ExampleAuthentication ¶
func ExampleAuthentication()
ExampleAuthentication shows how to implement authentication
func ExampleCRUDOperations ¶
func ExampleCRUDOperations()
ExampleCRUDOperations shows basic CRUD operations
func ExampleWithBun ¶
ExampleWithBun shows how to use WebSocketSpec with Bun ORM
func ExampleWithGORM ¶
ExampleWithGORM shows how to use WebSocketSpec with GORM
func ExampleWithHooks ¶
ExampleWithHooks shows how to use lifecycle hooks
func ExampleWithSubscriptions ¶
func ExampleWithSubscriptions()
ExampleWithSubscriptions shows subscription usage
Types ¶
type BroadcastMessage ¶
type BroadcastMessage struct {
// Message is the message to broadcast
Message []byte
// Filter is an optional function to filter which connections receive the message
Filter func(*Connection) bool
}
BroadcastMessage represents a message to broadcast to multiple connections
type Connection ¶
type Connection struct {
// ID is a unique identifier for this connection
ID string
// contains filtered or unexported fields
}
Connection rvepresents a WebSocket connection with its state
func NewConnection ¶
func NewConnection(id string, ws *websocket.Conn, handler *Handler) *Connection
NewConnection creates a new WebSocket connection
func (*Connection) AddSubscription ¶
func (c *Connection) AddSubscription(sub *Subscription)
AddSubscription adds a subscription to this connection
func (*Connection) GetMetadata ¶
func (c *Connection) GetMetadata(key string) (interface{}, bool)
GetMetadata retrieves metadata for this connection
func (*Connection) GetSubscription ¶
func (c *Connection) GetSubscription(subID string) (*Subscription, bool)
GetSubscription retrieves a subscription by ID
func (*Connection) ReadPump ¶
func (c *Connection) ReadPump()
ReadPump reads messages from the WebSocket connection
func (*Connection) RemoveSubscription ¶
func (c *Connection) RemoveSubscription(subID string)
RemoveSubscription removes a subscription from this connection
func (*Connection) Send ¶
func (c *Connection) Send(message []byte) error
Send sends a message to this connection
func (*Connection) SendJSON ¶
func (c *Connection) SendJSON(v interface{}) error
SendJSON sends a JSON-encoded message to this connection
func (*Connection) SetMetadata ¶
func (c *Connection) SetMetadata(key string, value interface{})
SetMetadata sets metadata for this connection
func (*Connection) WritePump ¶
func (c *Connection) WritePump()
WritePump writes messages to the WebSocket connection
type ConnectionManager ¶
type ConnectionManager struct {
// contains filtered or unexported fields
}
ConnectionManager manages all active WebSocket connections
func NewConnectionManager ¶
func NewConnectionManager(ctx context.Context) *ConnectionManager
NewConnectionManager creates a new connection manager
func (*ConnectionManager) Broadcast ¶
func (cm *ConnectionManager) Broadcast(message []byte, filter func(*Connection) bool)
Broadcast sends a message to all connections matching the filter
func (*ConnectionManager) Count ¶
func (cm *ConnectionManager) Count() int
Count returns the number of active connections
func (*ConnectionManager) GetConnection ¶
func (cm *ConnectionManager) GetConnection(id string) (*Connection, bool)
GetConnection retrieves a connection by ID
func (*ConnectionManager) Register ¶
func (cm *ConnectionManager) Register(conn *Connection)
Register registers a new connection
func (*ConnectionManager) Run ¶
func (cm *ConnectionManager) Run()
Run starts the connection manager event loop
func (*ConnectionManager) Shutdown ¶
func (cm *ConnectionManager) Shutdown()
Shutdown gracefully shuts down the connection manager
func (*ConnectionManager) Unregister ¶
func (cm *ConnectionManager) Unregister(conn *Connection)
Unregister removes a connection
type ErrorInfo ¶
type ErrorInfo struct {
// Code is the error code
Code string `json:"code"`
// Message is a human-readable error message
Message string `json:"message"`
// Details contains additional error context
Details map[string]interface{} `json:"details,omitempty"`
}
ErrorInfo contains error details
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler handles WebSocket connections and messages
func NewHandler ¶
func NewHandler(db common.Database, registry common.ModelRegistry) *Handler
NewHandler creates a new WebSocket handler
func NewHandlerWithBun ¶
NewHandlerWithBun creates a new Handler with Bun adapter
func NewHandlerWithDatabase ¶
func NewHandlerWithDatabase(db common.Database, registry common.ModelRegistry) *Handler
NewHandlerWithDatabase creates a new Handler with a custom database adapter
func NewHandlerWithGORM ¶
NewHandlerWithGORM creates a new Handler with GORM adapter
func (*Handler) BroadcastMessage ¶
func (h *Handler) BroadcastMessage(message interface{}, filter func(*Connection) bool) error
BroadcastMessage sends a message to all connections matching the filter
func (*Handler) GetConnection ¶
func (h *Handler) GetConnection(id string) (*Connection, bool)
GetConnection retrieves a connection by ID
func (*Handler) GetConnectionCount ¶
GetConnectionCount returns the number of active connections
func (*Handler) GetDatabase ¶
GetDatabase returns the underlying database connection Implements common.SpecHandler interface
func (*Handler) GetRelationshipInfo ¶
func (h *Handler) GetRelationshipInfo(modelType reflect.Type, relationName string) *common.RelationshipInfo
GetRelationshipInfo implements the RelationshipInfoProvider interface This is a placeholder implementation - full relationship support can be added later
func (*Handler) GetSubscriptionCount ¶
GetSubscriptionCount returns the number of active subscriptions
func (*Handler) HandleMessage ¶
func (h *Handler) HandleMessage(conn *Connection, msg *Message)
HandleMessage routes incoming messages to appropriate handlers
func (*Handler) HandleWebSocket ¶
func (h *Handler) HandleWebSocket(w http.ResponseWriter, r *http.Request)
HandleWebSocket upgrades HTTP connection to WebSocket
func (*Handler) Hooks ¶
func (h *Handler) Hooks() *HookRegistry
Hooks returns the hook registry for this handler
func (*Handler) Registry ¶
func (h *Handler) Registry() common.ModelRegistry
Registry returns the model registry for this handler
type HookContext ¶
type HookContext struct {
// Context is the request context
Context context.Context
// Handler provides access to the handler, database, and registry
Handler *Handler
// Connection is the WebSocket connection
Connection *Connection
// Message is the original message
Message *Message
// Schema is the database schema
Schema string
// Entity is the table/model name
Entity string
// TableName is the actual database table name
TableName string
// Model is the registered model instance
Model interface{}
// ModelPtr is a pointer to the model for queries
ModelPtr interface{}
// Options contains the parsed request options
Options *common.RequestOptions
// ID is the record ID for single-record operations
ID string
// Data is the request data (for create/update operations)
Data interface{}
// Result is the operation result (for after hooks)
Result interface{}
// Subscription is the subscription being created/removed
Subscription *Subscription
// Error is any error that occurred (for after hooks)
Error error
// Metadata is additional context data
Metadata map[string]interface{}
}
HookContext contains context information for hook execution
type HookRegistry ¶
type HookRegistry struct {
// contains filtered or unexported fields
}
HookRegistry manages lifecycle hooks
func NewHookRegistry ¶
func NewHookRegistry() *HookRegistry
NewHookRegistry creates a new hook registry
func (*HookRegistry) Clear ¶
func (hr *HookRegistry) Clear(hookType HookType)
Clear removes all hooks of a specific type
func (*HookRegistry) ClearAll ¶
func (hr *HookRegistry) ClearAll()
ClearAll removes all registered hooks
func (*HookRegistry) Execute ¶
func (hr *HookRegistry) Execute(hookType HookType, ctx *HookContext) error
Execute runs all hooks for a specific type
func (*HookRegistry) HasHooks ¶
func (hr *HookRegistry) HasHooks(hookType HookType) bool
HasHooks checks if any hooks are registered for a hook type
func (*HookRegistry) Register ¶
func (hr *HookRegistry) Register(hookType HookType, fn HookFunc)
Register registers a hook function for a specific hook type
func (*HookRegistry) RegisterAfter ¶
func (hr *HookRegistry) RegisterAfter(operation OperationType, fn HookFunc)
RegisterAfter registers a hook that runs after an operation Convenience method for AfterRead, AfterCreate, AfterUpdate, AfterDelete
func (*HookRegistry) RegisterBefore ¶
func (hr *HookRegistry) RegisterBefore(operation OperationType, fn HookFunc)
RegisterBefore registers a hook that runs before an operation Convenience method for BeforeRead, BeforeCreate, BeforeUpdate, BeforeDelete
type HookType ¶
type HookType string
HookType represents the type of lifecycle hook
const ( // BeforeRead is called before a read operation BeforeRead HookType = "before_read" // AfterRead is called after a read operation AfterRead HookType = "after_read" // BeforeCreate is called before a create operation BeforeCreate HookType = "before_create" // AfterCreate is called after a create operation AfterCreate HookType = "after_create" // BeforeUpdate is called before an update operation BeforeUpdate HookType = "before_update" // AfterUpdate is called after an update operation AfterUpdate HookType = "after_update" // BeforeDelete is called before a delete operation BeforeDelete HookType = "before_delete" // AfterDelete is called after a delete operation AfterDelete HookType = "after_delete" // BeforeSubscribe is called before creating a subscription BeforeSubscribe HookType = "before_subscribe" // AfterSubscribe is called after creating a subscription AfterSubscribe HookType = "after_subscribe" // BeforeUnsubscribe is called before removing a subscription BeforeUnsubscribe HookType = "before_unsubscribe" // AfterUnsubscribe is called after removing a subscription AfterUnsubscribe HookType = "after_unsubscribe" // BeforeConnect is called when a new connection is established BeforeConnect HookType = "before_connect" // AfterConnect is called after a connection is established AfterConnect HookType = "after_connect" // BeforeDisconnect is called before a connection is closed BeforeDisconnect HookType = "before_disconnect" // AfterDisconnect is called after a connection is closed AfterDisconnect HookType = "after_disconnect" )
type Message ¶
type Message struct {
// ID is a unique identifier for request/response correlation
ID string `json:"id,omitempty"`
// Type is the message type
Type MessageType `json:"type"`
// Operation is the operation to perform
Operation OperationType `json:"operation,omitempty"`
// Schema is the database schema name
Schema string `json:"schema,omitempty"`
// Entity is the table/model name
Entity string `json:"entity,omitempty"`
// RecordID is the ID for single-record operations (update, delete, read by ID)
RecordID string `json:"record_id,omitempty"`
// Data contains the request/response payload
Data interface{} `json:"data,omitempty"`
// Options contains query options (filters, sorting, pagination, etc.)
Options *common.RequestOptions `json:"options,omitempty"`
// SubscriptionID is the subscription identifier
SubscriptionID string `json:"subscription_id,omitempty"`
// Success indicates if the operation was successful
Success bool `json:"success,omitempty"`
// Error contains error information
Error *ErrorInfo `json:"error,omitempty"`
// Metadata contains additional response metadata
Metadata map[string]interface{} `json:"metadata,omitempty"`
// Timestamp is when the message was created
Timestamp time.Time `json:"timestamp,omitempty"`
}
Message represents a WebSocket message
func ParseMessage ¶
ParseMessage parses a JSON message into a Message struct
type MessageType ¶
type MessageType string
MessageType represents the type of WebSocket message
const ( // MessageTypeRequest is a client request message MessageTypeRequest MessageType = "request" // MessageTypeResponse is a server response message MessageTypeResponse MessageType = "response" // MessageTypeNotification is a server-initiated notification MessageTypeNotification MessageType = "notification" // MessageTypeSubscription is a subscription control message MessageTypeSubscription MessageType = "subscription" // MessageTypeError is an error message MessageTypeError MessageType = "error" // MessageTypePing is a keepalive ping message MessageTypePing MessageType = "ping" // MessageTypePong is a keepalive pong response MessageTypePong MessageType = "pong" )
type NotificationMessage ¶
type NotificationMessage struct {
Type MessageType `json:"type"`
Operation OperationType `json:"operation"`
SubscriptionID string `json:"subscription_id"`
Schema string `json:"schema"`
Entity string `json:"entity"`
Data interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
NotificationMessage represents a server-initiated notification
func NewNotificationMessage ¶
func NewNotificationMessage(subscriptionID string, operation OperationType, schema, entity string, data interface{}) *NotificationMessage
NewNotificationMessage creates a new notification message
func (*NotificationMessage) ToJSON ¶
func (n *NotificationMessage) ToJSON() ([]byte, error)
ToJSON converts a notification message to JSON bytes
type OperationType ¶
type OperationType string
OperationType represents the operation to perform
const ( // OperationRead retrieves records OperationRead OperationType = "read" // OperationCreate creates a new record OperationCreate OperationType = "create" // OperationUpdate updates an existing record OperationUpdate OperationType = "update" // OperationDelete deletes a record OperationDelete OperationType = "delete" // OperationSubscribe subscribes to entity changes OperationSubscribe OperationType = "subscribe" // OperationUnsubscribe unsubscribes from entity changes OperationUnsubscribe OperationType = "unsubscribe" // OperationMeta retrieves metadata about an entity OperationMeta OperationType = "meta" )
type RequestMessage ¶
type RequestMessage struct {
ID string `json:"id"`
Type MessageType `json:"type"`
Operation OperationType `json:"operation"`
Schema string `json:"schema,omitempty"`
Entity string `json:"entity"`
RecordID string `json:"record_id,omitempty"`
Data interface{} `json:"data,omitempty"`
Options *common.RequestOptions `json:"options,omitempty"`
}
RequestMessage represents a client request
func NewRequestMessage ¶
func NewRequestMessage(id string, operation OperationType, schema, entity string) *RequestMessage
NewRequestMessage creates a new request message
type ResponseMessage ¶
type ResponseMessage struct {
ID string `json:"id"`
Type MessageType `json:"type"`
Success bool `json:"success"`
Data interface{} `json:"data,omitempty"`
Error *ErrorInfo `json:"error,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
ResponseMessage represents a server response
func NewErrorResponse ¶
func NewErrorResponse(id string, code, message string) *ResponseMessage
NewErrorResponse creates an error response message
func NewResponseMessage ¶
func NewResponseMessage(id string, success bool, data interface{}) *ResponseMessage
NewResponseMessage creates a new response message
func (*ResponseMessage) ToJSON ¶
func (r *ResponseMessage) ToJSON() ([]byte, error)
ToJSON converts a response message to JSON bytes
type Subscription ¶
type Subscription struct {
// ID is the unique subscription identifier
ID string
// ConnectionID is the ID of the connection that owns this subscription
ConnectionID string
// Schema is the database schema
Schema string
// Entity is the table/model name
Entity string
// Options contains filters and other query options
Options *common.RequestOptions
// Active indicates if the subscription is active
Active bool
}
Subscription represents a subscription to entity changes
func (*Subscription) MatchesFilters ¶
func (s *Subscription) MatchesFilters(data interface{}) bool
MatchesFilters checks if data matches the subscription's filters
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
SubscriptionManager manages all subscriptions
func NewSubscriptionManager ¶
func NewSubscriptionManager() *SubscriptionManager
NewSubscriptionManager creates a new subscription manager
func (*SubscriptionManager) Count ¶
func (sm *SubscriptionManager) Count() int
Count returns the total number of active subscriptions
func (*SubscriptionManager) CountForEntity ¶
func (sm *SubscriptionManager) CountForEntity(schema, entity string) int
CountForEntity returns the number of subscriptions for a specific entity
func (*SubscriptionManager) GetSubscription ¶
func (sm *SubscriptionManager) GetSubscription(subID string) (*Subscription, bool)
GetSubscription retrieves a subscription by ID
func (*SubscriptionManager) GetSubscriptionsByConnection ¶
func (sm *SubscriptionManager) GetSubscriptionsByConnection(connID string) []*Subscription
GetSubscriptionsByConnection retrieves all subscriptions for a connection
func (*SubscriptionManager) GetSubscriptionsByEntity ¶
func (sm *SubscriptionManager) GetSubscriptionsByEntity(schema, entity string) []*Subscription
GetSubscriptionsByEntity retrieves all subscriptions for an entity
func (*SubscriptionManager) Subscribe ¶
func (sm *SubscriptionManager) Subscribe(id, connID, schema, entity string, options *common.RequestOptions) *Subscription
Subscribe creates a new subscription
func (*SubscriptionManager) Unsubscribe ¶
func (sm *SubscriptionManager) Unsubscribe(subID string) bool
Unsubscribe removes a subscription
type SubscriptionMessage ¶
type SubscriptionMessage struct {
ID string `json:"id"`
Type MessageType `json:"type"`
Operation OperationType `json:"operation"` // subscribe or unsubscribe
Schema string `json:"schema,omitempty"`
Entity string `json:"entity"`
Options *common.RequestOptions `json:"options,omitempty"` // Filters for subscription
SubscriptionID string `json:"subscription_id,omitempty"` // For unsubscribe
}
SubscriptionMessage represents a subscription control message