Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broadcaster ¶
type Broadcaster[T any] struct { // contains filtered or unexported fields }
func NewBroadcaster ¶
func NewBroadcaster[T any](name string) *Broadcaster[T]
NewBroadcaster creates a new Broadcaster instance for the specified type T. It initializes a map to hold the listeners. The name parameter is used to identify the broadcaster. It returns a pointer to the new Broadcaster instance.
func (*Broadcaster[T]) Broadcast ¶
func (b *Broadcaster[T]) Broadcast(msg T)
Broadcast sends a message of type T to all subscribed listeners. It logs the number of listeners and the name of the broadcaster. If a listener's channel is full, it skips sending the message to avoid blocking. This is a non-blocking send operation.
func (*Broadcaster[T]) Subscribe ¶
func (b *Broadcaster[T]) Subscribe() chan T
Subscribe adds a new listener channel to the broadcaster. It returns a channel of type T that can be used to receive messages. The channel is buffered (100) to allow for non-blocking sends.
func (*Broadcaster[T]) Unsubscribe ¶
func (b *Broadcaster[T]) Unsubscribe(ch chan T)
Unsubscribe removes a listener channel from the broadcaster. It closes the channel to signal that no more messages will be sent. If the channel does not exist in the listeners map, it does nothing.
type Listener ¶
type Listener[T any] struct { // contains filtered or unexported fields }
func NewListener ¶
func NewListener[T any](broadcaster *Broadcaster[T]) (*Listener[T], error)
NewListener creates a new Listener instance for the specified type T. It initializes a broadcaster and a waitgroup to manage concurrent processing of notifications. It returns a pointer to the new Listener instance.
func (*Listener[T]) Listen ¶
Listen listens for notifications on the broadcaster's channel. It takes a context for cancellation, a ready channel to signal readiness, and a notifyFunction that will be called with the data received. The listener will process notifications in a separate goroutine to avoid blocking. If the context is done, it will stop listening and return. The ready channel is closed in the first for loop iteration to signal that the listener is ready.
func (*Listener[T]) Notify ¶
func (l *Listener[T]) Notify(data T)
Notify sends a notification with the provided data to all listeners. It uses the broadcaster to broadcast the data to all subscribed channels. This method is typically called when an event occurs that needs to be communicated to all listeners. As Broadcast is not blocking it does not block and will not wait for listeners to process the notification.
func (*Listener[T]) WaitForNotificationsProcessed ¶
func (l *Listener[T]) WaitForNotificationsProcessed()
WaitForNotificationsProcessed waits for all notifications to be processed. It blocks until all goroutines that were started to process notifications have completed. This is useful to ensure that all notifications have been handled before proceeding with further operations. It is typically called after calling Listen and Notify to ensure that all processing is complete.
type Retryer ¶
type Retryer struct {
// contains filtered or unexported fields
}
func NewRetryer ¶
NewRetryer creates a new Retryer instance. It initializes the retryer with a function to execute, a sleep duration for retries, and options for retry behavior. It returns a pointer to the new Retryer instance or an error if the options are invalid.
func (*Retryer) Retry ¶
Retry attempts to execute the function up to MaxRetries times. It sleeps for the specified duration between retries. The retry behavior is determined by the RetryBackoff option. If the function returns an error, it will retry according to the specified backoff strategy. If all retries fail, it returns the last error encountered.
The function is executed in a loop until it succeeds or the maximum number of retries is reached If the function succeeds, it returns nil.
The backoff strategies are: - RETRY_BACKOFF_NONE: No backoff, retries immediately. - RETRY_BACKOFF_LINEAR: Increases the sleep duration linearly by the initial delay. - RETRY_BACKOFF_EXPONENTIAL: Doubles the sleep duration after each retry.
type Runner ¶
type Runner struct {
Options *model.Options
Task interface{}
Parameters model.Parameters
// Result channel to return results
ResultsChannel chan []interface{}
ErrorChannel chan error
// contains filtered or unexported fields
}
func NewRunner ¶
func NewRunner(options *model.Options, task interface{}, parameters ...interface{}) (*Runner, error)
NewRunner creates a new Runner instance for the specified task and parameters. It checks if the task and parameters are valid and returns a pointer to the new Runner instance. It returns an error if the task or parameters are invalid.
func NewRunnerFromJob ¶
NewRunnerFromJob creates a new Runner instance from a job. It initializes the runner with the job's task and parameters. It returns a pointer to the new Runner instance or an error if the job is invalid.
func (*Runner) Run ¶
Run executes the task with the provided parameters. It will return results on ResultsChannel and errors on ErrorChannel. If the task panics, it will send the panic value to ErrorChannel. The main intended use of this function is to run the task in a separate goroutine with panic recovery. It uses a context to manage cancellation and timeout. If the context is done, it will cancel the task and return an error. The context's timeout is set based on the OnError options if provided, otherwise it uses a cancelable context.
type Scheduler ¶
type Scheduler struct {
Task interface{}
Parameters model.Parameters
StartTime *time.Time
}
func NewScheduler ¶
func NewScheduler(startTime *time.Time, task interface{}, parameters ...interface{}) (*Scheduler, error)
NewScheduler creates a new Scheduler instance for the specified task and parameters. It checks if the task and parameters are valid and returns a pointer to the new Scheduler instance. It returns an error if the task or parameters are invalid.
func (*Scheduler) Go ¶
Go starts the scheduler to run the task at the specified start time. It creates a new Runner instance and runs the task after the specified duration. If the start time is nil, it runs the task immediately. It uses a context to manage cancellation and timeout. If the context is done, it will cancel the task and return an error. The context's timeout is set based on the OnError options if provided, otherwise it uses a cancelable context.
type Ticker ¶
type Ticker struct {
// contains filtered or unexported fields
}
Ticker represents a recurring task runner.
func NewTicker ¶
func NewTicker(interval time.Duration, task interface{}, parameters ...interface{}) (*Ticker, error)
NewTicker creates and returns a new Ticker instance. It initializes the ticker with a specified interval and a task to run. The task must be valid and compatible with the provided parameters. It returns a pointer to the new Ticker instance or an error if the interval, task or parameters are invalid.
func (*Ticker) Go ¶
Go starts the Ticker. It runs the task at the specified interval until the provided context is cancelled. It uses a ticker to trigger the task execution at the specified interval. If the context is done, it will stop the ticker and return. The task is run in a separate goroutine to avoid blocking the ticker. If the task returns an error, it will log the error. The ticker will continue to run until the context is cancelled or an error occurs.