Documentation
¶
Index ¶
- func At(t ...time.Time) scheduled
- func Cron(c string) cron
- func Crons(c ...string) cronArr
- func Event(e string) event
- func Events(events ...string) eventsArr
- func IsNonRetryableError(err error) bool
- func NewNonRetryableError(err error) error
- func NoTrigger() noTrigger
- type Action
- type ActionMap
- type ActionPayload
- type ActionRegistry
- type ActionWithCompute
- type DurableHatchetContext
- type ErrMarshalKeyNotFound
- type GetWorkflowConcurrencyGroupFn
- type HatchetContext
- type HatchetWorkerContext
- type HealthCheckResponse
- type JobRunLookupData
- type ManagedCompute
- type MiddlewareFunc
- type NonRetryableError
- type RateLimit
- type RegisterActionOpt
- type RegisterWebhookWorkerOpts
- type Service
- func (s *Service) Call(verb string) *WorkflowStep
- func (s *Service) On(t triggerConverter, workflow workflowConverter) errordeprecated
- func (s *Service) RegisterAction(fn any, opts ...RegisterActionOpt) error
- func (s *Service) RegisterWorkflow(workflow workflowConverter) error
- func (s *Service) Use(mws ...MiddlewareFunc)
- type SingleWaitResult
- type SpawnWorkflowOpts
- type SpawnWorkflowsOpts
- type Step
- type StepData
- type StepRunData
- type TriggeredBy
- type WaitResult
- type WebhookHandlerOptions
- type WebhookWorkerOpts
- type Worker
- func (w *Worker) Call(action string) *WorkflowStep
- func (w *Worker) ID() *string
- func (w *Worker) Logger() *zerolog.Logger
- func (w *Worker) NewService(name string) *Service
- func (w *Worker) On(t triggerConverter, workflow workflowConverter) errordeprecated
- func (w *Worker) RegisterAction(actionId string, method any) error
- func (w *Worker) RegisterWebhook(ww RegisterWebhookWorkerOpts) error
- func (w *Worker) RegisterWorkflow(workflow workflowConverter) error
- func (w *Worker) RegisterWorkflowV1(workflow *contracts.CreateWorkflowVersionRequest) error
- func (w *Worker) Run(ctx context.Context) error
- func (w *Worker) SetPanicHandler(panicHandler func(ctx HatchetContext, recovered any))
- func (w *Worker) Start() (func() error, error)
- func (w *Worker) StartWebhook(ww WebhookWorkerOpts) (func() error, error)
- func (w *Worker) Use(mws ...MiddlewareFunc)
- func (w *Worker) WebhookHttpHandler(opts WebhookHandlerOptions, workflows ...workflowConverter) http.HandlerFunc
- type WorkerOpt
- func WithClient(client client.Client) WorkerOpt
- func WithErrorAlerter(alerter errors.Alerter) WorkerOpt
- func WithIntegration(integration integrations.Integration) WorkerOpt
- func WithInternalData(actions []string) WorkerOpt
- func WithLabels(labels map[string]interface{}) WorkerOpt
- func WithLogLevel(lvl string) WorkerOpt
- func WithLogger(l *zerolog.Logger) WorkerOpt
- func WithMaxRuns(maxRuns int) WorkerOpt
- func WithName(name string) WorkerOpt
- type WorkerOpts
- type Workflow
- type WorkflowConcurrency
- type WorkflowJob
- type WorkflowStep
- func (w *WorkflowStep) AddParents(parents ...string) *WorkflowStep
- func (w *WorkflowStep) GetActionId(svcName string, index int) string
- func (w *WorkflowStep) GetStepId(index int) string
- func (w *WorkflowStep) SetCompute(compute *compute.Compute) *WorkflowStep
- func (w *WorkflowStep) SetDesiredLabels(labels map[string]*types.DesiredWorkerLabel) *WorkflowStep
- func (w *WorkflowStep) SetName(name string) *WorkflowStep
- func (w *WorkflowStep) SetRateLimit(rateLimit RateLimit) *WorkflowStep
- func (w *WorkflowStep) SetRetries(retries int) *WorkflowStep
- func (w *WorkflowStep) SetRetryBackoffFactor(retryBackoffFactor float32) *WorkflowStep
- func (w *WorkflowStep) SetRetryMaxBackoffSeconds(retryMaxBackoffSeconds int32) *WorkflowStep
- func (w *WorkflowStep) SetTimeout(timeout string) *WorkflowStep
- func (w *WorkflowStep) ToActionMap(svcName string) ActionMap
- func (w *WorkflowStep) ToWorkflow(svcName string, namespace string) types.Workflow
- func (w *WorkflowStep) ToWorkflowStep(svcName string, index int, namespace string) (*Step, error)
- func (w *WorkflowStep) ToWorkflowTrigger() triggerConverter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsNonRetryableError ¶
func NewNonRetryableError ¶
Types ¶
type Action ¶
type Action interface {
// Name returns the name of the action
Name() string
// Run runs the action
Run(args ...any) []any
MethodFn() any
ConcurrencyFn() GetWorkflowConcurrencyGroupFn
// Service returns the service that the action belongs to
Service() string
Compute() *compute.Compute
}
Action is an individual action that can be run by the worker.
type ActionMap ¶
type ActionMap map[string]ActionWithCompute
type ActionPayload ¶
type ActionRegistry ¶
type ActionWithCompute ¶
type ActionWithCompute struct {
// contains filtered or unexported fields
}
type DurableHatchetContext ¶
type DurableHatchetContext interface {
HatchetContext
// SleepFor pauses execution for the specified duration and returns after that time has elapsed.
// Duration is "global" meaning it will wait in real time regardless of transient failures
// like worker restarts.
// Example: "10s" for 10 seconds, "1m" for 1 minute, etc.
SleepFor(duration time.Duration) (*SingleWaitResult, error)
// TODO: docs
WaitForEvent(eventKey, expression string) (*SingleWaitResult, error)
// WaitFor pauses execution until the specified conditions are met.
// Conditions are "global" meaning they will wait in real time regardless of transient failures
// like worker restarts.
WaitFor(conditions condition.Condition) (*WaitResult, error)
}
DurableHatchetContext extends HatchetContext with methods for durable tasks.
func NewDurableHatchetContext ¶
func NewDurableHatchetContext(ctx HatchetContext) DurableHatchetContext
NewDurableHatchetContext creates a DurableHatchetContext from a HatchetContext.
type ErrMarshalKeyNotFound ¶
type ErrMarshalKeyNotFound struct {
Key string
}
func (ErrMarshalKeyNotFound) Error ¶
func (e ErrMarshalKeyNotFound) Error() string
type GetWorkflowConcurrencyGroupFn ¶
type GetWorkflowConcurrencyGroupFn func(ctx HatchetContext) (string, error)
type HatchetContext ¶
type HatchetContext interface {
context.Context
SetContext(ctx context.Context)
GetContext() context.Context
Worker() HatchetWorkerContext
StepOutput(step string, target interface{}) error
TriggerDataKeys() []string
TriggerData(key string, target interface{}) error
StepRunErrors() map[string]string
TriggeredByEvent() bool
WorkflowInput(target interface{}) error
UserData(target interface{}) error
AdditionalMetadata() map[string]string
StepName() string
StepRunId() string
StepId() string
WorkflowRunId() string
WorkflowId() *string
WorkflowVersionId() *string
Log(message string)
StreamEvent(message []byte)
PutStream(message string)
SpawnWorkflow(workflowName string, input any, opts *SpawnWorkflowOpts) (*client.Workflow, error)
SpawnWorkflows(childWorkflows []*SpawnWorkflowsOpts) ([]*client.Workflow, error)
ReleaseSlot() error
RefreshTimeout(incrementTimeoutBy string) error
RetryCount() int
ParentOutput(parent create.NamedTask, output interface{}) error
CurChildIndex() int
IncChildIndex()
Priority() int32
FilterPayload() map[string]interface{}
// contains filtered or unexported methods
}
type HatchetWorkerContext ¶
type HealthCheckResponse ¶
type HealthCheckResponse struct {
Actions []string `json:"actions"`
}
type JobRunLookupData ¶
type JobRunLookupData struct {
Input map[string]interface{} `json:"input"`
TriggeredBy TriggeredBy `json:"triggered_by"`
Steps map[string]StepData `json:"steps,omitempty"`
}
type ManagedCompute ¶
type ManagedCompute struct {
ActionRegistry *ActionRegistry
Client client.Client
MaxRuns int
RuntimeConfigs []rest.CreateManagedWorkerRuntimeConfigRequest
CloudRegisterID *string
Logger *zerolog.Logger
}
func NewManagedCompute ¶
func NewManagedCompute(actionRegistry *ActionRegistry, client client.Client, maxRuns int) *ManagedCompute
func (*ManagedCompute) CloudRegister ¶
func (mc *ManagedCompute) CloudRegister(ctx context.Context)
type MiddlewareFunc ¶
type MiddlewareFunc func(ctx HatchetContext, next func(HatchetContext) error) error
type NonRetryableError ¶
type NonRetryableError struct {
// contains filtered or unexported fields
}
func (*NonRetryableError) Error ¶
func (e *NonRetryableError) Error() string
type RateLimit ¶
type RateLimit struct {
// Key is the rate limit key
Key string `yaml:"key,omitempty"`
KeyExpr *string `yaml:"keyExpr,omitempty"`
// Units is the amount of units this step consumes
Units *int `yaml:"units,omitempty"`
UnitsExpr *string `yaml:"unitsExpr,omitempty"`
LimitValueExpr *string `yaml:"limitValueExpr,omitempty"`
// Duration is the duration of the rate limit
Duration *types.RateLimitDuration `yaml:"duration,omitempty"`
}
type RegisterActionOpt ¶
type RegisterActionOpt func(*registerActionOpts)
func WithActionName ¶
func WithActionName(name string) RegisterActionOpt
func WithCompute ¶
func WithCompute(compute *compute.Compute) RegisterActionOpt
type Service ¶
type Service struct {
Name string
// contains filtered or unexported fields
}
func (*Service) Call ¶
func (s *Service) Call(verb string) *WorkflowStep
func (*Service) RegisterAction ¶
func (s *Service) RegisterAction(fn any, opts ...RegisterActionOpt) error
func (*Service) RegisterWorkflow ¶
func (*Service) Use ¶
func (s *Service) Use(mws ...MiddlewareFunc)
type SingleWaitResult ¶
type SingleWaitResult struct {
*WaitResult
// contains filtered or unexported fields
}
func (*SingleWaitResult) Unmarshal ¶
func (w *SingleWaitResult) Unmarshal(in interface{}) error
type SpawnWorkflowOpts ¶
type SpawnWorkflowsOpts ¶
type StepRunData ¶
type StepRunData struct {
Input map[string]interface{} `json:"input"`
TriggeredBy TriggeredBy `json:"triggered_by"`
Parents map[string]StepData `json:"parents"`
Triggers map[string]map[string]interface{} `json:"triggers,omitempty"`
AdditionalMetadata map[string]string `json:"additional_metadata"`
UserData map[string]interface{} `json:"user_data"`
StepRunErrors map[string]string `json:"step_run_errors,omitempty"`
}
type TriggeredBy ¶
type TriggeredBy string
const ( TriggeredByEvent TriggeredBy = "event" TriggeredByCron TriggeredBy = "cron" TriggeredBySchedule TriggeredBy = "schedule" )
type WaitResult ¶
type WaitResult struct {
// contains filtered or unexported fields
}
func (*WaitResult) Keys ¶
func (w *WaitResult) Keys() []string
func (*WaitResult) Unmarshal ¶
func (w *WaitResult) Unmarshal(key string, in interface{}) error
type WebhookHandlerOptions ¶
type WebhookHandlerOptions struct {
Secret string
}
type WebhookWorkerOpts ¶
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
func (*Worker) Call ¶
func (w *Worker) Call(action string) *WorkflowStep
func (*Worker) NewService ¶
func (*Worker) RegisterAction ¶
RegisterAction can be used to register a single action which can be reused across multiple workflows.
An action should be of the format <service>:<verb>, for example slack:create-channel.
The method must match the following signatures: - func(ctx context.Context) error - func(ctx context.Context, input *Input) error - func(ctx context.Context, input *Input) (*Output, error) - func(ctx context.Context) (*Output, error)
func (*Worker) RegisterWebhook ¶
func (w *Worker) RegisterWebhook(ww RegisterWebhookWorkerOpts) error
func (*Worker) RegisterWorkflow ¶
func (*Worker) RegisterWorkflowV1 ¶
func (w *Worker) RegisterWorkflowV1(workflow *contracts.CreateWorkflowVersionRequest) error
func (*Worker) Run ¶
Run starts the worker in blocking fashion, returning an error if the worker could not be started or if the worker stopped due to a networking issue.
func (*Worker) SetPanicHandler ¶ added in v0.73.0
func (w *Worker) SetPanicHandler(panicHandler func(ctx HatchetContext, recovered any))
func (*Worker) Start ¶
Start starts the worker in non-blocking fashion, returning a cleanup function and an error if the worker could not be started.
func (*Worker) StartWebhook ¶
func (w *Worker) StartWebhook(ww WebhookWorkerOpts) (func() error, error)
FIXME do not expose this to the end-user client somehow
func (*Worker) Use ¶
func (w *Worker) Use(mws ...MiddlewareFunc)
func (*Worker) WebhookHttpHandler ¶
func (w *Worker) WebhookHttpHandler(opts WebhookHandlerOptions, workflows ...workflowConverter) http.HandlerFunc
type WorkerOpt ¶
type WorkerOpt func(*WorkerOpts)
func WithClient ¶
func WithErrorAlerter ¶
func WithIntegration ¶
func WithIntegration(integration integrations.Integration) WorkerOpt
func WithInternalData ¶
func WithLabels ¶
func WithLogLevel ¶
func WithLogger ¶
func WithMaxRuns ¶
type WorkerOpts ¶
type WorkerOpts struct {
// contains filtered or unexported fields
}
type Workflow ¶
type Workflow struct {
Jobs []WorkflowJob
}
type WorkflowConcurrency ¶
type WorkflowConcurrency struct {
// contains filtered or unexported fields
}
func Concurrency ¶
func Concurrency(fn GetWorkflowConcurrencyGroupFn) *WorkflowConcurrency
func Expression ¶
func Expression(expr string) *WorkflowConcurrency
func (*WorkflowConcurrency) LimitStrategy ¶
func (c *WorkflowConcurrency) LimitStrategy(limitStrategy types.WorkflowConcurrencyLimitStrategy) *WorkflowConcurrency
func (*WorkflowConcurrency) MaxRuns ¶
func (c *WorkflowConcurrency) MaxRuns(maxRuns int32) *WorkflowConcurrency
type WorkflowJob ¶
type WorkflowJob struct {
// The name of the job
Name string
Description string
On triggerConverter
Concurrency *WorkflowConcurrency
// The steps that are run in the job
Steps []*WorkflowStep
OnFailure *WorkflowJob
ScheduleTimeout string
StickyStrategy *types.StickyStrategy
}
func (*WorkflowJob) ToActionMap ¶
func (j *WorkflowJob) ToActionMap(svcName string) ActionMap
func (*WorkflowJob) ToWorkflow ¶
func (j *WorkflowJob) ToWorkflow(svcName string, namespace string) types.Workflow
func (*WorkflowJob) ToWorkflowJob ¶
func (j *WorkflowJob) ToWorkflowJob(svcName string, namespace string) (*types.WorkflowJob, error)
func (*WorkflowJob) ToWorkflowTrigger ¶
func (j *WorkflowJob) ToWorkflowTrigger() triggerConverter
type WorkflowStep ¶
type WorkflowStep struct {
// The step timeout
Timeout string
// The executed function
Function any
// The step id/name. If not set, one will be generated from the function name
Name string
// The ids of the parents
Parents []string
Retries int
RetryBackoffFactor *float32
RetryMaxBackoffSeconds *int32
RateLimit []RateLimit
DesiredLabels map[string]*types.DesiredWorkerLabel
Compute *compute.Compute
}
func Fn ¶
func Fn(f any) *WorkflowStep
func (*WorkflowStep) AddParents ¶
func (w *WorkflowStep) AddParents(parents ...string) *WorkflowStep
func (*WorkflowStep) GetActionId ¶
func (w *WorkflowStep) GetActionId(svcName string, index int) string
func (*WorkflowStep) GetStepId ¶
func (w *WorkflowStep) GetStepId(index int) string
func (*WorkflowStep) SetCompute ¶
func (w *WorkflowStep) SetCompute(compute *compute.Compute) *WorkflowStep
func (*WorkflowStep) SetDesiredLabels ¶
func (w *WorkflowStep) SetDesiredLabels(labels map[string]*types.DesiredWorkerLabel) *WorkflowStep
func (*WorkflowStep) SetName ¶
func (w *WorkflowStep) SetName(name string) *WorkflowStep
func (*WorkflowStep) SetRateLimit ¶
func (w *WorkflowStep) SetRateLimit(rateLimit RateLimit) *WorkflowStep
func (*WorkflowStep) SetRetries ¶
func (w *WorkflowStep) SetRetries(retries int) *WorkflowStep
func (*WorkflowStep) SetRetryBackoffFactor ¶
func (w *WorkflowStep) SetRetryBackoffFactor(retryBackoffFactor float32) *WorkflowStep
func (*WorkflowStep) SetRetryMaxBackoffSeconds ¶
func (w *WorkflowStep) SetRetryMaxBackoffSeconds(retryMaxBackoffSeconds int32) *WorkflowStep
func (*WorkflowStep) SetTimeout ¶
func (w *WorkflowStep) SetTimeout(timeout string) *WorkflowStep
func (*WorkflowStep) ToActionMap ¶
func (w *WorkflowStep) ToActionMap(svcName string) ActionMap
func (*WorkflowStep) ToWorkflow ¶
func (w *WorkflowStep) ToWorkflow(svcName string, namespace string) types.Workflow
func (*WorkflowStep) ToWorkflowStep ¶
func (*WorkflowStep) ToWorkflowTrigger ¶
func (w *WorkflowStep) ToWorkflowTrigger() triggerConverter