Documentation
¶
Index ¶
- Constants
- type Action
- type ActionEvent
- type ActionEventResponse
- type ActionEventType
- type ActionPayload
- type ActionType
- type AdminClient
- type BulkPushOpFunc
- type ChildWorkflowOpts
- type Client
- type ClientEventListener
- type ClientOpt
- func InitWorkflows() ClientOpt
- func WithHostPort(host string, port int) ClientOpt
- func WithLogLevel(lvl string) ClientOptdeprecated
- func WithLogger(l *zerolog.Logger) ClientOpt
- func WithNamespace(namespace string) ClientOpt
- func WithSharedMeta(meta map[string]string) ClientOpt
- func WithTenantId(tenantId string) ClientOpt
- func WithToken(token string) ClientOpt
- type ClientOpts
- type CronClient
- type CronOpts
- type DedupeViolationErr
- type DispatcherClient
- type DurableEvent
- type DurableEventHandler
- type DurableEventsListener
- type EventClient
- type EventWithAdditionalMetadata
- type GetActionListenerRequest
- type ListenerStrategy
- type PushOpFunc
- type PutOptFunc
- type RunChildWorkflowsOpts
- type RunHandler
- type RunOptFunc
- type ScheduleClient
- type ScheduleOptFunc
- type ScheduleOpts
- type StreamEvent
- type StreamHandler
- type SubscribeClient
- type WorkerActionListener
- type Workflow
- type WorkflowEvent
- type WorkflowResult
- type WorkflowRun
- type WorkflowRunEvent
- type WorkflowRunEventHandler
- type WorkflowRunsListener
- func (l *WorkflowRunsListener) AddWorkflowRun(workflowRunId, sessionId string, handler WorkflowRunEventHandler) error
- func (l *WorkflowRunsListener) Close() error
- func (l *WorkflowRunsListener) Listen(ctx context.Context) error
- func (l *WorkflowRunsListener) RemoveWorkflowRun(workflowRunId, sessionId string)
Constants ¶
View Source
const ( DefaultActionListenerRetryInterval = 5 * time.Second DefaultActionListenerRetryCount = 5 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Action ¶
type Action struct {
// the worker id
WorkerId string `json:"workerId"`
// the tenant id
TenantId string `json:"tenantId"`
// the workflow run id
WorkflowRunId string `json:"workflowRunId"`
// the get group key run id
GetGroupKeyRunId string `json:"getGroupKeyRunId"`
// the job id
JobId string `json:"jobId"`
// the job name
JobName string `json:"jobName"`
// the job run id
JobRunId string `json:"jobRunId"`
// the step id
StepId string `json:"stepId"`
// the step name
StepName string `json:"stepName"`
// the step run id
StepRunId string `json:"stepRunId"`
// the action id
ActionId string `json:"actionId"`
// the action payload
ActionPayload []byte `json:"actionPayload"`
// the action type
ActionType ActionType `json:"actionType"`
// the count of the retry attempt
RetryCount int32 `json:"retryCount"`
// the additional metadata for the workflow run
AdditionalMetadata map[string]string
// the child index for the workflow run
ChildIndex *int32
// the child key for the workflow run
ChildKey *string
// the parent workflow run id
ParentWorkflowRunId *string
Priority int32 `json:"priority,omitempty"`
WorkflowId *string `json:"workflowId,omitempty"`
WorkflowVersionId *string `json:"workflowVersionId,omitempty"`
}
type ActionEvent ¶
type ActionEvent struct {
*Action
// the event timestamp
EventTimestamp *time.Time
// the step event type
EventType ActionEventType
// The event payload. This must be JSON-compatible as it gets marshalled to a JSON string.
EventPayload interface{}
// If this is an error, whether to retry on failure
ShouldNotRetry *bool
}
type ActionEventResponse ¶
type ActionEventType ¶
type ActionEventType string
const ( ActionEventTypeUnknown ActionEventType = "STEP_EVENT_TYPE_UNKNOWN" ActionEventTypeStarted ActionEventType = "STEP_EVENT_TYPE_STARTED" ActionEventTypeCompleted ActionEventType = "STEP_EVENT_TYPE_COMPLETED" ActionEventTypeFailed ActionEventType = "STEP_EVENT_TYPE_FAILED" )
type ActionPayload ¶
type ActionPayload func(target interface{}) error
ActionPayload unmarshals the action payload into the target. It also validates the resulting target.
type ActionType ¶
type ActionType string
const ( ActionTypeStartStepRun ActionType = "START_STEP_RUN" ActionTypeCancelStepRun ActionType = "CANCEL_STEP_RUN" ActionTypeStartGetGroupKey ActionType = "START_GET_GROUP_KEY" )
type AdminClient ¶
type AdminClient interface {
PutWorkflow(workflow *types.Workflow, opts ...PutOptFunc) error
PutWorkflowV1(workflow *v1contracts.CreateWorkflowVersionRequest, opts ...PutOptFunc) error
ScheduleWorkflow(workflowName string, opts ...ScheduleOptFunc) error
// RunWorkflow triggers a workflow run and returns the run id
RunWorkflow(workflowName string, input interface{}, opts ...RunOptFunc) (*Workflow, error)
BulkRunWorkflow(workflows []*WorkflowRun) ([]string, error)
RunChildWorkflow(workflowName string, input interface{}, opts *ChildWorkflowOpts) (string, error)
RunChildWorkflows(workflows []*RunChildWorkflowsOpts) ([]string, error)
PutRateLimit(key string, opts *types.RateLimitOpts) error
}
type BulkPushOpFunc ¶
type BulkPushOpFunc func(*eventcontracts.BulkPushEventRequest) error
type ChildWorkflowOpts ¶
type Client ¶
type Client interface {
Admin() AdminClient
Cron() CronClient
Schedule() ScheduleClient
Dispatcher() DispatcherClient
Event() EventClient
Subscribe() SubscribeClient
API() *rest.ClientWithResponses
CloudAPI() *cloudrest.ClientWithResponses
Logger() *zerolog.Logger
TenantId() string
Namespace() string
CloudRegisterID() *string
RunnableActions() []string
}
func NewFromConfigFile ¶
func NewFromConfigFile(cf *client.ClientConfigFile, fs ...ClientOpt) (Client, error)
type ClientEventListener ¶
type ClientEventListener interface {
OnWorkflowEvent(ctx context.Context, event *WorkflowEvent) error
}
type ClientOpt ¶
type ClientOpt func(*ClientOpts)
func InitWorkflows ¶
func InitWorkflows() ClientOpt
func WithHostPort ¶
func WithLogLevel
deprecated
func WithLogger ¶
func WithNamespace ¶
func WithSharedMeta ¶
func WithTenantId ¶
type ClientOpts ¶
type ClientOpts struct {
// contains filtered or unexported fields
}
type CronClient ¶
type CronClient interface {
// Create creates a new cron trigger
Create(ctx context.Context, workflow string, opts *CronOpts) (*gen.CronWorkflows, error)
// Delete deletes a cron trigger
Delete(ctx context.Context, id string) error
// List lists all cron triggers
List(ctx context.Context) (*gen.CronWorkflowsList, error)
}
func NewCronClient ¶
func NewCronClient(restClient *rest.ClientWithResponses, l *zerolog.Logger, v validator.Validator, tenantId, namespace string) (CronClient, error)
type CronOpts ¶
type CronOpts struct {
// Name is the user-friendly name for the cron trigger
Name string
// Expression is the cron expression for the trigger
Expression string
// Input is the input to the workflow
Input map[string]interface{}
// AdditionalMetadata is additional metadata to be stored with the cron trigger
AdditionalMetadata map[string]string
// Priority is the priority of the run triggered by the cron
Priority *int32
}
type DedupeViolationErr ¶
type DedupeViolationErr struct {
// contains filtered or unexported fields
}
func (*DedupeViolationErr) Error ¶
func (d *DedupeViolationErr) Error() string
type DispatcherClient ¶
type DispatcherClient interface {
GetActionListener(ctx context.Context, req *GetActionListenerRequest) (WorkerActionListener, *string, error)
SendStepActionEvent(ctx context.Context, in *ActionEvent) (*ActionEventResponse, error)
SendGroupKeyActionEvent(ctx context.Context, in *ActionEvent) (*ActionEventResponse, error)
ReleaseSlot(ctx context.Context, stepRunId string) error
RefreshTimeout(ctx context.Context, stepRunId string, incrementTimeoutBy string) error
UpsertWorkerLabels(ctx context.Context, workerId string, labels map[string]interface{}) error
RegisterDurableEvent(ctx context.Context, req *sharedcontracts.RegisterDurableEventRequest) (*sharedcontracts.RegisterDurableEventResponse, error)
}
type DurableEvent ¶
type DurableEvent *contracts.DurableEvent
type DurableEventHandler ¶
type DurableEventHandler func(e DurableEvent) error
type DurableEventsListener ¶
type DurableEventsListener struct {
// contains filtered or unexported fields
}
func (*DurableEventsListener) AddSignal ¶
func (l *DurableEventsListener) AddSignal( taskId string, signalKey string, handler DurableEventHandler, ) error
func (*DurableEventsListener) Close ¶
func (l *DurableEventsListener) Close() error
type EventClient ¶
type EventClient interface {
Push(ctx context.Context, eventKey string, payload interface{}, options ...PushOpFunc) error
BulkPush(ctx context.Context, payloads []EventWithAdditionalMetadata, options ...BulkPushOpFunc) error
PutLog(ctx context.Context, stepRunId, msg string) error
PutStreamEvent(ctx context.Context, stepRunId string, message []byte) error
}
type ListenerStrategy ¶
type ListenerStrategy string
const ( ListenerStrategyV1 ListenerStrategy = "v1" ListenerStrategyV2 ListenerStrategy = "v2" )
type PushOpFunc ¶
type PushOpFunc func(*pushOpt) error
func WithEventMetadata ¶
func WithEventMetadata(metadata map[string]string) PushOpFunc
func WithEventPriority ¶
func WithEventPriority(priority *int32) PushOpFunc
func WithFilterScope ¶
func WithFilterScope(scope *string) PushOpFunc
type PutOptFunc ¶
type PutOptFunc func(*putOpts)
type RunChildWorkflowsOpts ¶
type RunChildWorkflowsOpts struct {
WorkflowName string
Input interface{}
Opts *ChildWorkflowOpts
}
type RunHandler ¶
type RunHandler func(event WorkflowEvent) error
type RunOptFunc ¶
type RunOptFunc func(*admincontracts.TriggerWorkflowRequest) error
func WithPriority ¶
func WithPriority(priority int32) RunOptFunc
func WithRunMetadata ¶
func WithRunMetadata(metadata interface{}) RunOptFunc
type ScheduleClient ¶
type ScheduleClient interface {
// Create creates a new scheduled workflow run
Create(ctx context.Context, workflow string, opts *ScheduleOpts) (*gen.ScheduledWorkflows, error)
// Delete deletes a scheduled workflow run
Delete(ctx context.Context, id string) error
// List lists all scheduled workflow runs
List(ctx context.Context) (*gen.ScheduledWorkflowsList, error)
}
func NewScheduleClient ¶
func NewScheduleClient(restClient *rest.ClientWithResponses, l *zerolog.Logger, v validator.Validator, tenantId, namespace string) (ScheduleClient, error)
type ScheduleOptFunc ¶
type ScheduleOptFunc func(*scheduleOpts)
func WithInput ¶
func WithInput(input any) ScheduleOptFunc
func WithSchedules ¶
func WithSchedules(schedules ...time.Time) ScheduleOptFunc
type ScheduleOpts ¶
type ScheduleOpts struct {
// TriggerAt is the time at which the scheduled run should be triggered
TriggerAt time.Time
// Input is the input to the workflow
Input map[string]interface{}
// AdditionalMetadata is additional metadata to be stored with the cron trigger
AdditionalMetadata map[string]string
Priority *int32 `json:"priority,omitempty"`
}
type StreamEvent ¶
type StreamEvent struct {
Message []byte
}
type StreamHandler ¶
type StreamHandler func(event StreamEvent) error
type SubscribeClient ¶
type SubscribeClient interface {
On(ctx context.Context, workflowRunId string, handler RunHandler) error
Stream(ctx context.Context, workflowRunId string, handler StreamHandler) error
StreamByAdditionalMetadata(ctx context.Context, key string, value string, handler StreamHandler) error
SubscribeToWorkflowRunEvents(ctx context.Context) (*WorkflowRunsListener, error)
ListenForDurableEvents(ctx context.Context) (*DurableEventsListener, error)
}
type WorkerActionListener ¶
type Workflow ¶
type Workflow struct {
// contains filtered or unexported fields
}
func NewWorkflow ¶
func NewWorkflow( workflowRunId string, listener *WorkflowRunsListener, ) *Workflow
func (*Workflow) Result ¶
func (c *Workflow) Result() (*WorkflowResult, error)
func (*Workflow) WorkflowRunId
deprecated
type WorkflowEvent ¶
type WorkflowEvent *dispatchercontracts.WorkflowEvent
type WorkflowResult ¶
type WorkflowResult struct {
// contains filtered or unexported fields
}
func (*WorkflowResult) Results ¶
func (r *WorkflowResult) Results() (interface{}, error)
func (*WorkflowResult) StepOutput ¶
func (r *WorkflowResult) StepOutput(key string, v interface{}) error
type WorkflowRun ¶
type WorkflowRun struct {
Name string
Input interface{}
Options []RunOptFunc
}
type WorkflowRunEvent ¶
type WorkflowRunEvent *dispatchercontracts.WorkflowRunEvent
type WorkflowRunEventHandler ¶
type WorkflowRunEventHandler func(event WorkflowRunEvent) error
type WorkflowRunsListener ¶
type WorkflowRunsListener struct {
// contains filtered or unexported fields
}
func (*WorkflowRunsListener) AddWorkflowRun ¶
func (l *WorkflowRunsListener) AddWorkflowRun( workflowRunId, sessionId string, handler WorkflowRunEventHandler, ) error
func (*WorkflowRunsListener) Close ¶
func (l *WorkflowRunsListener) Close() error
func (*WorkflowRunsListener) Listen ¶
func (l *WorkflowRunsListener) Listen(ctx context.Context) error
func (*WorkflowRunsListener) RemoveWorkflowRun ¶
func (l *WorkflowRunsListener) RemoveWorkflowRun( workflowRunId, sessionId string, )
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cloud
|
|
|
rest
Package rest provides primitives to interact with the openapi HTTP API.
|
Package rest provides primitives to interact with the openapi HTTP API. |
|
Package rest provides primitives to interact with the openapi HTTP API.
|
Package rest provides primitives to interact with the openapi HTTP API. |
Click to show internal directories.
Click to hide internal directories.