runner

package
v1.1.0-alpha.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2025 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultInterruptDuration is the default value for the `WithInterruptDuration`
	// for the "regular" runners. This global value can be adjusted if needed.
	DefaultInterruptDuration = 10 * time.Second
	// DefaultGroupInterruptDuration is the default value for the `WithInterruptDuration`
	// for the group runners. This global value can be adjusted if needed.
	DefaultGroupInterruptDuration = 15 * time.Second
)

Functions

This section is empty.

Types

type GroupRunner

type GroupRunner struct {
	// contains filtered or unexported fields
}

GroupRunner represent a group of tasks that need to run together. The expectation is that all the tasks will run at the same time, and when one of them stops, the rest will also stop.

The GroupRunner is intended to be used to run multiple services, which are more or less independent from eachother, but at the same time it doesn't make sense to have any of them stopped while the rest are running. Basically, either all of them run, or none of them. For example, you can have a GRPC and HTTP servers running, each of them providing a piece of functionality, however, if any of them fails, the feature provided by them would be incomplete or broken.

The interrupt duration for the group can be set through the `WithInterruptDuration` option. If the option isn't supplied, the default value (15 secs) will be used.

It's recommended that the timeouts are handled by each runner individually, meaning that each runner's timeout should be less than the group runner's timeout. This way, we can know which runner timed out. If the group timeout is reached, the remaining results will have the runner's id as "_unknown_".

Note that, as services, the task aren't expected to stop by default. This means that, if a task finishes naturally, the rest of the task will asked to stop as well.

func NewGroup

func NewGroup(opts ...Option) *GroupRunner

NewGroup will create a GroupRunner

func (*GroupRunner) Add

func (gr *GroupRunner) Add(r *Runner)

Add will add a runner to the group.

It's mandatory that each runner in the group has an unique id, otherwise there will be issues Adding new runners once the group starts will cause a panic

func (*GroupRunner) Interrupt

func (gr *GroupRunner) Interrupt()

Interrupt will execute the stopper function of ALL the tasks, which should notify the tasks in order for them to finish. The stoppers will be called immediately but sequentially. This means that the second stopper won't be called until the first one has returned. This usually isn't a problem because the service `Stop`'s methods either don't take a long time to return, or they run asynchronously in another goroutine.

As said, this will affect ALL the tasks in the group. It isn't possible to try to stop just one task. If a task has finished, the corresponding stopper won't be called

The interrupt timeout for the group will start after all the runners in the group have been notified. Note that, if the task's stopper for a runner takes a lot of time to return, it will delay the timeout's start, so it's advised that the stopper either returns fast or is run asynchronously.

func (*GroupRunner) Run

func (gr *GroupRunner) Run(ctx context.Context) []*Result

Run will execute all the tasks in the group at the same time.

Similarly to the "regular" runner's `Run` method, the execution thread will be blocked here until all tasks are completed, and their results will be available (each result will have the runner's id so it's easy to find which one failed). Note that there is no guarantee about the result's order, so the first result in the slice might or might not be the first result to be obtained.

When the context is marked as done, the groupRunner will call all the stoppers for each runner to notify each task to stop. Note that the tasks might still take a while to complete.

If a task finishes naturally (with the context still "alive"), it will also cause the groupRunner to call the stoppers of the rest of the tasks. So if a task finishes, the rest will also finish. Note that it is NOT expected for the finished task's stopper to be called in this case.

func (*GroupRunner) RunAsync

func (gr *GroupRunner) RunAsync(ch chan<- *Result)

RunAsync will execute the tasks in the group asynchronously. The result of each task will be placed in the provided channel as soon as it's available. Note that this method will finish as soon as all the tasks are running.

type Option

type Option func(o *Options)

Option defines a single option function.

func WithInterruptDuration

func WithInterruptDuration(val time.Duration) Option

WithInterruptDuration provides a function to set the interrupt duration option.

type Options

type Options struct {
	InterruptDuration time.Duration
}

Options defines the available options for this package.

type Result

type Result struct {
	RunnerID    string
	RunnerError error
}

Result represents the result of a runner. The result contains the provided runner's id (for easier identification in case of multiple results) and the runner's error, which is the result of the Runable function (might be nil if no error happened)

type Runable

type Runable func() error

Runable represent a task that can be executed by the Runner. It expected to be a long running task with an indefinite execution time, so it's suitable for servers or services. The task can eventually return an error, or nil if the execution finishes without errors

type Runner

type Runner struct {
	ID string
	// contains filtered or unexported fields
}

Runner represents the one executing a long running task, such as a server or a service. The ID of the runner is public to make identification easier, and the Result that it will generated will contain the same ID, so we can know which runner provided which result.

Runners are intended to be used only once. Reusing them isn't possible. You'd need to create a new runner if you want to rerun the same task.

func New

func New(id string, fn Runable, interrupt Stopper, opts ...Option) *Runner

New will create a new runner. The runner will be created with the provided id (the id must be unique, otherwise undefined behavior might occur), and will run the provided runable task, using the "interrupt" function to stop that task if needed.

The interrupt duration, which can be set through the `WithInterruptDuration` option, will be used to ensure the runner doesn't block forever. If the option isn't supplied, the default value (10 secs) will be used. The interrupt duration will be used to start a timeout when the runner gets interrupted (either the context of the `Run` method is done or this runner's `Interrupt` method is called). If the timeout is reached, a timeout result will be returned instead of whatever result the task should be returning.

Note that it's your responsibility to provide a proper stopper for the task. The runner will just call that method assuming it will be enough to eventually stop the task at some point.

func (*Runner) Finished

func (r *Runner) Finished() <-chan struct{}

Finished will return a receive-only channel that can be used to know when the task has finished but the result hasn't been made available yet. The channel will be closed (without sending any message) when the task has finished. This can be used specially with the `RunAsync` method when multiple runners use the same channel: results could be waiting on your side of the channel

func (*Runner) Interrupt

func (r *Runner) Interrupt()

Interrupt will execute the stopper function, which should notify the task in order for it to finish. The stopper will be called immediately, although it's expected the consequences to take a while (task might need a while to stop) A timeout will start using the provided "interrupt duration". Once that timeout is reached, the task must provide a result with a timeout error. Note that, even after returning the timeout result, the task could still be being executed and consuming resource. This method will be called only once. Further calls won't do anything

func (*Runner) Run

func (r *Runner) Run(ctx context.Context) *Result

Run will execute the task associated to this runner in a synchronous way. The task will be spawned in a new goroutine, and the current thread will wait until the task finishes.

The task will finish "naturally". The stopper will be called in the following ways: - Manually calling this runner's `Interrupt` method - When the provided context is done As said, it's expected that calling the provided stopper will be enough to make the task to eventually complete.

Once the task finishes, the result will be returned. When the context is done, or if the runner is interrupted, a timeout will start using the provided "interrupt duration". If this timeout is reached, a timeout result will be returned instead of the one from the task. This is intended to prevent blocking the main thread indefinitely. A suitable duration should be used depending on the task, usually 5, 10 or 30 secs

Some nice things you can do: - Use signal.NotifyContext(...) to call the stopper and provide a clean shutdown procedure when an OS signal is received - Use context.WithDeadline(...) or context.WithTimeout(...) to run the task for a limited time

func (*Runner) RunAsync

func (r *Runner) RunAsync(ch chan<- *Result)

RunAsync will execute the task associated to this runner asynchronously. The task will be spawned in a new goroutine and this method will finish. The task's result will be written in the provided channel when it's available, so you can wait for it if needed. It's up to you to decide to use a blocking or non-blocking channel, but the task will always finish before writing in the channel.

To interrupt the running task, the only option is to call the `Interrupt` method at some point.

type Stopper

type Stopper func()

Stopper represent a function that will stop the Runable. The stopper acts as a notification to the runable to know that the task needs to be finished now.

The stopper won't need to crash the runable or force the runable to stop, instead, it will let the runable to know it has to stop and let it finish. This means that the runable might still run for a while.

It's recommended the stopper to run asynchronously. This means that the stopper might need to spawn a goroutine. The intention is avoid blocking the running thread.

Usually, the stoppers are the servers's `Shutdown()` or `Close()` methods, that will cause the server to start its shutdown procedure. As said, there is no need to force the shutdown, so graceful shutdowns are preferred if they're available

type TimeoutError

type TimeoutError struct {
	RunnerID string
	Duration time.Duration
}

TimeoutError is an error that should be used for timeouts. It implements the `error` interface

func NewGroupTimeoutError

func NewGroupTimeoutError(duration time.Duration) *TimeoutError

NewGroupTimeoutError creates a new timeout error. This is intended to be used for group runners when the timeout of the group is reached. The runner id will be set to "_unknown_" because we don't know which is the id of the missing runner.

func NewTimeoutError

func NewTimeoutError(runnerID string, duration time.Duration) *TimeoutError

NewTimeoutError creates a new timeout error. Both runnerID and duration will be used in the error message

func (*TimeoutError) Error

func (te *TimeoutError) Error() string

Error generates the message for this particular error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL