rt

package
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2021 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MsgTypeReactrJobErr    = "reactr.joberr" // any kind of error from a job run
	MsgTypeReactrRunErr    = "reactr.runerr" // specifically a RunErr returned from a Wasm Runnable
	MsgTypeReactrResult    = "reactr.result"
	MsgTypeReactrNilResult = "reactr.nil"
)

MsgTypeReactrJobErr and others are Grav message types used for Reactr job

Variables

View Source
var ErrCacheKeyNotFound = errors.New("key not found")

ErrCacheKeyNotFound is returned when a non-existent cache key is requested

View Source
var (
	ErrJobNotFound = errors.New("job not found in storage")
)

ErrJobNotFound and others are storage realated errors

View Source
var (
	ErrJobTimeout = errors.New("job timeout")
)

ErrJobTimeout and others are errors related to workers

Functions

This section is empty.

Types

type Cache

type Cache interface {
	Set(key string, val []byte, ttl int) error
	Get(key string) ([]byte, error)
	Delete(key string) error
}

Cache represents access to a persistent cache

type ChangeEvent

type ChangeEvent int

ChangeEvent represents a change relevant to a worker

const (
	ChangeTypeStart ChangeEvent = iota
)

ChangeTypeStart and others represent types of changes

type Ctx

type Ctx struct {
	Cache Cache
	// contains filtered or unexported fields
}

Ctx is a Job context

func (*Ctx) Do

func (c *Ctx) Do(job Job) *Result

Do runs a new job

type DoFunc

type DoFunc func(Job) *Result

DoFunc describes a function to schedule work

type Group

type Group struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Group represents a group of job results

func NewGroup

func NewGroup() *Group

NewGroup creates a new Group

func (*Group) Add

func (g *Group) Add(result *Result)

Add adds a job result to the group

func (*Group) Wait

func (g *Group) Wait() error

Wait waits for all results to come in and returns an error if any arise

type Job

type Job struct {
	JobReference
	// contains filtered or unexported fields
}

Job describes a job to be done

func NewJob

func NewJob(jobType string, data interface{}) Job

NewJob creates a new job

func (Job) Bytes

func (j Job) Bytes() []byte

Bytes returns the []byte value of the job's data

func (Job) Data

func (j Job) Data() interface{}

Data returns the "raw" data for the job

func (Job) Int

func (j Job) Int() int

Int returns the int value of the job's data

func (Job) Reference

func (j Job) Reference() JobReference

Reference returns a reference to the Job

func (Job) String

func (j Job) String() string

String returns the string value of a job's data

func (Job) Unmarshal

func (j Job) Unmarshal(target interface{}) error

Unmarshal unmarshals the job's data into a struct

type JobFunc

type JobFunc func(interface{}) *Result

JobFunc is a function that runs a job of a predetermined type

type JobReference

type JobReference struct {
	// contains filtered or unexported fields
}

JobReference is a lightweight reference to a Job

func (JobReference) UUID

func (j JobReference) UUID() string

UUID returns the Job's UUID

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage is the default in-memory storage driver for Reactr

func (*MemoryStorage) Add

func (m *MemoryStorage) Add(job Job) error

Add adds a Job to storage

func (*MemoryStorage) AddResult

func (m *MemoryStorage) AddResult(uuid string, data interface{}, err error) error

AddResult adds a Job result to storage

func (*MemoryStorage) Get

func (m *MemoryStorage) Get(uuid string) (Job, error)

Get loads a Job and any of its results from storage

func (*MemoryStorage) Remove

func (m *MemoryStorage) Remove(uuid string) error

Remove removes a Job and its data from storage

type Option

type Option func(workerOpts) workerOpts

Option is a function that modifies workerOpts

func MaxRetries

func MaxRetries(count int) Option

MaxRetries returns an Option to set the worker maximum retry count

func PoolSize

func PoolSize(size int) Option

PoolSize returns an Option to set the worker pool size

func PreWarm

func PreWarm() Option

PreWarm sets the worker to pre-warm itself to minimize cold start time. if not enabled, worker will "warm up" when it receives its first job.

func RetrySeconds

func RetrySeconds(secs int) Option

RetrySeconds returns an Option to set the worker retry seconds

func TimeoutSeconds

func TimeoutSeconds(timeout int) Option

TimeoutSeconds returns an Option with the job timeout seconds set

type Reactr

type Reactr struct {
	// contains filtered or unexported fields
}

Reactr represents the main control object

func New

func New() *Reactr

New returns a Reactr ready to accept Jobs

func (*Reactr) Do

func (h *Reactr) Do(job Job) *Result

Do schedules a job to be worked on and returns a result object

func (*Reactr) Handle

func (h *Reactr) Handle(jobType string, runner Runnable, options ...Option) JobFunc

Handle registers a Runnable with the Reactr and returns a shortcut function to run those jobs

func (*Reactr) HandleMsg

func (h *Reactr) HandleMsg(pod *grav.Pod, msgType string, runner Runnable, options ...Option)

HandleMsg registers a Runnable with the Reactr and triggers that job whenever the provided Grav pod receives a message of a particular type.

func (*Reactr) Job

func (h *Reactr) Job(jobType string, data interface{}) Job

Job is a shorter alias for NewJob

func (*Reactr) Listen

func (h *Reactr) Listen(pod *grav.Pod, msgType string)

Listen causes Reactr to listen for messages of the given type and trigger the job of the same type. The message's data is passed to the runnable as the job data. The job's result is then emitted as a message. If an error occurs, it is logged and an error is sent. If the result is nil, nothing is sent.

func (*Reactr) Schedule added in v0.7.0

func (h *Reactr) Schedule(s Schedule)

Schedule adds a new Schedule to the instance, Reactr will 'watch' the Schedule and Do any jobs when the Schedule indicates it's needed

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result describes a result

func (*Result) Discard

func (r *Result) Discard()

Discard returns immediately and discards the eventual results and thus prevents the memory from hanging around

func (*Result) Then

func (r *Result) Then() (interface{}, error)

Then returns the result or error from a Result

func (*Result) ThenDo

func (r *Result) ThenDo(do ResultFunc)

ThenDo accepts a callback function to be called asynchronously when the result completes.

func (*Result) ThenInt

func (r *Result) ThenInt() (int, error)

ThenInt returns the result or error from a Result

func (*Result) ThenJSON

func (r *Result) ThenJSON(out interface{}) error

ThenJSON unmarshals the result or returns the error from a Result

func (*Result) UUID

func (r *Result) UUID() string

UUID returns the result/job's UUID

type ResultFunc

type ResultFunc func(interface{}, error)

ResultFunc is a result callback function.

type RunErr added in v0.8.0

type RunErr struct {
	Code    int    `json:"code"`
	Message string `json:"message"`
}

RunErr represents an error returned from a Wasm Runnable it lives in the rt package to avoid import cycles

func (RunErr) Error added in v0.8.0

func (r RunErr) Error() string

Error returns the stringified JSON representation of the error

func (RunErr) ToVKErr added in v0.8.0

func (r RunErr) ToVKErr() vk.Error

ToVKErr converts a RunErr to a VKError

type Runnable

type Runnable interface {
	// Run is the entrypoint for jobs handled by a Runnable
	Run(Job, *Ctx) (interface{}, error)

	// OnChange is called when the worker using the Runnable instance is going to change.
	// OnChange will be called for things like startup and shutdown.
	OnChange(ChangeEvent) error
}

Runnable describes something that is runnable

type Schedule added in v0.7.0

type Schedule interface {
	Check() *Job
	Done() bool
}

Schedule is a type that returns an *optional* job if there is something that should be scheduled. Reactr will poll the Check() method at regular intervals to see if work is available.

func After added in v0.7.0

func After(seconds int, jobFunc func() Job) Schedule

After returns a schedule that will schedule the job provided by jobFunc one time x seconds after creation

func Every added in v0.7.0

func Every(seconds int, jobFunc func() Job) Schedule

Every returns a Schedule that will schedule the job provided by jobFunc every x seconds

type Storage

type Storage interface {
	Add(Job) error
	AddResult(string, interface{}, error) error
	Get(string) (Job, error)
	Remove(string) error
}

Storage represents a storage driver for Reactr

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL