streaming

package
v1.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: AGPL-3.0 Imports: 22 Imported by: 0

Documentation

Overview

Package streaming provides shared helper functions for WebSocket command handlers

Package streaming defines WebSocket command types and constants

Package streaming provides WebSocket command handling infrastructure

Package streaming provides connection lifecycle management for WebSocket streaming

Package streaming provides error recovery mechanisms for WebSocket connections

Package streaming provides WebSocket event types and data structures for real-time activity updates.

Package streaming provides event constants and builder helpers for real-time streaming

Package streaming provides comprehensive metrics collection for WebSocket connections

Package streaming provides event publishing infrastructure for real-time streaming

Package streaming provides event queueing infrastructure for real-time streaming

Package streaming provides WebSocket streaming and rate limiting

Package streaming provides graceful shutdown and backpressure management for WebSocket connections

Package streaming provides connection state synchronization for multi-instance deployments

Index

Constants

View Source
const (
	StatusProcessing = "processing"
	StatusCompleted  = "completed"
)

Constants for operation statuses

View Source
const (
	// Status/Note Commands
	CmdCreateStatus     = "create_status"
	CmdDeleteStatus     = "delete_status"
	CmdFavoriteStatus   = "favorite_status"
	CmdUnfavoriteStatus = "unfavorite_status"
	CmdReblogStatus     = "reblog_status"
	CmdUnreblogStatus   = "unreblog_status"
	CmdBookmarkStatus   = "bookmark_status"
	CmdUnbookmarkStatus = "unbookmark_status"
	CmdMuteStatus       = "mute_status"
	CmdUnmuteStatus     = "unmute_status"
	CmdPinStatus        = "pin_status"
	CmdUnpinStatus      = "unpin_status"

	// Account/User Commands
	CmdFollowUser        = "follow_user"
	CmdUnfollowUser      = "unfollow_user"
	CmdBlockUser         = "block_user"
	CmdUnblockUser       = "unblock_user"
	CmdMuteUser          = "mute_user"
	CmdUnmuteUser        = "unmute_user"
	CmdUpdateProfile     = "update_profile"
	CmdUpdatePreferences = "update_preferences"

	// Relationship Commands
	CmdAcceptFollowRequest = "accept_follow_request"
	CmdRejectFollowRequest = "reject_follow_request"
	CmdRemoveFollower      = "remove_follower"

	// List Commands
	CmdCreateList     = "create_list"
	CmdUpdateList     = "update_list"
	CmdDeleteList     = "delete_list"
	CmdAddToList      = "add_to_list"
	CmdRemoveFromList = "remove_from_list"

	// Media Commands
	CmdUploadMedia = "upload_media"
	CmdUpdateMedia = "update_media"
	CmdDeleteMedia = "delete_media"

	// Conversation Commands
	CmdMarkConversationRead = "mark_conversation_read"
	CmdDeleteConversation   = "delete_conversation"

	// Notification Commands
	CmdMarkNotificationRead     = "mark_notification_read"
	CmdMarkAllNotificationsRead = "mark_all_notifications_read"
	CmdDismissNotification      = "dismiss_notification"

	// Scheduled Status Commands
	CmdCreateScheduledStatus = "create_scheduled_status"
	CmdUpdateScheduledStatus = "update_scheduled_status"
	CmdDeleteScheduledStatus = "delete_scheduled_status"

	// Poll Commands
	CmdVoteInPoll = "vote_in_poll"

	// Search Commands
	CmdSearchAccounts = "search_accounts"
	CmdSearchStatuses = "search_statuses"
	CmdSearchHashtags = "search_hashtags"

	// Admin Commands (require admin privileges)
	CmdAdminSuspendUser   = "admin_suspend_user"
	CmdAdminUnsuspendUser = "admin_unsuspend_user"
	CmdAdminSilenceUser   = "admin_silence_user"
	CmdAdminUnsilenceUser = "admin_unsilence_user"

	// System Commands
	CmdGetServerInfo       = "get_server_info"
	CmdGetTimeline         = "get_timeline"
	CmdGetNotifications    = "get_notifications"
	CmdSubscribeTimeline   = "subscribe_timeline"
	CmdUnsubscribeTimeline = "unsubscribe_timeline"

	// Bulk Operations Commands
	CmdBulkFollow         = "bulk_follow"
	CmdBulkUnfollow       = "bulk_unfollow"
	CmdBulkMute           = "bulk_mute"
	CmdBulkUnmute         = "bulk_unmute"
	CmdBulkBlock          = "bulk_block"
	CmdBulkUnblock        = "bulk_unblock"
	CmdBulkDeleteStatuses = "bulk_delete_statuses"
	CmdBulkListMembers    = "bulk_list_members"
	CmdGetBulkOperation   = "get_bulk_operation"

	// Bulk Content Management Commands
	CmdBulkDelete  = "bulk_delete"
	CmdBulkArchive = "bulk_archive"
	CmdBulkRestore = "bulk_restore"
	CmdBulkExport  = "bulk_export"

	// Import/Export Commands
	CmdCreateExport = "create_export"
	CmdGetExport    = "get_export"
	CmdListExports  = "list_exports"
	CmdCancelExport = "cancel_export"
	CmdCreateImport = "create_import"
	CmdGetImport    = "get_import"
	CmdListImports  = "list_imports"
)

Command type constants for WebSocket commands

View Source
const (
	CategoryStatus       = "status"
	CategoryAccount      = "account"
	CategoryRelationship = "relationship"
	CategoryList         = "list"
	CategoryMedia        = "media"
	CategoryConversation = "conversation"
	CategoryNotification = "notification"
	CategoryScheduled    = "scheduled"
	CategoryPoll         = "poll"
	CategorySearch       = "search"
	CategoryAdmin        = "admin"
	CategorySystem       = "system"
	CategoryBulk         = "bulk"
	CategoryExport       = "export"
)

Command categories for organization and access control

View Source
const (
	// Status/Note Events
	StatusCreated     = "status.created"     // New status posted
	StatusUpdated     = "status.updated"     // Status edited
	StatusDeleted     = "status.deleted"     // Status deleted
	StatusFavorited   = "status.favorited"   // Status favorited
	StatusUnfavorited = "status.unfavorited" // Status unfavorited
	StatusBoosted     = "status.boosted"     // Status boosted/reblogged
	StatusUnboosted   = "status.unboosted"   // Status unreblogged
	StatusPinned      = "status.pinned"      // Status pinned to profile
	StatusUnpinned    = "status.unpinned"    // Status unpinned from profile

	// Account Events
	AccountUpdated  = "account.updated"  // Profile updated
	AccountFollowed = "account.followed" // Account followed
	AccountBlocked  = "account.blocked"  // Account blocked
	AccountMuted    = "account.muted"    // Account muted

	// Relationship Events
	RelationshipFollowRequested = "relationship.follow_requested" // Follow request sent
	RelationshipFollowAccepted  = "relationship.follow_accepted"  // Follow request accepted
	RelationshipFollowRejected  = "relationship.follow_rejected"  // Follow request rejected
	RelationshipUnfollowed      = "relationship.unfollowed"       // Account unfollowed
	RelationshipBlocked         = "relationship.blocked"          // Account blocked
	RelationshipUnblocked       = "relationship.unblocked"        // Account unblocked
	RelationshipMuted           = "relationship.muted"            // Account muted
	RelationshipUnmuted         = "relationship.unmuted"          // Account unmuted

	// Notification Events
	NotificationCreated = "notification.created" // New notification
	NotificationRead    = "notification.read"    // Notification marked as read
	NotificationCleared = "notification.cleared" // Notifications cleared

	// Conversation Events
	ConversationCreated = "conversation.created" // New conversation started
	ConversationUpdated = "conversation.updated" // Conversation updated (new message, read status)
	ConversationDeleted = "conversation.deleted" // Conversation deleted

	// List Events
	ListCreated       = "list.created"        // List created
	ListUpdated       = "list.updated"        // List updated (title, privacy)
	ListDeleted       = "list.deleted"        // List deleted
	ListMemberAdded   = "list.member_added"   // Account added to list
	ListMemberRemoved = "list.member_removed" // Account removed from list

	// Media Events
	MediaUploaded  = "media.uploaded"  // Media uploaded and processed
	MediaUpdated   = "media.updated"   // Media metadata updated (alt text, focus)
	MediaDeleted   = "media.deleted"   // Media deleted
	MediaProcessed = "media.processed" // Media processing completed

	// Moderation Events
	ModerationReportCreated = "moderation.report_created" // New report submitted
	ModerationReportUpdated = "moderation.report_updated" // Report status updated
	ModerationActionTaken   = "moderation.action_taken"   // Moderation action performed

	// Federation Events
	FederationInstanceBlocked   = "federation.instance_blocked"   // Instance blocked
	FederationInstanceUnblocked = "federation.instance_unblocked" // Instance unblocked
	FederationActorUpdated      = "federation.actor_updated"      // Remote actor profile updated

	// System Events
	SystemAnnouncement = "system.announcement" // System announcement
	SystemMaintenance  = "system.maintenance"  // Maintenance notification

	// Filter Events
	FilterCreated = "filter.created" // Content filter created
	FilterUpdated = "filter.updated" // Content filter updated
	FilterDeleted = "filter.deleted" // Content filter deleted

	// Poll Events
	PollVoted   = "poll.voted"   // Vote cast in poll
	PollClosed  = "poll.closed"  // Poll closed
	PollExpired = "poll.expired" // Poll expired

	// Hashtag Events
	HashtagTrending = "hashtag.trending" // Hashtag is trending
	HashtagFollowed = "hashtag.followed" // Hashtag followed
)

Event type constants following Mastodon streaming API conventions

View Source
const (
	// Public Streams
	PublicStream       = "public"        // All public posts
	PublicLocalStream  = "public:local"  // Local public posts only
	PublicRemoteStream = "public:remote" // Remote public posts only

	// User Streams
	UserStream             = "user"              // User's home timeline and notifications
	UserNotificationStream = "user:notification" // User's notifications only

	// Direct Messages
	DirectStream = "direct" // Direct messages for the user

	// Hashtag Streams (template - append hashtag name)
	HashtagStreamPrefix = "hashtag" // Use as "hashtag:name"

	// List Streams (template - append list ID)
	ListStreamPrefix = "list" // Use as "list:id"

	// System Streams
	SystemStream = "system" // System announcements and maintenance

	// Moderation Streams
	ModerationStream = "moderation" // Moderation events for moderators

	// Admin Streams
	AdminStream = "admin" // Admin-only events
)

Stream name constants following Mastodon streaming API

Variables

View Source
var (
	// Connection errors
	ErrConnectionWriteFailed   = errors.FailedToSave("connection", stdErrors.New("failed to write to connection"))
	ErrConnectionDeleteFailed  = errors.FailedToDelete("connection", stdErrors.New("failed to delete connection"))
	ErrAPIGatewayClientNotInit = errors.ServiceUnavailable("API Gateway client")

	// Message errors
	ErrConfirmationSendFailed = errors.ProcessingFailed("confirmation send", stdErrors.New("failed to send confirmation"))
	ErrPongSendFailed         = errors.ProcessingFailed("pong send", stdErrors.New("failed to send pong"))
	ErrErrorMessageSendFailed = errors.ProcessingFailed("error message send", stdErrors.New("failed to send error message"))

	// Command validation errors
	ErrCommandIDRequired       = errors.ValidationFailedWithField("command id is required")
	ErrCommandIDMustBeString   = errors.ValidationFailedWithField("command id must be a string")
	ErrCommandTypeRequired     = errors.ValidationFailedWithField("command type is required")
	ErrCommandTypeMustBeString = errors.ValidationFailedWithField("command type must be a string")
)

Legacy error variables for backwards compatibility These are now wrappers around the centralized error system

Functions

func ConversationStreamName

func ConversationStreamName(conversationID string) string

ConversationStreamName returns the conversation stream name for a specific conversation

func DirectStreamName

func DirectStreamName(userID string) string

DirectStreamName returns the direct message stream name for a specific user

func GetCommandInfo

func GetCommandInfo() map[string]CommandInfo

GetCommandInfo returns metadata for all supported commands

func GetCommandsByCategory

func GetCommandsByCategory() map[string][]string

GetCommandsByCategory returns commands grouped by category

func GetEventCategory

func GetEventCategory(eventType string) string

GetEventCategory returns the category of an event type

func GetRequiredAuth

func GetRequiredAuth(commandType string) bool

GetRequiredAuth returns whether a command requires authentication

func HashtagStreamName

func HashtagStreamName(hashtag string) string

HashtagStreamName returns the hashtag stream name for a specific hashtag

func IsAdminOnly

func IsAdminOnly(commandType string) bool

IsAdminOnly returns whether a command is admin-only

func IsValidEventType

func IsValidEventType(eventType string) bool

IsValidEventType checks if an event type is valid

func IsValidStreamName

func IsValidStreamName(streamName string) bool

IsValidStreamName checks if a stream name follows valid patterns

func ListStreamName

func ListStreamName(listID string) string

ListStreamName returns the list stream name for a specific list

func UserNotificationStreamName

func UserNotificationStreamName(userID string) string

UserNotificationStreamName returns the notification stream name for a specific user

func UserStreamName

func UserStreamName(userID string) string

UserStreamName returns the user stream name for a specific user

Types

type AIEventPayload

type AIEventPayload struct {
	AnalysisID   string                 `json:"analysis_id"`
	ContentID    string                 `json:"content_id"`
	ContentType  string                 `json:"content_type"`  // status, media, profile
	AnalysisType string                 `json:"analysis_type"` // sentiment, toxicity, classification
	Results      map[string]interface{} `json:"results"`
	Confidence   float64                `json:"confidence"`
	ModelVersion string                 `json:"model_version,omitempty"`
	ProcessedAt  time.Time              `json:"processed_at"`
}

AIEventPayload represents the payload for AI analysis events

type AccountEventPayload

type AccountEventPayload struct {
	AccountID      string    `json:"account_id"`
	Username       string    `json:"username"`
	DisplayName    string    `json:"display_name,omitempty"`
	Avatar         string    `json:"avatar,omitempty"`
	Header         string    `json:"header,omitempty"`
	Bio            string    `json:"bio,omitempty"`
	URL            string    `json:"url,omitempty"`
	FollowersCount int64     `json:"followers_count"`
	FollowingCount int64     `json:"following_count"`
	StatusesCount  int64     `json:"statuses_count"`
	LastStatusAt   time.Time `json:"last_status_at,omitempty"`
	CreatedAt      time.Time `json:"created_at"`
	UpdatedAt      time.Time `json:"updated_at,omitempty"`
}

AccountEventPayload represents the payload for account-related events

type BackpressureAction

type BackpressureAction int

BackpressureAction defines actions to take under backpressure

const (
	// BackpressureAllow allows the message to proceed
	BackpressureAllow BackpressureAction = iota // Allow the message
	// BackpressureDelay delays the message
	BackpressureDelay // Delay the message
	// BackpressureDrop drops the message
	BackpressureDrop // Drop the message
	// BackpressureReject rejects the message with error
	BackpressureReject // Reject the message with error
)

type BackpressureConfig

type BackpressureConfig struct {
	MaxConcurrentMessages int           `json:"max_concurrent_messages"` // Maximum concurrent message processing
	MessageQueueSize      int           `json:"message_queue_size"`      // Maximum queued messages per connection
	ProcessingTimeout     time.Duration `json:"processing_timeout"`      // Timeout for message processing
	DropStrategy          DropStrategy  `json:"drop_strategy"`           // Strategy when queue is full
	EnableAdaptive        bool          `json:"enable_adaptive"`         // Enable adaptive backpressure
}

BackpressureConfig contains configuration for backpressure management

type BaseCommandHandler

type BaseCommandHandler struct {
	// contains filtered or unexported fields
}

BaseCommandHandler provides common functionality for command handlers

func NewBaseCommandHandler

func NewBaseCommandHandler(logger *zap.Logger) *BaseCommandHandler

NewBaseCommandHandler creates a new base command handler

func (*BaseCommandHandler) ConvertToJSON

func (bch *BaseCommandHandler) ConvertToJSON(data interface{}) (map[string]interface{}, error)

ConvertToJSON converts data to JSON for response

func (*BaseCommandHandler) CreateBulkOperationResponse

func (bch *BaseCommandHandler) CreateBulkOperationResponse(
	commandID string,
	operationID string,
	status string,
	total int,
	message string,
) *CommandResponse

CreateBulkOperationResponse creates a standardized response for bulk operations

func (*BaseCommandHandler) CreateBulkServiceResponse

func (bch *BaseCommandHandler) CreateBulkServiceResponse(
	commandID string,
	operation interface{},
	_ string,
	_ string,
) *CommandResponse

CreateBulkServiceResponse creates a response from a bulk service result

func (*BaseCommandHandler) CreateErrorResponse

func (bch *BaseCommandHandler) CreateErrorResponse(commandID string, code string, message string, details string) *CommandResponse

CreateErrorResponse creates an error command response

func (*BaseCommandHandler) CreateSuccessResponse

func (bch *BaseCommandHandler) CreateSuccessResponse(commandID string, data map[string]interface{}) *CommandResponse

CreateSuccessResponse creates a successful command response

func (*BaseCommandHandler) ExecuteStandardCommandFlow

func (bch *BaseCommandHandler) ExecuteStandardCommandFlow(
	ctx context.Context,
	conn *ConnectionInfo,
	cmd *Command,
	config *CommandHandlerConfig,
) (*CommandResponse, error)

ExecuteStandardCommandFlow handles the common command flow pattern: auth → validate → extract → service call → error handling → convert → return

func (*BaseCommandHandler) GetBool

func (bch *BaseCommandHandler) GetBool(payload map[string]interface{}, key string, defaultValue bool) bool

GetBool safely extracts a bool value from the payload

func (*BaseCommandHandler) GetInt

func (bch *BaseCommandHandler) GetInt(payload map[string]interface{}, key string, defaultValue int) int

GetInt safely extracts an int value from the payload

func (*BaseCommandHandler) GetString

func (bch *BaseCommandHandler) GetString(payload map[string]interface{}, key string, defaultValue string) string

GetString safely extracts a string value from the payload

func (*BaseCommandHandler) GetStringSlice

func (bch *BaseCommandHandler) GetStringSlice(payload map[string]interface{}, key string) []string

GetStringSlice safely extracts a string slice from the payload

func (*BaseCommandHandler) Logger

func (bch *BaseCommandHandler) Logger() *zap.Logger

Logger returns the handler logger instance

func (*BaseCommandHandler) RequireAuth

func (bch *BaseCommandHandler) RequireAuth(conn *ConnectionInfo, commandID string) *CommandResponse

RequireAuth checks if the connection is authenticated

func (*BaseCommandHandler) ValidateBulkAccountCommand

func (bch *BaseCommandHandler) ValidateBulkAccountCommand(
	conn *ConnectionInfo,
	cmd *Command,
	config *BulkAccountValidationConfig,
) ([]string, *CommandResponse)

ValidateBulkAccountCommand performs common validation for bulk account operations

func (*BaseCommandHandler) ValidateListCommand

func (bch *BaseCommandHandler) ValidateListCommand(
	conn *ConnectionInfo,
	cmd *Command,
	config *ListCommandValidationConfig,
) *CommandResponse

ValidateListCommand performs common validation for list operations

func (*BaseCommandHandler) ValidatePayload

func (bch *BaseCommandHandler) ValidatePayload(payload map[string]interface{}, required []string, commandID string) *CommandResponse

ValidatePayload validates that required fields are present in the command payload

type BulkAccountValidationConfig

type BulkAccountValidationConfig struct {
	RequiredFields []string
	MaxAccounts    int
	MinAccounts    int
}

BulkAccountValidationConfig holds configuration for bulk account validation

func DefaultBulkAccountConfig

func DefaultBulkAccountConfig() *BulkAccountValidationConfig

DefaultBulkAccountConfig returns default configuration for bulk account operations

type BulkProcessingConfig

type BulkProcessingConfig struct {
	BatchSize        int
	ProgressInterval int // Send progress every N operations
	BatchDelay       time.Duration
}

BulkProcessingConfig holds configuration for bulk processing operations

func DefaultBulkProcessingConfig

func DefaultBulkProcessingConfig() *BulkProcessingConfig

DefaultBulkProcessingConfig returns default configuration for bulk processing

type BulkProcessingTracker

type BulkProcessingTracker struct {
	Total      int
	Processed  int
	Successful int
	Failed     int
	Errors     []string
}

BulkProcessingTracker tracks progress of bulk operations

func NewBulkProcessingTracker

func NewBulkProcessingTracker(total int) *BulkProcessingTracker

NewBulkProcessingTracker creates a new processing tracker

func (*BulkProcessingTracker) AddFailure

func (bpt *BulkProcessingTracker) AddFailure(err error, entityID string)

AddFailure records a failed operation

func (*BulkProcessingTracker) AddSuccess

func (bpt *BulkProcessingTracker) AddSuccess()

AddSuccess records a successful operation

func (*BulkProcessingTracker) GetStatus

func (bpt *BulkProcessingTracker) GetStatus() string

GetStatus returns current processing status

func (*BulkProcessingTracker) ShouldSendProgress

func (bpt *BulkProcessingTracker) ShouldSendProgress(config *BulkProcessingConfig) bool

ShouldSendProgress determines if progress update should be sent

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern for connection recovery

func NewCircuitBreaker

func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) CanExecute

func (cb *CircuitBreaker) CanExecute() bool

CanExecute returns whether the circuit breaker allows execution

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() CircuitBreakerState

GetState returns the current circuit breaker state

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation

type CircuitBreakerState

type CircuitBreakerState int

CircuitBreakerState represents the current state of a circuit breaker

const (
	// CircuitBreakerClosed indicates the circuit breaker is closed (normal operation)
	CircuitBreakerClosed CircuitBreakerState = iota
	// CircuitBreakerOpen indicates the circuit breaker is open (blocking requests)
	CircuitBreakerOpen
	// CircuitBreakerHalfOpen indicates the circuit breaker is testing if service is recovered
	CircuitBreakerHalfOpen
)

type Command

type Command struct {
	ID      string                 `json:"id"`      // Client-provided ID for request/response matching
	Type    string                 `json:"type"`    // Command type (e.g., "create_status", "follow_user")
	Payload map[string]interface{} `json:"payload"` // Command payload data
}

Command represents a WebSocket command message

type CommandError

type CommandError struct {
	Code    string `json:"code"`              // Error code
	Message string `json:"message"`           // Human-readable error message
	Details string `json:"details,omitempty"` // Additional error details
}

CommandError represents an error in command execution

type CommandHandler

type CommandHandler interface {
	// HandleCommand processes a WebSocket command and returns a response
	HandleCommand(ctx context.Context, conn *ConnectionInfo, cmd *Command) (*CommandResponse, error)

	// GetSupportedCommands returns a list of command types this handler supports
	GetSupportedCommands() []string
}

CommandHandler defines the interface for handling WebSocket commands

type CommandHandlerConfig

type CommandHandlerConfig struct {
	RequiredFields  []string                                                                                    // Fields required in payload validation
	ParameterName   string                                                                                      // Name of the parameter to extract (e.g., "operation_id", "id")
	ErrorCodePrefix string                                                                                      // Prefix for error codes (e.g., "GET_OPERATION", "MARK_READ")
	OperationName   string                                                                                      // Name for error messages (e.g., "get bulk operation", "mark notification as read")
	ResultExtractor func(result interface{}) interface{}                                                        // Function to extract data from service result
	ServiceCall     func(ctx context.Context, conn *ConnectionInfo, extractedParam string) (interface{}, error) // Function to call the service
}

CommandHandlerConfig holds configuration for generic command handling

type CommandInfo

type CommandInfo struct {
	Type           string   `json:"type"`
	Category       string   `json:"category"`
	Description    string   `json:"description"`
	RequiresAuth   bool     `json:"requires_auth"`
	AdminOnly      bool     `json:"admin_only"`
	RequiredFields []string `json:"required_fields,omitempty"`
	OptionalFields []string `json:"optional_fields,omitempty"`
}

CommandInfo contains metadata about a command

type CommandRateLimit

type CommandRateLimit struct {
	MaxPerMinute   int
	MaxPerHour     int
	BurstLimit     int
	RequiresAuth   bool
	CostMultiplier float64 // Some commands cost more (e.g., searches)
}

CommandRateLimit defines rate limits for specific WebSocket commands

type CommandResponse

type CommandResponse struct {
	ID      string                 `json:"id"`              // Matching client ID
	Type    string                 `json:"type"`            // Response type (e.g., "command_result", "command_error")
	Success bool                   `json:"success"`         // Whether command succeeded
	Data    map[string]interface{} `json:"data"`            // Response data
	Error   *CommandError          `json:"error,omitempty"` // Error details if failed
}

CommandResponse represents a WebSocket command response

type CommandRouter

type CommandRouter struct {
	// contains filtered or unexported fields
}

CommandRouter routes WebSocket commands to appropriate handlers

func NewCommandRouter

func NewCommandRouter(logger *zap.Logger) *CommandRouter

NewCommandRouter creates a new WebSocket command router

func (*CommandRouter) GetSupportedCommands

func (cr *CommandRouter) GetSupportedCommands() []string

GetSupportedCommands returns a list of all supported command types

func (*CommandRouter) HandleCommand

func (cr *CommandRouter) HandleCommand(ctx context.Context, conn *ConnectionInfo, cmd *Command) (*CommandResponse, error)

HandleCommand routes a command to the appropriate handler

func (*CommandRouter) RegisterHandler

func (cr *CommandRouter) RegisterHandler(handler CommandHandler)

RegisterHandler registers a command handler for specific command types

type ConflictResolver

type ConflictResolver interface {
	ResolveConflict(ctx context.Context, local, remote *models.WebSocketConnection) (*models.WebSocketConnection, error)
}

ConflictResolver defines strategies for resolving state conflicts

type ConnectionInfo

type ConnectionInfo struct {
	ConnectionID    string                 `json:"connection_id"`
	UserID          string                 `json:"user_id"`
	Username        string                 `json:"username"`
	Streams         []string               `json:"streams"`
	IsAuthenticated bool                   `json:"is_authenticated"`
	Metadata        map[string]interface{} `json:"metadata"` // For storing connection-specific data
}

ConnectionInfo contains information about the WebSocket connection

type ConnectionManager

type ConnectionManager struct {
	// contains filtered or unexported fields
}

ConnectionManager manages WebSocket connection lifecycle, health checks, and resource management

func NewConnectionManager

func NewConnectionManager(
	connRepo interfaces.StreamingConnectionRepository,
	apiClient streamer.Client,
	logger *zap.Logger,
	config *ConnectionManagerConfig,
) *ConnectionManager

NewConnectionManager creates a new connection manager

func (*ConnectionManager) ForceCleanup

func (cm *ConnectionManager) ForceCleanup(ctx context.Context) error

ForceCleanup triggers an immediate cleanup

func (*ConnectionManager) ForceHealthCheck

func (cm *ConnectionManager) ForceHealthCheck(ctx context.Context) error

ForceHealthCheck triggers an immediate health check

func (*ConnectionManager) GetConnectionStats

func (cm *ConnectionManager) GetConnectionStats(ctx context.Context) (map[string]interface{}, error)

GetConnectionStats returns current connection statistics

func (*ConnectionManager) IsRunning

func (cm *ConnectionManager) IsRunning() bool

IsRunning returns whether the connection manager is currently running

func (*ConnectionManager) Start

func (cm *ConnectionManager) Start(ctx context.Context) error

Start begins the connection management background tasks

func (*ConnectionManager) Stop

func (cm *ConnectionManager) Stop() error

Stop stops the connection management background tasks

type ConnectionManagerConfig

type ConnectionManagerConfig struct {
	HealthCheckInterval time.Duration // How often to run health checks
	CleanupInterval     time.Duration // How often to run cleanup tasks
	PingTimeout         time.Duration // How long to wait for pong response
	IdleThreshold       time.Duration // When to mark connections as idle
	MaxIdleConnections  int           // Maximum number of idle connections to keep
}

ConnectionManagerConfig contains configuration for the connection manager

func DefaultConnectionManagerConfig

func DefaultConnectionManagerConfig() *ConnectionManagerConfig

DefaultConnectionManagerConfig returns default configuration

type ConnectionMetrics

type ConnectionMetrics struct {
	Timestamp             time.Time        `json:"timestamp"`
	TotalConnections      int64            `json:"total_connections"`
	ActiveConnections     int64            `json:"active_connections"`
	ConnectionsByState    map[string]int64 `json:"connections_by_state"`
	TotalMessages         int64            `json:"total_messages"`
	TotalBytes            int64            `json:"total_bytes"`
	AverageLatency        time.Duration    `json:"average_latency"`
	ErrorsByType          map[string]int64 `json:"errors_by_type"`
	AverageQualityScore   float64          `json:"average_quality_score"`
	MessageRate           float64          `json:"message_rate"`           // messages per second
	ByteRate              float64          `json:"byte_rate"`              // bytes per second
	ConnectionUtilization float64          `json:"connection_utilization"` // percentage of max connections used
}

ConnectionMetrics represents metrics for a specific time period

type ConnectionRateLimit

type ConnectionRateLimit struct {
	ConnectionID string
	UserID       string
	IPAddress    string
	// contains filtered or unexported fields
}

ConnectionRateLimit tracks rate limiting for a single connection

type ConnectionRepository

type ConnectionRepository interface {
	// GetUserConnections returns all active connections for a user
	GetUserConnections(ctx context.Context, userID string) ([]*StreamConnection, error)

	// GetStreamConnections returns all connections subscribed to a stream
	GetStreamConnections(ctx context.Context, streamName string) ([]*StreamConnection, error)

	// GetConversationConnections returns all connections for participants in a conversation
	GetConversationConnections(ctx context.Context, conversationID string) ([]*StreamConnection, error)
}

ConnectionRepository defines methods for accessing WebSocket connection data

type CostEventPayload

type CostEventPayload struct {
	Operation string                 `json:"operation"`
	Service   string                 `json:"service"` // dynamodb, lambda, s3, etc.
	CostUSD   float64                `json:"cost_usd"`
	Units     map[string]interface{} `json:"units,omitempty"` // RCU, WCU, requests, etc.
	UserID    string                 `json:"user_id,omitempty"`
	TenantID  string                 `json:"tenant_id,omitempty"`
	Timestamp time.Time              `json:"timestamp"`
}

CostEventPayload represents the payload for cost tracking events

type DropStrategy

type DropStrategy int

DropStrategy defines how to handle messages when backpressure is applied

const (
	// DropOldest drops oldest messages when queue is full
	DropOldest DropStrategy = iota // Drop oldest messages when queue is full
	// DropNewest drops newest messages when queue is full
	DropNewest // Drop newest messages when queue is full
	// DropRandom drops random messages when queue is full
	DropRandom // Drop random messages when queue is full
	// RejectNew rejects new messages when queue is full
	RejectNew // Reject new messages when queue is full
)

type ErrorRecoveryConfig

type ErrorRecoveryConfig struct {
	MaxRetries     int           // Maximum number of retry attempts
	BaseRetryDelay time.Duration // Base delay between retries
	MaxRetryDelay  time.Duration // Maximum delay between retries
	JitterFactor   float64       // Jitter factor for randomizing delays (0.0-1.0)
	EnableBackoff  bool          // Enable exponential backoff
}

ErrorRecoveryConfig contains configuration for error recovery

func DefaultErrorRecoveryConfig

func DefaultErrorRecoveryConfig() *ErrorRecoveryConfig

DefaultErrorRecoveryConfig returns default error recovery configuration

type ErrorRecoveryManager

type ErrorRecoveryManager struct {
	// contains filtered or unexported fields
}

ErrorRecoveryManager handles connection error recovery and reconnection strategies

func NewErrorRecoveryManager

func NewErrorRecoveryManager(
	connRepo interfaces.StreamingConnectionRepository,
	apiClient streamer.Client,
	jobQueue JobQueue,
	logger *zap.Logger,
	config *ErrorRecoveryConfig,
) *ErrorRecoveryManager

NewErrorRecoveryManager creates a new error recovery manager

func (*ErrorRecoveryManager) GetRecoveryStats

func (erm *ErrorRecoveryManager) GetRecoveryStats() map[string]interface{}

GetRecoveryStats returns current error recovery statistics

func (*ErrorRecoveryManager) HandleConnectionError

func (erm *ErrorRecoveryManager) HandleConnectionError(ctx context.Context, connectionID string, err error) error

HandleConnectionError processes a connection error and determines recovery strategy

func (*ErrorRecoveryManager) PerformHealthCheck

func (erm *ErrorRecoveryManager) PerformHealthCheck(ctx context.Context, connectionID string) (*HealthCheckResult, error)

PerformHealthCheck performs a comprehensive health check on a connection

func (*ErrorRecoveryManager) ProcessRetryJob

func (erm *ErrorRecoveryManager) ProcessRetryJob(ctx context.Context, msg RetryJobMessage) error

ProcessRetryJob processes a retry job from the queue

func (*ErrorRecoveryManager) ResynchronizeConnection

func (erm *ErrorRecoveryManager) ResynchronizeConnection(ctx context.Context, conn *models.WebSocketConnection) error

ResynchronizeConnection attempts to resynchronize a recovered connection

type Event

type Event struct {
	Type      string                 `json:"type"`      // Event type (e.g., "status.created")
	Stream    string                 `json:"stream"`    // Stream name (e.g., "user:alice")
	Payload   map[string]interface{} `json:"payload"`   // Event payload data
	Timestamp time.Time              `json:"timestamp"` // Event timestamp
}

Event represents a streaming event to be published

func NewAccountUpdatedEvent

func NewAccountUpdatedEvent(accountID string, accountData map[string]interface{}) *Event

NewAccountUpdatedEvent creates an account.updated event

func NewBlockEvent

func NewBlockEvent(blockerID, blockedID string) *Event

NewBlockEvent creates a relationship.blocked event

func NewConversationUpdatedEvent

func NewConversationUpdatedEvent(conversationID string, conversationData map[string]interface{}) *Event

NewConversationUpdatedEvent creates a conversation.updated event

func NewFollowEvent

func NewFollowEvent(followerID, followeeID string) *Event

NewFollowEvent creates a relationship.followed event

func NewListUpdatedEvent

func NewListUpdatedEvent(listID, ownerID string, listData map[string]interface{}) *Event

NewListUpdatedEvent creates a list.updated event

func NewMediaUploadedEvent

func NewMediaUploadedEvent(mediaID, ownerID string, mediaData map[string]interface{}) *Event

NewMediaUploadedEvent creates a media.uploaded event

func NewMuteEvent

func NewMuteEvent(muterID, mutedID string, duration *time.Duration) *Event

NewMuteEvent creates a relationship.muted event

func NewNotificationCreatedEvent

func NewNotificationCreatedEvent(notificationID, recipientID, notificationType string, notificationData map[string]interface{}) *Event

NewNotificationCreatedEvent creates a notification.created event

func NewStatusCreatedEvent

func NewStatusCreatedEvent(statusID, authorID string, statusData map[string]interface{}) *Event

NewStatusCreatedEvent creates a status.created event

func NewStatusDeletedEvent

func NewStatusDeletedEvent(statusID, authorID string) *Event

NewStatusDeletedEvent creates a status.deleted event

func NewStatusUpdatedEvent

func NewStatusUpdatedEvent(statusID, authorID string, statusData map[string]interface{}) *Event

NewStatusUpdatedEvent creates a status.updated event

func NewUnfollowEvent

func NewUnfollowEvent(followerID, followeeID string) *Event

NewUnfollowEvent creates a relationship.unfollowed event

type EventAction

type EventAction string

EventAction represents the action that triggered the event

const (
	// ActionCreate represents content creation actions
	ActionCreate EventAction = "create"
	// ActionUpdate represents content update actions
	ActionUpdate EventAction = "update"
	// ActionDelete represents content deletion actions
	ActionDelete EventAction = "delete"
	// ActionRead represents content read actions
	ActionRead EventAction = "read"
	// ActionFollow represents follow actions
	ActionFollow EventAction = "follow"
	// ActionUnfollow represents unfollow actions
	ActionUnfollow EventAction = "unfollow"
	// ActionFavourite represents favorite actions
	ActionFavourite EventAction = "favourite"
	// ActionUnfavourite represents unfavorite actions
	ActionUnfavourite EventAction = "unfavourite"
	// ActionReblog represents reblog actions
	ActionReblog EventAction = "reblog"
	// ActionUnreblog represents unreblog actions
	ActionUnreblog EventAction = "unreblog"
	// ActionFlag represents content flagging actions
	ActionFlag EventAction = "flag"
	// ActionReview represents content review actions
	ActionReview EventAction = "review"
	// ActionApprove represents content approval actions
	ActionApprove EventAction = "approve"
	// ActionReject represents content rejection actions
	ActionReject EventAction = "reject"
)

type EventBuilder

type EventBuilder struct {
	// contains filtered or unexported fields
}

EventBuilder provides a fluent interface for building streaming events

func NewAccountEvent

func NewAccountEvent(eventType, accountID string) *EventBuilder

NewAccountEvent creates an account-related event

func NewConversationEvent

func NewConversationEvent(eventType, conversationID string) *EventBuilder

NewConversationEvent creates a conversation-related event

func NewEvent

func NewEvent(eventType string) *EventBuilder

NewEvent creates a new EventBuilder with the specified type

func NewListEvent

func NewListEvent(eventType, listID, ownerID string) *EventBuilder

NewListEvent creates a list-related event

func NewMediaEvent

func NewMediaEvent(eventType, mediaID, ownerID string) *EventBuilder

NewMediaEvent creates a media-related event

func NewNotificationEvent

func NewNotificationEvent(eventType, notificationID, recipientID string) *EventBuilder

NewNotificationEvent creates a notification-related event

func NewRelationshipEvent

func NewRelationshipEvent(eventType, actorID, targetID string) *EventBuilder

NewRelationshipEvent creates a relationship-related event

func NewStatusEvent

func NewStatusEvent(eventType, statusID, authorID string) *EventBuilder

NewStatusEvent creates a status-related event

func (*EventBuilder) Build

func (eb *EventBuilder) Build() *Event

Build returns the constructed Event

func (*EventBuilder) ForStream

func (eb *EventBuilder) ForStream(stream string) *EventBuilder

ForStream sets the stream name for the event

func (*EventBuilder) WithData

func (eb *EventBuilder) WithData(key string, value interface{}) *EventBuilder

WithData adds a key-value pair to the event payload

func (*EventBuilder) WithPayload

func (eb *EventBuilder) WithPayload(payload map[string]interface{}) *EventBuilder

WithPayload sets the entire payload for the event

func (*EventBuilder) WithTimestamp

func (eb *EventBuilder) WithTimestamp(timestamp time.Time) *EventBuilder

WithTimestamp sets a custom timestamp for the event

type EventFilter

type EventFilter struct {
	Types       []EventType       `json:"types,omitempty"`        // Filter by event types
	Actions     []EventAction     `json:"actions,omitempty"`      // Filter by actions
	ActorID     string            `json:"actor_id,omitempty"`     // Filter by actor
	UserID      string            `json:"user_id,omitempty"`      // Filter by user
	TenantID    string            `json:"tenant_id,omitempty"`    // Filter by tenant
	Streams     []string          `json:"streams,omitempty"`      // Filter by streams
	Metadata    map[string]string `json:"metadata,omitempty"`     // Filter by metadata
	MinPriority EventPriority     `json:"min_priority,omitempty"` // Minimum priority
}

EventFilter represents criteria for filtering events

func (*EventFilter) Matches

func (f *EventFilter) Matches(event *InternalEvent) bool

Matches checks if an event matches the filter criteria

type EventPriority

type EventPriority int

EventPriority represents the priority of an event

const (
	// PriorityLow represents low priority events
	PriorityLow EventPriority = 1
	// PriorityNormal represents normal priority events
	PriorityNormal EventPriority = 2
	// PriorityHigh represents high priority events
	PriorityHigh EventPriority = 3
	// PriorityUrgent represents urgent priority events
	PriorityUrgent EventPriority = 4
)

type EventType

type EventType string

EventType represents the type of internal event

const (
	// EventTypeStatus represents basic status events
	EventTypeStatus EventType = "status"
	// EventTypeStatusUpdate represents status update events
	EventTypeStatusUpdate EventType = "status.update"
	// EventTypeStatusDelete represents status deletion events
	EventTypeStatusDelete EventType = "status.delete"
	// EventTypeStatusFavourite represents status favoriting events
	EventTypeStatusFavourite EventType = "status.favourite"
	// EventTypeStatusReblog represents status reblogging events
	EventTypeStatusReblog EventType = "status.reblog"

	// EventTypeTimelineUpdate represents timeline update events
	EventTypeTimelineUpdate EventType = "timeline.update"
	// EventTypeTimelineRefresh represents timeline refresh events
	EventTypeTimelineRefresh EventType = "timeline.refresh"

	// EventTypeAccountUpdate represents account update events
	EventTypeAccountUpdate EventType = "account.update"
	// EventTypeAccountFollow represents account follow events
	EventTypeAccountFollow EventType = "account.follow"
	// EventTypeAccountUnfollow represents account unfollow events
	EventTypeAccountUnfollow EventType = "account.unfollow"

	// EventTypeNotification represents basic notification events
	EventTypeNotification EventType = "notification"
	// EventTypeNotificationRead represents notification read events
	EventTypeNotificationRead EventType = "notification.read"

	// EventTypeModeration represents basic moderation events
	EventTypeModeration EventType = "moderation"
	// EventTypeModerationFlag represents content flagging events
	EventTypeModerationFlag EventType = "moderation.flag"
	// EventTypeModerationReview represents moderation review events
	EventTypeModerationReview EventType = "moderation.review"

	// EventTypeTrustUpdate represents trust score update events
	EventTypeTrustUpdate EventType = "trust.update"
	// EventTypeReputationUpdate represents reputation score update events
	EventTypeReputationUpdate EventType = "reputation.update"
	// EventTypeVouchUpdate represents vouch update events
	EventTypeVouchUpdate EventType = "vouch.update"

	// EventTypeAIAnalysis represents AI content analysis events
	EventTypeAIAnalysis EventType = "ai.analysis"
	// EventTypeAIClassification represents AI classification events
	EventTypeAIClassification EventType = "ai.classification"
	// EventTypeAIModeration represents AI moderation events
	EventTypeAIModeration EventType = "ai.moderation"

	// EventTypeHashtagTrend represents hashtag trending events
	EventTypeHashtagTrend EventType = "hashtag.trend"
	// EventTypeHashtagUpdate represents hashtag update events
	EventTypeHashtagUpdate EventType = "hashtag.update"

	// EventTypeMediaUpdate represents media update events
	EventTypeMediaUpdate EventType = "media.update"
	// EventTypeMediaProcess represents media processing events
	EventTypeMediaProcess EventType = "media.process"

	// EventTypeCostUpdate represents cost update events
	EventTypeCostUpdate EventType = "cost.update"
	// EventTypeCostAlert represents cost alert events
	EventTypeCostAlert EventType = "cost.alert"

	// EventTypeSystemAlert represents system alert events
	EventTypeSystemAlert EventType = "system.alert"
	// EventTypeHealthCheck represents health check events
	EventTypeHealthCheck EventType = "health.check"

	// EventTypeMetricsUpdate represents real-time metrics update events for GraphQL subscriptions
	EventTypeMetricsUpdate EventType = "metrics.update"

	// EventTypeFederationHealthUpdate represents federation health update events
	EventTypeFederationHealthUpdate EventType = "federation.health.update"
	// EventTypeFederationFailure represents federation failure events
	EventTypeFederationFailure EventType = "federation.failure"
	// EventTypeFederationRecovery represents federation recovery events
	EventTypeFederationRecovery EventType = "federation.recovery"
)

type GlobalRateLimit

type GlobalRateLimit struct {
	Identifier string // user:id or ip:address
	// contains filtered or unexported fields
}

GlobalRateLimit tracks rate limiting across all connections for a user/IP

type HashtagEventPayload

type HashtagEventPayload struct {
	Hashtag       string    `json:"hashtag"`
	Count         int64     `json:"count"`
	PreviousCount int64     `json:"previous_count,omitempty"`
	TrendScore    float64   `json:"trend_score,omitempty"`
	Period        string    `json:"period"` // hour, day, week
	UpdatedAt     time.Time `json:"updated_at"`
}

HashtagEventPayload represents the payload for hashtag/trend events

type HealthBasedResolver

type HealthBasedResolver struct{}

HealthBasedResolver resolves conflicts based on connection health

func (*HealthBasedResolver) ResolveConflict

ResolveConflict implements ConflictResolver by choosing the healthier connection

type HealthCheckResult

type HealthCheckResult struct {
	ConnectionID      string    `json:"connection_id"`
	Timestamp         time.Time `json:"timestamp"`
	IsHealthy         bool      `json:"is_healthy"`
	QualityScore      float64   `json:"quality_score"`
	LatencyMs         int64     `json:"latency_ms"`
	PacketLoss        float64   `json:"packet_loss"`
	RecommendedAction string    `json:"recommended_action"`
}

HealthCheckResult contains the results of a connection health check

type HighestPriorityResolver

type HighestPriorityResolver struct{}

HighestPriorityResolver resolves conflicts based on state priority

func (*HighestPriorityResolver) ResolveConflict

ResolveConflict implements ConflictResolver by choosing the connection with higher priority state

type InternalEvent

type InternalEvent struct {
	// Event identification
	ID     string      `json:"id"`
	Type   EventType   `json:"type"`
	Action EventAction `json:"action"`

	// Event context
	ActorID  string `json:"actor_id,omitempty"`  // Who triggered the event
	TargetID string `json:"target_id,omitempty"` // What was affected
	UserID   string `json:"user_id,omitempty"`   // User context for the event
	TenantID string `json:"tenant_id,omitempty"` // Multi-tenant support

	// Timing
	Timestamp time.Time `json:"timestamp"`

	// Event payload - the actual data
	Data interface{} `json:"data"`

	// Metadata for filtering and routing
	Metadata map[string]string `json:"metadata,omitempty"`

	// Stream context
	Streams []string `json:"streams,omitempty"` // Which streams this event should go to

	// Priority for event processing
	Priority EventPriority `json:"priority"`
}

InternalEvent represents an event in the internal event bus

func CreateEvent

func CreateEvent(eventType EventType, action EventAction, data interface{}) *InternalEvent

CreateEvent creates a new internal event with common defaults

func FromJSON

func FromJSON(data []byte) (*InternalEvent, error)

FromJSON deserializes an event from JSON

func (*InternalEvent) ToJSON

func (e *InternalEvent) ToJSON() ([]byte, error)

ToJSON serializes the event to JSON

func (*InternalEvent) WithActor

func (e *InternalEvent) WithActor(actorID string) *InternalEvent

WithActor sets the actor ID for the event

func (*InternalEvent) WithMetadata

func (e *InternalEvent) WithMetadata(key, value string) *InternalEvent

WithMetadata adds metadata to the event

func (*InternalEvent) WithPriority

func (e *InternalEvent) WithPriority(priority EventPriority) *InternalEvent

WithPriority sets the priority for the event

func (*InternalEvent) WithStreams

func (e *InternalEvent) WithStreams(streams ...string) *InternalEvent

WithStreams sets the streams for the event

func (*InternalEvent) WithTarget

func (e *InternalEvent) WithTarget(targetID string) *InternalEvent

WithTarget sets the target ID for the event

func (*InternalEvent) WithTenant

func (e *InternalEvent) WithTenant(tenantID string) *InternalEvent

WithTenant sets the tenant ID for the event

func (*InternalEvent) WithUser

func (e *InternalEvent) WithUser(userID string) *InternalEvent

WithUser sets the user ID for the event

type JobQueue

type JobQueue interface {
	QueueDelayedJob(ctx context.Context, queueName string, messageBody interface{}, delaySeconds int32) error
}

JobQueue defines the interface for job queue operations to avoid import cycles

type LastWriteWinsResolver

type LastWriteWinsResolver struct{}

LastWriteWinsResolver resolves conflicts by choosing the connection with the most recent update

func (*LastWriteWinsResolver) ResolveConflict

ResolveConflict implements ConflictResolver by choosing the connection with the most recent update

type ListCommandValidationConfig

type ListCommandValidationConfig struct {
	RequiredFields []string
	RequireID      bool
	RequireTitle   bool
}

ListCommandValidationConfig holds configuration for list command validation

type MediaEventPayload

type MediaEventPayload struct {
	MediaID     string                 `json:"media_id"`
	URL         string                 `json:"url"`
	MediaType   string                 `json:"media_type"` // image, video, audio
	Status      string                 `json:"status"`     // processing, ready, error
	Size        int64                  `json:"size,omitempty"`
	Duration    int64                  `json:"duration,omitempty"` // for video/audio
	Metadata    map[string]interface{} `json:"metadata,omitempty"`
	ProcessedAt time.Time              `json:"processed_at,omitempty"`
}

MediaEventPayload represents the payload for media stream events

type MetricsCollector

type MetricsCollector struct {
	// contains filtered or unexported fields
}

MetricsCollector collects and aggregates streaming connection metrics

func NewMetricsCollector

func NewMetricsCollector(
	connRepo interfaces.StreamingConnectionRepository,
	logger *zap.Logger,
	config *MetricsCollectorConfig,
) *MetricsCollector

NewMetricsCollector creates a new metrics collector

func (*MetricsCollector) GetCurrentMetrics

func (mc *MetricsCollector) GetCurrentMetrics() ConnectionMetrics

GetCurrentMetrics returns current connection metrics

func (*MetricsCollector) GetMetricsSummary

func (mc *MetricsCollector) GetMetricsSummary() map[string]interface{}

GetMetricsSummary returns a summary of key metrics

func (*MetricsCollector) GetPerformanceMetrics

func (mc *MetricsCollector) GetPerformanceMetrics() PerformanceMetrics

GetPerformanceMetrics returns detailed performance metrics

func (*MetricsCollector) IsCollecting

func (mc *MetricsCollector) IsCollecting() bool

IsCollecting returns whether metrics collection is active

func (*MetricsCollector) Start

func (mc *MetricsCollector) Start(ctx context.Context) error

Start begins metrics collection

func (*MetricsCollector) Stop

func (mc *MetricsCollector) Stop() error

Stop stops metrics collection

type MetricsCollectorConfig

type MetricsCollectorConfig struct {
	CollectionInterval time.Duration // How often to collect metrics
	RetentionPeriod    time.Duration // How long to retain metrics
}

MetricsCollectorConfig contains configuration for metrics collection

func DefaultMetricsCollectorConfig

func DefaultMetricsCollectorConfig() *MetricsCollectorConfig

DefaultMetricsCollectorConfig returns default configuration

type MetricsEventPayload

type MetricsEventPayload struct {
	MetricID             string            `json:"metric_id"`
	ServiceName          string            `json:"service_name"`
	MetricType           string            `json:"metric_type"`
	SubscriptionCategory string            `json:"subscription_category"` // moderation, security, performance, etc.
	AggregationLevel     string            `json:"aggregation_level"`     // raw, 5min, hourly, daily
	Timestamp            time.Time         `json:"timestamp"`
	Count                int64             `json:"count,omitempty"`
	Sum                  float64           `json:"sum,omitempty"`
	Min                  float64           `json:"min,omitempty"`
	Max                  float64           `json:"max,omitempty"`
	Average              float64           `json:"average,omitempty"`
	P50                  float64           `json:"p50,omitempty"`
	P95                  float64           `json:"p95,omitempty"`
	P99                  float64           `json:"p99,omitempty"`
	Unit                 string            `json:"unit,omitempty"`
	UserCostMicrocents   int64             `json:"user_cost_microcents,omitempty"`
	TotalCostMicrocents  int64             `json:"total_cost_microcents,omitempty"`
	Dimensions           map[string]string `json:"dimensions,omitempty"`
	UserID               string            `json:"user_id,omitempty"`
	TenantID             string            `json:"tenant_id,omitempty"`
	InstanceDomain       string            `json:"instance_domain,omitempty"`
}

MetricsEventPayload represents the payload for metrics update events sent to GraphQL subscriptions

type MockPublishedEvent

type MockPublishedEvent struct {
	Method      string    `json:"method"`    // "PublishToUser", "PublishToStream", "PublishToConversation"
	TargetID    string    `json:"target_id"` // userID, streamName, or conversationID
	Event       *Event    `json:"event"`
	PublishedAt time.Time `json:"published_at"`
}

MockPublishedEvent represents an event that was published via the mock publisher

type ModerationEventPayload

type ModerationEventPayload struct {
	ItemID      string                 `json:"item_id"`
	ItemType    string                 `json:"item_type"` // status, account, media
	Action      string                 `json:"action"`    // flag, review, approve, reject
	ModeratorID string                 `json:"moderator_id,omitempty"`
	Reason      string                 `json:"reason,omitempty"`
	Details     map[string]interface{} `json:"details,omitempty"`
	CreatedAt   time.Time              `json:"created_at"`
}

ModerationEventPayload represents the payload for moderation events

type NotificationEventPayload

type NotificationEventPayload struct {
	NotificationID string    `json:"notification_id"`
	Type           string    `json:"type"` // follow, mention, favourite, reblog, etc.
	RecipientID    string    `json:"recipient_id"`
	ActorID        string    `json:"actor_id"`
	StatusID       string    `json:"status_id,omitempty"`
	Read           bool      `json:"read"`
	CreatedAt      time.Time `json:"created_at"`
}

NotificationEventPayload represents the payload for notification events

type PerformanceMetrics

type PerformanceMetrics struct {
	AverageConnectionDuration time.Duration `json:"average_connection_duration"`
	MedianConnectionDuration  time.Duration `json:"median_connection_duration"`
	P95ConnectionDuration     time.Duration `json:"p95_connection_duration"`
	P99ConnectionDuration     time.Duration `json:"p99_connection_duration"`

	AverageMessageRate float64 `json:"average_message_rate"`
	PeakMessageRate    float64 `json:"peak_message_rate"`

	ConnectionSuccessRate float64 `json:"connection_success_rate"`
	MessageDeliveryRate   float64 `json:"message_delivery_rate"`

	HealthyConnections   int64   `json:"healthy_connections"`
	UnhealthyConnections int64   `json:"unhealthy_connections"`
	AverageQualityScore  float64 `json:"average_quality_score"`
}

PerformanceMetrics represents performance-specific metrics

type ProgressUpdateHelper

type ProgressUpdateHelper struct {
	// contains filtered or unexported fields
}

ProgressUpdateHelper handles WebSocket progress updates for async operations

func NewProgressUpdateHelper

func NewProgressUpdateHelper(publisher Publisher, logger *zap.Logger) *ProgressUpdateHelper

NewProgressUpdateHelper creates a new progress update helper

func (*ProgressUpdateHelper) SendFinalUpdate

func (puh *ProgressUpdateHelper) SendFinalUpdate(
	conn *ConnectionInfo,
	operationID string,
	tracker *BulkProcessingTracker,
	finalMessage string,
)

SendFinalUpdate sends the final completion update with error details

func (*ProgressUpdateHelper) SendProgressUpdate

func (puh *ProgressUpdateHelper) SendProgressUpdate(
	conn *ConnectionInfo,
	operationID string,
	tracker *BulkProcessingTracker,
	message string,
)

SendProgressUpdate sends a progress update via WebSocket

type PublishConnectionHelper

type PublishConnectionHelper struct {
	// contains filtered or unexported fields
}

PublishConnectionHelper handles common connection publishing logic

func NewPublishConnectionHelper

func NewPublishConnectionHelper(logger *zap.Logger) *PublishConnectionHelper

NewPublishConnectionHelper creates a new publish connection helper

func (*PublishConnectionHelper) PublishToConnections

func (pch *PublishConnectionHelper) PublishToConnections(
	ctx context.Context,
	connections []*StreamConnection,
	event *Event,
	publishFunc func(ctx context.Context, connectionID string, event *Event) error,
	logContext map[string]interface{},
) error

PublishToConnections publishes an event to multiple connections with common error handling

type Publisher

type Publisher interface {
	// PublishToUser publishes an event to a specific user's streams
	PublishToUser(ctx context.Context, userID string, event *Event) error

	// PublishToStream publishes an event to all subscribers of a stream
	PublishToStream(ctx context.Context, streamName string, event *Event) error

	// PublishToConversation publishes an event to all participants in a conversation
	PublishToConversation(ctx context.Context, conversationID string, event *Event) error

	// Close closes the publisher and cleans up resources
	Close() error
}

Publisher defines the interface for publishing streaming events

func NewAPIGatewayPublisher

func NewAPIGatewayPublisher(
	client streamer.Client,
	connRepo ConnectionRepository,
	endpoint string,
	logger *zap.Logger,
) Publisher

NewAPIGatewayPublisher creates a new publisher using API Gateway Management API

func NewMockPublisher

func NewMockPublisher() Publisher

NewMockPublisher creates a new mock publisher for testing

func NewNoopPublisher added in v1.1.9

func NewNoopPublisher() Publisher

NewNoopPublisher returns a Publisher implementation that drops all events.

This is used as a safe default when streaming is not configured so API paths never panic when emitting optional real-time events.

func NewQueuePublisher

func NewQueuePublisher(queue StreamQueueService, logger *zap.Logger) Publisher

NewQueuePublisher creates a publisher that enqueues streaming events to the DynamoDB-backed stream queue.

type PublisherConnectionConfig

type PublisherConnectionConfig struct {
	EventType      string
	StreamTemplate string // Template for stream name, e.g., "user:%s"
	Timeout        time.Duration
}

PublisherConnectionConfig holds configuration for publisher connection operations

func DefaultPublisherConnectionConfig

func DefaultPublisherConnectionConfig() *PublisherConnectionConfig

DefaultPublisherConnectionConfig returns default configuration for publisher connections

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements a token bucket rate limiter for backpressure control

func (*RateLimiter) AllowRequest

func (rl *RateLimiter) AllowRequest() bool

AllowRequest checks if a request should be allowed based on rate limiting

type RetryJobMessage

type RetryJobMessage struct {
	ConnectionID  string `json:"connection_id"`
	RetryCount    int    `json:"retry_count"`
	Timestamp     int64  `json:"timestamp"`
	OriginalError string `json:"original_error,omitempty"`
}

RetryJobMessage represents a streaming connection retry job

type ServiceRegistry

type ServiceRegistry interface {
}

ServiceRegistry defines the interface needed by command handlers

type ShutdownManager

type ShutdownManager struct {
	// contains filtered or unexported fields
}

ShutdownManager manages graceful shutdown of WebSocket connections and backpressure control

func NewShutdownManager

func NewShutdownManager(
	connRepo interfaces.StreamingConnectionRepository,
	apiClient streamer.Client,
	logger *zap.Logger,
	config *ShutdownManagerConfig,
) *ShutdownManager

NewShutdownManager creates a new shutdown manager

func (*ShutdownManager) ApplyBackpressure

func (sm *ShutdownManager) ApplyBackpressure(connectionID string, messageSize int64) (BackpressureAction, error)

ApplyBackpressure checks if backpressure should be applied and returns appropriate action

func (*ShutdownManager) GetBackpressureStats

func (sm *ShutdownManager) GetBackpressureStats() map[string]interface{}

GetBackpressureStats returns current backpressure statistics

func (*ShutdownManager) GetShutdownStats

func (sm *ShutdownManager) GetShutdownStats() map[string]interface{}

GetShutdownStats returns current shutdown statistics

func (*ShutdownManager) InitiateGracefulShutdown

func (sm *ShutdownManager) InitiateGracefulShutdown(ctx context.Context) error

InitiateGracefulShutdown begins the graceful shutdown process

func (*ShutdownManager) IsShuttingDown

func (sm *ShutdownManager) IsShuttingDown() bool

IsShuttingDown returns whether a graceful shutdown is in progress

func (*ShutdownManager) WaitForShutdown

func (sm *ShutdownManager) WaitForShutdown() error

WaitForShutdown waits for the graceful shutdown process to complete

type ShutdownManagerConfig

type ShutdownManagerConfig struct {
	ShutdownTimeout time.Duration       // Total time to wait for graceful shutdown
	DrainTimeout    time.Duration       // Time to wait for connection draining
	Backpressure    *BackpressureConfig // Backpressure configuration
}

ShutdownManagerConfig contains configuration for shutdown management

func DefaultShutdownManagerConfig

func DefaultShutdownManagerConfig() *ShutdownManagerConfig

DefaultShutdownManagerConfig returns default configuration

type SlidingWindow

type SlidingWindow struct {
	// contains filtered or unexported fields
}

SlidingWindow implements a time-based sliding window counter

func NewSlidingWindow

func NewSlidingWindow(windowSize time.Duration) *SlidingWindow

NewSlidingWindow creates a new sliding window counter

func (*SlidingWindow) Add

func (sw *SlidingWindow) Add(count int)

Add increments the counter for the current time

func (*SlidingWindow) Count

func (sw *SlidingWindow) Count() int

Count returns the total count in the window

func (*SlidingWindow) CountRecent

func (sw *SlidingWindow) CountRecent(duration time.Duration) int

CountRecent returns the count in a recent time period

type StateSynchronizer

type StateSynchronizer struct {
	// contains filtered or unexported fields
}

StateSynchronizer manages connection state synchronization across multiple instances

func NewStateSynchronizer

func NewStateSynchronizer(
	connRepo interfaces.StreamingConnectionRepository,
	logger *zap.Logger,
	config *StateSynchronizerConfig,
) *StateSynchronizer

NewStateSynchronizer creates a new state synchronizer

func (*StateSynchronizer) ForceSync

func (ss *StateSynchronizer) ForceSync(ctx context.Context) error

ForceSync triggers an immediate synchronization

func (*StateSynchronizer) GetInstanceInfo

func (ss *StateSynchronizer) GetInstanceInfo() map[string]interface{}

GetInstanceInfo returns information about this instance

func (*StateSynchronizer) GetSyncStats

func (ss *StateSynchronizer) GetSyncStats() SyncStats

GetSyncStats returns current synchronization statistics

func (*StateSynchronizer) IsRunning

func (ss *StateSynchronizer) IsRunning() bool

IsRunning returns whether state synchronization is active

func (*StateSynchronizer) Start

func (ss *StateSynchronizer) Start(ctx context.Context) error

Start begins state synchronization

func (*StateSynchronizer) Stop

func (ss *StateSynchronizer) Stop() error

Stop stops state synchronization

type StateSynchronizerConfig

type StateSynchronizerConfig struct {
	InstanceID       string           // Unique instance identifier
	SyncInterval     time.Duration    // How often to sync state
	StaleThreshold   time.Duration    // When to consider connections stale
	ConflictResolver ConflictResolver // Strategy for resolving conflicts
}

StateSynchronizerConfig contains configuration for state synchronization

func DefaultStateSynchronizerConfig

func DefaultStateSynchronizerConfig() *StateSynchronizerConfig

DefaultStateSynchronizerConfig returns default configuration

type StatusEventPayload

type StatusEventPayload struct {
	StatusID       string                 `json:"status_id"`
	AuthorID       string                 `json:"author_id"`
	AuthorUsername string                 `json:"author_username"`
	Content        string                 `json:"content,omitempty"`
	Visibility     string                 `json:"visibility,omitempty"`
	InReplyToID    string                 `json:"in_reply_to_id,omitempty"`
	ReblogOfID     string                 `json:"reblog_of_id,omitempty"`
	Sensitive      bool                   `json:"sensitive"`
	Language       string                 `json:"language,omitempty"`
	Hashtags       []string               `json:"hashtags,omitempty"`
	Mentions       []string               `json:"mentions,omitempty"`
	URL            string                 `json:"url,omitempty"`
	CreatedAt      time.Time              `json:"created_at"`
	UpdatedAt      time.Time              `json:"updated_at,omitempty"`
	ExtraData      map[string]interface{} `json:"extra_data,omitempty"`
}

StatusEventPayload represents the payload for status-related events

type StreamConnection

type StreamConnection struct {
	ConnectionID string    `json:"connection_id"`
	UserID       string    `json:"user_id,omitempty"`
	Username     string    `json:"username,omitempty"`
	Streams      []string  `json:"streams"`
	LastActivity time.Time `json:"last_activity"`
}

StreamConnection represents a WebSocket connection for streaming

type StreamEventLog

type StreamEventLog struct {
	// contains filtered or unexported fields
}

StreamEventLog provides append/query operations for SSE stream event persistence.

func NewStreamEventLog

func NewStreamEventLog(db core.DB, ttl time.Duration) *StreamEventLog

NewStreamEventLog creates a new StreamEventLog with the provided DynamORM DB and TTL.

func (*StreamEventLog) Append

func (l *StreamEventLog) Append(ctx context.Context, streamName, eventType, data string) (string, error)

Append writes a new event into the stream log for the given stream.

func (*StreamEventLog) Enabled

func (l *StreamEventLog) Enabled() bool

Enabled reports whether the StreamEventLog is configured and usable.

func (*StreamEventLog) Query

func (l *StreamEventLog) Query(ctx context.Context, streamName, afterID string, limit int32) ([]StreamEventLogItem, error)

Query returns up to limit events for the stream after afterID, ordered ascending by ID.

type StreamEventLogItem

type StreamEventLogItem struct {
	ID        string
	Event     string
	Data      string
	CreatedAt time.Time
}

StreamEventLogItem represents a single persisted SSE event for a given stream.

type StreamQueueService

type StreamQueueService interface {
	// QueueEventForUser queues an event for a specific user's streams
	QueueEventForUser(ctx context.Context, userID string, eventType string, payload map[string]interface{}) error

	// QueueEventForStream queues an event for all subscribers of a stream
	QueueEventForStream(ctx context.Context, streamName string, eventType string, payload map[string]interface{}) error

	// QueueEventForConversation queues an event for all participants in a conversation
	QueueEventForConversation(ctx context.Context, conversationID string, eventType string, payload map[string]interface{}) error

	// QueueEventForFollowers queues an event for all followers of a user
	QueueEventForFollowers(ctx context.Context, userID string, eventType string, payload map[string]interface{}) error
}

StreamQueueService queues streaming events to DynamoDB for processing by stream-router

func NewDynamoStreamQueue

func NewDynamoStreamQueue(db core.DB, tableName string, logger *zap.Logger) StreamQueueService

NewDynamoStreamQueue creates a new DynamoDB-based stream queue service

type SyncStats

type SyncStats struct {
	TotalSyncs            int64         `json:"total_syncs"`
	SuccessfulSyncs       int64         `json:"successful_syncs"`
	FailedSyncs           int64         `json:"failed_syncs"`
	ConflictsResolved     int64         `json:"conflicts_resolved"`
	StaleConnectionsFound int64         `json:"stale_connections_found"`
	LastSyncTime          time.Time     `json:"last_sync_time"`
	LastSyncDuration      time.Duration `json:"last_sync_duration"`
	AverageSyncDuration   time.Duration `json:"average_sync_duration"`
}

SyncStats tracks synchronization statistics

type TrustEventPayload

type TrustEventPayload struct {
	SubjectID     string                 `json:"subject_id"`
	SubjectType   string                 `json:"subject_type"` // user, content, domain
	Score         float64                `json:"score"`
	PreviousScore float64                `json:"previous_score,omitempty"`
	Reason        string                 `json:"reason,omitempty"`
	Evidence      map[string]interface{} `json:"evidence,omitempty"`
	UpdatedBy     string                 `json:"updated_by,omitempty"`
	UpdatedAt     time.Time              `json:"updated_at"`
}

TrustEventPayload represents the payload for trust/reputation events

type WebSocketRateLimitConfig

type WebSocketRateLimitConfig struct {
	// Connection limits
	MaxConnectionsPerUser int
	MaxConnectionsPerIP   int
	ConnectionWindowSize  time.Duration

	// Command rate limits (per connection)
	MaxCommandsPerMinute int
	MaxCommandsPerSecond int
	CommandWindowSize    time.Duration

	// Command-specific limits
	CommandLimits map[string]CommandRateLimit

	// Progressive delays
	EnableProgressiveDelays bool
	BaseDelayMillis         int
	MaxDelayMillis          int

	// Burst protection
	BurstWindowSize  time.Duration
	MaxBurstCommands int

	// Penalty configuration
	ViolationThreshold int
	PenaltyDuration    time.Duration
	PenaltyMultiplier  float64
}

WebSocketRateLimitConfig defines WebSocket rate limiting configuration

func DefaultWebSocketRateLimitConfig

func DefaultWebSocketRateLimitConfig() *WebSocketRateLimitConfig

DefaultWebSocketRateLimitConfig returns default WebSocket rate limit configuration

type WebSocketRateLimiter

type WebSocketRateLimiter struct {
	// contains filtered or unexported fields
}

WebSocketRateLimiter provides rate limiting for WebSocket connections and commands

func NewWebSocketRateLimiter

func NewWebSocketRateLimiter(config *WebSocketRateLimitConfig, repo interfaces.RateLimitRepository, logger *zap.Logger) *WebSocketRateLimiter

NewWebSocketRateLimiter creates a new WebSocket rate limiter

func (*WebSocketRateLimiter) CheckCommand

func (wrl *WebSocketRateLimiter) CheckCommand(ctx context.Context, connectionID string, command *Command) (bool, time.Duration, error)

CheckCommand checks if a WebSocket command should be allowed

func (*WebSocketRateLimiter) CheckConnection

func (wrl *WebSocketRateLimiter) CheckConnection(ctx context.Context, userID, ipAddress string) (bool, string, error)

CheckConnection checks if a new connection should be allowed

func (*WebSocketRateLimiter) GetConnectionStatus

func (wrl *WebSocketRateLimiter) GetConnectionStatus(connectionID string) (map[string]interface{}, error)

GetConnectionStatus returns the rate limit status for a connection

func (*WebSocketRateLimiter) OnConnect

func (wrl *WebSocketRateLimiter) OnConnect(connectionID, userID, ipAddress string)

OnConnect handles a new WebSocket connection

func (*WebSocketRateLimiter) OnDisconnect

func (wrl *WebSocketRateLimiter) OnDisconnect(connectionID string)

OnDisconnect handles a WebSocket disconnection

func (*WebSocketRateLimiter) ResetConnection

func (wrl *WebSocketRateLimiter) ResetConnection(connectionID string) error

ResetConnection resets rate limits for a specific connection (admin function)

Directories

Path Synopsis
Package handlers provides WebSocket command handlers for different domains
Package handlers provides WebSocket command handlers for different domains

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL