Documentation
¶
Index ¶
- Variables
- func GetAwaitResponse(ctx context.Context) (string, bool)
- func GetConnection(addr string, config TLSConfig) (net.Conn, error)
- func GetConsumerID(ctx context.Context) (string, bool)
- func GetContentType(ctx context.Context) (string, bool)
- func GetHeader(ctx context.Context, key string) (string, bool)
- func GetHeaders(ctx context.Context) (storage.IMap[string, string], bool)
- func GetPublisherID(ctx context.Context) (string, bool)
- func GetQueue(ctx context.Context) (string, bool)
- func GetTriggerNode(ctx context.Context) (string, bool)
- func HeadersWithConsumerID(ctx context.Context, id string) map[string]string
- func HeadersWithConsumerIDAndQueue(ctx context.Context, id, queue string) map[string]string
- func IsClosed(conn net.Conn) bool
- func NewID() string
- func RecoverPanic(labelGenerator func() string)
- func RecoverTitle() string
- func SetHeaders(ctx context.Context, headers map[string]string) context.Context
- func WithHeaders(ctx context.Context, headers map[string]string) map[string]string
- func WrapError(err error, msg, op string) error
- type Broker
- func (b *Broker) AddConsumer(ctx context.Context, queueName string, conn net.Conn) string
- func (b *Broker) AdjustConsumerWorkers(noOfWorkers int, consumerID ...string)
- func (b *Broker) Close() error
- func (b *Broker) HandleCallback(ctx context.Context, msg *codec.Message)
- func (b *Broker) MessageAck(ctx context.Context, msg *codec.Message)
- func (b *Broker) MessageDeny(ctx context.Context, msg *codec.Message)
- func (b *Broker) MessageResponseHandler(ctx context.Context, msg *codec.Message)
- func (b *Broker) NewQueue(name string) *Queue
- func (b *Broker) NotifyHandler() func(context.Context, Result) error
- func (b *Broker) OnClose(ctx context.Context, conn net.Conn) error
- func (b *Broker) OnConsumerPause(ctx context.Context, _ *codec.Message)
- func (b *Broker) OnConsumerResume(ctx context.Context, _ *codec.Message)
- func (b *Broker) OnConsumerStop(ctx context.Context, _ *codec.Message)
- func (b *Broker) OnConsumerUpdated(ctx context.Context, msg *codec.Message)
- func (b *Broker) OnError(_ context.Context, conn net.Conn, err error)
- func (b *Broker) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn)
- func (b *Broker) Options() *Options
- func (b *Broker) PauseConsumer(ctx context.Context, consumerID string, queues ...string)
- func (b *Broker) Publish(ctx context.Context, task *Task, queue string) error
- func (b *Broker) PublishHandler(ctx context.Context, conn net.Conn, msg *codec.Message)
- func (b *Broker) RemoveConsumer(consumerID string, queues ...string)
- func (b *Broker) ResumeConsumer(ctx context.Context, consumerID string, queues ...string)
- func (b *Broker) SetNotifyHandler(callback Callback)
- func (b *Broker) SetURL(url string)
- func (b *Broker) Start(ctx context.Context) error
- func (b *Broker) StopConsumer(ctx context.Context, consumerID string, queues ...string)
- func (b *Broker) SubscribeHandler(ctx context.Context, conn net.Conn, msg *codec.Message)
- func (b *Broker) SyncMode() bool
- func (b *Broker) TLSConfig() TLSConfig
- func (b *Broker) URL() string
- func (b *Broker) UpdateConsumer(ctx context.Context, consumerID string, config DynamicConfig, queues ...string) error
- type Callback
- type CircuitBreakerConfig
- type CompletionCallback
- type Consumer
- func (c *Consumer) Close() error
- func (c *Consumer) Conn() net.Conn
- func (c *Consumer) Consume(ctx context.Context) error
- func (c *Consumer) ConsumeMessage(ctx context.Context, msg *codec.Message, conn net.Conn)
- func (c *Consumer) GetKey() string
- func (c *Consumer) GetType() string
- func (c *Consumer) Metrics() Metrics
- func (c *Consumer) OnClose(_ context.Context, _ net.Conn) error
- func (c *Consumer) OnError(_ context.Context, conn net.Conn, err error)
- func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn) error
- func (c *Consumer) OnResponse(ctx context.Context, result Result) error
- func (c *Consumer) Pause(ctx context.Context) error
- func (c *Consumer) ProcessTask(ctx context.Context, msg *Task) Result
- func (c *Consumer) Resume(ctx context.Context) error
- func (c *Consumer) SetKey(key string)
- func (c *Consumer) Stop(ctx context.Context) error
- func (c *Consumer) Update(ctx context.Context, payload []byte) error
- type CronSchedule
- type DeadLetterQueue
- type DefaultPlugin
- type DynamicConfig
- type ExecutionHistory
- type Handler
- type InMemoryMetricsRegistry
- type MemoryTaskStorage
- func (m *MemoryTaskStorage) CleanupExpiredTasks() error
- func (m *MemoryTaskStorage) DeleteTask(taskID string) error
- func (m *MemoryTaskStorage) FetchNextTask() (*QueueTask, error)
- func (m *MemoryTaskStorage) GetAllTasks() ([]*QueueTask, error)
- func (m *MemoryTaskStorage) GetTask(taskID string) (*QueueTask, error)
- func (m *MemoryTaskStorage) SaveTask(task *QueueTask) error
- type Metrics
- type MetricsRegistry
- type Option
- func DisableBrokerRateLimit() Option
- func DisableConsumerRateLimit() Option
- func WithBrokerRateLimiter(rate int, burst int) Option
- func WithBrokerURL(url string) Option
- func WithCAPath(caPath string) Option
- func WithCallback(val ...func(context.Context, Result) Result) Option
- func WithCleanTaskOnComplete() Option
- func WithConsumerOnClose(handler func(ctx context.Context, topic, consumerName string)) Option
- func WithConsumerOnSubscribe(handler func(ctx context.Context, topic, consumerName string)) Option
- func WithConsumerRateLimiter(rate int, burst int) Option
- func WithInitialDelay(val time.Duration) Option
- func WithJitterPercent(val float64) Option
- func WithLogger(log logger.Logger) Option
- func WithMaxBackoff(val time.Duration) Option
- func WithMaxRetries(val int) Option
- func WithNotifyResponse(callback Callback) Option
- func WithRespondPendingResult(mode bool) Option
- func WithSyncMode(mode bool) Option
- func WithTLS(enableTLS bool, certPath, keyPath string) Option
- func WithWorkerPool(queueSize, numOfWorkers int, maxMemoryLoad int64) Option
- type Options
- func (o *Options) BrokerAddr() string
- func (o *Options) CleanTaskOnComplete() bool
- func (o *Options) Logger() logger.Logger
- func (o *Options) MaxMemoryLoad() int64
- func (o *Options) NumOfWorkers() int
- func (o *Options) QueueSize() int
- func (o *Options) SetSyncMode(sync bool)
- func (o *Options) Storage() TaskStorage
- type Plugin
- type Pool
- func (wp *Pool) AdjustWorkerCount(newWorkerCount int)
- func (wp *Pool) DLQ() *DeadLetterQueue
- func (wp *Pool) Dispatch(event func())
- func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) error
- func (wp *Pool) Metrics() Metrics
- func (wp *Pool) Pause()
- func (wp *Pool) Resume()
- func (wp *Pool) Scheduler() *Scheduler
- func (wp *Pool) SetBatchSize(size int)
- func (wp *Pool) Start(numWorkers int)
- func (wp *Pool) Stop()
- func (wp *Pool) UpdateConfig(newConfig *DynamicConfig) error
- type PoolOption
- func WithBatchSize(batchSize int) PoolOption
- func WithCircuitBreaker(config CircuitBreakerConfig) PoolOption
- func WithCompletionCallback(callback func()) PoolOption
- func WithDiagnostics(enabled bool) PoolOption
- func WithGracefulShutdown(timeout time.Duration) PoolOption
- func WithHandler(handler Handler) PoolOption
- func WithHealthServicePort(port int) PoolOption
- func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption
- func WithMetricsRegistry(registry MetricsRegistry) PoolOption
- func WithPlugin(plugin Plugin) PoolOption
- func WithPoolCallback(callback Callback) PoolOption
- func WithTaskQueueSize(size int) PoolOption
- func WithTaskStorage(storage TaskStorage) PoolOption
- func WithTaskTimeout(t time.Duration) PoolOption
- func WithWarningThresholds(thresholds ThresholdConfig) PoolOption
- type PriorityQueue
- type Processor
- type Publisher
- type Queue
- type QueueTask
- type QueuedTask
- type RateLimiter
- type Result
- type Schedule
- type ScheduleOptions
- type ScheduledTask
- type Scheduler
- type SchedulerConfig
- type SchedulerOption
- type Status
- type TLSConfig
- type Task
- type TaskOption
- type TaskStorage
- type ThresholdConfig
- type WarningThresholds
Constants ¶
This section is empty.
Variables ¶
View Source
var Config = &DynamicConfig{ Timeout: 10 * time.Second, BatchSize: 1, MaxMemoryLoad: 100 * 1024 * 1024, IdleTimeout: 5 * time.Minute, BackoffDuration: 2 * time.Second, MaxRetries: 3, ReloadInterval: 30 * time.Second, WarningThreshold: WarningThresholds{ HighMemory: 1 * 1024 * 1024, LongExecution: 2 * time.Second, }, NumberOfWorkers: 5, }
View Source
var Logger = log.DefaultLogger
Functions ¶
func GetConnection ¶
Modified GetConnection: reuse existing connection if valid.
func HeadersWithConsumerID ¶
func RecoverPanic ¶
func RecoverPanic(labelGenerator func() string)
func RecoverTitle ¶
func RecoverTitle() string
func WithHeaders ¶
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
func (*Broker) AddConsumer ¶
func (*Broker) AdjustConsumerWorkers ¶ added in v0.0.11
func (*Broker) HandleCallback ¶
func (*Broker) MessageResponseHandler ¶
func (*Broker) OnConsumerPause ¶
func (*Broker) OnConsumerResume ¶
func (*Broker) OnConsumerStop ¶
func (*Broker) OnConsumerUpdated ¶ added in v0.0.11
func (*Broker) PauseConsumer ¶
func (*Broker) PublishHandler ¶
func (*Broker) RemoveConsumer ¶
func (*Broker) ResumeConsumer ¶
func (*Broker) SetNotifyHandler ¶
func (*Broker) StopConsumer ¶
func (*Broker) SubscribeHandler ¶
func (*Broker) UpdateConsumer ¶ added in v0.0.11
type CircuitBreakerConfig ¶ added in v0.0.11
type CompletionCallback ¶
type CompletionCallback func()
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func (*Consumer) ConsumeMessage ¶
func (*Consumer) OnResponse ¶
func (*Consumer) ProcessTask ¶
type CronSchedule ¶
type CronSchedule struct {
Minute string
Hour string
DayOfMonth string
Month string
DayOfWeek string
}
func (CronSchedule) String ¶
func (c CronSchedule) String() string
type DeadLetterQueue ¶ added in v0.0.11
type DeadLetterQueue struct {
// contains filtered or unexported fields
}
func NewDeadLetterQueue ¶ added in v0.0.11
func NewDeadLetterQueue() *DeadLetterQueue
func (*DeadLetterQueue) Add ¶ added in v0.0.11
func (dlq *DeadLetterQueue) Add(task *QueueTask)
func (*DeadLetterQueue) Tasks ¶ added in v0.0.11
func (dlq *DeadLetterQueue) Tasks() []*QueueTask
type DefaultPlugin ¶ added in v0.0.11
type DefaultPlugin struct{}
func (*DefaultPlugin) AfterTask ¶ added in v0.0.11
func (dp *DefaultPlugin) AfterTask(task *QueueTask, result Result)
func (*DefaultPlugin) BeforeTask ¶ added in v0.0.11
func (dp *DefaultPlugin) BeforeTask(task *QueueTask)
func (*DefaultPlugin) Initialize ¶ added in v0.0.11
func (dp *DefaultPlugin) Initialize(config interface{}) error
type DynamicConfig ¶ added in v0.0.11
type ExecutionHistory ¶
type InMemoryMetricsRegistry ¶ added in v0.0.11
type InMemoryMetricsRegistry struct {
// contains filtered or unexported fields
}
func NewInMemoryMetricsRegistry ¶ added in v0.0.11
func NewInMemoryMetricsRegistry() *InMemoryMetricsRegistry
func (*InMemoryMetricsRegistry) Get ¶ added in v0.0.11
func (m *InMemoryMetricsRegistry) Get(metricName string) interface{}
func (*InMemoryMetricsRegistry) Increment ¶ added in v0.0.11
func (m *InMemoryMetricsRegistry) Increment(metricName string)
func (*InMemoryMetricsRegistry) Register ¶ added in v0.0.11
func (m *InMemoryMetricsRegistry) Register(metricName string, value interface{})
type MemoryTaskStorage ¶
type MemoryTaskStorage struct {
// contains filtered or unexported fields
}
func NewMemoryTaskStorage ¶
func NewMemoryTaskStorage(expiryTime time.Duration) *MemoryTaskStorage
func (*MemoryTaskStorage) CleanupExpiredTasks ¶
func (m *MemoryTaskStorage) CleanupExpiredTasks() error
func (*MemoryTaskStorage) DeleteTask ¶
func (m *MemoryTaskStorage) DeleteTask(taskID string) error
func (*MemoryTaskStorage) FetchNextTask ¶
func (m *MemoryTaskStorage) FetchNextTask() (*QueueTask, error)
func (*MemoryTaskStorage) GetAllTasks ¶
func (m *MemoryTaskStorage) GetAllTasks() ([]*QueueTask, error)
func (*MemoryTaskStorage) GetTask ¶
func (m *MemoryTaskStorage) GetTask(taskID string) (*QueueTask, error)
func (*MemoryTaskStorage) SaveTask ¶
func (m *MemoryTaskStorage) SaveTask(task *QueueTask) error
type MetricsRegistry ¶ added in v0.0.11
type Option ¶
type Option func(*Options)
func DisableBrokerRateLimit ¶ added in v0.0.11
func DisableBrokerRateLimit() Option
func DisableConsumerRateLimit ¶ added in v0.0.11
func DisableConsumerRateLimit() Option
func WithBrokerRateLimiter ¶ added in v0.0.11
func WithCallback ¶
WithCallback -
func WithConsumerOnClose ¶
func WithConsumerOnSubscribe ¶
func WithConsumerRateLimiter ¶ added in v0.0.11
func WithLogger ¶ added in v0.0.10
func WithNotifyResponse ¶
func WithRespondPendingResult ¶
WithRespondPendingResult -
func WithWorkerPool ¶
type Options ¶
type Options struct {
BrokerRateLimiter *RateLimiter // new field for broker rate limiting
ConsumerRateLimiter *RateLimiter // new field for consumer rate limiting
// contains filtered or unexported fields
}
func SetupOptions ¶
func (*Options) BrokerAddr ¶ added in v0.0.10
func (*Options) CleanTaskOnComplete ¶
func (*Options) MaxMemoryLoad ¶
func (*Options) NumOfWorkers ¶
func (*Options) SetSyncMode ¶
func (*Options) Storage ¶
func (o *Options) Storage() TaskStorage
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
func NewPool ¶
func NewPool(numOfWorkers int, opts ...PoolOption) *Pool
func (*Pool) AdjustWorkerCount ¶
func (*Pool) DLQ ¶ added in v0.0.11
func (wp *Pool) DLQ() *DeadLetterQueue
func (*Pool) EnqueueTask ¶
func (*Pool) SetBatchSize ¶
func (*Pool) UpdateConfig ¶ added in v0.0.11
func (wp *Pool) UpdateConfig(newConfig *DynamicConfig) error
New method to update pool configuration via POOL_UPDATE command.
type PoolOption ¶
type PoolOption func(*Pool)
func WithBatchSize ¶
func WithBatchSize(batchSize int) PoolOption
func WithCircuitBreaker ¶ added in v0.0.11
func WithCircuitBreaker(config CircuitBreakerConfig) PoolOption
func WithCompletionCallback ¶
func WithCompletionCallback(callback func()) PoolOption
func WithDiagnostics ¶ added in v0.0.11
func WithDiagnostics(enabled bool) PoolOption
func WithGracefulShutdown ¶ added in v0.0.11
func WithGracefulShutdown(timeout time.Duration) PoolOption
func WithHandler ¶
func WithHandler(handler Handler) PoolOption
func WithHealthServicePort ¶ added in v0.0.11
func WithHealthServicePort(port int) PoolOption
func WithMaxMemoryLoad ¶
func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption
func WithMetricsRegistry ¶ added in v0.0.11
func WithMetricsRegistry(registry MetricsRegistry) PoolOption
func WithPlugin ¶ added in v0.0.11
func WithPlugin(plugin Plugin) PoolOption
func WithPoolCallback ¶
func WithPoolCallback(callback Callback) PoolOption
func WithTaskQueueSize ¶
func WithTaskQueueSize(size int) PoolOption
func WithTaskStorage ¶
func WithTaskStorage(storage TaskStorage) PoolOption
func WithTaskTimeout ¶
func WithTaskTimeout(t time.Duration) PoolOption
func WithWarningThresholds ¶ added in v0.0.11
func WithWarningThresholds(thresholds ThresholdConfig) PoolOption
type PriorityQueue ¶
type PriorityQueue []*QueueTask
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
type QueuedTask ¶
type RateLimiter ¶ added in v0.0.11
type RateLimiter struct {
C chan struct{}
}
NEW: RateLimiter implementation
func NewRateLimiter ¶ added in v0.0.11
func NewRateLimiter(rate int, burst int) *RateLimiter
Modified RateLimiter: use blocking send to avoid discarding tokens.
func (*RateLimiter) Wait ¶ added in v0.0.11
func (rl *RateLimiter) Wait()
type Result ¶
type Result struct {
CreatedAt time.Time `json:"created_at"`
ProcessedAt time.Time `json:"processed_at,omitempty"`
Latency string `json:"latency"`
Error error `json:"-"` // Keep error as an error type
Topic string `json:"topic"`
TaskID string `json:"task_id"`
Status Status `json:"status"`
ConditionStatus string `json:"condition_status"`
Ctx context.Context `json:"-"`
Payload json.RawMessage `json:"payload"`
Last bool
}
func (Result) MarshalJSON ¶
func (*Result) UnmarshalJSON ¶
type Schedule ¶
type Schedule struct {
TimeOfDay time.Time
CronSpec string
DayOfWeek []time.Weekday
DayOfMonth []int
Interval time.Duration
Recurring bool
}
func (*Schedule) ToHumanReadable ¶
type ScheduleOptions ¶
type ScheduledTask ¶
type ScheduledTask struct {
// contains filtered or unexported fields
}
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func (*Scheduler) AddTask ¶
func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption)
func (*Scheduler) PrintAllTasks ¶
func (s *Scheduler) PrintAllTasks()
func (*Scheduler) PrintExecutionHistory ¶
func (*Scheduler) RemoveTask ¶
type SchedulerConfig ¶
type SchedulerOption ¶
type SchedulerOption func(*ScheduleOptions)
func WithInterval ¶
func WithInterval(interval time.Duration) SchedulerOption
func WithOverlap ¶
func WithOverlap() SchedulerOption
func WithRecurring ¶
func WithRecurring() SchedulerOption
func WithSchedulerCallback ¶
func WithSchedulerCallback(callback Callback) SchedulerOption
func WithSchedulerHandler ¶
func WithSchedulerHandler(handler Handler) SchedulerOption
WithSchedulerHandler Helper functions to create SchedulerOptions
type Task ¶
type Task struct {
CreatedAt time.Time `json:"created_at"`
ProcessedAt time.Time `json:"processed_at"`
Expiry time.Time `json:"expiry"`
Error error `json:"error"`
ID string `json:"id"`
Topic string `json:"topic"`
Status string `json:"status"`
Payload json.RawMessage `json:"payload"`
// contains filtered or unexported fields
}
func NewTask ¶
func NewTask(id string, payload json.RawMessage, nodeKey string, opts ...TaskOption) *Task
type TaskOption ¶ added in v0.0.10
type TaskOption func(*Task)
TaskOption defines a function type for setting options.
func WithDAG ¶ added in v0.0.10
func WithDAG(dag any) TaskOption
type TaskStorage ¶
type ThresholdConfig ¶ added in v0.0.11
type WarningThresholds ¶ added in v0.0.11
Source Files
¶
Click to show internal directories.
Click to hide internal directories.