Documentation
¶
Index ¶
- Variables
- func GetAwaitResponse(ctx context.Context) (string, bool)
- func GetConnection(addr string, config TLSConfig) (net.Conn, error)
- func GetConsumerID(ctx context.Context) (string, bool)
- func GetContentType(ctx context.Context) (string, bool)
- func GetHeader(ctx context.Context, key string) (string, bool)
- func GetHeaders(ctx context.Context) (storage.IMap[string, string], bool)
- func GetPublisherID(ctx context.Context) (string, bool)
- func GetQueue(ctx context.Context) (string, bool)
- func GetTriggerNode(ctx context.Context) (string, bool)
- func HeadersWithConsumerID(ctx context.Context, id string) map[string]string
- func HeadersWithConsumerIDAndQueue(ctx context.Context, id, queue string) map[string]string
- func IsClosed(conn net.Conn) bool
- func NewID() string
- func RecoverPanic(labelGenerator func() string)
- func RecoverTitle() string
- func SetHeaders(ctx context.Context, headers map[string]string) context.Context
- func WithHeaders(ctx context.Context, headers map[string]string) map[string]string
- func WrapError(err error, msg, op string) error
- type ActiveAlert
- type AlertManager
- type AlertNotifier
- type AlertRule
- type AlertStatus
- type Broker
- func (b *Broker) AddConsumer(ctx context.Context, queueName string, conn net.Conn) string
- func (b *Broker) AdjustConsumerWorkers(noOfWorkers int, consumerID ...string)
- func (b *Broker) Authenticate(ctx context.Context, credentials map[string]string) error
- func (b *Broker) Authorize(ctx context.Context, role string, action string) error
- func (b *Broker) Close() error
- func (b *Broker) HandleCallback(ctx context.Context, msg *codec.Message)
- func (b *Broker) MessageAck(ctx context.Context, msg *codec.Message)
- func (b *Broker) MessageDeny(ctx context.Context, msg *codec.Message)
- func (b *Broker) MessageResponseHandler(ctx context.Context, msg *codec.Message)
- func (b *Broker) NewQueue(name string) *Queue
- func (b *Broker) NewQueueWithConfig(name string, opts ...QueueOption) *Queue
- func (b *Broker) NewQueueWithOrdering(name string) *Queue
- func (b *Broker) NotifyHandler() func(context.Context, Result) error
- func (b *Broker) OnClose(ctx context.Context, conn net.Conn) error
- func (b *Broker) OnConsumerPause(ctx context.Context, _ *codec.Message)
- func (b *Broker) OnConsumerResume(ctx context.Context, _ *codec.Message)
- func (b *Broker) OnConsumerStop(ctx context.Context, _ *codec.Message)
- func (b *Broker) OnConsumerUpdated(ctx context.Context, msg *codec.Message)
- func (b *Broker) OnError(_ context.Context, conn net.Conn, err error)
- func (b *Broker) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn)
- func (b *Broker) Options() *Options
- func (b *Broker) PauseConsumer(ctx context.Context, consumerID string, queues ...string)
- func (b *Broker) Publish(ctx context.Context, task *Task, queue string) error
- func (b *Broker) PublishHandler(ctx context.Context, conn net.Conn, msg *codec.Message)
- func (b *Broker) RemoveConsumer(consumerID string, queues ...string)
- func (b *Broker) ReprocessDLQ(queueName string) error
- func (b *Broker) ResumeConsumer(ctx context.Context, consumerID string, queues ...string)
- func (b *Broker) SetNotifyHandler(callback Callback)
- func (b *Broker) SetURL(url string)
- func (b *Broker) Start(ctx context.Context) error
- func (b *Broker) StartEnhanced(ctx context.Context) error
- func (b *Broker) StopConsumer(ctx context.Context, consumerID string, queues ...string)
- func (b *Broker) StopEnhanced() error
- func (b *Broker) SubscribeHandler(ctx context.Context, conn net.Conn, msg *codec.Message)
- func (b *Broker) SyncMode() bool
- func (b *Broker) TLSConfig() TLSConfig
- func (b *Broker) URL() string
- func (b *Broker) UpdateConsumer(ctx context.Context, consumerID string, config DynamicConfig, queues ...string) error
- type BrokerConfig
- type BrokerConnection
- type Callback
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitState
- type ClusteringConfig
- type CompletionCallback
- type ConfigManager
- func (cm *ConfigManager) AddWatcher(watcher ConfigWatcher)
- func (cm *ConfigManager) GetConfig() *ProductionConfig
- func (cm *ConfigManager) LoadConfig() error
- func (cm *ConfigManager) RemoveWatcher(watcher ConfigWatcher)
- func (cm *ConfigManager) SaveConfig() error
- func (cm *ConfigManager) StartWatching(ctx context.Context, interval time.Duration)
- func (cm *ConfigManager) UpdateConfig(newConfig *ProductionConfig) error
- type ConfigWatcher
- type ConnectionPool
- type Consumer
- func (c *Consumer) Close() error
- func (c *Consumer) Conn() net.Conn
- func (c *Consumer) Consume(ctx context.Context) error
- func (c *Consumer) ConsumeMessage(ctx context.Context, msg *codec.Message, conn net.Conn)
- func (c *Consumer) GetKey() string
- func (c *Consumer) GetType() string
- func (c *Consumer) Metrics() Metrics
- func (c *Consumer) OnClose(_ context.Context, _ net.Conn) error
- func (c *Consumer) OnError(_ context.Context, conn net.Conn, err error)
- func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn) error
- func (c *Consumer) OnResponse(ctx context.Context, result Result) error
- func (c *Consumer) Pause(ctx context.Context) error
- func (c *Consumer) ProcessTask(ctx context.Context, msg *Task) Result
- func (c *Consumer) Resume(ctx context.Context) error
- func (c *Consumer) SetKey(key string)
- func (c *Consumer) StartHTTPAPI() (int, error)
- func (c *Consumer) Stop(ctx context.Context) error
- func (c *Consumer) Update(ctx context.Context, payload []byte) error
- type ConsumerConfig
- type CronSchedule
- type DeadLetterQueue
- type DefaultPlugin
- type DetailedMetricsRegistry
- func (mr *DetailedMetricsRegistry) GetAllMetrics() map[string]*TimeSeries
- func (mr *DetailedMetricsRegistry) GetMetric(name string) (*TimeSeries, bool)
- func (mr *DetailedMetricsRegistry) RecordValue(name string, value float64)
- func (mr *DetailedMetricsRegistry) RegisterMetric(name string, metricType MetricType, description string, ...)
- type DiskSpaceHealthCheck
- type DistributedLocker
- type DynamicConfig
- type EnhancedCircuitBreaker
- type ExecutionHistory
- type FormattedMetrics
- type GoRoutineHealthCheck
- type Handler
- type HealthCheck
- type HealthCheckResult
- type HealthChecker
- type HealthStatus
- type HealthThresholds
- type InMemoryMessageStore
- func (ims *InMemoryMessageStore) Cleanup(olderThan time.Time) error
- func (ims *InMemoryMessageStore) Count(queue string) (int64, error)
- func (ims *InMemoryMessageStore) Delete(id string) error
- func (ims *InMemoryMessageStore) List(queue string, limit int, offset int) ([]*StoredMessage, error)
- func (ims *InMemoryMessageStore) Retrieve(id string) (*StoredMessage, error)
- func (ims *InMemoryMessageStore) Store(msg *StoredMessage) error
- type InMemoryMetricsRegistry
- type LockEntry
- type LogNotifier
- type MemoryHealthCheck
- type MemoryTaskStorage
- func (m *MemoryTaskStorage) CleanupExpiredTasks() error
- func (m *MemoryTaskStorage) DeleteTask(taskID string) error
- func (m *MemoryTaskStorage) FetchNextTask() (*QueueTask, error)
- func (m *MemoryTaskStorage) GetAllTasks() ([]*QueueTask, error)
- func (m *MemoryTaskStorage) GetTask(taskID string) (*QueueTask, error)
- func (m *MemoryTaskStorage) SaveTask(task *QueueTask) error
- type MessageStore
- type Metric
- type MetricType
- type Metrics
- type MetricsCollector
- type MetricsRegistry
- type MetricsServer
- type MonitoringConfig
- type Option
- func DisableBrokerRateLimit() Option
- func DisableConsumerRateLimit() Option
- func WithBrokerRateLimiter(rate int, burst int) Option
- func WithBrokerURL(url string) Option
- func WithCAPath(caPath string) Option
- func WithCallback(val ...func(context.Context, Result) Result) Option
- func WithCleanTaskOnComplete() Option
- func WithConsumerOnClose(handler func(ctx context.Context, topic, consumerName string)) Option
- func WithConsumerOnSubscribe(handler func(ctx context.Context, topic, consumerName string)) Option
- func WithConsumerRateLimiter(rate int, burst int) Option
- func WithConsumerTimeout(timeout time.Duration) Option
- func WithHTTPApi(flag bool) Option
- func WithInitialDelay(val time.Duration) Option
- func WithJitterPercent(val float64) Option
- func WithLogger(log logger.Logger) Option
- func WithMaxBackoff(val time.Duration) Option
- func WithMaxRetries(val int) Option
- func WithNotifyResponse(callback Callback) Option
- func WithRespondPendingResult(mode bool) Option
- func WithSyncMode(mode bool) Option
- func WithTLS(enableTLS bool, certPath, keyPath string) Option
- func WithWorkerPool(queueSize, numOfWorkers int, maxMemoryLoad int64) Option
- type Options
- func (o *Options) BrokerAddr() string
- func (o *Options) CleanTaskOnComplete() bool
- func (o *Options) ConsumerTimeout() time.Duration
- func (o *Options) HTTPApi() bool
- func (o *Options) Logger() logger.Logger
- func (o *Options) MaxMemoryLoad() int64
- func (o *Options) NumOfWorkers() int
- func (o *Options) QueueSize() int
- func (o *Options) SetSyncMode(sync bool)
- func (o *Options) Storage() TaskStorage
- type PersistenceConfig
- type Plugin
- type Pool
- func (wp *Pool) AddScheduledMetrics(total int)
- func (wp *Pool) AdjustWorkerCount(newWorkerCount int)
- func (wp *Pool) DLQ() *DeadLetterQueue
- func (wp *Pool) Dispatch(event func())
- func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) error
- func (wp *Pool) FormattedMetrics() FormattedMetrics
- func (wp *Pool) Init()
- func (wp *Pool) Metrics() Metrics
- func (wp *Pool) Pause()
- func (wp *Pool) Resume()
- func (wp *Pool) SetBatchSize(size int)
- func (wp *Pool) Start(numWorkers int)
- func (wp *Pool) Stop()
- func (wp *Pool) UpdateConfig(newConfig *DynamicConfig) error
- type PoolConfig
- type PoolOption
- func WithBatchSize(batchSize int) PoolOption
- func WithCircuitBreaker(config CircuitBreakerConfig) PoolOption
- func WithCompletionCallback(callback func()) PoolOption
- func WithDiagnostics(enabled bool) PoolOption
- func WithGracefulShutdown(timeout time.Duration) PoolOption
- func WithHandler(handler Handler) PoolOption
- func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption
- func WithMetricsRegistry(registry MetricsRegistry) PoolOption
- func WithPlugin(plugin Plugin) PoolOption
- func WithPoolCallback(callback Callback) PoolOption
- func WithTaskQueueSize(size int) PoolOption
- func WithTaskStorage(storage TaskStorage) PoolOption
- func WithTaskTimeout(t time.Duration) PoolOption
- func WithWarningThresholds(thresholds ThresholdConfig) PoolOption
- type PriorityQueue
- type Processor
- type ProductionConfig
- type Publisher
- type PublisherConfig
- type Queue
- type QueueConfig
- type QueueMetrics
- type QueueOption
- type QueueTask
- type QueuedTask
- type RateLimitConfig
- type RateLimiter
- type Result
- type Schedule
- type ScheduleOptions
- type ScheduledTask
- type Scheduler
- func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption) string
- func (s *Scheduler) Close() error
- func (s *Scheduler) ListScheduledTasks() []TaskInfo
- func (s *Scheduler) PrintAllTasks()
- func (s *Scheduler) PrintExecutionHistory(id string) error
- func (s *Scheduler) RemoveTask(id string) error
- func (s *Scheduler) Start()
- func (s *Scheduler) UpdateTask(id string, newSched *Schedule) error
- type SchedulerConfig
- type SchedulerOpt
- type SchedulerOption
- func WithInterval(interval time.Duration) SchedulerOption
- func WithOverlap() SchedulerOption
- func WithRecurring() SchedulerOption
- func WithScheduleSpec(spec string) SchedulerOption
- func WithSchedulerCallback(callback Callback) SchedulerOption
- func WithSchedulerHandler(handler Handler) SchedulerOption
- type SecurityConfig
- type Status
- type StoredMessage
- type SystemHealthChecker
- type TLSConfig
- type Task
- type TaskInfo
- type TaskOption
- func WithDAG(dag any) TaskOption
- func WithDedupKey(key string) TaskOption
- func WithExpiry(expiry time.Time) TaskOption
- func WithPriority(priority int) TaskOption
- func WithTTL(ttl time.Duration) TaskOption
- func WithTags(tags map[string]string) TaskOption
- func WithTaskHeaders(headers map[string]string) TaskOption
- func WithTaskMaxRetries(maxRetries int) TaskOption
- func WithTraceID(traceID string) TaskOption
- type TaskStorage
- type ThresholdConfig
- type TimeSeries
- type TimeSeriesPoint
- type WarningThresholds
Constants ¶
This section is empty.
Variables ¶
var BrokerAddr string
var Config = &DynamicConfig{ Timeout: 10 * time.Second, BatchSize: 1, MaxMemoryLoad: 100 * 1024 * 1024, IdleTimeout: 5 * time.Minute, BackoffDuration: 2 * time.Second, MaxRetries: 3, ReloadInterval: 30 * time.Second, WarningThreshold: WarningThresholds{ HighMemory: 1 * 1024 * 1024, LongExecution: 2 * time.Second, }, NumberOfWorkers: 5, }
var Logger = log.DefaultLogger
Functions ¶
func GetConnection ¶
Modified GetConnection: reuse existing connection if valid.
func HeadersWithConsumerID ¶
func RecoverPanic ¶
func RecoverPanic(labelGenerator func() string)
func RecoverTitle ¶
func RecoverTitle() string
func WithHeaders ¶
Types ¶
type ActiveAlert ¶ added in v0.0.16
type ActiveAlert struct {
Rule AlertRule `json:"rule"`
Value float64 `json:"value"`
StartsAt time.Time `json:"starts_at"`
EndsAt *time.Time `json:"ends_at,omitempty"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
Status AlertStatus `json:"status"`
}
ActiveAlert represents an active alert
type AlertManager ¶ added in v0.0.16
type AlertManager struct {
// contains filtered or unexported fields
}
AlertManager manages alerts and notifications
func NewAlertManager ¶ added in v0.0.16
func NewAlertManager(logger logger.Logger) *AlertManager
NewAlertManager creates a new alert manager
func (*AlertManager) AddNotifier ¶ added in v0.0.16
func (am *AlertManager) AddNotifier(notifier AlertNotifier)
AddNotifier adds an alert notifier
func (*AlertManager) AddRule ¶ added in v0.0.16
func (am *AlertManager) AddRule(rule AlertRule)
AddRule adds an alert rule
func (*AlertManager) EvaluateRules ¶ added in v0.0.16
func (am *AlertManager) EvaluateRules(registry *DetailedMetricsRegistry)
EvaluateRules evaluates all alert rules against current metrics
type AlertNotifier ¶ added in v0.0.16
type AlertNotifier interface {
Notify(ctx context.Context, alert ActiveAlert) error
Name() string
}
AlertNotifier interface for alert notifications
type AlertRule ¶ added in v0.0.16
type AlertRule struct {
Name string `json:"name"`
Metric string `json:"metric"`
Condition string `json:"condition"` // "gt", "lt", "eq", "gte", "lte"
Threshold float64 `json:"threshold"`
Duration time.Duration `json:"duration"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
Enabled bool `json:"enabled"`
}
AlertRule defines conditions for triggering alerts
type AlertStatus ¶ added in v0.0.16
type AlertStatus string
AlertStatus represents the status of an alert
const ( AlertStatusFiring AlertStatus = "firing" AlertStatusResolved AlertStatus = "resolved" AlertStatusSilenced AlertStatus = "silenced" )
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
func (*Broker) AddConsumer ¶
func (*Broker) AdjustConsumerWorkers ¶ added in v0.0.11
func (*Broker) Authenticate ¶ added in v0.0.16
Add authentication and authorization for publishers and consumers
func (*Broker) HandleCallback ¶
func (*Broker) MessageResponseHandler ¶
func (*Broker) NewQueueWithConfig ¶ added in v0.0.16
func (b *Broker) NewQueueWithConfig(name string, opts ...QueueOption) *Queue
NewQueueWithConfig creates a queue with specific configuration
func (*Broker) NewQueueWithOrdering ¶ added in v0.0.16
Ensure message ordering in task queues
func (*Broker) OnConsumerPause ¶
func (*Broker) OnConsumerResume ¶
func (*Broker) OnConsumerStop ¶
func (*Broker) OnConsumerUpdated ¶ added in v0.0.11
func (*Broker) PauseConsumer ¶
func (*Broker) PublishHandler ¶
func (*Broker) RemoveConsumer ¶
func (*Broker) ReprocessDLQ ¶ added in v0.0.16
Add advanced dead-letter queue management
func (*Broker) ResumeConsumer ¶
func (*Broker) SetNotifyHandler ¶
func (*Broker) StartEnhanced ¶ added in v0.0.16
Enhanced Start method with production features
func (*Broker) StopConsumer ¶
func (*Broker) StopEnhanced ¶ added in v0.0.16
Enhanced Stop method with graceful shutdown
func (*Broker) SubscribeHandler ¶
func (*Broker) UpdateConsumer ¶ added in v0.0.11
type BrokerConfig ¶ added in v0.0.16
type BrokerConfig struct {
Address string `json:"address"`
Port int `json:"port"`
MaxConnections int `json:"max_connections"`
ConnectionTimeout time.Duration `json:"connection_timeout"`
ReadTimeout time.Duration `json:"read_timeout"`
WriteTimeout time.Duration `json:"write_timeout"`
IdleTimeout time.Duration `json:"idle_timeout"`
KeepAlive bool `json:"keep_alive"`
KeepAlivePeriod time.Duration `json:"keep_alive_period"`
MaxQueueDepth int `json:"max_queue_depth"`
EnableDeadLetter bool `json:"enable_dead_letter"`
DeadLetterMaxRetries int `json:"dead_letter_max_retries"`
EnableMetrics bool `json:"enable_metrics"`
MetricsInterval time.Duration `json:"metrics_interval"`
GracefulShutdown time.Duration `json:"graceful_shutdown"`
MessageTTL time.Duration `json:"message_ttl"`
Headers map[string]string `json:"headers"`
}
BrokerConfig contains broker-specific configuration
func (*BrokerConfig) UnmarshalJSON ¶ added in v0.0.16
func (b *BrokerConfig) UnmarshalJSON(data []byte) error
type BrokerConnection ¶ added in v0.0.16
type BrokerConnection struct {
// contains filtered or unexported fields
}
BrokerConnection represents a single broker connection
type CircuitBreaker ¶ added in v0.0.16
type CircuitBreakerConfig ¶ added in v0.0.11
type CircuitState ¶ added in v0.0.16
type CircuitState int
CircuitState represents the state of a circuit breaker
const ( CircuitClosed CircuitState = iota CircuitOpen CircuitHalfOpen )
type ClusteringConfig ¶ added in v0.0.16
type ClusteringConfig struct {
EnableClustering bool `json:"enable_clustering"`
NodeID string `json:"node_id"`
ClusterNodes []string `json:"cluster_nodes"`
DiscoveryMethod string `json:"discovery_method"` // "static", "consul", "etcd", "k8s"
DiscoveryEndpoint string `json:"discovery_endpoint"`
HeartbeatInterval time.Duration `json:"heartbeat_interval"`
ElectionTimeout time.Duration `json:"election_timeout"`
EnableLoadBalancing bool `json:"enable_load_balancing"`
LoadBalancingStrategy string `json:"load_balancing_strategy"` // "round_robin", "least_connections", "hash"
EnableFailover bool `json:"enable_failover"`
FailoverTimeout time.Duration `json:"failover_timeout"`
EnableReplication bool `json:"enable_replication"`
ReplicationFactor int `json:"replication_factor"`
ConsistencyLevel string `json:"consistency_level"` // "weak", "strong", "eventual"
}
ClusteringConfig contains clustering configuration
func (*ClusteringConfig) UnmarshalJSON ¶ added in v0.0.16
func (c *ClusteringConfig) UnmarshalJSON(data []byte) error
type CompletionCallback ¶
type CompletionCallback func()
CompletionCallback is called when the pool completes a graceful shutdown.
type ConfigManager ¶ added in v0.0.16
type ConfigManager struct {
// contains filtered or unexported fields
}
ConfigManager handles dynamic configuration management
func NewConfigManager ¶ added in v0.0.16
func NewConfigManager(configFile string, logger logger.Logger) *ConfigManager
NewConfigManager creates a new configuration manager
func (*ConfigManager) AddWatcher ¶ added in v0.0.16
func (cm *ConfigManager) AddWatcher(watcher ConfigWatcher)
AddWatcher adds a configuration watcher
func (*ConfigManager) GetConfig ¶ added in v0.0.16
func (cm *ConfigManager) GetConfig() *ProductionConfig
GetConfig returns a copy of the current configuration
func (*ConfigManager) LoadConfig ¶ added in v0.0.16
func (cm *ConfigManager) LoadConfig() error
LoadConfig loads configuration from file
func (*ConfigManager) RemoveWatcher ¶ added in v0.0.16
func (cm *ConfigManager) RemoveWatcher(watcher ConfigWatcher)
RemoveWatcher removes a configuration watcher
func (*ConfigManager) SaveConfig ¶ added in v0.0.16
func (cm *ConfigManager) SaveConfig() error
SaveConfig saves current configuration to file
func (*ConfigManager) StartWatching ¶ added in v0.0.16
func (cm *ConfigManager) StartWatching(ctx context.Context, interval time.Duration)
StartWatching starts watching for configuration changes
func (*ConfigManager) UpdateConfig ¶ added in v0.0.16
func (cm *ConfigManager) UpdateConfig(newConfig *ProductionConfig) error
UpdateConfig updates the configuration
type ConfigWatcher ¶ added in v0.0.16
type ConfigWatcher interface {
OnConfigChange(oldConfig, newConfig *ProductionConfig) error
}
ConfigWatcher interface for configuration change notifications
type ConnectionPool ¶ added in v0.0.16
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool manages a pool of broker connections
func NewConnectionPool ¶ added in v0.0.16
func NewConnectionPool(maxConns int) *ConnectionPool
NewConnectionPool creates a new connection pool
func (*ConnectionPool) AddConnection ¶ added in v0.0.16
AddConnection adds a connection to the pool
func (*ConnectionPool) CleanupIdleConnections ¶ added in v0.0.16
func (cp *ConnectionPool) CleanupIdleConnections(idleTimeout time.Duration)
CleanupIdleConnections removes idle connections
func (*ConnectionPool) GetActiveConnections ¶ added in v0.0.16
func (cp *ConnectionPool) GetActiveConnections() int64
GetActiveConnections returns the number of active connections
func (*ConnectionPool) RemoveConnection ¶ added in v0.0.16
func (cp *ConnectionPool) RemoveConnection(id string)
RemoveConnection removes a connection from the pool
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func (*Consumer) ConsumeMessage ¶
func (*Consumer) OnResponse ¶
func (*Consumer) ProcessTask ¶
func (*Consumer) StartHTTPAPI ¶ added in v0.0.12
StartHTTPAPI starts an HTTP server on a random available port and registers API endpoints. It returns the port number the server is listening on.
type ConsumerConfig ¶ added in v0.0.16
type ConsumerConfig struct {
MaxRetries int `json:"max_retries"`
InitialDelay time.Duration `json:"initial_delay"`
MaxBackoff time.Duration `json:"max_backoff"`
JitterPercent float64 `json:"jitter_percent"`
EnableReconnect bool `json:"enable_reconnect"`
ReconnectInterval time.Duration `json:"reconnect_interval"`
HealthCheckInterval time.Duration `json:"health_check_interval"`
MaxConcurrentTasks int `json:"max_concurrent_tasks"`
TaskTimeout time.Duration `json:"task_timeout"`
EnableDeduplication bool `json:"enable_deduplication"`
DeduplicationWindow time.Duration `json:"deduplication_window"`
EnablePriorityQueue bool `json:"enable_priority_queue"`
EnableHTTPAPI bool `json:"enable_http_api"`
HTTPAPIPort int `json:"http_api_port"`
EnableCircuitBreaker bool `json:"enable_circuit_breaker"`
CircuitBreakerThreshold int `json:"circuit_breaker_threshold"`
CircuitBreakerTimeout time.Duration `json:"circuit_breaker_timeout"`
}
ConsumerConfig contains consumer-specific configuration
func (*ConsumerConfig) UnmarshalJSON ¶ added in v0.0.16
func (c *ConsumerConfig) UnmarshalJSON(data []byte) error
type CronSchedule ¶
type CronSchedule struct {
Seconds string
Minute string
Hour string
DayOfMonth string
Month string
DayOfWeek string
}
func (CronSchedule) String ¶
func (c CronSchedule) String() string
type DeadLetterQueue ¶ added in v0.0.11
type DeadLetterQueue struct {
// contains filtered or unexported fields
}
DeadLetterQueue stores tasks that have permanently failed.
func NewDeadLetterQueue ¶ added in v0.0.11
func NewDeadLetterQueue() *DeadLetterQueue
func (*DeadLetterQueue) Add ¶ added in v0.0.11
func (dlq *DeadLetterQueue) Add(task *QueueTask)
func (*DeadLetterQueue) Tasks ¶ added in v0.0.11
func (dlq *DeadLetterQueue) Tasks() []*QueueTask
type DefaultPlugin ¶ added in v0.0.11
type DefaultPlugin struct{}
DefaultPlugin is a no-op implementation of Plugin.
func (*DefaultPlugin) AfterTask ¶ added in v0.0.11
func (dp *DefaultPlugin) AfterTask(task *QueueTask, result Result)
func (*DefaultPlugin) BeforeTask ¶ added in v0.0.11
func (dp *DefaultPlugin) BeforeTask(task *QueueTask)
func (*DefaultPlugin) Initialize ¶ added in v0.0.11
func (dp *DefaultPlugin) Initialize(config interface{}) error
type DetailedMetricsRegistry ¶ added in v0.0.16
type DetailedMetricsRegistry struct {
// contains filtered or unexported fields
}
DetailedMetricsRegistry stores and manages metrics with enhanced features
func NewDetailedMetricsRegistry ¶ added in v0.0.16
func NewDetailedMetricsRegistry() *DetailedMetricsRegistry
NewMetricsRegistry creates a new metrics registry
func (*DetailedMetricsRegistry) GetAllMetrics ¶ added in v0.0.16
func (mr *DetailedMetricsRegistry) GetAllMetrics() map[string]*TimeSeries
GetAllMetrics returns all metrics
func (*DetailedMetricsRegistry) GetMetric ¶ added in v0.0.16
func (mr *DetailedMetricsRegistry) GetMetric(name string) (*TimeSeries, bool)
GetMetric returns a metric by name
func (*DetailedMetricsRegistry) RecordValue ¶ added in v0.0.16
func (mr *DetailedMetricsRegistry) RecordValue(name string, value float64)
RecordValue records a value for a metric
func (*DetailedMetricsRegistry) RegisterMetric ¶ added in v0.0.16
func (mr *DetailedMetricsRegistry) RegisterMetric(name string, metricType MetricType, description string, labels map[string]string)
RegisterMetric registers a new metric
type DiskSpaceHealthCheck ¶ added in v0.0.16
type DiskSpaceHealthCheck struct{}
DiskSpaceHealthCheck checks available disk space
func (*DiskSpaceHealthCheck) Check ¶ added in v0.0.16
func (dshc *DiskSpaceHealthCheck) Check(ctx context.Context) *HealthCheckResult
func (*DiskSpaceHealthCheck) Name ¶ added in v0.0.16
func (dshc *DiskSpaceHealthCheck) Name() string
func (*DiskSpaceHealthCheck) Timeout ¶ added in v0.0.16
func (dshc *DiskSpaceHealthCheck) Timeout() time.Duration
type DistributedLocker ¶ added in v0.0.16
type DynamicConfig ¶ added in v0.0.11
type DynamicConfig struct {
Timeout time.Duration
BatchSize int
MaxMemoryLoad int64
IdleTimeout time.Duration
BackoffDuration time.Duration
MaxRetries int
ReloadInterval time.Duration
WarningThreshold WarningThresholds
NumberOfWorkers int // new field for worker count
}
DynamicConfig holds runtime configuration values.
type EnhancedCircuitBreaker ¶ added in v0.0.16
type EnhancedCircuitBreaker struct {
// contains filtered or unexported fields
}
EnhancedCircuitBreaker provides circuit breaker functionality
func NewEnhancedCircuitBreaker ¶ added in v0.0.16
func NewEnhancedCircuitBreaker(threshold int64, timeout time.Duration) *EnhancedCircuitBreaker
NewEnhancedCircuitBreaker creates a new circuit breaker
func (*EnhancedCircuitBreaker) Call ¶ added in v0.0.16
func (cb *EnhancedCircuitBreaker) Call(fn func() error) error
Call executes a function with circuit breaker protection
type ExecutionHistory ¶
type FormattedMetrics ¶ added in v0.0.12
type FormattedMetrics struct {
TotalTasks int64 `json:"total_tasks"`
CompletedTasks int64 `json:"completed_tasks"`
ErrorCount int64 `json:"error_count"`
CurrentMemoryUsed string `json:"current_memory_used"`
CumulativeMemoryUsed string `json:"cumulative_memory_used"`
TotalScheduled int64 `json:"total_scheduled"`
CumulativeExecution string `json:"cumulative_execution"`
AverageExecution string `json:"average_execution"`
}
FormattedMetrics is a helper struct to present human-readable metrics.
type GoRoutineHealthCheck ¶ added in v0.0.16
type GoRoutineHealthCheck struct{}
GoRoutineHealthCheck checks goroutine count
func (*GoRoutineHealthCheck) Check ¶ added in v0.0.16
func (ghc *GoRoutineHealthCheck) Check(ctx context.Context) *HealthCheckResult
func (*GoRoutineHealthCheck) Name ¶ added in v0.0.16
func (ghc *GoRoutineHealthCheck) Name() string
func (*GoRoutineHealthCheck) Timeout ¶ added in v0.0.16
func (ghc *GoRoutineHealthCheck) Timeout() time.Duration
type HealthCheck ¶ added in v0.0.16
type HealthCheck interface {
Name() string
Check(ctx context.Context) *HealthCheckResult
Timeout() time.Duration
}
HealthCheck interface for health checks
type HealthCheckResult ¶ added in v0.0.16
type HealthCheckResult struct {
Name string `json:"name"`
Status HealthStatus `json:"status"`
Message string `json:"message"`
Duration time.Duration `json:"duration"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
HealthCheckResult represents the result of a health check
type HealthChecker ¶ added in v0.0.16
type HealthChecker struct {
// contains filtered or unexported fields
}
HealthChecker monitors broker health
func NewHealthChecker ¶ added in v0.0.16
func NewHealthChecker() *HealthChecker
NewHealthChecker creates a new health checker
func (*HealthChecker) Start ¶ added in v0.0.16
func (hc *HealthChecker) Start()
Start starts the health checker
func (*HealthChecker) Stop ¶ added in v0.0.16
func (hc *HealthChecker) Stop()
Stop stops the health checker
type HealthStatus ¶ added in v0.0.16
type HealthStatus string
HealthStatus represents the health status
const ( HealthStatusHealthy HealthStatus = "healthy" HealthStatusUnhealthy HealthStatus = "unhealthy" HealthStatusWarning HealthStatus = "warning" HealthStatusUnknown HealthStatus = "unknown" )
type HealthThresholds ¶ added in v0.0.16
type HealthThresholds struct {
MaxMemoryUsage int64
MaxCPUUsage float64
MaxConnections int
MaxQueueDepth int
MaxResponseTime time.Duration
MinFreeMemory int64
}
HealthThresholds defines health check thresholds
type InMemoryMessageStore ¶ added in v0.0.16
type InMemoryMessageStore struct {
// contains filtered or unexported fields
}
InMemoryMessageStore implements MessageStore in memory
func NewInMemoryMessageStore ¶ added in v0.0.16
func NewInMemoryMessageStore() *InMemoryMessageStore
NewInMemoryMessageStore creates a new in-memory message store
func (*InMemoryMessageStore) Cleanup ¶ added in v0.0.16
func (ims *InMemoryMessageStore) Cleanup(olderThan time.Time) error
Cleanup removes old messages
func (*InMemoryMessageStore) Count ¶ added in v0.0.16
func (ims *InMemoryMessageStore) Count(queue string) (int64, error)
Count counts messages in a queue
func (*InMemoryMessageStore) Delete ¶ added in v0.0.16
func (ims *InMemoryMessageStore) Delete(id string) error
Delete deletes a message
func (*InMemoryMessageStore) List ¶ added in v0.0.16
func (ims *InMemoryMessageStore) List(queue string, limit int, offset int) ([]*StoredMessage, error)
List lists messages for a queue
func (*InMemoryMessageStore) Retrieve ¶ added in v0.0.16
func (ims *InMemoryMessageStore) Retrieve(id string) (*StoredMessage, error)
Retrieve retrieves a message by ID
func (*InMemoryMessageStore) Store ¶ added in v0.0.16
func (ims *InMemoryMessageStore) Store(msg *StoredMessage) error
Store stores a message
type InMemoryMetricsRegistry ¶ added in v0.0.11
type InMemoryMetricsRegistry struct {
// contains filtered or unexported fields
}
InMemoryMetricsRegistry stores metrics in memory.
func NewInMemoryMetricsRegistry ¶ added in v0.0.11
func NewInMemoryMetricsRegistry() *InMemoryMetricsRegistry
func (*InMemoryMetricsRegistry) Get ¶ added in v0.0.11
func (m *InMemoryMetricsRegistry) Get(metricName string) interface{}
func (*InMemoryMetricsRegistry) Increment ¶ added in v0.0.11
func (m *InMemoryMetricsRegistry) Increment(metricName string)
func (*InMemoryMetricsRegistry) Register ¶ added in v0.0.11
func (m *InMemoryMetricsRegistry) Register(metricName string, value interface{})
type LockEntry ¶ added in v0.0.16
type LockEntry struct {
// contains filtered or unexported fields
}
type LogNotifier ¶ added in v0.0.16
type LogNotifier struct {
// contains filtered or unexported fields
}
LogNotifier sends alerts to logs
func NewLogNotifier ¶ added in v0.0.16
func NewLogNotifier(logger logger.Logger) *LogNotifier
func (*LogNotifier) Name ¶ added in v0.0.16
func (ln *LogNotifier) Name() string
func (*LogNotifier) Notify ¶ added in v0.0.16
func (ln *LogNotifier) Notify(ctx context.Context, alert ActiveAlert) error
type MemoryHealthCheck ¶ added in v0.0.16
type MemoryHealthCheck struct{}
MemoryHealthCheck checks memory usage
func (*MemoryHealthCheck) Check ¶ added in v0.0.16
func (mhc *MemoryHealthCheck) Check(ctx context.Context) *HealthCheckResult
func (*MemoryHealthCheck) Name ¶ added in v0.0.16
func (mhc *MemoryHealthCheck) Name() string
func (*MemoryHealthCheck) Timeout ¶ added in v0.0.16
func (mhc *MemoryHealthCheck) Timeout() time.Duration
type MemoryTaskStorage ¶
type MemoryTaskStorage struct {
// contains filtered or unexported fields
}
func NewMemoryTaskStorage ¶
func NewMemoryTaskStorage(expiryTime time.Duration) *MemoryTaskStorage
func (*MemoryTaskStorage) CleanupExpiredTasks ¶
func (m *MemoryTaskStorage) CleanupExpiredTasks() error
func (*MemoryTaskStorage) DeleteTask ¶
func (m *MemoryTaskStorage) DeleteTask(taskID string) error
func (*MemoryTaskStorage) FetchNextTask ¶
func (m *MemoryTaskStorage) FetchNextTask() (*QueueTask, error)
func (*MemoryTaskStorage) GetAllTasks ¶
func (m *MemoryTaskStorage) GetAllTasks() ([]*QueueTask, error)
func (*MemoryTaskStorage) GetTask ¶
func (m *MemoryTaskStorage) GetTask(taskID string) (*QueueTask, error)
func (*MemoryTaskStorage) SaveTask ¶
func (m *MemoryTaskStorage) SaveTask(task *QueueTask) error
type MessageStore ¶ added in v0.0.16
type MessageStore interface {
Store(msg *StoredMessage) error
Retrieve(id string) (*StoredMessage, error)
Delete(id string) error
List(queue string, limit int, offset int) ([]*StoredMessage, error)
Count(queue string) (int64, error)
Cleanup(olderThan time.Time) error
}
MessageStore interface for storing messages
type Metric ¶ added in v0.0.16
type Metric struct {
Name string `json:"name"`
Value float64 `json:"value"`
Timestamp time.Time `json:"timestamp"`
Tags map[string]string `json:"tags,omitempty"`
}
Metric represents a single metric
type MetricType ¶ added in v0.0.16
type MetricType string
MetricType represents the type of metric
const ( MetricTypeCounter MetricType = "counter" MetricTypeGauge MetricType = "gauge" MetricTypeHistogram MetricType = "histogram" MetricTypeSummary MetricType = "summary" )
type Metrics ¶
type Metrics struct {
TotalTasks int64 // total number of tasks processed
CompletedTasks int64 // number of successfully processed tasks
ErrorCount int64 // number of tasks that resulted in error
TotalMemoryUsed int64 // current memory used (in bytes) by tasks in flight
TotalScheduled int64 // number of tasks scheduled
ExecutionTime int64 // cumulative execution time in milliseconds
CumulativeMemoryUsed int64 // cumulative memory used (sum of all task sizes) in bytes
}
Metrics holds cumulative pool metrics.
type MetricsCollector ¶ added in v0.0.16
type MetricsCollector struct {
// contains filtered or unexported fields
}
MetricsCollector collects and stores metrics
func NewMetricsCollector ¶ added in v0.0.16
func NewMetricsCollector() *MetricsCollector
NewMetricsCollector creates a new metrics collector
func (*MetricsCollector) RecordMetric ¶ added in v0.0.16
func (mc *MetricsCollector) RecordMetric(name string, value float64, tags map[string]string)
RecordMetric records a metric
type MetricsRegistry ¶ added in v0.0.11
type MetricsServer ¶ added in v0.0.16
type MetricsServer struct {
// contains filtered or unexported fields
}
MetricsServer provides comprehensive monitoring and metrics
func NewMetricsServer ¶ added in v0.0.16
func NewMetricsServer(broker *Broker, config *MonitoringConfig, logger logger.Logger) *MetricsServer
NewMetricsServer creates a new metrics server
func (*MetricsServer) AddAlertNotifier ¶ added in v0.0.16
func (ms *MetricsServer) AddAlertNotifier(notifier AlertNotifier)
AddAlertNotifier adds an alert notifier to the metrics server
func (*MetricsServer) AddAlertRule ¶ added in v0.0.16
func (ms *MetricsServer) AddAlertRule(rule AlertRule)
AddAlertRule adds an alert rule to the metrics server
func (*MetricsServer) Start ¶ added in v0.0.16
func (ms *MetricsServer) Start(ctx context.Context) error
Start starts the metrics server
func (*MetricsServer) Stop ¶ added in v0.0.16
func (ms *MetricsServer) Stop() error
Stop stops the metrics server
type MonitoringConfig ¶ added in v0.0.16
type MonitoringConfig struct {
EnableMetrics bool `json:"enable_metrics"`
MetricsPort int `json:"metrics_port"`
MetricsPath string `json:"metrics_path"`
EnableHealthCheck bool `json:"enable_health_check"`
HealthCheckPort int `json:"health_check_port"`
HealthCheckPath string `json:"health_check_path"`
HealthCheckInterval time.Duration `json:"health_check_interval"`
EnableTracing bool `json:"enable_tracing"`
TracingEndpoint string `json:"tracing_endpoint"`
TracingSampleRate float64 `json:"tracing_sample_rate"`
EnableLogging bool `json:"enable_logging"`
LogLevel string `json:"log_level"`
LogFormat string `json:"log_format"` // "json", "text"
LogOutput string `json:"log_output"` // "stdout", "file", "syslog"
LogFilePath string `json:"log_file_path"`
LogMaxSize int `json:"log_max_size"` // MB
LogMaxBackups int `json:"log_max_backups"`
LogMaxAge int `json:"log_max_age"` // days
EnableProfiling bool `json:"enable_profiling"`
ProfilingPort int `json:"profiling_port"`
}
MonitoringConfig contains monitoring and observability configuration
func (*MonitoringConfig) UnmarshalJSON ¶ added in v0.0.16
func (m *MonitoringConfig) UnmarshalJSON(data []byte) error
type Option ¶
type Option func(*Options)
func DisableBrokerRateLimit ¶ added in v0.0.11
func DisableBrokerRateLimit() Option
func DisableConsumerRateLimit ¶ added in v0.0.11
func DisableConsumerRateLimit() Option
func WithBrokerRateLimiter ¶ added in v0.0.11
func WithCallback ¶
WithCallback -
func WithConsumerOnClose ¶
func WithConsumerOnSubscribe ¶
func WithConsumerRateLimiter ¶ added in v0.0.11
func WithConsumerTimeout ¶ added in v0.0.16
func WithHTTPApi ¶ added in v0.0.12
WithHTTPApi - Option to enable/disable TLS
func WithLogger ¶ added in v0.0.10
func WithNotifyResponse ¶
func WithRespondPendingResult ¶
WithRespondPendingResult -
func WithWorkerPool ¶
type Options ¶
type Options struct {
BrokerRateLimiter *RateLimiter // new field for broker rate limiting
ConsumerRateLimiter *RateLimiter // new field for consumer rate limiting
// contains filtered or unexported fields
}
func SetupOptions ¶
func (*Options) BrokerAddr ¶ added in v0.0.10
func (*Options) CleanTaskOnComplete ¶
func (*Options) ConsumerTimeout ¶ added in v0.0.16
func (*Options) MaxMemoryLoad ¶
func (*Options) NumOfWorkers ¶
func (*Options) SetSyncMode ¶
func (*Options) Storage ¶
func (o *Options) Storage() TaskStorage
type PersistenceConfig ¶ added in v0.0.16
type PersistenceConfig struct {
EnablePersistence bool `json:"enable_persistence"`
StorageType string `json:"storage_type"` // "memory", "file", "redis", "postgres", "mysql"
ConnectionString string `json:"connection_string"`
MaxConnections int `json:"max_connections"`
ConnectionTimeout time.Duration `json:"connection_timeout"`
RetentionPeriod time.Duration `json:"retention_period"`
CleanupInterval time.Duration `json:"cleanup_interval"`
BackupEnabled bool `json:"backup_enabled"`
BackupInterval time.Duration `json:"backup_interval"`
BackupPath string `json:"backup_path"`
CompressionEnabled bool `json:"compression_enabled"`
EncryptionEnabled bool `json:"encryption_enabled"`
ReplicationEnabled bool `json:"replication_enabled"`
ReplicationNodes []string `json:"replication_nodes"`
}
PersistenceConfig contains data persistence configuration
func (*PersistenceConfig) UnmarshalJSON ¶ added in v0.0.16
func (p *PersistenceConfig) UnmarshalJSON(data []byte) error
type Plugin ¶ added in v0.0.11
type Plugin interface {
Initialize(config interface{}) error
BeforeTask(task *QueueTask)
AfterTask(task *QueueTask, result Result)
}
Plugin is used to inject custom behavior before or after task processing.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool represents the worker pool processing tasks.
func NewPool ¶
func NewPool(numOfWorkers int, opts ...PoolOption) *Pool
NewPool creates and starts a new pool with the given number of workers.
func (*Pool) AddScheduledMetrics ¶ added in v0.0.16
func (*Pool) AdjustWorkerCount ¶
func (*Pool) DLQ ¶ added in v0.0.11
func (wp *Pool) DLQ() *DeadLetterQueue
func (*Pool) EnqueueTask ¶
func (*Pool) FormattedMetrics ¶ added in v0.0.12
func (wp *Pool) FormattedMetrics() FormattedMetrics
FormattedMetrics returns a formatted version of the pool metrics.
func (*Pool) SetBatchSize ¶
func (*Pool) UpdateConfig ¶ added in v0.0.11
func (wp *Pool) UpdateConfig(newConfig *DynamicConfig) error
UpdateConfig updates pool configuration via a POOL_UPDATE command.
type PoolConfig ¶ added in v0.0.16
type PoolConfig struct {
MinWorkers int `json:"min_workers"`
MaxWorkers int `json:"max_workers"`
QueueSize int `json:"queue_size"`
MaxMemoryLoad int64 `json:"max_memory_load"`
TaskTimeout time.Duration `json:"task_timeout"`
IdleWorkerTimeout time.Duration `json:"idle_worker_timeout"`
EnableDynamicScaling bool `json:"enable_dynamic_scaling"`
ScalingFactor float64 `json:"scaling_factor"`
ScalingInterval time.Duration `json:"scaling_interval"`
MaxQueueWaitTime time.Duration `json:"max_queue_wait_time"`
EnableWorkStealing bool `json:"enable_work_stealing"`
EnablePriorityScheduling bool `json:"enable_priority_scheduling"`
GracefulShutdownTimeout time.Duration `json:"graceful_shutdown_timeout"`
}
PoolConfig contains worker pool configuration
func (*PoolConfig) UnmarshalJSON ¶ added in v0.0.16
func (p *PoolConfig) UnmarshalJSON(data []byte) error
type PoolOption ¶
type PoolOption func(*Pool)
func WithBatchSize ¶
func WithBatchSize(batchSize int) PoolOption
func WithCircuitBreaker ¶ added in v0.0.11
func WithCircuitBreaker(config CircuitBreakerConfig) PoolOption
func WithCompletionCallback ¶
func WithCompletionCallback(callback func()) PoolOption
func WithDiagnostics ¶ added in v0.0.11
func WithDiagnostics(enabled bool) PoolOption
func WithGracefulShutdown ¶ added in v0.0.11
func WithGracefulShutdown(timeout time.Duration) PoolOption
func WithHandler ¶
func WithHandler(handler Handler) PoolOption
func WithMaxMemoryLoad ¶
func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption
func WithMetricsRegistry ¶ added in v0.0.11
func WithMetricsRegistry(registry MetricsRegistry) PoolOption
func WithPlugin ¶ added in v0.0.11
func WithPlugin(plugin Plugin) PoolOption
func WithPoolCallback ¶
func WithPoolCallback(callback Callback) PoolOption
func WithTaskQueueSize ¶
func WithTaskQueueSize(size int) PoolOption
func WithTaskStorage ¶
func WithTaskStorage(storage TaskStorage) PoolOption
func WithTaskTimeout ¶
func WithTaskTimeout(t time.Duration) PoolOption
func WithWarningThresholds ¶ added in v0.0.11
func WithWarningThresholds(thresholds ThresholdConfig) PoolOption
type PriorityQueue ¶
type PriorityQueue []*QueueTask
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
type ProductionConfig ¶ added in v0.0.16
type ProductionConfig struct {
Broker BrokerConfig `json:"broker"`
Consumer ConsumerConfig `json:"consumer"`
Publisher PublisherConfig `json:"publisher"`
Pool PoolConfig `json:"pool"`
Security SecurityConfig `json:"security"`
Monitoring MonitoringConfig `json:"monitoring"`
Persistence PersistenceConfig `json:"persistence"`
Clustering ClusteringConfig `json:"clustering"`
RateLimit RateLimitConfig `json:"rate_limit"`
LastUpdated time.Time `json:"last_updated"`
}
ProductionConfig contains all production configuration
func DefaultProductionConfig ¶ added in v0.0.16
func DefaultProductionConfig() *ProductionConfig
DefaultProductionConfig returns default production configuration
func (*ProductionConfig) UnmarshalJSON ¶ added in v0.0.16
func (c *ProductionConfig) UnmarshalJSON(data []byte) error
Custom unmarshaling to handle duration strings
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
type PublisherConfig ¶ added in v0.0.16
type PublisherConfig struct {
MaxRetries int `json:"max_retries"`
InitialDelay time.Duration `json:"initial_delay"`
MaxBackoff time.Duration `json:"max_backoff"`
JitterPercent float64 `json:"jitter_percent"`
ConnectionPoolSize int `json:"connection_pool_size"`
PublishTimeout time.Duration `json:"publish_timeout"`
EnableBatching bool `json:"enable_batching"`
BatchSize int `json:"batch_size"`
BatchTimeout time.Duration `json:"batch_timeout"`
EnableCompression bool `json:"enable_compression"`
CompressionLevel int `json:"compression_level"`
EnableAsync bool `json:"enable_async"`
AsyncBufferSize int `json:"async_buffer_size"`
EnableOrderedDelivery bool `json:"enable_ordered_delivery"`
}
PublisherConfig contains publisher-specific configuration
func (*PublisherConfig) UnmarshalJSON ¶ added in v0.0.16
func (p *PublisherConfig) UnmarshalJSON(data []byte) error
type QueueConfig ¶ added in v0.0.16
type QueueConfig struct {
MaxDepth int `json:"max_depth"`
MaxRetries int `json:"max_retries"`
MessageTTL time.Duration `json:"message_ttl"`
DeadLetter bool `json:"dead_letter"`
Persistent bool `json:"persistent"`
BatchSize int `json:"batch_size"`
Priority int `json:"priority"`
OrderedMode bool `json:"ordered_mode"`
Throttling bool `json:"throttling"`
ThrottleRate int `json:"throttle_rate"`
ThrottleBurst int `json:"throttle_burst"`
CompactionMode bool `json:"compaction_mode"`
}
QueueConfig holds configuration for a specific queue
type QueueMetrics ¶ added in v0.0.16
type QueueMetrics struct {
MessagesReceived int64 `json:"messages_received"`
MessagesProcessed int64 `json:"messages_processed"`
MessagesFailed int64 `json:"messages_failed"`
CurrentDepth int64 `json:"current_depth"`
AverageLatency time.Duration `json:"average_latency"`
LastActivity time.Time `json:"last_activity"`
}
QueueMetrics holds metrics for a specific queue
type QueueOption ¶ added in v0.0.16
type QueueOption func(*QueueConfig)
QueueOption defines options for queue configuration
func WithDeadLetter ¶ added in v0.0.16
func WithDeadLetter() QueueOption
WithDeadLetter enables dead letter queue for failed messages
func WithPersistent ¶ added in v0.0.16
func WithPersistent() QueueOption
WithPersistent enables message persistence
func WithQueueMaxDepth ¶ added in v0.0.16
func WithQueueMaxDepth(maxDepth int) QueueOption
WithQueueMaxDepth sets the maximum queue depth
func WithQueueMaxRetries ¶ added in v0.0.16
func WithQueueMaxRetries(maxRetries int) QueueOption
WithQueueMaxRetries sets the maximum retries for queue messages
func WithQueueOption ¶ added in v0.0.16
func WithQueueOption(config QueueConfig) QueueOption
WithQueueOption creates a queue with specific configuration
func WithQueueTTL ¶ added in v0.0.16
func WithQueueTTL(ttl time.Duration) QueueOption
WithQueueTTL sets the message TTL for the queue
type QueuedTask ¶
type RateLimitConfig ¶ added in v0.0.16
type RateLimitConfig struct {
EnableBrokerRateLimit bool `json:"enable_broker_rate_limit"`
BrokerRate int `json:"broker_rate"` // requests per second
BrokerBurst int `json:"broker_burst"`
EnableConsumerRateLimit bool `json:"enable_consumer_rate_limit"`
ConsumerRate int `json:"consumer_rate"`
ConsumerBurst int `json:"consumer_burst"`
EnablePublisherRateLimit bool `json:"enable_publisher_rate_limit"`
PublisherRate int `json:"publisher_rate"`
PublisherBurst int `json:"publisher_burst"`
EnablePerQueueRateLimit bool `json:"enable_per_queue_rate_limit"`
PerQueueRate int `json:"per_queue_rate"`
PerQueueBurst int `json:"per_queue_burst"`
}
RateLimitConfig contains rate limiting configuration
type RateLimiter ¶ added in v0.0.11
type RateLimiter struct {
C chan struct{}
// contains filtered or unexported fields
}
RateLimiter implementation
func NewRateLimiter ¶ added in v0.0.11
func NewRateLimiter(rate int, burst int) *RateLimiter
NewRateLimiter creates a new RateLimiter with the specified rate and burst.
func (*RateLimiter) Stop ¶ added in v0.0.12
func (rl *RateLimiter) Stop()
Stop terminates the rate limiter's internal goroutine.
func (*RateLimiter) Update ¶ added in v0.0.12
func (rl *RateLimiter) Update(newRate, newBurst int)
Update allows dynamic adjustment of rate and burst at runtime. It immediately applies the new settings.
func (*RateLimiter) Wait ¶ added in v0.0.11
func (rl *RateLimiter) Wait()
Wait blocks until a token is available.
type Result ¶
type Result struct {
CreatedAt time.Time `json:"created_at"`
ProcessedAt time.Time `json:"processed_at,omitempty"`
Latency string `json:"latency"`
Error error `json:"-"` // Keep error as an error type
Topic string `json:"topic"`
TaskID string `json:"task_id"`
Status Status `json:"status"`
ConditionStatus string `json:"condition_status"`
Ctx context.Context `json:"-"`
Payload json.RawMessage `json:"payload"`
Last bool
}
func (Result) MarshalJSON ¶
func (*Result) UnmarshalJSON ¶
type Schedule ¶
type Schedule struct {
TimeOfDay time.Time
CronSpec string
DayOfWeek []time.Weekday
DayOfMonth []int
Interval time.Duration
Recurring bool
}
func (*Schedule) ToHumanReadable ¶
type ScheduleOptions ¶
type ScheduledTask ¶
type ScheduledTask struct {
// contains filtered or unexported fields
}
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler(pool *Pool, opts ...SchedulerOpt) *Scheduler
func (*Scheduler) ListScheduledTasks ¶ added in v0.0.16
func (*Scheduler) PrintAllTasks ¶
func (s *Scheduler) PrintAllTasks()
func (*Scheduler) PrintExecutionHistory ¶
func (*Scheduler) RemoveTask ¶
type SchedulerConfig ¶
type SchedulerOpt ¶ added in v0.0.16
type SchedulerOpt func(*Scheduler)
func WithStorage ¶ added in v0.0.16
func WithStorage(sm storage.IMap[string, *ScheduledTask]) SchedulerOpt
type SchedulerOption ¶
type SchedulerOption func(*ScheduleOptions)
func WithInterval ¶
func WithInterval(interval time.Duration) SchedulerOption
func WithOverlap ¶
func WithOverlap() SchedulerOption
func WithRecurring ¶
func WithRecurring() SchedulerOption
func WithScheduleSpec ¶ added in v0.0.16
func WithScheduleSpec(spec string) SchedulerOption
func WithSchedulerCallback ¶
func WithSchedulerCallback(callback Callback) SchedulerOption
func WithSchedulerHandler ¶
func WithSchedulerHandler(handler Handler) SchedulerOption
type SecurityConfig ¶ added in v0.0.16
type SecurityConfig struct {
EnableTLS bool `json:"enable_tls"`
TLSCertPath string `json:"tls_cert_path"`
TLSKeyPath string `json:"tls_key_path"`
TLSCAPath string `json:"tls_ca_path"`
TLSInsecureSkipVerify bool `json:"tls_insecure_skip_verify"`
EnableAuthentication bool `json:"enable_authentication"`
AuthenticationMethod string `json:"authentication_method"` // "basic", "jwt", "oauth"
EnableAuthorization bool `json:"enable_authorization"`
EnableEncryption bool `json:"enable_encryption"`
EncryptionKey string `json:"encryption_key"`
EnableAuditLog bool `json:"enable_audit_log"`
AuditLogPath string `json:"audit_log_path"`
SessionTimeout time.Duration `json:"session_timeout"`
MaxLoginAttempts int `json:"max_login_attempts"`
LockoutDuration time.Duration `json:"lockout_duration"`
}
SecurityConfig contains security-related configuration
func (*SecurityConfig) UnmarshalJSON ¶ added in v0.0.16
func (s *SecurityConfig) UnmarshalJSON(data []byte) error
type StoredMessage ¶ added in v0.0.16
type StoredMessage struct {
ID string `json:"id"`
Queue string `json:"queue"`
Payload []byte `json:"payload"`
Headers map[string]string `json:"headers,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Priority int `json:"priority"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt *time.Time `json:"expires_at,omitempty"`
Attempts int `json:"attempts"`
}
StoredMessage represents a message stored in the message store
type SystemHealthChecker ¶ added in v0.0.16
type SystemHealthChecker struct {
// contains filtered or unexported fields
}
SystemHealthChecker monitors system health
func NewSystemHealthChecker ¶ added in v0.0.16
func NewSystemHealthChecker(logger logger.Logger) *SystemHealthChecker
NewSystemHealthChecker creates a new system health checker
func (*SystemHealthChecker) GetOverallHealth ¶ added in v0.0.16
func (shc *SystemHealthChecker) GetOverallHealth() HealthStatus
GetOverallHealth returns the overall system health
func (*SystemHealthChecker) RegisterCheck ¶ added in v0.0.16
func (shc *SystemHealthChecker) RegisterCheck(check HealthCheck)
RegisterCheck registers a health check
func (*SystemHealthChecker) RunChecks ¶ added in v0.0.16
func (shc *SystemHealthChecker) RunChecks(ctx context.Context) map[string]*HealthCheckResult
RunChecks runs all health checks
type Task ¶
type Task struct {
CreatedAt time.Time `json:"created_at"`
ProcessedAt time.Time `json:"processed_at"`
Expiry time.Time `json:"expiry"`
Error error `json:"-"` // Don't serialize errors directly
ErrorMsg string `json:"error,omitempty"` // Serialize error message if present
ID string `json:"id"`
Topic string `json:"topic"`
Status Status `json:"status"` // Use Status type instead of string
Payload json.RawMessage `json:"payload"`
Priority int `json:"priority,omitempty"`
Retries int `json:"retries,omitempty"`
MaxRetries int `json:"max_retries,omitempty"`
// Enhanced deduplication and tracing
DedupKey string `json:"dedup_key,omitempty"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
// contains filtered or unexported fields
}
func NewTask ¶
func NewTask(id string, payload json.RawMessage, nodeKey string, opts ...TaskOption) *Task
func (*Task) IncrementRetry ¶ added in v0.0.16
func (t *Task) IncrementRetry()
IncrementRetry increments the retry count
type TaskOption ¶ added in v0.0.10
type TaskOption func(*Task)
TaskOption defines a function type for setting options.
func WithDAG ¶ added in v0.0.10
func WithDAG(dag any) TaskOption
func WithDedupKey ¶ added in v0.0.16
func WithDedupKey(key string) TaskOption
new TaskOption for deduplication:
func WithExpiry ¶ added in v0.0.16
func WithExpiry(expiry time.Time) TaskOption
TaskOption for setting expiry time
func WithPriority ¶ added in v0.0.16
func WithPriority(priority int) TaskOption
TaskOption for setting priority
func WithTTL ¶ added in v0.0.16
func WithTTL(ttl time.Duration) TaskOption
TaskOption for setting TTL (time to live)
func WithTags ¶ added in v0.0.16
func WithTags(tags map[string]string) TaskOption
TaskOption for adding tags
func WithTaskHeaders ¶ added in v0.0.16
func WithTaskHeaders(headers map[string]string) TaskOption
TaskOption for adding headers
func WithTaskMaxRetries ¶ added in v0.0.16
func WithTaskMaxRetries(maxRetries int) TaskOption
TaskOption for setting max retries
func WithTraceID ¶ added in v0.0.16
func WithTraceID(traceID string) TaskOption
TaskOption for setting trace ID
type TaskStorage ¶
type ThresholdConfig ¶ added in v0.0.11
type TimeSeries ¶ added in v0.0.16
type TimeSeries struct {
Name string `json:"name"`
Type MetricType `json:"type"`
Description string `json:"description"`
Labels map[string]string `json:"labels"`
Values []TimeSeriesPoint `json:"values"`
MaxPoints int `json:"max_points"`
// contains filtered or unexported fields
}
TimeSeries represents a time series metric
type TimeSeriesPoint ¶ added in v0.0.16
TimeSeriesPoint represents a single point in a time series
type WarningThresholds ¶ added in v0.0.11
type WarningThresholds struct {
HighMemory int64 // in bytes
LongExecution time.Duration // threshold duration
}
WarningThresholds defines thresholds for warnings.