rt

package
v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2021 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MsgTypeReactrJobErr    = "reactr.joberr" // any kind of error from a job run
	MsgTypeReactrRunErr    = "reactr.runerr" // specifically a RunErr returned from a Wasm Runnable
	MsgTypeReactrResult    = "reactr.result"
	MsgTypeReactrNilResult = "reactr.nil"
)

MsgTypeReactrJobErr and others are Grav message types used for Reactr job

Variables

View Source
var ErrCapabilityNotAvailable = errors.New("capability not available")
View Source
var (
	ErrJobTimeout = errors.New("job timeout")
)

ErrJobTimeout and others are errors related to workers

Functions

This section is empty.

Types

type Capabilities added in v0.11.0

type Capabilities struct {
	Auth          rcap.AuthCapability
	LoggerSource  rcap.LoggerCapability
	HTTPClient    rcap.HTTPCapability
	GraphQLClient rcap.GraphQLCapability
	FileSource    rcap.FileCapability
	Cache         rcap.CacheCapability

	// RequestHandler and doFunc are special because they are more
	// sensitive; they could cause memory leaks or expose internal state,
	// so they cannot be swapped out for a different implementation.
	RequestHandler rcap.RequestHandlerCapability
	// contains filtered or unexported fields
}

Capabilities define the capabilities available to a Runnable

func CapabilitiesFromConfig added in v0.11.1

func CapabilitiesFromConfig(config rcap.CapabilityConfig) Capabilities

func DefaultCapabilities added in v0.11.1

func DefaultCapabilities(logger *vlog.Logger) Capabilities

DefaultCapabilities returns the default capabilities with the provided Logger

func (Capabilities) Config added in v0.11.1

func (c Capabilities) Config() rcap.CapabilityConfig

Config returns the configuration that was used to create the Capabilities the config cannot be changed, but it can be used to determine what was previously set so that the orginal config (like enabled settings) can be respected

type ChangeEvent

type ChangeEvent int

ChangeEvent represents a change relevant to a worker

const (
	ChangeTypeStart ChangeEvent = iota
)

ChangeTypeStart and others represent types of changes

type Ctx

type Ctx struct {
	*Capabilities
}

Ctx is a Job context

func (*Ctx) Do

func (c *Ctx) Do(job Job) *Result

Do runs a new job

func (*Ctx) UseRequest added in v0.11.0

func (c *Ctx) UseRequest(req *request.CoordinatedRequest)

UseRequest sets a CoordinatedRequest to be used by the capabilities

type DoFunc

type DoFunc func(Job) *Result

DoFunc describes a function to schedule work

type Group

type Group struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Group represents a group of job results

func NewGroup

func NewGroup() *Group

NewGroup creates a new Group

func (*Group) Add

func (g *Group) Add(result *Result)

Add adds a job result to the group

func (*Group) Wait

func (g *Group) Wait() error

Wait waits for all results to come in and returns an error if any arise

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job describes a job to be done

func NewJob

func NewJob(jobType string, data interface{}) Job

NewJob creates a new job

func (Job) Bytes

func (j Job) Bytes() []byte

Bytes returns the []byte value of the job's data

func (Job) Data

func (j Job) Data() interface{}

Data returns the "raw" data for the job

func (Job) Int

func (j Job) Int() int

Int returns the int value of the job's data

func (Job) Req added in v0.11.0

func (j Job) Req() *request.CoordinatedRequest

Req returns the Coordinated request attached to the Job

func (Job) String

func (j Job) String() string

String returns the string value of a job's data

func (Job) UUID added in v0.10.0

func (j Job) UUID() string

func (Job) Unmarshal

func (j Job) Unmarshal(target interface{}) error

Unmarshal unmarshals the job's data into a struct

type JobFunc

type JobFunc func(interface{}) *Result

JobFunc is a function that runs a job of a predetermined type

type Option

type Option func(workerOpts) workerOpts

Option is a function that modifies workerOpts

func MaxRetries

func MaxRetries(count int) Option

MaxRetries returns an Option to set the worker maximum retry count

func PoolSize

func PoolSize(size int) Option

PoolSize returns an Option to set the worker pool size

func PreWarm

func PreWarm() Option

PreWarm sets the worker to pre-warm itself to minimize cold start time. if not enabled, worker will "warm up" when it receives its first job.

func RetrySeconds

func RetrySeconds(secs int) Option

RetrySeconds returns an Option to set the worker retry seconds

func TimeoutSeconds

func TimeoutSeconds(timeout int) Option

TimeoutSeconds returns an Option with the job timeout seconds set

type Reactr

type Reactr struct {
	// contains filtered or unexported fields
}

Reactr represents the main control object

func New

func New() *Reactr

New returns a Reactr ready to accept Jobs

func NewWithConfig added in v0.11.1

func NewWithConfig(config rcap.CapabilityConfig) *Reactr

NewWithConfig returns a Reactr with custom capability config

func (*Reactr) DefaultCaps added in v0.11.0

func (r *Reactr) DefaultCaps() Capabilities

DefaultCaps returns this instance's Capabilities object

func (*Reactr) Do

func (r *Reactr) Do(job Job) *Result

Do schedules a job to be worked on and returns a result object

func (*Reactr) DoWithCaps added in v0.11.0

func (r *Reactr) DoWithCaps(job Job, caps Capabilities) *Result

DoWithCaps schedules a job with a custom Capabilities set use Do() to use the default capability set for this job's worker

func (*Reactr) IsRegistered added in v0.10.0

func (r *Reactr) IsRegistered(jobType string) bool

IsRegistered returns true if the instance has a worker registered for the given jobType

func (*Reactr) Job

func (r *Reactr) Job(jobType string, data interface{}) Job

Job is a shorter alias for NewJob

func (*Reactr) Listen

func (r *Reactr) Listen(pod *grav.Pod, msgType string)

Listen causes Reactr to listen for messages of the given type and trigger the job of the same type. The message's data is passed to the runnable as the job data. The job's result is then emitted as a message. If an error occurs, it is logged and an error is sent. If the result is nil, nothing is sent.

func (*Reactr) Register added in v0.10.0

func (r *Reactr) Register(jobType string, runner Runnable, options ...Option) JobFunc

Register registers a Runnable with the Reactr and returns a shortcut function to run those jobs

func (*Reactr) RegisterWithCaps added in v0.11.0

func (r *Reactr) RegisterWithCaps(jobType string, runner Runnable, caps Capabilities, options ...Option)

RegisterWithCaps registers a Runnable with the provided Capabilities when building your capabilites, you should call r.DefaultCaps() and then copy individual capability objects so that they remain shared with other workers

func (*Reactr) Schedule added in v0.7.0

func (r *Reactr) Schedule(s Schedule)

Schedule adds a new Schedule to the instance, Reactr will 'watch' the Schedule and Do any jobs when the Schedule indicates it's needed

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result describes a result

func (*Result) Discard

func (r *Result) Discard()

Discard returns immediately and discards the eventual results and thus prevents the memory from hanging around

func (*Result) Then

func (r *Result) Then() (interface{}, error)

Then returns the result or error from a Result

func (*Result) ThenDo

func (r *Result) ThenDo(do ResultFunc)

ThenDo accepts a callback function to be called asynchronously when the result completes.

func (*Result) ThenInt

func (r *Result) ThenInt() (int, error)

ThenInt returns the result or error from a Result

func (*Result) ThenJSON

func (r *Result) ThenJSON(out interface{}) error

ThenJSON unmarshals the result or returns the error from a Result

func (*Result) UUID

func (r *Result) UUID() string

UUID returns the result/job's UUID

type ResultFunc

type ResultFunc func(interface{}, error)

ResultFunc is a result callback function.

type RunErr added in v0.8.0

type RunErr struct {
	Code    int    `json:"code"`
	Message string `json:"message"`
}

RunErr represents an error returned from a Wasm Runnable it lives in the rt package to avoid import cycles

func (RunErr) Error added in v0.8.0

func (r RunErr) Error() string

Error returns the stringified JSON representation of the error

func (RunErr) ToVKErr added in v0.8.0

func (r RunErr) ToVKErr() vk.Error

ToVKErr converts a RunErr to a VKError

type Runnable

type Runnable interface {
	// Run is the entrypoint for jobs handled by a Runnable
	Run(Job, *Ctx) (interface{}, error)

	// OnChange is called when the worker using the Runnable instance is going to change.
	// OnChange will be called for things like startup and shutdown.
	OnChange(ChangeEvent) error
}

Runnable describes something that is runnable

type Schedule added in v0.7.0

type Schedule interface {
	Check() *Job
	Done() bool
}

Schedule is a type that returns an *optional* job if there is something that should be scheduled. Reactr will poll the Check() method at regular intervals to see if work is available.

func After added in v0.7.0

func After(seconds int, jobFunc func() Job) Schedule

After returns a schedule that will schedule the job provided by jobFunc one time x seconds after creation

func Every added in v0.7.0

func Every(seconds int, jobFunc func() Job) Schedule

Every returns a Schedule that will schedule the job provided by jobFunc every x seconds

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL