srv

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2023 License: Apache-2.0 Imports: 13 Imported by: 12

Documentation

Overview

Package srv provides a framework and toolkit for service orchestration.

srv contains three basic components: a Service type for managing the lifecylce of specific services, an Orchestrator for process-level management of services, and a collection of tools

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrServiceAlreadyStarted = errors.New("service already started")
	ErrServiceReturned       = errors.New("service returned")
	ErrServiceNotStarted     = errors.New("service not started")
)

Functions

func GetBaseContext added in v0.6.0

func GetBaseContext(ctx context.Context) context.Context

GetBaseContext gets a base context attached with SetBaseContext from the current context. If the base context is not attached, GetBaseContext panics with a fun.ErrInvariantViolation error.

Use this context to start background services that should respect global shutdown and have access to the process' context, in the scope of a request that has a context that will be canceled early.

func GetShutdown added in v0.6.0

func GetShutdown(ctx context.Context) context.CancelFunc

GetShutdown returns the previously attached cancellation function (via SetShutdown) for this context chain. To cancel all contexts that descend from the context created by SetShutdown callers must call the cancelfunction returned by GetShutdown.

If a shutdown function was not set, GetShutdown panics with a fun.ErrInvariantViolation error.

func RunWait added in v0.6.3

func RunWait(s *Service) fun.WaitFunc

RunWait produces a fun.WaitFunc that runs the service, ignoring any error from the Service.

func RunWaitCollect added in v0.6.3

func RunWaitCollect(ec *erc.Collector, s *Service) fun.WaitFunc

RunWaitCollect produces a fun.WaitFunc that runs the service and adds any errors produced to the provided collector.

func RunWaitObserve added in v0.6.3

func RunWaitObserve(observe func(error), s *Service) fun.WaitFunc

RunWaitObserve returns a fun.WaitFunc that runs the service, passing the error that the Service's Wait() method to the observer.

func SetBaseContext added in v0.6.0

func SetBaseContext(ctx context.Context) context.Context

SetBaseContext attaches the current context as an accessible value from the returned context. Once you attach core services to a base context (e.g. loggers, orchestrators, etc.) call SetBaseContext to make that context accessible later. This base context is useful for starting background services or dispatching other asynchronous work in the context of request driven work which can be canceled early.

Because context's values work like a stack, it's possible to attach more than one base to a single context chain, which would be very confusing and callers should avoid doing this.

func SetOrchestrator added in v0.6.0

func SetOrchestrator(ctx context.Context, or *Orchestrator) context.Context

SetOrchestrator attaches an orchestrator to a context.

func SetShutdown added in v0.6.0

func SetShutdown(ctx context.Context) context.Context

SetShutdown attaches a context.CancelFunc for the current context to that context, which can be accesed with the GetShutdown function to make it possible to trigger a safe and clean shutdown in functions that have access to the context but that.

Because context's values work like a stack, it's possible to attach more than one shutdown to a single context chain, which would be very confusing and callers should avoid doing this.

func WithOrchestrator added in v0.6.0

func WithOrchestrator(ctx context.Context) context.Context

WithOrchestrator creates a new *Orchestrator, starts the associated service, and attaches it to the returned context. You should also, wait on the orchestrator's service to return before your process exits, as in:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = srv.WithOrchestrator(ctx)
defer srv.GetOrchestrator(ctx).Service().Wait()

There are two flaws with this example: nothing calls cancel on the orchestrators context, and nothing observes the error from Wait(). The base context passed to the orchestrator could easily be a singal.NotifyContext() so that the context is eventually canceled, or the caller should call cancel explicitly. The alternate implementation, that resolves these issues:

ctx, cancel := signal.NotifyContext(context.Background(), )
defer cancel()
ctx = srv.WithOrchestrator(ctx)
defer fun.InvariantCheck(srv.GetOrchestrator(ctx).Service().Wait)

InvariantCheck will run Wait at defer time, and raise an ErrInvariantViolation panic with the contents of Wait's error.

Types

type Orchestrator

type Orchestrator struct {
	// Name is used in the string format of the Orchestrator
	// object and also propagated to the name of services.
	Name string
	// contains filtered or unexported fields
}

Orchestrator manages groups of services and makes it possible to add services to a running system, but with coordinated shutdown mechanisms of normal services.

Use the Service() method to generate a service that will wait for new services to added to the orchestrator and start them (if needed.)

func GetOrchestrator added in v0.6.0

func GetOrchestrator(ctx context.Context) *Orchestrator

GetOrchestrator resolves the Orchestrator attached to the context or panics if there is no orchestrator attached to the context.

func (*Orchestrator) Add

func (or *Orchestrator) Add(s *Service) error

Add sends a *Service to the orchestrator. If the service is running, it will be started after all of the other services have started. There is no limit on the number of services an orchestrator can manage, and you can add services before starting the orchestrator's service. Services are started in the order they're added.

Nil services are ignored without an error.

Once the orchestrator's service is closed, or its context is canceled, all successive calls to Add will return an error.

func (*Orchestrator) Service

func (or *Orchestrator) Service() *Service

Service returns a service that runs all of the constituent services of the orchestrator. The service must be started by the caller.

Constituent Services are started in the order they are passed to Add.

The orchestrator's service is blocking and will wait until it's closed or its context is canceled.

When called more than once, Service will return the same object. However, if you call Add or Service to an orchestrator whose Service has already completed and return, the Orchestrator will reset and a new service will be created. The new service must be started, but this can allow the orchestrator to drop all references to completed services that would otherwise remain.

func (*Orchestrator) Start added in v0.6.5

func (or *Orchestrator) Start(ctx context.Context) error

Start is a convenience function that run's the service's start function.

func (*Orchestrator) String added in v0.6.0

func (or *Orchestrator) String() string

String implements fmt.Stringer and returns the type name and

func (*Orchestrator) Wait added in v0.6.5

func (or *Orchestrator) Wait() error

Wait is a convenience function that blocks until the Orchestrator's service completes.

type Service

type Service struct {
	// Name is a human-readable name for the service, and is used
	// in the String() method, and to annotate errors.
	Name string

	// Run is a blocking function that does the main action of the
	// service. When the Run function returns the shutdown
	// function is called if it exists. The error produced by the
	// Run function is propagated to callers via the results of
	// the Wait method. Panics in the Run method are converted to
	// errors and propagated to the Wait method.
	//
	// Implementations are responsible for returning promptly when
	// the context is canceled.
	Run func(context.Context) error

	// Cleanup is optional, but if defined is always called after the
	// Run function returned. Cleanup operations should all return
	// relatively quickly and be used for releasing state rather
	// than doing potentially blocking work.
	Cleanup func() error

	// Shutdown is optional, but provides a hook that
	// implementations can be used to trigger a shutdown when the
	// context passed to Start is canceled but before the Run
	// function returns. The shutdown function, when defined,
	// must return before the Cleanup function runs.
	Shutdown func() error
	// contains filtered or unexported fields
}

Service defines a background operation. The behavior of the service is defined by the Run, Shutdown, and Cleanup functions defined in the Service structure, with the service lifecycle managed by the context passed to start.

Applications often consist of a number of services, and the Group function amalgamates a number of services into a single service, while the Orchestrator provides a more dynamic mechanism for managing services during an application's lifetime.

There are no special construction requirements for Services and implementations must define Run methods, with the other options optional. The service can only be run once; however, callers should avoid mutating the service after calling Start.

func Broker added in v0.6.6

func Broker[T any](broker *pubsub.Broker[T]) *Service

Broker creates a Service implementation that wraps a pubsub.Broker[T] implementation for integration into service orchestration frameworks.

func Group

func Group(services fun.Iterator[*Service]) *Service

Group makes it possible to have a collection of services, provided via an iterator, that behave as a single service. All services are started concurrently (and without order) and shutdown concurrently (and without order). The Service produced by group, has the same semantics as any other Service.

Use Group(itertool.Slice([]*Service)) to produce a group from a slice of *Services,

func HTTP

func HTTP(name string, shutdownTimeout time.Duration, hs *http.Server) *Service

HTTP wraps an http.Server object in a Service, both for common use and convenience, and as an example for implementing arbitrary services.

If the http.Server's BaseContext method is not set, it is overridden with a function that provides a copy of the context passed to the service's Start method: this ensures that requests have access to the same underlying context as the service.

func ProcessIterator added in v0.6.6

func ProcessIterator[T any](
	iter fun.Iterator[T],
	mapper func(context.Context, T) error,
	opts itertool.Options,
) *Service

ProcessIterator runs an itertool.ParallelForEach operation as a *Service. For a long running service, use an iterator that is blocking (e.g. based on a pubsub queue/deque or a channel.)

func Wait added in v0.6.3

func Wait(iter fun.Iterator[fun.WaitFunc]) *Service

Wait creates a service that runs until *both* all wait functions have returned *and* the iterator is exhausted. The Service's wait function returns an error that aggregates all errors encountered by the constituent wait functions.

Wait produces a service that fills the same role as the fun.WaitMerge function, but that can be more easily integrated into existing orchestration tools.

When the service returns all worker Goroutines will have returned.

func (*Service) Close

func (s *Service) Close()

Close forceably shuts down the service, causing the background process to terminate and wait to return. If the service hasn't started or isn't running this has no effect. Close is safe to call multiple times.

func (*Service) Running

func (s *Service) Running() bool

Running returns true as soon as start returns, and returns false after close is called.

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start launches the configured service and tracks its lifecycle. If the context is canceled, the service returns, and any errors collected during the service's lifecycle are returned by the close (or wait) methods.

If the service is running or is finished, Start returns the appropriate sentinel error.

func (*Service) String

func (s *Service) String() string

String implements fmt.Stringer and includes value of s.Name.

func (*Service) Wait

func (s *Service) Wait() error

Wait blocks until the service returns or the service's start context is canceled. If the service hasn't been started Wait returns ErrServiceNotStarted. It is safe to call wait multiple times. While wait does not accept a context, it respects the context passed to start. Wait returns nil if the service's background operation encountered no errors, and otherwise returns an erc.ErrorStack object that contains the errors encountered when running the service, the shutdown hook, and any panics encountered during the service's execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL