Documentation
¶
Overview ¶
Package realtime provides SSE and WebSocket clients for real-time Spooled events.
Index ¶
- type ConnectionOptions
- type ConnectionState
- type Event
- type EventHandler
- type EventType
- type JobEvent
- type JobEventHandler
- type QueueEvent
- type QueueEventHandler
- type RealtimeClient
- type SSEClient
- func (c *SSEClient) Connect() error
- func (c *SSEClient) ConnectWithFilter(filter *SubscriptionFilter) error
- func (c *SSEClient) Disconnect() error
- func (c *SSEClient) OnEvent(handler EventHandler)
- func (c *SSEClient) OnJobEvent(eventType EventType, handler JobEventHandler)
- func (c *SSEClient) OnQueueEvent(eventType EventType, handler QueueEventHandler)
- func (c *SSEClient) OnStateChange(handler StateChangeHandler)
- func (c *SSEClient) OnWorkerEvent(eventType EventType, handler WorkerEventHandler)
- func (c *SSEClient) State() ConnectionState
- func (c *SSEClient) Subscribe(filter SubscriptionFilter) error
- func (c *SSEClient) Unsubscribe(filter SubscriptionFilter) error
- type StateChangeHandler
- type SubscriptionFilter
- type WebSocketClient
- func (c *WebSocketClient) Connect() error
- func (c *WebSocketClient) Disconnect() error
- func (c *WebSocketClient) OnEvent(handler EventHandler)
- func (c *WebSocketClient) OnJobEvent(eventType EventType, handler JobEventHandler)
- func (c *WebSocketClient) OnQueueEvent(eventType EventType, handler QueueEventHandler)
- func (c *WebSocketClient) OnStateChange(handler StateChangeHandler)
- func (c *WebSocketClient) OnWorkerEvent(eventType EventType, handler WorkerEventHandler)
- func (c *WebSocketClient) State() ConnectionState
- func (c *WebSocketClient) Subscribe(filter SubscriptionFilter) error
- func (c *WebSocketClient) Unsubscribe(filter SubscriptionFilter) error
- type WorkerEvent
- type WorkerEventHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConnectionOptions ¶
type ConnectionOptions struct {
// BaseURL is the base API URL (e.g., "https://api.spooled.cloud")
BaseURL string
// WSURL is the WebSocket URL (e.g., "wss://api.spooled.cloud/api/v1/ws")
WSURL string
// Token is the JWT or API key for authentication
Token string
// APIKey is the API key for authentication (alternative to Token)
APIKey string
// AutoReconnect enables automatic reconnection on disconnect
AutoReconnect bool
// MaxReconnectAttempts is the maximum number of reconnect attempts (0 = unlimited)
MaxReconnectAttempts int
// ReconnectDelay is the initial delay between reconnect attempts
ReconnectDelay time.Duration
// MaxReconnectDelay is the maximum delay between reconnect attempts
MaxReconnectDelay time.Duration
// Debug enables debug logging
Debug bool
// Logger is a custom logger function
Logger func(msg string, args ...any)
}
ConnectionOptions configures a realtime connection.
func DefaultConnectionOptions ¶
func DefaultConnectionOptions() ConnectionOptions
DefaultConnectionOptions returns options with sensible defaults.
type ConnectionState ¶
type ConnectionState string
ConnectionState represents the state of a realtime connection.
const ( StateDisconnected ConnectionState = "disconnected" StateConnecting ConnectionState = "connecting" StateConnected ConnectionState = "connected" StateReconnecting ConnectionState = "reconnecting" )
type Event ¶
type Event struct {
Type EventType `json:"type"`
Timestamp time.Time `json:"timestamp"`
Data json.RawMessage `json:"data"`
}
Event represents a realtime event from the Spooled API.
type EventHandler ¶
type EventHandler func(event *Event)
EventHandler is a callback for handling events.
type EventType ¶
type EventType string
EventType represents the type of realtime event.
const ( EventJobCreated EventType = "job.created" EventJobStarted EventType = "job.started" EventJobCompleted EventType = "job.completed" EventJobFailed EventType = "job.failed" EventJobRetrying EventType = "job.retrying" EventJobProgress EventType = "job.progress" EventQueuePaused EventType = "queue.paused" EventQueueResumed EventType = "queue.resumed" EventWorkerJoined EventType = "worker.joined" EventWorkerLeft EventType = "worker.left" EventWorkerActive EventType = "worker.active" EventWorkerInactive EventType = "worker.inactive" )
type JobEvent ¶
type JobEvent struct {
JobID string `json:"job_id"`
QueueName string `json:"queue_name"`
Status string `json:"status"`
Priority int `json:"priority,omitempty"`
RetryCount int `json:"retry_count,omitempty"`
Error string `json:"error,omitempty"`
Progress float64 `json:"progress,omitempty"`
Result map[string]any `json:"result,omitempty"`
WorkerID string `json:"worker_id,omitempty"`
ScheduledAt *time.Time `json:"scheduled_at,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
FailedAt *time.Time `json:"failed_at,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
JobEvent contains data for job-related events.
type JobEventHandler ¶
type JobEventHandler func(event *JobEvent)
JobEventHandler is a callback for handling job events.
type QueueEvent ¶
type QueueEvent struct {
QueueName string `json:"queue_name"`
Reason string `json:"reason,omitempty"`
}
QueueEvent contains data for queue-related events.
type QueueEventHandler ¶
type QueueEventHandler func(event *QueueEvent)
QueueEventHandler is a callback for handling queue events.
type RealtimeClient ¶
type RealtimeClient interface {
// Connect establishes the connection
Connect() error
// Disconnect closes the connection
Disconnect() error
// State returns the current connection state
State() ConnectionState
// Subscribe adds a subscription filter (WebSocket only - SSE uses filter at connect time)
Subscribe(filter SubscriptionFilter) error
// Unsubscribe removes a subscription (WebSocket only)
Unsubscribe(filter SubscriptionFilter) error
// OnEvent registers a handler for all events
OnEvent(handler EventHandler)
// OnJobEvent registers a handler for job events
OnJobEvent(eventType EventType, handler JobEventHandler)
// OnQueueEvent registers a handler for queue events
OnQueueEvent(eventType EventType, handler QueueEventHandler)
// OnWorkerEvent registers a handler for worker events
OnWorkerEvent(eventType EventType, handler WorkerEventHandler)
// OnStateChange registers a handler for state changes
OnStateChange(handler StateChangeHandler)
}
RealtimeClient is the interface for realtime connections.
type SSEClient ¶
type SSEClient struct {
// contains filtered or unexported fields
}
SSEClient implements RealtimeClient using Server-Sent Events. Note: SSE is unidirectional - subscriptions must be set at connection time.
func NewSSEClient ¶
func NewSSEClient(opts ConnectionOptions) *SSEClient
NewSSEClient creates a new SSE realtime client.
func (*SSEClient) Connect ¶
Connect establishes the SSE connection. For SSE, subscriptions must be provided at connect time via ConnectWithFilter.
func (*SSEClient) ConnectWithFilter ¶
func (c *SSEClient) ConnectWithFilter(filter *SubscriptionFilter) error
ConnectWithFilter establishes the SSE connection with a subscription filter.
func (*SSEClient) Disconnect ¶
Disconnect closes the SSE connection.
func (*SSEClient) OnEvent ¶
func (c *SSEClient) OnEvent(handler EventHandler)
OnEvent registers a handler for all events.
func (*SSEClient) OnJobEvent ¶
func (c *SSEClient) OnJobEvent(eventType EventType, handler JobEventHandler)
OnJobEvent registers a handler for job events.
func (*SSEClient) OnQueueEvent ¶
func (c *SSEClient) OnQueueEvent(eventType EventType, handler QueueEventHandler)
OnQueueEvent registers a handler for queue events.
func (*SSEClient) OnStateChange ¶
func (c *SSEClient) OnStateChange(handler StateChangeHandler)
OnStateChange registers a handler for state changes.
func (*SSEClient) OnWorkerEvent ¶
func (c *SSEClient) OnWorkerEvent(eventType EventType, handler WorkerEventHandler)
OnWorkerEvent registers a handler for worker events.
func (*SSEClient) State ¶
func (c *SSEClient) State() ConnectionState
State returns the current connection state.
func (*SSEClient) Subscribe ¶
func (c *SSEClient) Subscribe(filter SubscriptionFilter) error
Subscribe is not supported for SSE - use ConnectWithFilter instead.
func (*SSEClient) Unsubscribe ¶
func (c *SSEClient) Unsubscribe(filter SubscriptionFilter) error
Unsubscribe is not supported for SSE.
type StateChangeHandler ¶
type StateChangeHandler func(state ConnectionState)
StateChangeHandler is a callback for connection state changes.
type SubscriptionFilter ¶
type SubscriptionFilter struct {
QueueName string `json:"queue_name,omitempty"`
JobID string `json:"job_id,omitempty"`
WorkerID string `json:"worker_id,omitempty"`
Events []string `json:"events,omitempty"`
}
SubscriptionFilter specifies which events to receive.
type WebSocketClient ¶
type WebSocketClient struct {
// contains filtered or unexported fields
}
WebSocketClient implements RealtimeClient using WebSocket.
func NewWebSocketClient ¶
func NewWebSocketClient(opts ConnectionOptions) *WebSocketClient
NewWebSocketClient creates a new WebSocket realtime client.
func (*WebSocketClient) Connect ¶
func (c *WebSocketClient) Connect() error
Connect establishes the WebSocket connection.
func (*WebSocketClient) Disconnect ¶
func (c *WebSocketClient) Disconnect() error
Disconnect closes the WebSocket connection.
func (*WebSocketClient) OnEvent ¶
func (c *WebSocketClient) OnEvent(handler EventHandler)
OnEvent registers a handler for all events.
func (*WebSocketClient) OnJobEvent ¶
func (c *WebSocketClient) OnJobEvent(eventType EventType, handler JobEventHandler)
OnJobEvent registers a handler for job events.
func (*WebSocketClient) OnQueueEvent ¶
func (c *WebSocketClient) OnQueueEvent(eventType EventType, handler QueueEventHandler)
OnQueueEvent registers a handler for queue events.
func (*WebSocketClient) OnStateChange ¶
func (c *WebSocketClient) OnStateChange(handler StateChangeHandler)
OnStateChange registers a handler for state changes.
func (*WebSocketClient) OnWorkerEvent ¶
func (c *WebSocketClient) OnWorkerEvent(eventType EventType, handler WorkerEventHandler)
OnWorkerEvent registers a handler for worker events.
func (*WebSocketClient) State ¶
func (c *WebSocketClient) State() ConnectionState
State returns the current connection state.
func (*WebSocketClient) Subscribe ¶
func (c *WebSocketClient) Subscribe(filter SubscriptionFilter) error
Subscribe adds a subscription filter.
func (*WebSocketClient) Unsubscribe ¶
func (c *WebSocketClient) Unsubscribe(filter SubscriptionFilter) error
Unsubscribe removes a subscription.
type WorkerEvent ¶
type WorkerEvent struct {
WorkerID string `json:"worker_id"`
QueueName string `json:"queue_name"`
Hostname string `json:"hostname,omitempty"`
Version string `json:"version,omitempty"`
LastSeenAt time.Time `json:"last_seen_at,omitempty"`
}
WorkerEvent contains data for worker-related events.
type WorkerEventHandler ¶
type WorkerEventHandler func(event *WorkerEvent)
WorkerEventHandler is a callback for handling worker events.