core

package
v1.45.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broadcaster

type Broadcaster[T any] struct {
	// contains filtered or unexported fields
}

func NewBroadcaster

func NewBroadcaster[T any](name string) *Broadcaster[T]

NewBroadcaster creates a new Broadcaster instance for the specified type T. It initializes a map to hold the listeners. The name parameter is used to identify the broadcaster. It returns a pointer to the new Broadcaster instance.

func (*Broadcaster[T]) Broadcast

func (b *Broadcaster[T]) Broadcast(msg T)

Broadcast sends a message of type T to all subscribed listeners. It logs the number of listeners and the name of the broadcaster. If a listener's channel is full, it skips sending the message to avoid blocking. This is a non-blocking send operation.

func (*Broadcaster[T]) Subscribe

func (b *Broadcaster[T]) Subscribe() chan T

Subscribe adds a new listener channel to the broadcaster. It returns a channel of type T that can be used to receive messages. The channel is buffered (100) to allow for non-blocking sends.

func (*Broadcaster[T]) Unsubscribe

func (b *Broadcaster[T]) Unsubscribe(ch chan T)

Unsubscribe removes a listener channel from the broadcaster. It closes the channel to signal that no more messages will be sent. If the channel does not exist in the listeners map, it does nothing.

type Listener

type Listener[T any] struct {
	// contains filtered or unexported fields
}

func NewListener

func NewListener[T any](broadcaster *Broadcaster[T]) (*Listener[T], error)

NewListener creates a new Listener instance for the specified type T. It initializes a broadcaster and a waitgroup to manage concurrent processing of notifications. It returns a pointer to the new Listener instance.

func (*Listener[T]) Listen

func (l *Listener[T]) Listen(ctx context.Context, ready chan struct{}, notifyFunction func(data T))

Listen listens for notifications on the broadcaster's channel. It takes a context for cancellation, a ready channel to signal readiness, and a notifyFunction that will be called with the data received. The listener will process notifications in a separate goroutine to avoid blocking. If the context is done, it will stop listening and return. The ready channel is closed in the first for loop iteration to signal that the listener is ready.

func (*Listener[T]) Notify

func (l *Listener[T]) Notify(data T)

Notify sends a notification with the provided data to all listeners. It uses the broadcaster to broadcast the data to all subscribed channels. This method is typically called when an event occurs that needs to be communicated to all listeners. As Broadcast is not blocking it does not block and will not wait for listeners to process the notification.

func (*Listener[T]) WaitForNotificationsProcessed

func (l *Listener[T]) WaitForNotificationsProcessed()

WaitForNotificationsProcessed waits for all notifications to be processed. It blocks until all goroutines that were started to process notifications have completed. This is useful to ensure that all notifications have been handled before proceeding with further operations. It is typically called after calling Listen and Notify to ensure that all processing is complete.

type Retryer

type Retryer struct {
	// contains filtered or unexported fields
}

func NewRetryer

func NewRetryer(function func() error, options *model.OnError) (*Retryer, error)

NewRetryer creates a new Retryer instance. It initializes the retryer with a function to execute, a sleep duration for retries, and options for retry behavior. It returns a pointer to the new Retryer instance or an error if the options are invalid.

func (*Retryer) Retry

func (r *Retryer) Retry() (err error)

Retry attempts to execute the function up to MaxRetries times. It sleeps for the specified duration between retries. The retry behavior is determined by the RetryBackoff option. If the function returns an error, it will retry according to the specified backoff strategy. If all retries fail, it returns the last error encountered.

The function is executed in a loop until it succeeds or the maximum number of retries is reached If the function succeeds, it returns nil.

The backoff strategies are: - RETRY_BACKOFF_NONE: No backoff, retries immediately. - RETRY_BACKOFF_LINEAR: Increases the sleep duration linearly by the initial delay. - RETRY_BACKOFF_EXPONENTIAL: Doubles the sleep duration after each retry.

type Runner

type Runner struct {
	Options    *model.Options
	Task       interface{}
	Parameters model.Parameters
	// Result channel to return results
	ResultsChannel chan []interface{}
	ErrorChannel   chan error
	// contains filtered or unexported fields
}

func NewRunner

func NewRunner(options *model.Options, task interface{}, parameters ...interface{}) (*Runner, error)

NewRunner creates a new Runner instance for the specified task and parameters. It checks if the task and parameters are valid and returns a pointer to the new Runner instance. It returns an error if the task or parameters are invalid.

func NewRunnerFromJob

func NewRunnerFromJob(task *model.Task, job *model.Job) (*Runner, error)

NewRunnerFromJob creates a new Runner instance from a job. It initializes the runner with the job's task and parameters. It returns a pointer to the new Runner instance or an error if the job is invalid.

func (*Runner) Cancel

func (r *Runner) Cancel(onCancel ...func())

func (*Runner) Run

func (r *Runner) Run(ctx context.Context)

Run executes the task with the provided parameters. It will return results on ResultsChannel and errors on ErrorChannel. If the task panics, it will send the panic value to ErrorChannel. The main intended use of this function is to run the task in a separate goroutine with panic recovery. It uses a context to manage cancellation and timeout. If the context is done, it will cancel the task and return an error. The context's timeout is set based on the OnError options if provided, otherwise it uses a cancelable context.

type Scheduler

type Scheduler struct {
	Task       interface{}
	Parameters model.Parameters
	StartTime  *time.Time
}

func NewScheduler

func NewScheduler(startTime *time.Time, task interface{}, parameters ...interface{}) (*Scheduler, error)

NewScheduler creates a new Scheduler instance for the specified task and parameters. It checks if the task and parameters are valid and returns a pointer to the new Scheduler instance. It returns an error if the task or parameters are invalid.

func (*Scheduler) Go

func (s *Scheduler) Go(ctx context.Context)

Go starts the scheduler to run the task at the specified start time. It creates a new Runner instance and runs the task after the specified duration. If the start time is nil, it runs the task immediately. It uses a context to manage cancellation and timeout. If the context is done, it will cancel the task and return an error. The context's timeout is set based on the OnError options if provided, otherwise it uses a cancelable context.

type Ticker

type Ticker struct {
	// contains filtered or unexported fields
}

Ticker represents a recurring task runner.

func NewTicker

func NewTicker(interval time.Duration, task interface{}, parameters ...interface{}) (*Ticker, error)

NewTicker creates and returns a new Ticker instance. It initializes the ticker with a specified interval and a task to run. The task must be valid and compatible with the provided parameters. It returns a pointer to the new Ticker instance or an error if the interval, task or parameters are invalid.

func (*Ticker) Go

func (t *Ticker) Go(ctx context.Context)

Go starts the Ticker. It runs the task at the specified interval until the provided context is cancelled. It uses a ticker to trigger the task execution at the specified interval. If the context is done, it will stop the ticker and return. The task is run in a separate goroutine to avoid blocking the ticker. If the task returns an error, it will log the error. The ticker will continue to run until the context is cancelled or an error occurs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL