Documentation
¶
Overview ¶
Package streaming provides shared helper functions for WebSocket command handlers
Package streaming defines WebSocket command types and constants ¶
Package streaming provides WebSocket command handling infrastructure ¶
Package streaming provides connection lifecycle management for WebSocket streaming ¶
Package streaming provides error recovery mechanisms for WebSocket connections ¶
Package streaming provides WebSocket event types and data structures for real-time activity updates.
Package streaming provides event constants and builder helpers for real-time streaming ¶
Package streaming provides comprehensive metrics collection for WebSocket connections ¶
Package streaming provides event publishing infrastructure for real-time streaming ¶
Package streaming provides event queueing infrastructure for real-time streaming ¶
Package streaming provides WebSocket streaming and rate limiting ¶
Package streaming provides graceful shutdown and backpressure management for WebSocket connections ¶
Package streaming provides connection state synchronization for multi-instance deployments
Index ¶
- Constants
- Variables
- func ConversationStreamName(conversationID string) string
- func DirectStreamName(userID string) string
- func GetCommandInfo() map[string]CommandInfo
- func GetCommandsByCategory() map[string][]string
- func GetEventCategory(eventType string) string
- func GetRequiredAuth(commandType string) bool
- func HashtagStreamName(hashtag string) string
- func IsAdminOnly(commandType string) bool
- func IsValidEventType(eventType string) bool
- func IsValidStreamName(streamName string) bool
- func ListStreamName(listID string) string
- func UserNotificationStreamName(userID string) string
- func UserStreamName(userID string) string
- type AIEventPayload
- type AccountEventPayload
- type BackpressureAction
- type BackpressureConfig
- type BaseCommandHandler
- func (bch *BaseCommandHandler) ConvertToJSON(data interface{}) (map[string]interface{}, error)
- func (bch *BaseCommandHandler) CreateBulkOperationResponse(commandID string, operationID string, status string, total int, message string) *CommandResponse
- func (bch *BaseCommandHandler) CreateBulkServiceResponse(commandID string, operation interface{}, _ string, _ string) *CommandResponse
- func (bch *BaseCommandHandler) CreateErrorResponse(commandID string, code string, message string, details string) *CommandResponse
- func (bch *BaseCommandHandler) CreateSuccessResponse(commandID string, data map[string]interface{}) *CommandResponse
- func (bch *BaseCommandHandler) ExecuteStandardCommandFlow(ctx context.Context, conn *ConnectionInfo, cmd *Command, ...) (*CommandResponse, error)
- func (bch *BaseCommandHandler) GetBool(payload map[string]interface{}, key string, defaultValue bool) bool
- func (bch *BaseCommandHandler) GetInt(payload map[string]interface{}, key string, defaultValue int) int
- func (bch *BaseCommandHandler) GetString(payload map[string]interface{}, key string, defaultValue string) string
- func (bch *BaseCommandHandler) GetStringSlice(payload map[string]interface{}, key string) []string
- func (bch *BaseCommandHandler) Logger() *zap.Logger
- func (bch *BaseCommandHandler) RequireAuth(conn *ConnectionInfo, commandID string) *CommandResponse
- func (bch *BaseCommandHandler) ValidateBulkAccountCommand(conn *ConnectionInfo, cmd *Command, config *BulkAccountValidationConfig) ([]string, *CommandResponse)
- func (bch *BaseCommandHandler) ValidateListCommand(conn *ConnectionInfo, cmd *Command, config *ListCommandValidationConfig) *CommandResponse
- func (bch *BaseCommandHandler) ValidatePayload(payload map[string]interface{}, required []string, commandID string) *CommandResponse
- type BulkAccountValidationConfig
- type BulkProcessingConfig
- type BulkProcessingTracker
- type CircuitBreaker
- type CircuitBreakerState
- type Command
- type CommandError
- type CommandHandler
- type CommandHandlerConfig
- type CommandInfo
- type CommandRateLimit
- type CommandResponse
- type CommandRouter
- type ConflictResolver
- type ConnectionInfo
- type ConnectionManager
- func (cm *ConnectionManager) ForceCleanup(ctx context.Context) error
- func (cm *ConnectionManager) ForceHealthCheck(ctx context.Context) error
- func (cm *ConnectionManager) GetConnectionStats(ctx context.Context) (map[string]interface{}, error)
- func (cm *ConnectionManager) IsRunning() bool
- func (cm *ConnectionManager) Start(ctx context.Context) error
- func (cm *ConnectionManager) Stop() error
- type ConnectionManagerConfig
- type ConnectionMetrics
- type ConnectionRateLimit
- type ConnectionRepository
- type CostEventPayload
- type DropStrategy
- type ErrorRecoveryConfig
- type ErrorRecoveryManager
- func (erm *ErrorRecoveryManager) GetRecoveryStats() map[string]interface{}
- func (erm *ErrorRecoveryManager) HandleConnectionError(ctx context.Context, connectionID string, err error) error
- func (erm *ErrorRecoveryManager) PerformHealthCheck(ctx context.Context, connectionID string) (*HealthCheckResult, error)
- func (erm *ErrorRecoveryManager) ProcessRetryJob(ctx context.Context, msg RetryJobMessage) error
- func (erm *ErrorRecoveryManager) ResynchronizeConnection(ctx context.Context, conn *models.WebSocketConnection) error
- type Event
- func NewAccountUpdatedEvent(accountID string, accountData map[string]interface{}) *Event
- func NewBlockEvent(blockerID, blockedID string) *Event
- func NewConversationUpdatedEvent(conversationID string, conversationData map[string]interface{}) *Event
- func NewFollowEvent(followerID, followeeID string) *Event
- func NewListUpdatedEvent(listID, ownerID string, listData map[string]interface{}) *Event
- func NewMediaUploadedEvent(mediaID, ownerID string, mediaData map[string]interface{}) *Event
- func NewMuteEvent(muterID, mutedID string, duration *time.Duration) *Event
- func NewNotificationCreatedEvent(notificationID, recipientID, notificationType string, ...) *Event
- func NewStatusCreatedEvent(statusID, authorID string, statusData map[string]interface{}) *Event
- func NewStatusDeletedEvent(statusID, authorID string) *Event
- func NewStatusUpdatedEvent(statusID, authorID string, statusData map[string]interface{}) *Event
- func NewUnfollowEvent(followerID, followeeID string) *Event
- type EventAction
- type EventBuilder
- func NewAccountEvent(eventType, accountID string) *EventBuilder
- func NewConversationEvent(eventType, conversationID string) *EventBuilder
- func NewEvent(eventType string) *EventBuilder
- func NewListEvent(eventType, listID, ownerID string) *EventBuilder
- func NewMediaEvent(eventType, mediaID, ownerID string) *EventBuilder
- func NewNotificationEvent(eventType, notificationID, recipientID string) *EventBuilder
- func NewRelationshipEvent(eventType, actorID, targetID string) *EventBuilder
- func NewStatusEvent(eventType, statusID, authorID string) *EventBuilder
- func (eb *EventBuilder) Build() *Event
- func (eb *EventBuilder) ForStream(stream string) *EventBuilder
- func (eb *EventBuilder) WithData(key string, value interface{}) *EventBuilder
- func (eb *EventBuilder) WithPayload(payload map[string]interface{}) *EventBuilder
- func (eb *EventBuilder) WithTimestamp(timestamp time.Time) *EventBuilder
- type EventFilter
- type EventPriority
- type EventType
- type GlobalRateLimit
- type HashtagEventPayload
- type HealthBasedResolver
- type HealthCheckResult
- type HighestPriorityResolver
- type InternalEvent
- func (e *InternalEvent) ToJSON() ([]byte, error)
- func (e *InternalEvent) WithActor(actorID string) *InternalEvent
- func (e *InternalEvent) WithMetadata(key, value string) *InternalEvent
- func (e *InternalEvent) WithPriority(priority EventPriority) *InternalEvent
- func (e *InternalEvent) WithStreams(streams ...string) *InternalEvent
- func (e *InternalEvent) WithTarget(targetID string) *InternalEvent
- func (e *InternalEvent) WithTenant(tenantID string) *InternalEvent
- func (e *InternalEvent) WithUser(userID string) *InternalEvent
- type JobQueue
- type LastWriteWinsResolver
- type ListCommandValidationConfig
- type MediaEventPayload
- type MetricsCollector
- func (mc *MetricsCollector) GetCurrentMetrics() ConnectionMetrics
- func (mc *MetricsCollector) GetMetricsSummary() map[string]interface{}
- func (mc *MetricsCollector) GetPerformanceMetrics() PerformanceMetrics
- func (mc *MetricsCollector) IsCollecting() bool
- func (mc *MetricsCollector) Start(ctx context.Context) error
- func (mc *MetricsCollector) Stop() error
- type MetricsCollectorConfig
- type MetricsEventPayload
- type MockPublishedEvent
- type ModerationEventPayload
- type NotificationEventPayload
- type PerformanceMetrics
- type ProgressUpdateHelper
- type PublishConnectionHelper
- type Publisher
- type PublisherConnectionConfig
- type RateLimiter
- type RetryJobMessage
- type ServiceRegistry
- type ShutdownManager
- func (sm *ShutdownManager) ApplyBackpressure(connectionID string, messageSize int64) (BackpressureAction, error)
- func (sm *ShutdownManager) GetBackpressureStats() map[string]interface{}
- func (sm *ShutdownManager) GetShutdownStats() map[string]interface{}
- func (sm *ShutdownManager) InitiateGracefulShutdown(ctx context.Context) error
- func (sm *ShutdownManager) IsShuttingDown() bool
- func (sm *ShutdownManager) WaitForShutdown() error
- type ShutdownManagerConfig
- type SlidingWindow
- type StateSynchronizer
- func (ss *StateSynchronizer) ForceSync(ctx context.Context) error
- func (ss *StateSynchronizer) GetInstanceInfo() map[string]interface{}
- func (ss *StateSynchronizer) GetSyncStats() SyncStats
- func (ss *StateSynchronizer) IsRunning() bool
- func (ss *StateSynchronizer) Start(ctx context.Context) error
- func (ss *StateSynchronizer) Stop() error
- type StateSynchronizerConfig
- type StatusEventPayload
- type StreamConnection
- type StreamEventLog
- type StreamEventLogItem
- type StreamQueueService
- type SyncStats
- type TrustEventPayload
- type WebSocketRateLimitConfig
- type WebSocketRateLimiter
- func (wrl *WebSocketRateLimiter) CheckCommand(ctx context.Context, connectionID string, command *Command) (bool, time.Duration, error)
- func (wrl *WebSocketRateLimiter) CheckConnection(ctx context.Context, userID, ipAddress string) (bool, string, error)
- func (wrl *WebSocketRateLimiter) GetConnectionStatus(connectionID string) (map[string]interface{}, error)
- func (wrl *WebSocketRateLimiter) OnConnect(connectionID, userID, ipAddress string)
- func (wrl *WebSocketRateLimiter) OnDisconnect(connectionID string)
- func (wrl *WebSocketRateLimiter) ResetConnection(connectionID string) error
Constants ¶
const ( StatusProcessing = "processing" StatusCompleted = "completed" )
Constants for operation statuses
const ( // Status/Note Commands CmdCreateStatus = "create_status" CmdDeleteStatus = "delete_status" CmdFavoriteStatus = "favorite_status" CmdUnfavoriteStatus = "unfavorite_status" CmdReblogStatus = "reblog_status" CmdUnreblogStatus = "unreblog_status" CmdBookmarkStatus = "bookmark_status" CmdUnbookmarkStatus = "unbookmark_status" CmdMuteStatus = "mute_status" CmdUnmuteStatus = "unmute_status" CmdPinStatus = "pin_status" CmdUnpinStatus = "unpin_status" // Account/User Commands CmdFollowUser = "follow_user" CmdUnfollowUser = "unfollow_user" CmdBlockUser = "block_user" CmdUnblockUser = "unblock_user" CmdMuteUser = "mute_user" CmdUnmuteUser = "unmute_user" CmdUpdateProfile = "update_profile" CmdUpdatePreferences = "update_preferences" // Relationship Commands CmdAcceptFollowRequest = "accept_follow_request" CmdRejectFollowRequest = "reject_follow_request" CmdRemoveFollower = "remove_follower" // List Commands CmdCreateList = "create_list" CmdUpdateList = "update_list" CmdDeleteList = "delete_list" CmdAddToList = "add_to_list" CmdRemoveFromList = "remove_from_list" // Media Commands CmdUploadMedia = "upload_media" CmdUpdateMedia = "update_media" CmdDeleteMedia = "delete_media" // Conversation Commands CmdMarkConversationRead = "mark_conversation_read" CmdDeleteConversation = "delete_conversation" // Notification Commands CmdMarkNotificationRead = "mark_notification_read" CmdMarkAllNotificationsRead = "mark_all_notifications_read" CmdDismissNotification = "dismiss_notification" // Scheduled Status Commands CmdCreateScheduledStatus = "create_scheduled_status" CmdUpdateScheduledStatus = "update_scheduled_status" CmdDeleteScheduledStatus = "delete_scheduled_status" // Poll Commands CmdVoteInPoll = "vote_in_poll" // Search Commands CmdSearchAccounts = "search_accounts" CmdSearchStatuses = "search_statuses" // Admin Commands (require admin privileges) CmdAdminSuspendUser = "admin_suspend_user" CmdAdminUnsuspendUser = "admin_unsuspend_user" CmdAdminSilenceUser = "admin_silence_user" CmdAdminUnsilenceUser = "admin_unsilence_user" // System Commands CmdGetServerInfo = "get_server_info" CmdGetTimeline = "get_timeline" CmdGetNotifications = "get_notifications" CmdSubscribeTimeline = "subscribe_timeline" CmdUnsubscribeTimeline = "unsubscribe_timeline" // Bulk Operations Commands CmdBulkFollow = "bulk_follow" CmdBulkUnfollow = "bulk_unfollow" CmdBulkMute = "bulk_mute" CmdBulkUnmute = "bulk_unmute" CmdBulkBlock = "bulk_block" CmdBulkUnblock = "bulk_unblock" CmdBulkDeleteStatuses = "bulk_delete_statuses" CmdBulkListMembers = "bulk_list_members" CmdGetBulkOperation = "get_bulk_operation" // Bulk Content Management Commands CmdBulkDelete = "bulk_delete" CmdBulkArchive = "bulk_archive" CmdBulkRestore = "bulk_restore" CmdBulkExport = "bulk_export" // Import/Export Commands CmdCreateExport = "create_export" CmdGetExport = "get_export" CmdListExports = "list_exports" CmdCancelExport = "cancel_export" CmdCreateImport = "create_import" CmdGetImport = "get_import" CmdListImports = "list_imports" )
Command type constants for WebSocket commands
const ( CategoryStatus = "status" CategoryAccount = "account" CategoryRelationship = "relationship" CategoryList = "list" CategoryMedia = "media" CategoryConversation = "conversation" CategoryNotification = "notification" CategoryScheduled = "scheduled" CategoryPoll = "poll" CategorySearch = "search" CategoryAdmin = "admin" CategorySystem = "system" CategoryBulk = "bulk" CategoryExport = "export" )
Command categories for organization and access control
const ( // Status/Note Events StatusCreated = "status.created" // New status posted StatusUpdated = "status.updated" // Status edited StatusDeleted = "status.deleted" // Status deleted StatusFavorited = "status.favorited" // Status favorited StatusUnfavorited = "status.unfavorited" // Status unfavorited StatusBoosted = "status.boosted" // Status boosted/reblogged StatusUnboosted = "status.unboosted" // Status unreblogged StatusPinned = "status.pinned" // Status pinned to profile StatusUnpinned = "status.unpinned" // Status unpinned from profile // Account Events AccountUpdated = "account.updated" // Profile updated AccountFollowed = "account.followed" // Account followed AccountBlocked = "account.blocked" // Account blocked AccountMuted = "account.muted" // Account muted // Relationship Events RelationshipFollowRequested = "relationship.follow_requested" // Follow request sent RelationshipFollowAccepted = "relationship.follow_accepted" // Follow request accepted RelationshipFollowRejected = "relationship.follow_rejected" // Follow request rejected RelationshipUnfollowed = "relationship.unfollowed" // Account unfollowed RelationshipBlocked = "relationship.blocked" // Account blocked RelationshipUnblocked = "relationship.unblocked" // Account unblocked RelationshipMuted = "relationship.muted" // Account muted RelationshipUnmuted = "relationship.unmuted" // Account unmuted // Notification Events NotificationCreated = "notification.created" // New notification NotificationRead = "notification.read" // Notification marked as read NotificationCleared = "notification.cleared" // Notifications cleared // Conversation Events ConversationCreated = "conversation.created" // New conversation started ConversationUpdated = "conversation.updated" // Conversation updated (new message, read status) ConversationDeleted = "conversation.deleted" // Conversation deleted // List Events ListCreated = "list.created" // List created ListUpdated = "list.updated" // List updated (title, privacy) ListDeleted = "list.deleted" // List deleted ListMemberAdded = "list.member_added" // Account added to list ListMemberRemoved = "list.member_removed" // Account removed from list // Media Events MediaUploaded = "media.uploaded" // Media uploaded and processed MediaUpdated = "media.updated" // Media metadata updated (alt text, focus) MediaDeleted = "media.deleted" // Media deleted MediaProcessed = "media.processed" // Media processing completed // Moderation Events ModerationReportCreated = "moderation.report_created" // New report submitted ModerationReportUpdated = "moderation.report_updated" // Report status updated ModerationActionTaken = "moderation.action_taken" // Moderation action performed // Federation Events FederationInstanceBlocked = "federation.instance_blocked" // Instance blocked FederationInstanceUnblocked = "federation.instance_unblocked" // Instance unblocked FederationActorUpdated = "federation.actor_updated" // Remote actor profile updated // System Events SystemAnnouncement = "system.announcement" // System announcement SystemMaintenance = "system.maintenance" // Maintenance notification // Filter Events FilterCreated = "filter.created" // Content filter created FilterUpdated = "filter.updated" // Content filter updated FilterDeleted = "filter.deleted" // Content filter deleted // Poll Events PollVoted = "poll.voted" // Vote cast in poll PollClosed = "poll.closed" // Poll closed PollExpired = "poll.expired" // Poll expired // Hashtag Events HashtagTrending = "hashtag.trending" // Hashtag is trending HashtagFollowed = "hashtag.followed" // Hashtag followed )
Event type constants following Mastodon streaming API conventions
const ( // Public Streams PublicStream = "public" // All public posts PublicLocalStream = "public:local" // Local public posts only PublicRemoteStream = "public:remote" // Remote public posts only // User Streams UserStream = "user" // User's home timeline and notifications UserNotificationStream = "user:notification" // User's notifications only // Direct Messages DirectStream = "direct" // Direct messages for the user // Hashtag Streams (template - append hashtag name) HashtagStreamPrefix = "hashtag" // Use as "hashtag:name" // List Streams (template - append list ID) ListStreamPrefix = "list" // Use as "list:id" // System Streams SystemStream = "system" // System announcements and maintenance // Moderation Streams ModerationStream = "moderation" // Moderation events for moderators // Admin Streams AdminStream = "admin" // Admin-only events )
Stream name constants following Mastodon streaming API
Variables ¶
var ( // Connection errors ErrConnectionWriteFailed = errors.FailedToSave("connection", stdErrors.New("failed to write to connection")) ErrConnectionDeleteFailed = errors.FailedToDelete("connection", stdErrors.New("failed to delete connection")) ErrAPIGatewayClientNotInit = errors.ServiceUnavailable("API Gateway client") // Message errors ErrConfirmationSendFailed = errors.ProcessingFailed("confirmation send", stdErrors.New("failed to send confirmation")) ErrPongSendFailed = errors.ProcessingFailed("pong send", stdErrors.New("failed to send pong")) ErrErrorMessageSendFailed = errors.ProcessingFailed("error message send", stdErrors.New("failed to send error message")) // Command validation errors ErrCommandIDRequired = errors.ValidationFailedWithField("command id is required") ErrCommandIDMustBeString = errors.ValidationFailedWithField("command id must be a string") ErrCommandTypeRequired = errors.ValidationFailedWithField("command type is required") ErrCommandTypeMustBeString = errors.ValidationFailedWithField("command type must be a string") )
Legacy error variables for backwards compatibility These are now wrappers around the centralized error system
Functions ¶
func ConversationStreamName ¶
ConversationStreamName returns the conversation stream name for a specific conversation
func DirectStreamName ¶
DirectStreamName returns the direct message stream name for a specific user
func GetCommandInfo ¶
func GetCommandInfo() map[string]CommandInfo
GetCommandInfo returns metadata for all supported commands
func GetCommandsByCategory ¶
GetCommandsByCategory returns commands grouped by category
func GetEventCategory ¶
GetEventCategory returns the category of an event type
func GetRequiredAuth ¶
GetRequiredAuth returns whether a command requires authentication
func HashtagStreamName ¶
HashtagStreamName returns the hashtag stream name for a specific hashtag
func IsAdminOnly ¶
IsAdminOnly returns whether a command is admin-only
func IsValidEventType ¶
IsValidEventType checks if an event type is valid
func IsValidStreamName ¶
IsValidStreamName checks if a stream name follows valid patterns
func ListStreamName ¶
ListStreamName returns the list stream name for a specific list
func UserNotificationStreamName ¶
UserNotificationStreamName returns the notification stream name for a specific user
func UserStreamName ¶
UserStreamName returns the user stream name for a specific user
Types ¶
type AIEventPayload ¶
type AIEventPayload struct {
AnalysisID string `json:"analysis_id"`
ContentID string `json:"content_id"`
ContentType string `json:"content_type"` // status, media, profile
AnalysisType string `json:"analysis_type"` // sentiment, toxicity, classification
Results map[string]interface{} `json:"results"`
Confidence float64 `json:"confidence"`
ModelVersion string `json:"model_version,omitempty"`
ProcessedAt time.Time `json:"processed_at"`
}
AIEventPayload represents the payload for AI analysis events
type AccountEventPayload ¶
type AccountEventPayload struct {
AccountID string `json:"account_id"`
Username string `json:"username"`
DisplayName string `json:"display_name,omitempty"`
Avatar string `json:"avatar,omitempty"`
Header string `json:"header,omitempty"`
Bio string `json:"bio,omitempty"`
URL string `json:"url,omitempty"`
FollowersCount int64 `json:"followers_count"`
FollowingCount int64 `json:"following_count"`
StatusesCount int64 `json:"statuses_count"`
LastStatusAt time.Time `json:"last_status_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
}
AccountEventPayload represents the payload for account-related events
type BackpressureAction ¶
type BackpressureAction int
BackpressureAction defines actions to take under backpressure
const ( // BackpressureAllow allows the message to proceed BackpressureAllow BackpressureAction = iota // Allow the message // BackpressureDelay delays the message BackpressureDelay // Delay the message // BackpressureDrop drops the message BackpressureDrop // Drop the message // BackpressureReject rejects the message with error BackpressureReject // Reject the message with error )
type BackpressureConfig ¶
type BackpressureConfig struct {
MaxConcurrentMessages int `json:"max_concurrent_messages"` // Maximum concurrent message processing
MessageQueueSize int `json:"message_queue_size"` // Maximum queued messages per connection
ProcessingTimeout time.Duration `json:"processing_timeout"` // Timeout for message processing
DropStrategy DropStrategy `json:"drop_strategy"` // Strategy when queue is full
EnableAdaptive bool `json:"enable_adaptive"` // Enable adaptive backpressure
}
BackpressureConfig contains configuration for backpressure management
type BaseCommandHandler ¶
type BaseCommandHandler struct {
// contains filtered or unexported fields
}
BaseCommandHandler provides common functionality for command handlers
func NewBaseCommandHandler ¶
func NewBaseCommandHandler(logger *zap.Logger) *BaseCommandHandler
NewBaseCommandHandler creates a new base command handler
func (*BaseCommandHandler) ConvertToJSON ¶
func (bch *BaseCommandHandler) ConvertToJSON(data interface{}) (map[string]interface{}, error)
ConvertToJSON converts data to JSON for response
func (*BaseCommandHandler) CreateBulkOperationResponse ¶
func (bch *BaseCommandHandler) CreateBulkOperationResponse( commandID string, operationID string, status string, total int, message string, ) *CommandResponse
CreateBulkOperationResponse creates a standardized response for bulk operations
func (*BaseCommandHandler) CreateBulkServiceResponse ¶
func (bch *BaseCommandHandler) CreateBulkServiceResponse( commandID string, operation interface{}, _ string, _ string, ) *CommandResponse
CreateBulkServiceResponse creates a response from a bulk service result
func (*BaseCommandHandler) CreateErrorResponse ¶
func (bch *BaseCommandHandler) CreateErrorResponse(commandID string, code string, message string, details string) *CommandResponse
CreateErrorResponse creates an error command response
func (*BaseCommandHandler) CreateSuccessResponse ¶
func (bch *BaseCommandHandler) CreateSuccessResponse(commandID string, data map[string]interface{}) *CommandResponse
CreateSuccessResponse creates a successful command response
func (*BaseCommandHandler) ExecuteStandardCommandFlow ¶
func (bch *BaseCommandHandler) ExecuteStandardCommandFlow( ctx context.Context, conn *ConnectionInfo, cmd *Command, config *CommandHandlerConfig, ) (*CommandResponse, error)
ExecuteStandardCommandFlow handles the common command flow pattern: auth → validate → extract → service call → error handling → convert → return
func (*BaseCommandHandler) GetBool ¶
func (bch *BaseCommandHandler) GetBool(payload map[string]interface{}, key string, defaultValue bool) bool
GetBool safely extracts a bool value from the payload
func (*BaseCommandHandler) GetInt ¶
func (bch *BaseCommandHandler) GetInt(payload map[string]interface{}, key string, defaultValue int) int
GetInt safely extracts an int value from the payload
func (*BaseCommandHandler) GetString ¶
func (bch *BaseCommandHandler) GetString(payload map[string]interface{}, key string, defaultValue string) string
GetString safely extracts a string value from the payload
func (*BaseCommandHandler) GetStringSlice ¶
func (bch *BaseCommandHandler) GetStringSlice(payload map[string]interface{}, key string) []string
GetStringSlice safely extracts a string slice from the payload
func (*BaseCommandHandler) Logger ¶
func (bch *BaseCommandHandler) Logger() *zap.Logger
Logger returns the handler logger instance
func (*BaseCommandHandler) RequireAuth ¶
func (bch *BaseCommandHandler) RequireAuth(conn *ConnectionInfo, commandID string) *CommandResponse
RequireAuth checks if the connection is authenticated
func (*BaseCommandHandler) ValidateBulkAccountCommand ¶
func (bch *BaseCommandHandler) ValidateBulkAccountCommand( conn *ConnectionInfo, cmd *Command, config *BulkAccountValidationConfig, ) ([]string, *CommandResponse)
ValidateBulkAccountCommand performs common validation for bulk account operations
func (*BaseCommandHandler) ValidateListCommand ¶
func (bch *BaseCommandHandler) ValidateListCommand( conn *ConnectionInfo, cmd *Command, config *ListCommandValidationConfig, ) *CommandResponse
ValidateListCommand performs common validation for list operations
func (*BaseCommandHandler) ValidatePayload ¶
func (bch *BaseCommandHandler) ValidatePayload(payload map[string]interface{}, required []string, commandID string) *CommandResponse
ValidatePayload validates that required fields are present in the command payload
type BulkAccountValidationConfig ¶
BulkAccountValidationConfig holds configuration for bulk account validation
func DefaultBulkAccountConfig ¶
func DefaultBulkAccountConfig() *BulkAccountValidationConfig
DefaultBulkAccountConfig returns default configuration for bulk account operations
type BulkProcessingConfig ¶
type BulkProcessingConfig struct {
BatchSize int
ProgressInterval int // Send progress every N operations
BatchDelay time.Duration
}
BulkProcessingConfig holds configuration for bulk processing operations
func DefaultBulkProcessingConfig ¶
func DefaultBulkProcessingConfig() *BulkProcessingConfig
DefaultBulkProcessingConfig returns default configuration for bulk processing
type BulkProcessingTracker ¶
type BulkProcessingTracker struct {
Total int
Processed int
Successful int
Failed int
Errors []string
}
BulkProcessingTracker tracks progress of bulk operations
func NewBulkProcessingTracker ¶
func NewBulkProcessingTracker(total int) *BulkProcessingTracker
NewBulkProcessingTracker creates a new processing tracker
func (*BulkProcessingTracker) AddFailure ¶
func (bpt *BulkProcessingTracker) AddFailure(err error, entityID string)
AddFailure records a failed operation
func (*BulkProcessingTracker) AddSuccess ¶
func (bpt *BulkProcessingTracker) AddSuccess()
AddSuccess records a successful operation
func (*BulkProcessingTracker) GetStatus ¶
func (bpt *BulkProcessingTracker) GetStatus() string
GetStatus returns current processing status
func (*BulkProcessingTracker) ShouldSendProgress ¶
func (bpt *BulkProcessingTracker) ShouldSendProgress(config *BulkProcessingConfig) bool
ShouldSendProgress determines if progress update should be sent
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern for connection recovery
func NewCircuitBreaker ¶
func NewCircuitBreaker(maxFailures int, timeout time.Duration) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker
func (*CircuitBreaker) CanExecute ¶
func (cb *CircuitBreaker) CanExecute() bool
CanExecute returns whether the circuit breaker allows execution
func (*CircuitBreaker) GetState ¶
func (cb *CircuitBreaker) GetState() CircuitBreakerState
GetState returns the current circuit breaker state
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failed operation
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful operation
type CircuitBreakerState ¶
type CircuitBreakerState int
CircuitBreakerState represents the current state of a circuit breaker
const ( // CircuitBreakerClosed indicates the circuit breaker is closed (normal operation) CircuitBreakerClosed CircuitBreakerState = iota // CircuitBreakerOpen indicates the circuit breaker is open (blocking requests) CircuitBreakerOpen // CircuitBreakerHalfOpen indicates the circuit breaker is testing if service is recovered CircuitBreakerHalfOpen )
type Command ¶
type Command struct {
ID string `json:"id"` // Client-provided ID for request/response matching
Type string `json:"type"` // Command type (e.g., "create_status", "follow_user")
Payload map[string]interface{} `json:"payload"` // Command payload data
}
Command represents a WebSocket command message
type CommandError ¶
type CommandError struct {
Code string `json:"code"` // Error code
Message string `json:"message"` // Human-readable error message
Details string `json:"details,omitempty"` // Additional error details
}
CommandError represents an error in command execution
type CommandHandler ¶
type CommandHandler interface {
// HandleCommand processes a WebSocket command and returns a response
HandleCommand(ctx context.Context, conn *ConnectionInfo, cmd *Command) (*CommandResponse, error)
// GetSupportedCommands returns a list of command types this handler supports
GetSupportedCommands() []string
}
CommandHandler defines the interface for handling WebSocket commands
type CommandHandlerConfig ¶
type CommandHandlerConfig struct {
RequiredFields []string // Fields required in payload validation
ParameterName string // Name of the parameter to extract (e.g., "operation_id", "id")
ErrorCodePrefix string // Prefix for error codes (e.g., "GET_OPERATION", "MARK_READ")
OperationName string // Name for error messages (e.g., "get bulk operation", "mark notification as read")
ResultExtractor func(result interface{}) interface{} // Function to extract data from service result
ServiceCall func(ctx context.Context, conn *ConnectionInfo, extractedParam string) (interface{}, error) // Function to call the service
}
CommandHandlerConfig holds configuration for generic command handling
type CommandInfo ¶
type CommandInfo struct {
Type string `json:"type"`
Category string `json:"category"`
Description string `json:"description"`
RequiresAuth bool `json:"requires_auth"`
AdminOnly bool `json:"admin_only"`
RequiredFields []string `json:"required_fields,omitempty"`
OptionalFields []string `json:"optional_fields,omitempty"`
}
CommandInfo contains metadata about a command
type CommandRateLimit ¶
type CommandRateLimit struct {
MaxPerMinute int
MaxPerHour int
BurstLimit int
RequiresAuth bool
CostMultiplier float64 // Some commands cost more (e.g., searches)
}
CommandRateLimit defines rate limits for specific WebSocket commands
type CommandResponse ¶
type CommandResponse struct {
ID string `json:"id"` // Matching client ID
Type string `json:"type"` // Response type (e.g., "command_result", "command_error")
Success bool `json:"success"` // Whether command succeeded
Data map[string]interface{} `json:"data"` // Response data
Error *CommandError `json:"error,omitempty"` // Error details if failed
}
CommandResponse represents a WebSocket command response
type CommandRouter ¶
type CommandRouter struct {
// contains filtered or unexported fields
}
CommandRouter routes WebSocket commands to appropriate handlers
func NewCommandRouter ¶
func NewCommandRouter(logger *zap.Logger) *CommandRouter
NewCommandRouter creates a new WebSocket command router
func (*CommandRouter) GetSupportedCommands ¶
func (cr *CommandRouter) GetSupportedCommands() []string
GetSupportedCommands returns a list of all supported command types
func (*CommandRouter) HandleCommand ¶
func (cr *CommandRouter) HandleCommand(ctx context.Context, conn *ConnectionInfo, cmd *Command) (*CommandResponse, error)
HandleCommand routes a command to the appropriate handler
func (*CommandRouter) RegisterHandler ¶
func (cr *CommandRouter) RegisterHandler(handler CommandHandler)
RegisterHandler registers a command handler for specific command types
type ConflictResolver ¶
type ConflictResolver interface {
ResolveConflict(ctx context.Context, local, remote *models.WebSocketConnection) (*models.WebSocketConnection, error)
}
ConflictResolver defines strategies for resolving state conflicts
type ConnectionInfo ¶
type ConnectionInfo struct {
ConnectionID string `json:"connection_id"`
UserID string `json:"user_id"`
Username string `json:"username"`
Streams []string `json:"streams"`
IsAuthenticated bool `json:"is_authenticated"`
Metadata map[string]interface{} `json:"metadata"` // For storing connection-specific data
}
ConnectionInfo contains information about the WebSocket connection
type ConnectionManager ¶
type ConnectionManager struct {
// contains filtered or unexported fields
}
ConnectionManager manages WebSocket connection lifecycle, health checks, and resource management
func NewConnectionManager ¶
func NewConnectionManager( connRepo interfaces.StreamingConnectionRepository, apiClient streamer.Client, logger *zap.Logger, config *ConnectionManagerConfig, ) *ConnectionManager
NewConnectionManager creates a new connection manager
func (*ConnectionManager) ForceCleanup ¶
func (cm *ConnectionManager) ForceCleanup(ctx context.Context) error
ForceCleanup triggers an immediate cleanup
func (*ConnectionManager) ForceHealthCheck ¶
func (cm *ConnectionManager) ForceHealthCheck(ctx context.Context) error
ForceHealthCheck triggers an immediate health check
func (*ConnectionManager) GetConnectionStats ¶
func (cm *ConnectionManager) GetConnectionStats(ctx context.Context) (map[string]interface{}, error)
GetConnectionStats returns current connection statistics
func (*ConnectionManager) IsRunning ¶
func (cm *ConnectionManager) IsRunning() bool
IsRunning returns whether the connection manager is currently running
func (*ConnectionManager) Start ¶
func (cm *ConnectionManager) Start(ctx context.Context) error
Start begins the connection management background tasks
func (*ConnectionManager) Stop ¶
func (cm *ConnectionManager) Stop() error
Stop stops the connection management background tasks
type ConnectionManagerConfig ¶
type ConnectionManagerConfig struct {
HealthCheckInterval time.Duration // How often to run health checks
CleanupInterval time.Duration // How often to run cleanup tasks
PingTimeout time.Duration // How long to wait for pong response
IdleThreshold time.Duration // When to mark connections as idle
MaxIdleConnections int // Maximum number of idle connections to keep
}
ConnectionManagerConfig contains configuration for the connection manager
func DefaultConnectionManagerConfig ¶
func DefaultConnectionManagerConfig() *ConnectionManagerConfig
DefaultConnectionManagerConfig returns default configuration
type ConnectionMetrics ¶
type ConnectionMetrics struct {
Timestamp time.Time `json:"timestamp"`
TotalConnections int64 `json:"total_connections"`
ActiveConnections int64 `json:"active_connections"`
ConnectionsByState map[string]int64 `json:"connections_by_state"`
TotalMessages int64 `json:"total_messages"`
TotalBytes int64 `json:"total_bytes"`
AverageLatency time.Duration `json:"average_latency"`
ErrorsByType map[string]int64 `json:"errors_by_type"`
AverageQualityScore float64 `json:"average_quality_score"`
MessageRate float64 `json:"message_rate"` // messages per second
ByteRate float64 `json:"byte_rate"` // bytes per second
ConnectionUtilization float64 `json:"connection_utilization"` // percentage of max connections used
}
ConnectionMetrics represents metrics for a specific time period
type ConnectionRateLimit ¶
type ConnectionRateLimit struct {
ConnectionID string
UserID string
IPAddress string
// contains filtered or unexported fields
}
ConnectionRateLimit tracks rate limiting for a single connection
type ConnectionRepository ¶
type ConnectionRepository interface {
// GetUserConnections returns all active connections for a user
GetUserConnections(ctx context.Context, userID string) ([]*StreamConnection, error)
// GetStreamConnections returns all connections subscribed to a stream
GetStreamConnections(ctx context.Context, streamName string) ([]*StreamConnection, error)
// GetConversationConnections returns all connections for participants in a conversation
GetConversationConnections(ctx context.Context, conversationID string) ([]*StreamConnection, error)
}
ConnectionRepository defines methods for accessing WebSocket connection data
type CostEventPayload ¶
type CostEventPayload struct {
Operation string `json:"operation"`
Service string `json:"service"` // dynamodb, lambda, s3, etc.
CostUSD float64 `json:"cost_usd"`
Units map[string]interface{} `json:"units,omitempty"` // RCU, WCU, requests, etc.
UserID string `json:"user_id,omitempty"`
TenantID string `json:"tenant_id,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
CostEventPayload represents the payload for cost tracking events
type DropStrategy ¶
type DropStrategy int
DropStrategy defines how to handle messages when backpressure is applied
const ( // DropOldest drops oldest messages when queue is full DropOldest DropStrategy = iota // Drop oldest messages when queue is full // DropNewest drops newest messages when queue is full DropNewest // Drop newest messages when queue is full // DropRandom drops random messages when queue is full DropRandom // Drop random messages when queue is full // RejectNew rejects new messages when queue is full RejectNew // Reject new messages when queue is full )
type ErrorRecoveryConfig ¶
type ErrorRecoveryConfig struct {
MaxRetries int // Maximum number of retry attempts
BaseRetryDelay time.Duration // Base delay between retries
MaxRetryDelay time.Duration // Maximum delay between retries
JitterFactor float64 // Jitter factor for randomizing delays (0.0-1.0)
EnableBackoff bool // Enable exponential backoff
}
ErrorRecoveryConfig contains configuration for error recovery
func DefaultErrorRecoveryConfig ¶
func DefaultErrorRecoveryConfig() *ErrorRecoveryConfig
DefaultErrorRecoveryConfig returns default error recovery configuration
type ErrorRecoveryManager ¶
type ErrorRecoveryManager struct {
// contains filtered or unexported fields
}
ErrorRecoveryManager handles connection error recovery and reconnection strategies
func NewErrorRecoveryManager ¶
func NewErrorRecoveryManager( connRepo interfaces.StreamingConnectionRepository, apiClient streamer.Client, jobQueue JobQueue, logger *zap.Logger, config *ErrorRecoveryConfig, ) *ErrorRecoveryManager
NewErrorRecoveryManager creates a new error recovery manager
func (*ErrorRecoveryManager) GetRecoveryStats ¶
func (erm *ErrorRecoveryManager) GetRecoveryStats() map[string]interface{}
GetRecoveryStats returns current error recovery statistics
func (*ErrorRecoveryManager) HandleConnectionError ¶
func (erm *ErrorRecoveryManager) HandleConnectionError(ctx context.Context, connectionID string, err error) error
HandleConnectionError processes a connection error and determines recovery strategy
func (*ErrorRecoveryManager) PerformHealthCheck ¶
func (erm *ErrorRecoveryManager) PerformHealthCheck(ctx context.Context, connectionID string) (*HealthCheckResult, error)
PerformHealthCheck performs a comprehensive health check on a connection
func (*ErrorRecoveryManager) ProcessRetryJob ¶
func (erm *ErrorRecoveryManager) ProcessRetryJob(ctx context.Context, msg RetryJobMessage) error
ProcessRetryJob processes a retry job from the queue
func (*ErrorRecoveryManager) ResynchronizeConnection ¶
func (erm *ErrorRecoveryManager) ResynchronizeConnection(ctx context.Context, conn *models.WebSocketConnection) error
ResynchronizeConnection attempts to resynchronize a recovered connection
type Event ¶
type Event struct {
Type string `json:"type"` // Event type (e.g., "status.created")
Stream string `json:"stream"` // Stream name (e.g., "user:alice")
Payload map[string]interface{} `json:"payload"` // Event payload data
Timestamp time.Time `json:"timestamp"` // Event timestamp
}
Event represents a streaming event to be published
func NewAccountUpdatedEvent ¶
NewAccountUpdatedEvent creates an account.updated event
func NewBlockEvent ¶
NewBlockEvent creates a relationship.blocked event
func NewConversationUpdatedEvent ¶
func NewConversationUpdatedEvent(conversationID string, conversationData map[string]interface{}) *Event
NewConversationUpdatedEvent creates a conversation.updated event
func NewFollowEvent ¶
NewFollowEvent creates a relationship.followed event
func NewListUpdatedEvent ¶
NewListUpdatedEvent creates a list.updated event
func NewMediaUploadedEvent ¶
NewMediaUploadedEvent creates a media.uploaded event
func NewMuteEvent ¶
NewMuteEvent creates a relationship.muted event
func NewNotificationCreatedEvent ¶
func NewNotificationCreatedEvent(notificationID, recipientID, notificationType string, notificationData map[string]interface{}) *Event
NewNotificationCreatedEvent creates a notification.created event
func NewStatusCreatedEvent ¶
NewStatusCreatedEvent creates a status.created event
func NewStatusDeletedEvent ¶
NewStatusDeletedEvent creates a status.deleted event
func NewStatusUpdatedEvent ¶
NewStatusUpdatedEvent creates a status.updated event
func NewUnfollowEvent ¶
NewUnfollowEvent creates a relationship.unfollowed event
type EventAction ¶
type EventAction string
EventAction represents the action that triggered the event
const ( // ActionCreate represents content creation actions ActionCreate EventAction = "create" // ActionUpdate represents content update actions ActionUpdate EventAction = "update" // ActionDelete represents content deletion actions ActionDelete EventAction = "delete" // ActionRead represents content read actions ActionRead EventAction = "read" // ActionFollow represents follow actions ActionFollow EventAction = "follow" // ActionUnfollow represents unfollow actions ActionUnfollow EventAction = "unfollow" // ActionFavourite represents favorite actions ActionFavourite EventAction = "favourite" // ActionUnfavourite represents unfavorite actions ActionUnfavourite EventAction = "unfavourite" // ActionReblog represents reblog actions ActionReblog EventAction = "reblog" // ActionUnreblog represents unreblog actions ActionUnreblog EventAction = "unreblog" // ActionFlag represents content flagging actions ActionFlag EventAction = "flag" // ActionReview represents content review actions ActionReview EventAction = "review" // ActionApprove represents content approval actions ActionApprove EventAction = "approve" // ActionReject represents content rejection actions ActionReject EventAction = "reject" )
type EventBuilder ¶
type EventBuilder struct {
// contains filtered or unexported fields
}
EventBuilder provides a fluent interface for building streaming events
func NewAccountEvent ¶
func NewAccountEvent(eventType, accountID string) *EventBuilder
NewAccountEvent creates an account-related event
func NewConversationEvent ¶
func NewConversationEvent(eventType, conversationID string) *EventBuilder
NewConversationEvent creates a conversation-related event
func NewEvent ¶
func NewEvent(eventType string) *EventBuilder
NewEvent creates a new EventBuilder with the specified type
func NewListEvent ¶
func NewListEvent(eventType, listID, ownerID string) *EventBuilder
NewListEvent creates a list-related event
func NewMediaEvent ¶
func NewMediaEvent(eventType, mediaID, ownerID string) *EventBuilder
NewMediaEvent creates a media-related event
func NewNotificationEvent ¶
func NewNotificationEvent(eventType, notificationID, recipientID string) *EventBuilder
NewNotificationEvent creates a notification-related event
func NewRelationshipEvent ¶
func NewRelationshipEvent(eventType, actorID, targetID string) *EventBuilder
NewRelationshipEvent creates a relationship-related event
func NewStatusEvent ¶
func NewStatusEvent(eventType, statusID, authorID string) *EventBuilder
NewStatusEvent creates a status-related event
func (*EventBuilder) Build ¶
func (eb *EventBuilder) Build() *Event
Build returns the constructed Event
func (*EventBuilder) ForStream ¶
func (eb *EventBuilder) ForStream(stream string) *EventBuilder
ForStream sets the stream name for the event
func (*EventBuilder) WithData ¶
func (eb *EventBuilder) WithData(key string, value interface{}) *EventBuilder
WithData adds a key-value pair to the event payload
func (*EventBuilder) WithPayload ¶
func (eb *EventBuilder) WithPayload(payload map[string]interface{}) *EventBuilder
WithPayload sets the entire payload for the event
func (*EventBuilder) WithTimestamp ¶
func (eb *EventBuilder) WithTimestamp(timestamp time.Time) *EventBuilder
WithTimestamp sets a custom timestamp for the event
type EventFilter ¶
type EventFilter struct {
Types []EventType `json:"types,omitempty"` // Filter by event types
Actions []EventAction `json:"actions,omitempty"` // Filter by actions
ActorID string `json:"actor_id,omitempty"` // Filter by actor
UserID string `json:"user_id,omitempty"` // Filter by user
TenantID string `json:"tenant_id,omitempty"` // Filter by tenant
Streams []string `json:"streams,omitempty"` // Filter by streams
Metadata map[string]string `json:"metadata,omitempty"` // Filter by metadata
MinPriority EventPriority `json:"min_priority,omitempty"` // Minimum priority
}
EventFilter represents criteria for filtering events
func (*EventFilter) Matches ¶
func (f *EventFilter) Matches(event *InternalEvent) bool
Matches checks if an event matches the filter criteria
type EventPriority ¶
type EventPriority int
EventPriority represents the priority of an event
const ( // PriorityLow represents low priority events PriorityLow EventPriority = 1 // PriorityNormal represents normal priority events PriorityNormal EventPriority = 2 // PriorityHigh represents high priority events PriorityHigh EventPriority = 3 // PriorityUrgent represents urgent priority events PriorityUrgent EventPriority = 4 )
type EventType ¶
type EventType string
EventType represents the type of internal event
const ( // EventTypeStatus represents basic status events EventTypeStatus EventType = "status" // EventTypeStatusUpdate represents status update events EventTypeStatusUpdate EventType = "status.update" // EventTypeStatusDelete represents status deletion events EventTypeStatusDelete EventType = "status.delete" // EventTypeStatusFavourite represents status favoriting events EventTypeStatusFavourite EventType = "status.favourite" // EventTypeStatusReblog represents status reblogging events EventTypeStatusReblog EventType = "status.reblog" // EventTypeTimelineUpdate represents timeline update events EventTypeTimelineUpdate EventType = "timeline.update" // EventTypeTimelineRefresh represents timeline refresh events EventTypeTimelineRefresh EventType = "timeline.refresh" // EventTypeAccountUpdate represents account update events EventTypeAccountUpdate EventType = "account.update" // EventTypeAccountFollow represents account follow events EventTypeAccountFollow EventType = "account.follow" // EventTypeAccountUnfollow represents account unfollow events EventTypeAccountUnfollow EventType = "account.unfollow" // EventTypeNotification represents basic notification events EventTypeNotification EventType = "notification" // EventTypeNotificationRead represents notification read events EventTypeNotificationRead EventType = "notification.read" // EventTypeModeration represents basic moderation events EventTypeModeration EventType = "moderation" // EventTypeModerationFlag represents content flagging events EventTypeModerationFlag EventType = "moderation.flag" // EventTypeModerationReview represents moderation review events EventTypeModerationReview EventType = "moderation.review" // EventTypeTrustUpdate represents trust score update events EventTypeTrustUpdate EventType = "trust.update" // EventTypeReputationUpdate represents reputation score update events EventTypeReputationUpdate EventType = "reputation.update" // EventTypeVouchUpdate represents vouch update events EventTypeVouchUpdate EventType = "vouch.update" // EventTypeAIAnalysis represents AI content analysis events EventTypeAIAnalysis EventType = "ai.analysis" // EventTypeAIClassification represents AI classification events EventTypeAIClassification EventType = "ai.classification" // EventTypeAIModeration represents AI moderation events EventTypeAIModeration EventType = "ai.moderation" // EventTypeHashtagTrend represents hashtag trending events EventTypeHashtagTrend EventType = "hashtag.trend" // EventTypeHashtagUpdate represents hashtag update events EventTypeHashtagUpdate EventType = "hashtag.update" // EventTypeMediaUpdate represents media update events EventTypeMediaUpdate EventType = "media.update" // EventTypeMediaProcess represents media processing events EventTypeMediaProcess EventType = "media.process" // EventTypeCostUpdate represents cost update events EventTypeCostUpdate EventType = "cost.update" // EventTypeCostAlert represents cost alert events EventTypeCostAlert EventType = "cost.alert" // EventTypeSystemAlert represents system alert events EventTypeSystemAlert EventType = "system.alert" // EventTypeHealthCheck represents health check events EventTypeHealthCheck EventType = "health.check" // EventTypeMetricsUpdate represents real-time metrics update events for GraphQL subscriptions EventTypeMetricsUpdate EventType = "metrics.update" // EventTypeFederationHealthUpdate represents federation health update events EventTypeFederationHealthUpdate EventType = "federation.health.update" // EventTypeFederationFailure represents federation failure events EventTypeFederationFailure EventType = "federation.failure" // EventTypeFederationRecovery represents federation recovery events EventTypeFederationRecovery EventType = "federation.recovery" )
type GlobalRateLimit ¶
type GlobalRateLimit struct {
Identifier string // user:id or ip:address
// contains filtered or unexported fields
}
GlobalRateLimit tracks rate limiting across all connections for a user/IP
type HashtagEventPayload ¶
type HashtagEventPayload struct {
Hashtag string `json:"hashtag"`
Count int64 `json:"count"`
PreviousCount int64 `json:"previous_count,omitempty"`
TrendScore float64 `json:"trend_score,omitempty"`
Period string `json:"period"` // hour, day, week
UpdatedAt time.Time `json:"updated_at"`
}
HashtagEventPayload represents the payload for hashtag/trend events
type HealthBasedResolver ¶
type HealthBasedResolver struct{}
HealthBasedResolver resolves conflicts based on connection health
func (*HealthBasedResolver) ResolveConflict ¶
func (r *HealthBasedResolver) ResolveConflict(_ context.Context, local, remote *models.WebSocketConnection) (*models.WebSocketConnection, error)
ResolveConflict implements ConflictResolver by choosing the healthier connection
type HealthCheckResult ¶
type HealthCheckResult struct {
ConnectionID string `json:"connection_id"`
Timestamp time.Time `json:"timestamp"`
IsHealthy bool `json:"is_healthy"`
QualityScore float64 `json:"quality_score"`
LatencyMs int64 `json:"latency_ms"`
PacketLoss float64 `json:"packet_loss"`
RecommendedAction string `json:"recommended_action"`
}
HealthCheckResult contains the results of a connection health check
type HighestPriorityResolver ¶
type HighestPriorityResolver struct{}
HighestPriorityResolver resolves conflicts based on state priority
func (*HighestPriorityResolver) ResolveConflict ¶
func (r *HighestPriorityResolver) ResolveConflict(_ context.Context, local, remote *models.WebSocketConnection) (*models.WebSocketConnection, error)
ResolveConflict implements ConflictResolver by choosing the connection with higher priority state
type InternalEvent ¶
type InternalEvent struct {
// Event identification
ID string `json:"id"`
Type EventType `json:"type"`
Action EventAction `json:"action"`
// Event context
ActorID string `json:"actor_id,omitempty"` // Who triggered the event
TargetID string `json:"target_id,omitempty"` // What was affected
UserID string `json:"user_id,omitempty"` // User context for the event
TenantID string `json:"tenant_id,omitempty"` // Multi-tenant support
// Timing
Timestamp time.Time `json:"timestamp"`
// Event payload - the actual data
Data interface{} `json:"data"`
// Metadata for filtering and routing
Metadata map[string]string `json:"metadata,omitempty"`
// Stream context
Streams []string `json:"streams,omitempty"` // Which streams this event should go to
// Priority for event processing
Priority EventPriority `json:"priority"`
}
InternalEvent represents an event in the internal event bus
func CreateEvent ¶
func CreateEvent(eventType EventType, action EventAction, data interface{}) *InternalEvent
CreateEvent creates a new internal event with common defaults
func FromJSON ¶
func FromJSON(data []byte) (*InternalEvent, error)
FromJSON deserializes an event from JSON
func (*InternalEvent) ToJSON ¶
func (e *InternalEvent) ToJSON() ([]byte, error)
ToJSON serializes the event to JSON
func (*InternalEvent) WithActor ¶
func (e *InternalEvent) WithActor(actorID string) *InternalEvent
WithActor sets the actor ID for the event
func (*InternalEvent) WithMetadata ¶
func (e *InternalEvent) WithMetadata(key, value string) *InternalEvent
WithMetadata adds metadata to the event
func (*InternalEvent) WithPriority ¶
func (e *InternalEvent) WithPriority(priority EventPriority) *InternalEvent
WithPriority sets the priority for the event
func (*InternalEvent) WithStreams ¶
func (e *InternalEvent) WithStreams(streams ...string) *InternalEvent
WithStreams sets the streams for the event
func (*InternalEvent) WithTarget ¶
func (e *InternalEvent) WithTarget(targetID string) *InternalEvent
WithTarget sets the target ID for the event
func (*InternalEvent) WithTenant ¶
func (e *InternalEvent) WithTenant(tenantID string) *InternalEvent
WithTenant sets the tenant ID for the event
func (*InternalEvent) WithUser ¶
func (e *InternalEvent) WithUser(userID string) *InternalEvent
WithUser sets the user ID for the event
type JobQueue ¶
type JobQueue interface {
QueueDelayedJob(ctx context.Context, queueName string, messageBody interface{}, delaySeconds int32) error
}
JobQueue defines the interface for job queue operations to avoid import cycles
type LastWriteWinsResolver ¶
type LastWriteWinsResolver struct{}
LastWriteWinsResolver resolves conflicts by choosing the connection with the most recent update
func (*LastWriteWinsResolver) ResolveConflict ¶
func (r *LastWriteWinsResolver) ResolveConflict(_ context.Context, local, remote *models.WebSocketConnection) (*models.WebSocketConnection, error)
ResolveConflict implements ConflictResolver by choosing the connection with the most recent update
type ListCommandValidationConfig ¶
type ListCommandValidationConfig struct {
RequiredFields []string
RequireID bool
RequireTitle bool
}
ListCommandValidationConfig holds configuration for list command validation
type MediaEventPayload ¶
type MediaEventPayload struct {
MediaID string `json:"media_id"`
URL string `json:"url"`
MediaType string `json:"media_type"` // image, video, audio
Status string `json:"status"` // processing, ready, error
Size int64 `json:"size,omitempty"`
Duration int64 `json:"duration,omitempty"` // for video/audio
Metadata map[string]interface{} `json:"metadata,omitempty"`
ProcessedAt time.Time `json:"processed_at,omitempty"`
}
MediaEventPayload represents the payload for media stream events
type MetricsCollector ¶
type MetricsCollector struct {
// contains filtered or unexported fields
}
MetricsCollector collects and aggregates streaming connection metrics
func NewMetricsCollector ¶
func NewMetricsCollector( connRepo interfaces.StreamingConnectionRepository, logger *zap.Logger, config *MetricsCollectorConfig, ) *MetricsCollector
NewMetricsCollector creates a new metrics collector
func (*MetricsCollector) GetCurrentMetrics ¶
func (mc *MetricsCollector) GetCurrentMetrics() ConnectionMetrics
GetCurrentMetrics returns current connection metrics
func (*MetricsCollector) GetMetricsSummary ¶
func (mc *MetricsCollector) GetMetricsSummary() map[string]interface{}
GetMetricsSummary returns a summary of key metrics
func (*MetricsCollector) GetPerformanceMetrics ¶
func (mc *MetricsCollector) GetPerformanceMetrics() PerformanceMetrics
GetPerformanceMetrics returns detailed performance metrics
func (*MetricsCollector) IsCollecting ¶
func (mc *MetricsCollector) IsCollecting() bool
IsCollecting returns whether metrics collection is active
func (*MetricsCollector) Start ¶
func (mc *MetricsCollector) Start(ctx context.Context) error
Start begins metrics collection
func (*MetricsCollector) Stop ¶
func (mc *MetricsCollector) Stop() error
Stop stops metrics collection
type MetricsCollectorConfig ¶
type MetricsCollectorConfig struct {
CollectionInterval time.Duration // How often to collect metrics
RetentionPeriod time.Duration // How long to retain metrics
}
MetricsCollectorConfig contains configuration for metrics collection
func DefaultMetricsCollectorConfig ¶
func DefaultMetricsCollectorConfig() *MetricsCollectorConfig
DefaultMetricsCollectorConfig returns default configuration
type MetricsEventPayload ¶
type MetricsEventPayload struct {
MetricID string `json:"metric_id"`
ServiceName string `json:"service_name"`
MetricType string `json:"metric_type"`
SubscriptionCategory string `json:"subscription_category"` // moderation, security, performance, etc.
AggregationLevel string `json:"aggregation_level"` // raw, 5min, hourly, daily
Timestamp time.Time `json:"timestamp"`
Count int64 `json:"count,omitempty"`
Sum float64 `json:"sum,omitempty"`
Min float64 `json:"min,omitempty"`
Max float64 `json:"max,omitempty"`
Average float64 `json:"average,omitempty"`
P50 float64 `json:"p50,omitempty"`
P95 float64 `json:"p95,omitempty"`
P99 float64 `json:"p99,omitempty"`
Unit string `json:"unit,omitempty"`
UserCostMicrocents int64 `json:"user_cost_microcents,omitempty"`
TotalCostMicrocents int64 `json:"total_cost_microcents,omitempty"`
Dimensions map[string]string `json:"dimensions,omitempty"`
UserID string `json:"user_id,omitempty"`
TenantID string `json:"tenant_id,omitempty"`
InstanceDomain string `json:"instance_domain,omitempty"`
}
MetricsEventPayload represents the payload for metrics update events sent to GraphQL subscriptions
type MockPublishedEvent ¶
type MockPublishedEvent struct {
Method string `json:"method"` // "PublishToUser", "PublishToStream", "PublishToConversation"
TargetID string `json:"target_id"` // userID, streamName, or conversationID
Event *Event `json:"event"`
PublishedAt time.Time `json:"published_at"`
}
MockPublishedEvent represents an event that was published via the mock publisher
type ModerationEventPayload ¶
type ModerationEventPayload struct {
ItemID string `json:"item_id"`
ItemType string `json:"item_type"` // status, account, media
Action string `json:"action"` // flag, review, approve, reject
ModeratorID string `json:"moderator_id,omitempty"`
Reason string `json:"reason,omitempty"`
Details map[string]interface{} `json:"details,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
ModerationEventPayload represents the payload for moderation events
type NotificationEventPayload ¶
type NotificationEventPayload struct {
NotificationID string `json:"notification_id"`
Type string `json:"type"` // follow, mention, favourite, reblog, etc.
RecipientID string `json:"recipient_id"`
ActorID string `json:"actor_id"`
StatusID string `json:"status_id,omitempty"`
Read bool `json:"read"`
CreatedAt time.Time `json:"created_at"`
}
NotificationEventPayload represents the payload for notification events
type PerformanceMetrics ¶
type PerformanceMetrics struct {
AverageConnectionDuration time.Duration `json:"average_connection_duration"`
MedianConnectionDuration time.Duration `json:"median_connection_duration"`
P95ConnectionDuration time.Duration `json:"p95_connection_duration"`
P99ConnectionDuration time.Duration `json:"p99_connection_duration"`
AverageMessageRate float64 `json:"average_message_rate"`
PeakMessageRate float64 `json:"peak_message_rate"`
ConnectionSuccessRate float64 `json:"connection_success_rate"`
MessageDeliveryRate float64 `json:"message_delivery_rate"`
HealthyConnections int64 `json:"healthy_connections"`
UnhealthyConnections int64 `json:"unhealthy_connections"`
AverageQualityScore float64 `json:"average_quality_score"`
}
PerformanceMetrics represents performance-specific metrics
type ProgressUpdateHelper ¶
type ProgressUpdateHelper struct {
// contains filtered or unexported fields
}
ProgressUpdateHelper handles WebSocket progress updates for async operations
func NewProgressUpdateHelper ¶
func NewProgressUpdateHelper(publisher Publisher, logger *zap.Logger) *ProgressUpdateHelper
NewProgressUpdateHelper creates a new progress update helper
func (*ProgressUpdateHelper) SendFinalUpdate ¶
func (puh *ProgressUpdateHelper) SendFinalUpdate( conn *ConnectionInfo, operationID string, tracker *BulkProcessingTracker, finalMessage string, )
SendFinalUpdate sends the final completion update with error details
func (*ProgressUpdateHelper) SendProgressUpdate ¶
func (puh *ProgressUpdateHelper) SendProgressUpdate( conn *ConnectionInfo, operationID string, tracker *BulkProcessingTracker, message string, )
SendProgressUpdate sends a progress update via WebSocket
type PublishConnectionHelper ¶
type PublishConnectionHelper struct {
// contains filtered or unexported fields
}
PublishConnectionHelper handles common connection publishing logic
func NewPublishConnectionHelper ¶
func NewPublishConnectionHelper(logger *zap.Logger) *PublishConnectionHelper
NewPublishConnectionHelper creates a new publish connection helper
func (*PublishConnectionHelper) PublishToConnections ¶
func (pch *PublishConnectionHelper) PublishToConnections( ctx context.Context, connections []*StreamConnection, event *Event, publishFunc func(ctx context.Context, connectionID string, event *Event) error, logContext map[string]interface{}, ) error
PublishToConnections publishes an event to multiple connections with common error handling
type Publisher ¶
type Publisher interface {
// PublishToUser publishes an event to a specific user's streams
PublishToUser(ctx context.Context, userID string, event *Event) error
// PublishToStream publishes an event to all subscribers of a stream
PublishToStream(ctx context.Context, streamName string, event *Event) error
// PublishToConversation publishes an event to all participants in a conversation
PublishToConversation(ctx context.Context, conversationID string, event *Event) error
// Close closes the publisher and cleans up resources
Close() error
}
Publisher defines the interface for publishing streaming events
func NewAPIGatewayPublisher ¶
func NewAPIGatewayPublisher( client streamer.Client, connRepo ConnectionRepository, endpoint string, logger *zap.Logger, ) Publisher
NewAPIGatewayPublisher creates a new publisher using API Gateway Management API
func NewMockPublisher ¶
func NewMockPublisher() Publisher
NewMockPublisher creates a new mock publisher for testing
func NewQueuePublisher ¶
func NewQueuePublisher(queue StreamQueueService, logger *zap.Logger) Publisher
NewQueuePublisher creates a publisher that enqueues streaming events to the DynamoDB-backed stream queue.
type PublisherConnectionConfig ¶
type PublisherConnectionConfig struct {
EventType string
StreamTemplate string // Template for stream name, e.g., "user:%s"
Timeout time.Duration
}
PublisherConnectionConfig holds configuration for publisher connection operations
func DefaultPublisherConnectionConfig ¶
func DefaultPublisherConnectionConfig() *PublisherConnectionConfig
DefaultPublisherConnectionConfig returns default configuration for publisher connections
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements a token bucket rate limiter for backpressure control
func (*RateLimiter) AllowRequest ¶
func (rl *RateLimiter) AllowRequest() bool
AllowRequest checks if a request should be allowed based on rate limiting
type RetryJobMessage ¶
type RetryJobMessage struct {
ConnectionID string `json:"connection_id"`
RetryCount int `json:"retry_count"`
Timestamp int64 `json:"timestamp"`
OriginalError string `json:"original_error,omitempty"`
}
RetryJobMessage represents a streaming connection retry job
type ServiceRegistry ¶
type ServiceRegistry interface {
}
ServiceRegistry defines the interface needed by command handlers
type ShutdownManager ¶
type ShutdownManager struct {
// contains filtered or unexported fields
}
ShutdownManager manages graceful shutdown of WebSocket connections and backpressure control
func NewShutdownManager ¶
func NewShutdownManager( connRepo interfaces.StreamingConnectionRepository, apiClient streamer.Client, logger *zap.Logger, config *ShutdownManagerConfig, ) *ShutdownManager
NewShutdownManager creates a new shutdown manager
func (*ShutdownManager) ApplyBackpressure ¶
func (sm *ShutdownManager) ApplyBackpressure(connectionID string, messageSize int64) (BackpressureAction, error)
ApplyBackpressure checks if backpressure should be applied and returns appropriate action
func (*ShutdownManager) GetBackpressureStats ¶
func (sm *ShutdownManager) GetBackpressureStats() map[string]interface{}
GetBackpressureStats returns current backpressure statistics
func (*ShutdownManager) GetShutdownStats ¶
func (sm *ShutdownManager) GetShutdownStats() map[string]interface{}
GetShutdownStats returns current shutdown statistics
func (*ShutdownManager) InitiateGracefulShutdown ¶
func (sm *ShutdownManager) InitiateGracefulShutdown(ctx context.Context) error
InitiateGracefulShutdown begins the graceful shutdown process
func (*ShutdownManager) IsShuttingDown ¶
func (sm *ShutdownManager) IsShuttingDown() bool
IsShuttingDown returns whether a graceful shutdown is in progress
func (*ShutdownManager) WaitForShutdown ¶
func (sm *ShutdownManager) WaitForShutdown() error
WaitForShutdown waits for the graceful shutdown process to complete
type ShutdownManagerConfig ¶
type ShutdownManagerConfig struct {
ShutdownTimeout time.Duration // Total time to wait for graceful shutdown
DrainTimeout time.Duration // Time to wait for connection draining
Backpressure *BackpressureConfig // Backpressure configuration
}
ShutdownManagerConfig contains configuration for shutdown management
func DefaultShutdownManagerConfig ¶
func DefaultShutdownManagerConfig() *ShutdownManagerConfig
DefaultShutdownManagerConfig returns default configuration
type SlidingWindow ¶
type SlidingWindow struct {
// contains filtered or unexported fields
}
SlidingWindow implements a time-based sliding window counter
func NewSlidingWindow ¶
func NewSlidingWindow(windowSize time.Duration) *SlidingWindow
NewSlidingWindow creates a new sliding window counter
func (*SlidingWindow) Add ¶
func (sw *SlidingWindow) Add(count int)
Add increments the counter for the current time
func (*SlidingWindow) Count ¶
func (sw *SlidingWindow) Count() int
Count returns the total count in the window
func (*SlidingWindow) CountRecent ¶
func (sw *SlidingWindow) CountRecent(duration time.Duration) int
CountRecent returns the count in a recent time period
type StateSynchronizer ¶
type StateSynchronizer struct {
// contains filtered or unexported fields
}
StateSynchronizer manages connection state synchronization across multiple instances
func NewStateSynchronizer ¶
func NewStateSynchronizer( connRepo interfaces.StreamingConnectionRepository, logger *zap.Logger, config *StateSynchronizerConfig, ) *StateSynchronizer
NewStateSynchronizer creates a new state synchronizer
func (*StateSynchronizer) ForceSync ¶
func (ss *StateSynchronizer) ForceSync(ctx context.Context) error
ForceSync triggers an immediate synchronization
func (*StateSynchronizer) GetInstanceInfo ¶
func (ss *StateSynchronizer) GetInstanceInfo() map[string]interface{}
GetInstanceInfo returns information about this instance
func (*StateSynchronizer) GetSyncStats ¶
func (ss *StateSynchronizer) GetSyncStats() SyncStats
GetSyncStats returns current synchronization statistics
func (*StateSynchronizer) IsRunning ¶
func (ss *StateSynchronizer) IsRunning() bool
IsRunning returns whether state synchronization is active
func (*StateSynchronizer) Start ¶
func (ss *StateSynchronizer) Start(ctx context.Context) error
Start begins state synchronization
func (*StateSynchronizer) Stop ¶
func (ss *StateSynchronizer) Stop() error
Stop stops state synchronization
type StateSynchronizerConfig ¶
type StateSynchronizerConfig struct {
InstanceID string // Unique instance identifier
SyncInterval time.Duration // How often to sync state
StaleThreshold time.Duration // When to consider connections stale
ConflictResolver ConflictResolver // Strategy for resolving conflicts
}
StateSynchronizerConfig contains configuration for state synchronization
func DefaultStateSynchronizerConfig ¶
func DefaultStateSynchronizerConfig() *StateSynchronizerConfig
DefaultStateSynchronizerConfig returns default configuration
type StatusEventPayload ¶
type StatusEventPayload struct {
StatusID string `json:"status_id"`
AuthorID string `json:"author_id"`
AuthorUsername string `json:"author_username"`
Content string `json:"content,omitempty"`
Visibility string `json:"visibility,omitempty"`
InReplyToID string `json:"in_reply_to_id,omitempty"`
ReblogOfID string `json:"reblog_of_id,omitempty"`
Sensitive bool `json:"sensitive"`
Language string `json:"language,omitempty"`
Mentions []string `json:"mentions,omitempty"`
URL string `json:"url,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
ExtraData map[string]interface{} `json:"extra_data,omitempty"`
}
StatusEventPayload represents the payload for status-related events
type StreamConnection ¶
type StreamConnection struct {
ConnectionID string `json:"connection_id"`
UserID string `json:"user_id,omitempty"`
Username string `json:"username,omitempty"`
Streams []string `json:"streams"`
LastActivity time.Time `json:"last_activity"`
}
StreamConnection represents a WebSocket connection for streaming
type StreamEventLog ¶
type StreamEventLog struct {
// contains filtered or unexported fields
}
StreamEventLog provides append/query operations for SSE stream event persistence.
func NewStreamEventLog ¶
func NewStreamEventLog(db core.DB, ttl time.Duration) *StreamEventLog
NewStreamEventLog creates a new StreamEventLog with the provided DynamORM DB and TTL.
func (*StreamEventLog) Append ¶
func (l *StreamEventLog) Append(ctx context.Context, streamName, eventType, data string) (string, error)
Append writes a new event into the stream log for the given stream.
func (*StreamEventLog) Enabled ¶
func (l *StreamEventLog) Enabled() bool
Enabled reports whether the StreamEventLog is configured and usable.
func (*StreamEventLog) Query ¶
func (l *StreamEventLog) Query(ctx context.Context, streamName, afterID string, limit int32) ([]StreamEventLogItem, error)
Query returns up to limit events for the stream after afterID, ordered ascending by ID.
type StreamEventLogItem ¶
StreamEventLogItem represents a single persisted SSE event for a given stream.
type StreamQueueService ¶
type StreamQueueService interface {
// QueueEventForUser queues an event for a specific user's streams
QueueEventForUser(ctx context.Context, userID string, eventType string, payload map[string]interface{}) error
// QueueEventForStream queues an event for all subscribers of a stream
QueueEventForStream(ctx context.Context, streamName string, eventType string, payload map[string]interface{}) error
// QueueEventForConversation queues an event for all participants in a conversation
QueueEventForConversation(ctx context.Context, conversationID string, eventType string, payload map[string]interface{}) error
// QueueEventForFollowers queues an event for all followers of a user
QueueEventForFollowers(ctx context.Context, userID string, eventType string, payload map[string]interface{}) error
}
StreamQueueService queues streaming events to DynamoDB for processing by stream-router
func NewDynamoStreamQueue ¶
NewDynamoStreamQueue creates a new DynamoDB-based stream queue service
type SyncStats ¶
type SyncStats struct {
TotalSyncs int64 `json:"total_syncs"`
SuccessfulSyncs int64 `json:"successful_syncs"`
FailedSyncs int64 `json:"failed_syncs"`
ConflictsResolved int64 `json:"conflicts_resolved"`
StaleConnectionsFound int64 `json:"stale_connections_found"`
LastSyncTime time.Time `json:"last_sync_time"`
LastSyncDuration time.Duration `json:"last_sync_duration"`
AverageSyncDuration time.Duration `json:"average_sync_duration"`
}
SyncStats tracks synchronization statistics
type TrustEventPayload ¶
type TrustEventPayload struct {
SubjectID string `json:"subject_id"`
SubjectType string `json:"subject_type"` // user, content, domain
Score float64 `json:"score"`
PreviousScore float64 `json:"previous_score,omitempty"`
Reason string `json:"reason,omitempty"`
Evidence map[string]interface{} `json:"evidence,omitempty"`
UpdatedBy string `json:"updated_by,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
}
TrustEventPayload represents the payload for trust/reputation events
type WebSocketRateLimitConfig ¶
type WebSocketRateLimitConfig struct {
// Connection limits
MaxConnectionsPerUser int
MaxConnectionsPerIP int
ConnectionWindowSize time.Duration
// Command rate limits (per connection)
MaxCommandsPerMinute int
MaxCommandsPerSecond int
CommandWindowSize time.Duration
// Command-specific limits
CommandLimits map[string]CommandRateLimit
// Progressive delays
EnableProgressiveDelays bool
BaseDelayMillis int
MaxDelayMillis int
// Burst protection
BurstWindowSize time.Duration
MaxBurstCommands int
// Penalty configuration
ViolationThreshold int
PenaltyDuration time.Duration
PenaltyMultiplier float64
}
WebSocketRateLimitConfig defines WebSocket rate limiting configuration
func DefaultWebSocketRateLimitConfig ¶
func DefaultWebSocketRateLimitConfig() *WebSocketRateLimitConfig
DefaultWebSocketRateLimitConfig returns default WebSocket rate limit configuration
type WebSocketRateLimiter ¶
type WebSocketRateLimiter struct {
// contains filtered or unexported fields
}
WebSocketRateLimiter provides rate limiting for WebSocket connections and commands
func NewWebSocketRateLimiter ¶
func NewWebSocketRateLimiter(config *WebSocketRateLimitConfig, repo interfaces.RateLimitRepository, logger *zap.Logger) *WebSocketRateLimiter
NewWebSocketRateLimiter creates a new WebSocket rate limiter
func (*WebSocketRateLimiter) CheckCommand ¶
func (wrl *WebSocketRateLimiter) CheckCommand(ctx context.Context, connectionID string, command *Command) (bool, time.Duration, error)
CheckCommand checks if a WebSocket command should be allowed
func (*WebSocketRateLimiter) CheckConnection ¶
func (wrl *WebSocketRateLimiter) CheckConnection(ctx context.Context, userID, ipAddress string) (bool, string, error)
CheckConnection checks if a new connection should be allowed
func (*WebSocketRateLimiter) GetConnectionStatus ¶
func (wrl *WebSocketRateLimiter) GetConnectionStatus(connectionID string) (map[string]interface{}, error)
GetConnectionStatus returns the rate limit status for a connection
func (*WebSocketRateLimiter) OnConnect ¶
func (wrl *WebSocketRateLimiter) OnConnect(connectionID, userID, ipAddress string)
OnConnect handles a new WebSocket connection
func (*WebSocketRateLimiter) OnDisconnect ¶
func (wrl *WebSocketRateLimiter) OnDisconnect(connectionID string)
OnDisconnect handles a WebSocket disconnection
func (*WebSocketRateLimiter) ResetConnection ¶
func (wrl *WebSocketRateLimiter) ResetConnection(connectionID string) error
ResetConnection resets rate limits for a specific connection (admin function)