Documentation
¶
Index ¶
- func GetAwaitResponse(ctx context.Context) (string, bool)
- func GetConnection(addr string, config TLSConfig) (net.Conn, error)
- func GetConsumerID(ctx context.Context) (string, bool)
- func GetContentType(ctx context.Context) (string, bool)
- func GetHeader(ctx context.Context, key string) (string, bool)
- func GetHeaders(ctx context.Context) (storage.IMap[string, string], bool)
- func GetPublisherID(ctx context.Context) (string, bool)
- func GetQueue(ctx context.Context) (string, bool)
- func GetTriggerNode(ctx context.Context) (string, bool)
- func HeadersWithConsumerID(ctx context.Context, id string) map[string]string
- func HeadersWithConsumerIDAndQueue(ctx context.Context, id, queue string) map[string]string
- func IsClosed(conn net.Conn) bool
- func NewID() string
- func RecoverPanic(labelGenerator func() string)
- func RecoverTitle() string
- func SetHeaders(ctx context.Context, headers map[string]string) context.Context
- func WithHeaders(ctx context.Context, headers map[string]string) map[string]string
- func WrapError(err error, msg, op string) error
- type Broker
- func (b *Broker) AddConsumer(ctx context.Context, queueName string, conn net.Conn) string
- func (b *Broker) HandleCallback(ctx context.Context, msg *codec.Message)
- func (b *Broker) MessageAck(ctx context.Context, msg *codec.Message)
- func (b *Broker) MessageDeny(ctx context.Context, msg *codec.Message)
- func (b *Broker) MessageResponseHandler(ctx context.Context, msg *codec.Message)
- func (b *Broker) NewQueue(name string) *Queue
- func (b *Broker) NotifyHandler() func(context.Context, Result) error
- func (b *Broker) OnClose(ctx context.Context, conn net.Conn) error
- func (b *Broker) OnConsumerPause(ctx context.Context, _ *codec.Message)
- func (b *Broker) OnConsumerResume(ctx context.Context, _ *codec.Message)
- func (b *Broker) OnConsumerStop(ctx context.Context, _ *codec.Message)
- func (b *Broker) OnError(_ context.Context, conn net.Conn, err error)
- func (b *Broker) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn)
- func (b *Broker) Options() *Options
- func (b *Broker) PauseConsumer(ctx context.Context, consumerID string, queues ...string)
- func (b *Broker) Publish(ctx context.Context, task *Task, queue string) error
- func (b *Broker) PublishHandler(ctx context.Context, conn net.Conn, msg *codec.Message)
- func (b *Broker) RemoveConsumer(consumerID string, queues ...string)
- func (b *Broker) ResumeConsumer(ctx context.Context, consumerID string, queues ...string)
- func (b *Broker) SetNotifyHandler(callback Callback)
- func (b *Broker) Start(ctx context.Context) error
- func (b *Broker) StopConsumer(ctx context.Context, consumerID string, queues ...string)
- func (b *Broker) SubscribeHandler(ctx context.Context, conn net.Conn, msg *codec.Message)
- func (b *Broker) SyncMode() bool
- func (b *Broker) TLSConfig() TLSConfig
- func (b *Broker) URL() string
- type Callback
- type CompletionCallback
- type Consumer
- func (c *Consumer) Close() error
- func (c *Consumer) Conn() net.Conn
- func (c *Consumer) Consume(ctx context.Context) error
- func (c *Consumer) ConsumeMessage(ctx context.Context, msg *codec.Message, conn net.Conn)
- func (c *Consumer) GetKey() string
- func (c *Consumer) GetType() string
- func (c *Consumer) Metrics() Metrics
- func (c *Consumer) OnClose(_ context.Context, _ net.Conn) error
- func (c *Consumer) OnError(_ context.Context, conn net.Conn, err error)
- func (c *Consumer) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn) error
- func (c *Consumer) OnResponse(ctx context.Context, result Result) error
- func (c *Consumer) Pause(ctx context.Context) error
- func (c *Consumer) ProcessTask(ctx context.Context, msg *Task) Result
- func (c *Consumer) Resume(ctx context.Context) error
- func (c *Consumer) SetKey(key string)
- func (c *Consumer) Stop(ctx context.Context) error
- type CronSchedule
- type ExecutionHistory
- type Handler
- type MemoryTaskStorage
- func (m *MemoryTaskStorage) CleanupExpiredTasks() error
- func (m *MemoryTaskStorage) DeleteTask(taskID string) error
- func (m *MemoryTaskStorage) FetchNextTask() (*QueueTask, error)
- func (m *MemoryTaskStorage) GetAllTasks() ([]*QueueTask, error)
- func (m *MemoryTaskStorage) GetTask(taskID string) (*QueueTask, error)
- func (m *MemoryTaskStorage) SaveTask(task *QueueTask) error
- type Metrics
- type Option
- func WithBrokerURL(url string) Option
- func WithCAPath(caPath string) Option
- func WithCallback(val ...func(context.Context, Result) Result) Option
- func WithCleanTaskOnComplete() Option
- func WithConsumerOnClose(handler func(ctx context.Context, topic, consumerName string)) Option
- func WithConsumerOnSubscribe(handler func(ctx context.Context, topic, consumerName string)) Option
- func WithInitialDelay(val time.Duration) Option
- func WithJitterPercent(val float64) Option
- func WithMaxBackoff(val time.Duration) Option
- func WithMaxRetries(val int) Option
- func WithNotifyResponse(callback Callback) Option
- func WithRespondPendingResult(mode bool) Option
- func WithSyncMode(mode bool) Option
- func WithTLS(enableTLS bool, certPath, keyPath string) Option
- func WithWorkerPool(queueSize, numOfWorkers int, maxMemoryLoad int64) Option
- type Options
- type Pool
- func (wp *Pool) AdjustWorkerCount(newWorkerCount int)
- func (wp *Pool) Dispatch(event func())
- func (wp *Pool) EnqueueTask(ctx context.Context, payload *Task, priority int) error
- func (wp *Pool) Metrics() Metrics
- func (wp *Pool) Pause()
- func (wp *Pool) Resume()
- func (wp *Pool) Scheduler() *Scheduler
- func (wp *Pool) SetBatchSize(size int)
- func (wp *Pool) Start(numWorkers int)
- func (wp *Pool) Stop()
- type PoolOption
- func WithBatchSize(batchSize int) PoolOption
- func WithCompletionCallback(callback func()) PoolOption
- func WithHandler(handler Handler) PoolOption
- func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption
- func WithPoolCallback(callback Callback) PoolOption
- func WithTaskQueueSize(size int) PoolOption
- func WithTaskStorage(storage TaskStorage) PoolOption
- func WithTaskTimeout(t time.Duration) PoolOption
- type PriorityQueue
- type Processor
- type Publisher
- type Queue
- type QueueTask
- type QueuedTask
- type Result
- type Schedule
- type ScheduleOptions
- type ScheduledTask
- type Scheduler
- type SchedulerConfig
- type SchedulerOption
- type TLSConfig
- type Task
- type TaskStorage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HeadersWithConsumerID ¶
func RecoverPanic ¶
func RecoverPanic(labelGenerator func() string)
func RecoverTitle ¶
func RecoverTitle() string
func WithHeaders ¶
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
func (*Broker) AddConsumer ¶
func (*Broker) HandleCallback ¶
func (*Broker) MessageResponseHandler ¶
func (*Broker) OnConsumerPause ¶
func (*Broker) OnConsumerResume ¶
func (*Broker) OnConsumerStop ¶
func (*Broker) PauseConsumer ¶
func (*Broker) PublishHandler ¶
func (*Broker) RemoveConsumer ¶
func (*Broker) ResumeConsumer ¶
func (*Broker) SetNotifyHandler ¶
func (*Broker) StopConsumer ¶
func (*Broker) SubscribeHandler ¶
type CompletionCallback ¶
type CompletionCallback func()
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func (*Consumer) ConsumeMessage ¶
func (*Consumer) OnResponse ¶
func (*Consumer) ProcessTask ¶
type CronSchedule ¶
type CronSchedule struct {
Minute string
Hour string
DayOfMonth string
Month string
DayOfWeek string
}
func (CronSchedule) String ¶
func (c CronSchedule) String() string
type ExecutionHistory ¶
type MemoryTaskStorage ¶
type MemoryTaskStorage struct {
// contains filtered or unexported fields
}
func NewMemoryTaskStorage ¶
func NewMemoryTaskStorage(expiryTime time.Duration) *MemoryTaskStorage
func (*MemoryTaskStorage) CleanupExpiredTasks ¶
func (m *MemoryTaskStorage) CleanupExpiredTasks() error
func (*MemoryTaskStorage) DeleteTask ¶
func (m *MemoryTaskStorage) DeleteTask(taskID string) error
func (*MemoryTaskStorage) FetchNextTask ¶
func (m *MemoryTaskStorage) FetchNextTask() (*QueueTask, error)
func (*MemoryTaskStorage) GetAllTasks ¶
func (m *MemoryTaskStorage) GetAllTasks() ([]*QueueTask, error)
func (*MemoryTaskStorage) GetTask ¶
func (m *MemoryTaskStorage) GetTask(taskID string) (*QueueTask, error)
func (*MemoryTaskStorage) SaveTask ¶
func (m *MemoryTaskStorage) SaveTask(task *QueueTask) error
type Option ¶
type Option func(*Options)
Option defines a function type for setting options.
func WithCallback ¶
WithCallback -
func WithConsumerOnClose ¶
func WithConsumerOnSubscribe ¶
func WithNotifyResponse ¶
func WithRespondPendingResult ¶
WithRespondPendingResult -
func WithWorkerPool ¶
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
func SetupOptions ¶
func (*Options) CleanTaskOnComplete ¶
func (*Options) MaxMemoryLoad ¶
func (*Options) NumOfWorkers ¶
func (*Options) SetSyncMode ¶
func (*Options) Storage ¶
func (o *Options) Storage() TaskStorage
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
func NewPool ¶
func NewPool(numOfWorkers int, opts ...PoolOption) *Pool
func (*Pool) AdjustWorkerCount ¶
func (*Pool) EnqueueTask ¶
func (*Pool) SetBatchSize ¶
type PoolOption ¶
type PoolOption func(*Pool)
func WithBatchSize ¶
func WithBatchSize(batchSize int) PoolOption
func WithCompletionCallback ¶
func WithCompletionCallback(callback func()) PoolOption
func WithHandler ¶
func WithHandler(handler Handler) PoolOption
func WithMaxMemoryLoad ¶
func WithMaxMemoryLoad(maxMemoryLoad int64) PoolOption
func WithPoolCallback ¶
func WithPoolCallback(callback Callback) PoolOption
func WithTaskQueueSize ¶
func WithTaskQueueSize(size int) PoolOption
func WithTaskStorage ¶
func WithTaskStorage(storage TaskStorage) PoolOption
func WithTaskTimeout ¶
func WithTaskTimeout(t time.Duration) PoolOption
type PriorityQueue ¶
type PriorityQueue []*QueueTask
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
type QueuedTask ¶
type Result ¶
type Result struct {
CreatedAt time.Time `json:"created_at"`
ProcessedAt time.Time `json:"processed_at,omitempty"`
Latency string `json:"latency"`
Error error `json:"-"` // Keep error as an error type
Topic string `json:"topic"`
TaskID string `json:"task_id"`
Status string `json:"status"`
ConditionStatus string `json:"condition_status"`
Ctx context.Context `json:"-"`
Payload json.RawMessage `json:"payload"`
}
func (Result) MarshalJSON ¶
func (*Result) UnmarshalJSON ¶
type Schedule ¶
type Schedule struct {
Interval time.Duration
DayOfWeek []time.Weekday
DayOfMonth []int
TimeOfDay time.Time
Recurring bool
CronSpec string
}
func (*Schedule) ToHumanReadable ¶
type ScheduleOptions ¶
type ScheduledTask ¶
type ScheduledTask struct {
// contains filtered or unexported fields
}
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func (*Scheduler) AddTask ¶
func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...SchedulerOption)
func (*Scheduler) PrintAllTasks ¶
func (s *Scheduler) PrintAllTasks()
func (*Scheduler) PrintExecutionHistory ¶
func (*Scheduler) RemoveTask ¶
type SchedulerConfig ¶
type SchedulerOption ¶
type SchedulerOption func(*ScheduleOptions)
func WithInterval ¶
func WithInterval(interval time.Duration) SchedulerOption
func WithOverlap ¶
func WithOverlap() SchedulerOption
func WithRecurring ¶
func WithRecurring() SchedulerOption
func WithSchedulerCallback ¶
func WithSchedulerCallback(callback Callback) SchedulerOption
func WithSchedulerHandler ¶
func WithSchedulerHandler(handler Handler) SchedulerOption
Helper functions to create SchedulerOptions
Source Files
¶
Click to show internal directories.
Click to hide internal directories.