v1

package
v0.73.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2025 License: MIT Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const MAX_TENANT_RATE_LIMITS = 10000
View Source
const NUM_PARTITIONS = 4

TODO: make this dynamic for the instance

View Source
const PARENT_STRATEGY_LOCK_OFFSET = 1000000000000 // 1 trillion

Variables

View Source
var ErrDagParentNotFound = errors.New("dag parent not found")

Functions

This section is empty.

Types

type APIKeyAuthCredentials added in v0.70.0

type APIKeyAuthCredentials struct {
	HeaderName   string `json:"header_name" validate:"required"`
	EncryptedKey []byte `json:"key" validate:"required"`
}

type AssignResults

type AssignResults struct {
	Assigned           []*AssignedItem
	Unassigned         []*sqlcv1.V1QueueItem
	SchedulingTimedOut []*sqlcv1.V1QueueItem
	RateLimited        []*RateLimitResult
	RateLimitedToMove  []*RateLimitResult
}

type AssignedItem

type AssignedItem struct {
	WorkerId pgtype.UUID

	QueueItem *sqlcv1.V1QueueItem
}

type AssignmentRepository

type AssignmentRepository interface {
	ListActionsForWorkers(ctx context.Context, tenantId pgtype.UUID, workerIds []pgtype.UUID) ([]*sqlcv1.ListActionsForWorkersRow, error)
	ListAvailableSlotsForWorkers(ctx context.Context, tenantId pgtype.UUID, params sqlcv1.ListAvailableSlotsForWorkersParams) ([]*sqlcv1.ListAvailableSlotsForWorkersRow, error)
}

type AuthConfig added in v0.70.0

type AuthConfig struct {
	Type       sqlcv1.V1IncomingWebhookAuthType `json:"type" validate:"required"`
	BasicAuth  *BasicAuthCredentials            `json:"basic_auth,omitempty"`
	APIKeyAuth *APIKeyAuthCredentials           `json:"api_key_auth,omitempty"`
	HMACAuth   *HMACAuthCredentials             `json:"hmac_auth,omitempty"`
}

func (*AuthConfig) Validate added in v0.70.0

func (ac *AuthConfig) Validate() error

type BasicAuthCredentials added in v0.70.0

type BasicAuthCredentials struct {
	Username          string `json:"username" validate:"required"`
	EncryptedPassword []byte `json:"password" validate:"required"`
}

type BulkRetrievePayloadOpts added in v0.73.0

type BulkRetrievePayloadOpts struct {
	Keys     []ExternalPayloadLocationKey
	TenantId string
}

type CELEvaluationFailure added in v0.70.0

type CELEvaluationFailure struct {
	Source       sqlcv1.V1CelEvaluationFailureSource `json:"source"`
	ErrorMessage string                              `json:"error_message"`
}

type CandidateEventMatch

type CandidateEventMatch struct {
	// A UUID for the event
	ID string

	// A timestamp for the event
	EventTimestamp time.Time

	// Key for the event
	Key string

	// Resource hint for the event
	ResourceHint *string

	// Data for the event
	Data []byte
}

type ChildWorkflowSignalCreatedData

type ChildWorkflowSignalCreatedData struct {
	// The external id of the target child task
	ChildExternalId string `json:"external_id"`

	// The external id of the parent task
	ParentExternalId string `json:"parent_external_id"`

	// The index of the child task
	ChildIndex int64 `json:"child_index"`

	// The key of the child task
	ChildKey *string `json:"child_key"`
}

func (*ChildWorkflowSignalCreatedData) Bytes

func (c *ChildWorkflowSignalCreatedData) Bytes() []byte

type CompleteTaskOpts

type CompleteTaskOpts struct {
	*TaskIdInsertedAtRetryCount

	// (required) the output bytes for the task
	Output []byte
}

type ConcurrencyRepository

type ConcurrencyRepository interface {
	// Checks whether the concurrency strategy is active, and if not, sets is_active=False
	UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error

	RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (*RunConcurrencyResult, error)
}

type ConcurrencyRepositoryImpl

type ConcurrencyRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (ConcurrencyRepositoryImpl) DesiredWorkerId

func (s ConcurrencyRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow

func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*ConcurrencyRepositoryImpl) RunConcurrencyStrategy

func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy(
	ctx context.Context,
	tenantId pgtype.UUID,
	strategy *sqlcv1.V1StepConcurrency,
) (res *RunConcurrencyResult, err error)

func (ConcurrencyRepositoryImpl) ToV1StepRunData

func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

func (*ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive

func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive(
	ctx context.Context,
	tenantId pgtype.UUID,
	strategy *sqlcv1.V1StepConcurrency,
) error

type CreateConcurrencyOpts

type CreateConcurrencyOpts struct {
	// (optional) the maximum number of concurrent workflow runs, default 1
	MaxRuns *int32

	// (optional) the strategy to use when the concurrency limit is reached, default CANCEL_IN_PROGRESS
	LimitStrategy *string `validate:"omitnil,oneof=CANCEL_IN_PROGRESS GROUP_ROUND_ROBIN CANCEL_NEWEST"`

	// (required) a concurrency expression for evaluating the concurrency key
	Expression string `validate:"celworkflowrunstr"`
}

type CreateCronWorkflowTriggerOpts

type CreateCronWorkflowTriggerOpts struct {
	// (required) the workflow id
	WorkflowId string `validate:"required,uuid"`

	// (required) the workflow name
	Name string `validate:"required"`

	Cron string `validate:"required,cron"`

	Input              map[string]interface{}
	AdditionalMetadata map[string]interface{}
}

type CreateExternalSignalConditionKind

type CreateExternalSignalConditionKind string
const (
	CreateExternalSignalConditionKindSLEEP     CreateExternalSignalConditionKind = "SLEEP"
	CreateExternalSignalConditionKindUSEREVENT CreateExternalSignalConditionKind = "USER_EVENT"
)

type CreateExternalSignalConditionOpt

type CreateExternalSignalConditionOpt struct {
	Kind CreateExternalSignalConditionKind `validate:"required, oneof=SLEEP USER_EVENT"`

	ReadableDataKey string `validate:"required"`

	OrGroupId string `validate:"required,uuid"`

	UserEventKey *string

	SleepFor *string `validate:"omitempty,duration"`

	Expression string
}

type CreateFilterOpts

type CreateFilterOpts struct {
	Workflowid    pgtype.UUID `json:"workflowid" validate:"required,uuid"`
	Scope         string      `json:"scope" validate:"required"`
	Expression    string      `json:"expression" validate:"required"`
	Payload       []byte      `json:"payload"`
	IsDeclarative bool        `json:"is_declarative"`
}

type CreateIncomingWebhookFailureLogOpts added in v0.70.0

type CreateIncomingWebhookFailureLogOpts struct {
	WebhookName string
	ErrorText   string
}

type CreateLogLineOpts

type CreateLogLineOpts struct {
	TaskId int64

	TaskInsertedAt pgtype.Timestamptz

	// (optional) The time when the log line was created.
	CreatedAt *time.Time

	// (required) The message of the log line.
	Message string `validate:"required,min=1,max=10000"`

	// (optional) The level of the log line.
	Level *string `validate:"omitnil,oneof=INFO ERROR WARN DEBUG"`

	// (optional) The metadata of the log line.
	Metadata []byte

	// The retry count of the log line.
	RetryCount int
}

type CreateMatchOpts

type CreateMatchOpts struct {
	Kind sqlcv1.V1MatchKind

	ExistingMatchData []byte

	Conditions []GroupMatchCondition

	TriggerDAGId *int64

	TriggerDAGInsertedAt pgtype.Timestamptz

	TriggerExternalId *string

	TriggerWorkflowRunId *string

	TriggerStepId *string

	TriggerStepIndex pgtype.Int8

	TriggerExistingTaskId *int64

	TriggerExistingTaskInsertedAt pgtype.Timestamptz

	TriggerParentTaskExternalId pgtype.UUID

	TriggerParentTaskId pgtype.Int8

	TriggerParentTaskInsertedAt pgtype.Timestamptz

	TriggerChildIndex pgtype.Int8

	TriggerChildKey pgtype.Text

	TriggerPriority pgtype.Int4

	SignalTaskId *int64

	SignalTaskInsertedAt pgtype.Timestamptz

	SignalExternalId *string

	SignalKey *string
}

type CreateStepMatchConditionOpt

type CreateStepMatchConditionOpt struct {
	// (required) the type of match condition for triggering the step
	MatchConditionKind string `validate:"required,oneof=PARENT_OVERRIDE USER_EVENT SLEEP"`

	// (required) the key for the event data when the workflow is triggered
	ReadableDataKey string `validate:"required"`

	// (required) the initial state for the task when the match condition is satisfied
	Action string `validate:"required,oneof=QUEUE CANCEL SKIP"`

	// (required) the or group id for the match condition
	// we ignore the JSON field here because it's randomly generated by the client, and we don't want to
	// publish a new workflow version just because this UUID is different. we use the `OrGroupHashIndex` instead.
	OrGroupId string `json:"-" validate:"required,uuid"`

	// NOTE: should not be set by the caller. This is populated by this package before creating the checksum.
	OrGroupIdIndex int32

	// (optional) the expression for the match condition
	Expression string `validate:"omitempty"`

	// (optional) the sleep duration for the match condition, only set if this is a SLEEP
	SleepDuration *string `validate:"omitempty,duration"`

	// (optional) the event key for the match condition, only set if this is a USER_EVENT
	EventKey *string `validate:"omitempty"`

	// (optional) if this is a PARENT_OVERRIDE condition, this will be set to the parent readable_id for
	// the parent whose trigger behavior we're overriding
	ParentReadableId *string `validate:"omitempty"`
}

type CreateStepOpts

type CreateStepOpts struct {
	// (required) the task name
	ReadableId string `validate:"hatchetName"`

	// (required) the task action id
	Action string `validate:"required,actionId"`

	// (optional) the task timeout
	Timeout *string `validate:"omitnil,duration"`

	// (optional) the task scheduling timeout
	ScheduleTimeout *string `validate:"omitnil,duration"`

	// (optional) the parents that this step depends on
	Parents []string `validate:"dive,hatchetName"`

	// (optional) the step retry max
	Retries *int `validate:"omitempty,min=0"`

	// (optional) rate limits for this step
	RateLimits []CreateWorkflowStepRateLimitOpts `validate:"dive"`

	// (optional) desired worker affinity state for this step
	DesiredWorkerLabels map[string]DesiredWorkerLabelOpts `validate:"omitempty"`

	// (optional) the step retry backoff factor
	RetryBackoffFactor *float64 `validate:"omitnil,min=1,max=1000"`

	// (optional) the step retry backoff max seconds (can't be greater than 86400)
	RetryBackoffMaxSeconds *int `validate:"omitnil,min=1,max=86400"`

	// (optional) a list of additional trigger conditions
	TriggerConditions []CreateStepMatchConditionOpt `validate:"omitempty,dive"`

	// (optional) the step concurrency options
	Concurrency []CreateConcurrencyOpts `json:"concurrency,omitempty" validator:"omitnil"`
}

type CreateTaskOpts

type CreateTaskOpts struct {
	// (required) the external id
	ExternalId string `validate:"required,uuid"`

	// (required) the workflow run id. note this may be the same as the external id if this is a
	// single-task workflow, otherwise it represents the external id of the DAG.
	WorkflowRunId string `validate:"required,uuid"`

	// (required) the step id
	StepId string `validate:"required,uuid"`

	// (required) the input bytes to the task
	Input *TaskInput

	FilterPayload []byte

	// (required) the step index for the task
	StepIndex int

	// (optional) the additional metadata for the task
	AdditionalMetadata []byte

	// (optional) the desired worker id
	DesiredWorkerId *string

	// (optional) the DAG id for the task
	DagId *int64

	// (optional) the DAG inserted at for the task
	DagInsertedAt pgtype.Timestamptz

	// (required) the initial state for the task
	InitialState sqlcv1.V1TaskInitialState

	// (optional) the parent task external id
	ParentTaskExternalId *string

	// (optional) the parent task id
	ParentTaskId *int64

	// (optional) the parent task inserted at
	ParentTaskInsertedAt *time.Time

	// (optional) The priority of a task, between 1 and 3
	Priority *int32

	// (optional) the child index for the task
	ChildIndex *int64

	// (optional) the child key for the task
	ChildKey *string
}

type CreateWebhookOpts added in v0.70.0

type CreateWebhookOpts struct {
	Tenantid           pgtype.UUID                        `json:"tenantid"`
	Sourcename         sqlcv1.V1IncomingWebhookSourceName `json:"sourcename"`
	Name               string                             `json:"name" validate:"required"`
	Eventkeyexpression string                             `json:"eventkeyexpression"`
	AuthConfig         AuthConfig                         `json:"auth_config,omitempty"`
}

type CreateWorkflowStepRateLimitOpts

type CreateWorkflowStepRateLimitOpts struct {
	// (required) the rate limit key
	Key string `validate:"required"`

	// (optional) a CEL expression for the rate limit key
	KeyExpr *string `validate:"omitnil,celsteprunstr,required_without=Key"`

	// (optional) the rate limit units to consume
	Units *int `validate:"omitnil,required_without=UnitsExpr"`

	// (optional) a CEL expression for the rate limit units
	UnitsExpr *string `validate:"omitnil,celsteprunstr,required_without=Units"`

	// (optional) a CEL expression for a dynamic limit value for the rate limit
	LimitExpr *string `validate:"omitnil,celsteprunstr"`

	// (optional) the rate limit duration, defaults to MINUTE
	Duration *string `validate:"omitnil,oneof=SECOND MINUTE HOUR DAY WEEK MONTH YEAR"`
}

type CreateWorkflowVersionOpts

type CreateWorkflowVersionOpts struct {
	// (required) the workflow name
	Name string `validate:"required,hatchetName"`

	// (optional) the workflow description
	Description *string `json:"description,omitempty"`

	// (optional) event triggers for the workflow
	EventTriggers []string

	// (optional) cron triggers for the workflow
	CronTriggers []string `validate:"dive,cron"`

	// (optional) the input bytes for the cron triggers
	CronInput []byte

	// (required) the tasks in the workflow
	Tasks []CreateStepOpts `validate:"required,min=1,dive"`

	OnFailure *CreateStepOpts `json:"onFailureJob,omitempty" validate:"omitempty"`

	// (optional) the workflow concurrency groups
	Concurrency []CreateConcurrencyOpts `json:"concurrency,omitempty" validator:"omitempty,dive"`

	// (optional) sticky strategy
	Sticky *string `validate:"omitempty,oneof=SOFT HARD"`

	DefaultPriority *int32 `validate:"omitempty,min=1,max=3"`

	DefaultFilters []types.DefaultFilter `json:"defaultFilters,omitempty" validate:"omitempty,dive"`
}

type DAGWithData

type DAGWithData struct {
	*sqlcv1.V1Dag

	Input []byte

	AdditionalMetadata []byte

	ParentTaskExternalID *pgtype.UUID

	TotalTasks int
}

type DesiredWorkerLabelOpts

type DesiredWorkerLabelOpts struct {
	// (required) the label key
	Key string `validate:"required"`

	// (required if StringValue is nil) the label integer value
	IntValue *int32 `validate:"omitnil,required_without=StrValue"`

	// (required if StrValue is nil) the label string value
	StrValue *string `validate:"omitnil,required_without=IntValue"`

	// (optional) if the label is required
	Required *bool `validate:"omitempty"`

	// (optional) the weight of the label for scheduling (default: 100)
	Weight *int32 `validate:"omitempty"`

	// (optional) the label comparator for scheduling (default: EQUAL)
	Comparator *string `validate:"omitempty,oneof=EQUAL NOT_EQUAL GREATER_THAN LESS_THAN GREATER_THAN_OR_EQUAL LESS_THAN_OR_EQUAL"`
}

type ErrNamesNotFound

type ErrNamesNotFound struct {
	Names []string
}

func (*ErrNamesNotFound) Error

func (e *ErrNamesNotFound) Error() string

type EventExternalIdFilterId

type EventExternalIdFilterId struct {
	ExternalId string
	FilterId   *string
}

type EventMatchResults

type EventMatchResults struct {
	// The list of tasks which were created from the matches
	CreatedTasks []*V1TaskWithPayload

	// The list of tasks which were replayed from the matches
	ReplayedTasks []*V1TaskWithPayload
}

type EventTriggerOpts

type EventTriggerOpts struct {
	ExternalId string

	Key string

	Data []byte

	AdditionalMetadata []byte

	Priority *int32

	Scope *string

	TriggeringWebhookName *string
}

type EventTriggersFromExternalId

type EventTriggersFromExternalId struct {
	RunID           int64              `json:"run_id"`
	RunInsertedAt   pgtype.Timestamptz `json:"run_inserted_at"`
	EventExternalId pgtype.UUID        `json:"event_external_id"`
	EventSeenAt     pgtype.Timestamptz `json:"event_seen_at"`
	FilterId        pgtype.UUID        `json:"filter_id"`
}

type EventType

type EventType string
const (
	EVENT_TYPE_REQUEUED_NO_WORKER   EventType = "REQUEUED_NO_WORKER"
	EVENT_TYPE_REQUEUED_RATE_LIMIT  EventType = "REQUEUED_RATE_LIMIT"
	EVENT_TYPE_SCHEDULING_TIMED_OUT EventType = "SCHEDULING_TIMED_OUT"
	EVENT_TYPE_ASSIGNED             EventType = "ASSIGNED"
	EVENT_TYPE_STARTED              EventType = "STARTED"
	EVENT_TYPE_FINISHED             EventType = "FINISHED"
	EVENT_TYPE_FAILED               EventType = "FAILED"
	EVENT_TYPE_RETRYING             EventType = "RETRYING"
	EVENT_TYPE_CANCELLED            EventType = "CANCELLED"
	EVENT_TYPE_TIMED_OUT            EventType = "TIMED_OUT"
	EVENT_TYPE_REASSIGNED           EventType = "REASSIGNED"
	EVENT_TYPE_SLOT_RELEASED        EventType = "SLOT_RELEASED"
	EVENT_TYPE_TIMEOUT_REFRESHED    EventType = "TIMEOUT_REFRESHED"
	EVENT_TYPE_RETRIED_BY_USER      EventType = "RETRIED_BY_USER"
	EVENT_TYPE_SENT_TO_WORKER       EventType = "SENT_TO_WORKER"
	EVENT_TYPE_RATE_LIMIT_ERROR     EventType = "RATE_LIMIT_ERROR"
	EVENT_TYPE_ACKNOWLEDGED         EventType = "ACKNOWLEDGED"
	EVENT_TYPE_CREATED              EventType = "CREATED"
	EVENT_TYPE_QUEUED               EventType = "QUEUED"
)

type ExternalCreateSignalMatchOpts

type ExternalCreateSignalMatchOpts struct {
	Conditions []CreateExternalSignalConditionOpt `validate:"required,min=1,dive"`

	SignalTaskId int64 `validate:"required,gt=0"`

	SignalTaskInsertedAt pgtype.Timestamptz

	SignalExternalId string `validate:"required,uuid"`

	SignalKey string `validate:"required"`
}

type ExternalPayloadLocationKey added in v0.73.0

type ExternalPayloadLocationKey string

type ExternalStore added in v0.73.0

type ExternalStore interface {
	Store(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[RetrievePayloadOpts]ExternalPayloadLocationKey, error)
	BulkRetrieve(ctx context.Context, opts ...BulkRetrievePayloadOpts) (map[ExternalPayloadLocationKey][]byte, error)
}

type FailTaskOpts

type FailTaskOpts struct {
	*TaskIdInsertedAtRetryCount

	// (required) whether this is an application-level error or an internal error on the Hatchet side
	IsAppError bool

	// (optional) the error message for the task
	ErrorMessage string

	// (optional) A boolean flag to indicate whether the error is non-retryable, meaning it should _not_ be retried. Defaults to false.
	IsNonRetryable bool
}

type FailTasksResponse

type FailTasksResponse struct {
	*FinalizedTaskResponse

	RetriedTasks []RetriedTask
}

type FilterRepository

type FilterRepository interface {
	CreateFilter(ctx context.Context, tenantId string, params CreateFilterOpts) (*sqlcv1.V1Filter, error)
	ListFilters(ctx context.Context, tenantId string, params ListFiltersOpts) ([]*sqlcv1.V1Filter, int64, error)
	DeleteFilter(ctx context.Context, tenantId, filterId string) (*sqlcv1.V1Filter, error)
	GetFilter(ctx context.Context, tenantId, filterId string) (*sqlcv1.V1Filter, error)
	UpdateFilter(ctx context.Context, tenantId string, filterId string, opts UpdateFilterOpts) (*sqlcv1.V1Filter, error)
}

type FinalizedTaskResponse

type FinalizedTaskResponse struct {
	ReleasedTasks []*sqlcv1.ReleaseTasksRow

	InternalEvents []InternalTaskEvent
}

type GroupMatchCondition

type GroupMatchCondition struct {
	GroupId string `validate:"required,uuid"`

	EventType sqlcv1.V1EventType

	EventKey string

	// (optional) a hint for querying the event data
	EventResourceHint *string

	// the data key which gets inserted into the returned data from a satisfied match condition
	ReadableDataKey string

	Expression string

	Action sqlcv1.V1MatchConditionAction

	// (optional) the data which was used to satisfy the condition (relevant for replays)
	Data []byte
}

type HMACAuthCredentials added in v0.70.0

type HMACAuthCredentials struct {
	Algorithm                     sqlcv1.V1IncomingWebhookHmacAlgorithm `json:"algorithm" validate:"required"`
	Encoding                      sqlcv1.V1IncomingWebhookHmacEncoding  `json:"encoding" validate:"required"`
	SignatureHeaderName           string                                `json:"signature_header_name" validate:"required"`
	EncryptedWebhookSigningSecret []byte                                `json:"webhook_signing_secret" validate:"required"`
}

type IdInsertedAt added in v0.71.2

type IdInsertedAt struct {
	ID         int64              `json:"id"`
	InsertedAt pgtype.Timestamptz `json:"inserted_at"`
}

type IdempotencyKey added in v0.73.0

type IdempotencyKey string

type IdempotencyRepository added in v0.73.0

type IdempotencyRepository interface {
	CreateIdempotencyKey(context context.Context, tenantId, key string, expiresAt pgtype.Timestamptz) error
	EvictExpiredIdempotencyKeys(context context.Context, tenantId pgtype.UUID) error
}

type InternalTaskEvent

type InternalTaskEvent struct {
	TenantID       string                 `json:"tenant_id"`
	TaskID         int64                  `json:"task_id"`
	TaskExternalID string                 `json:"task_external_id"`
	RetryCount     int32                  `json:"retry_count"`
	EventType      sqlcv1.V1TaskEventType `json:"event_type"`
	EventKey       string                 `json:"event_key"`
	Data           []byte                 `json:"data"`
}

InternalTaskEvent resembles sqlcv1.V1TaskEvent, but doesn't include the id field as we use COPY FROM to write the events to the database.

type JobRunHasCycleError

type JobRunHasCycleError struct {
	JobName string
}

func (*JobRunHasCycleError) Error

func (e *JobRunHasCycleError) Error() string

type KeyClaimantPair added in v0.73.0

type KeyClaimantPair struct {
	IdempotencyKey      IdempotencyKey
	ClaimedByExternalId pgtype.UUID
}

type LeaseRepository

type LeaseRepository interface {
	ListQueues(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1Queue, error)
	ListActiveWorkers(ctx context.Context, tenantId pgtype.UUID) ([]*ListActiveWorkersResult, error)
	ListConcurrencyStrategies(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1StepConcurrency, error)

	AcquireOrExtendLeases(ctx context.Context, tenantId pgtype.UUID, kind sqlcv1.LeaseKind, resourceIds []string, existingLeases []*sqlcv1.Lease) ([]*sqlcv1.Lease, error)
	ReleaseLeases(ctx context.Context, tenantId pgtype.UUID, leases []*sqlcv1.Lease) error
}

type ListActiveWorkersResult

type ListActiveWorkersResult struct {
	ID      string
	MaxRuns int
	Name    string
	Labels  []*sqlcv1.ListManyWorkerLabelsRow
}

type ListEventsRow

type ListEventsRow struct {
	TenantID                pgtype.UUID        `json:"tenant_id"`
	EventID                 int64              `json:"event_id"`
	EventExternalID         pgtype.UUID        `json:"event_external_id"`
	EventSeenAt             pgtype.Timestamptz `json:"event_seen_at"`
	EventKey                string             `json:"event_key"`
	EventPayload            []byte             `json:"event_payload"`
	EventAdditionalMetadata []byte             `json:"event_additional_metadata"`
	EventScope              string             `json:"event_scope"`
	QueuedCount             int64              `json:"queued_count"`
	RunningCount            int64              `json:"running_count"`
	CompletedCount          int64              `json:"completed_count"`
	CancelledCount          int64              `json:"cancelled_count"`
	FailedCount             int64              `json:"failed_count"`
	TriggeredRuns           []byte             `json:"triggered_runs"`
	TriggeringWebhookName   *string            `json:"triggering_webhook_name,omitempty"`
}

type ListFiltersOpts

type ListFiltersOpts struct {
	WorkflowIds []pgtype.UUID `json:"workflow_ids"`
	Scopes      []string      `json:"scopes"`
	Limit       int64         `json:"limit" validate:"omitnil,min=1"`
	Offset      int64         `json:"offset" validate:"omitnil,min=0"`
}

type ListFinalizedWorkflowRunsResponse

type ListFinalizedWorkflowRunsResponse struct {
	WorkflowRunId string

	OutputEvents []*TaskOutputEvent
}

type ListLogsOpts

type ListLogsOpts struct {
	// (optional) number of logs to skip
	Offset *int

	// (optional) number of logs to return
	Limit *int `validate:"omitnil,min=1,max=10000"`

	// (optional) a list of log levels to filter by
	Levels []string `validate:"omitnil,dive,oneof=INFO ERROR WARN DEBUG"`

	// (optional) a search query
	Search *string
}

type ListTaskRunOpts

type ListTaskRunOpts struct {
	CreatedAfter time.Time

	Statuses []sqlcv1.V1ReadableStatusOlap

	WorkflowIds []uuid.UUID

	WorkerId *uuid.UUID

	StartedAfter time.Time

	FinishedBefore *time.Time

	AdditionalMetadata map[string]interface{}

	TriggeringEventExternalId *uuid.UUID

	Limit int64

	Offset int64

	IncludePayloads bool
}

type ListWebhooksOpts added in v0.70.0

type ListWebhooksOpts struct {
	WebhookNames       []string                             `json:"webhook_names"`
	WebhookSourceNames []sqlcv1.V1IncomingWebhookSourceName `json:"webhook_source_names"`
	Limit              *int64                               `json:"limit" validate:"omitnil,min=1"`
	Offset             *int64                               `json:"offset" validate:"omitnil,min=0"`
}

type ListWorkflowRunOpts

type ListWorkflowRunOpts struct {
	CreatedAfter time.Time

	Statuses []sqlcv1.V1ReadableStatusOlap

	WorkflowIds []uuid.UUID

	StartedAfter time.Time

	FinishedBefore *time.Time

	AdditionalMetadata map[string]interface{}

	Limit int64

	Offset int64

	ParentTaskExternalId *pgtype.UUID

	TriggeringEventExternalId *pgtype.UUID

	IncludePayloads bool
}

type LogLineRepository

type LogLineRepository interface {
	ListLogLines(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, opts *ListLogsOpts) ([]*sqlcv1.V1LogLine, error)

	PutLog(ctx context.Context, tenantId string, opts *CreateLogLineOpts) error
}

type MatchData

type MatchData struct {
	// contains filtered or unexported fields
}

parses match aggregated data

func NewMatchData

func NewMatchData(mcAggregatedData []byte) (*MatchData, error)

func (*MatchData) Action

func (*MatchData) DataKeys

func (m *MatchData) DataKeys() []string

func (*MatchData) DataValueAsTaskOutputEvent

func (m *MatchData) DataValueAsTaskOutputEvent(key string) *TaskOutputEvent

Helper function for internal events

func (*MatchData) TriggerDataKeys

func (m *MatchData) TriggerDataKeys() []string

func (*MatchData) TriggerDataValue

func (m *MatchData) TriggerDataValue(key string) map[string]interface{}

type MatchRepository

type MatchRepository interface {
	RegisterSignalMatchConditions(ctx context.Context, tenantId string, eventMatches []ExternalCreateSignalMatchOpts) error

	ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
	ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
}

type MatchRepositoryImpl

type MatchRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (MatchRepositoryImpl) DesiredWorkerId

func (s MatchRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (MatchRepositoryImpl) PopulateExternalIdsForWorkflow

func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*MatchRepositoryImpl) ProcessInternalEventMatches

func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)

ProcessInternalEventMatches processes a list of internal events

func (*MatchRepositoryImpl) ProcessUserEventMatches

func (m *MatchRepositoryImpl) ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)

ProcessUserEventMatches processes a list of user events

func (*MatchRepositoryImpl) RegisterSignalMatchConditions

func (m *MatchRepositoryImpl) RegisterSignalMatchConditions(ctx context.Context, tenantId string, signalMatches []ExternalCreateSignalMatchOpts) error

func (MatchRepositoryImpl) ToV1StepRunData

func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

type NoOpExternalStore added in v0.73.0

type NoOpExternalStore struct{}

func (*NoOpExternalStore) BulkRetrieve added in v0.73.0

func (*NoOpExternalStore) Store added in v0.73.0

type OLAPRepository

type OLAPRepository interface {
	UpdateTablePartitions(ctx context.Context) error
	SetReadReplicaPool(pool *pgxpool.Pool)

	ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
	ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
	ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz, retryCount *int) (*sqlcv1.PopulateSingleTaskRunDataRow, pgtype.UUID, error)

	ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
	ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
	ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)
	ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
	ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
	ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
	CreateTasks(ctx context.Context, tenantId string, tasks []*V1TaskWithPayload) error
	CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error
	CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
	GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, endTimestamp *time.Time, bucketInterval time.Duration) ([]*sqlcv1.GetTaskPointMetricsRow, error)
	UpdateTaskStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateTaskStatusRow, error)
	UpdateDAGStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateDAGStatusRow, error)
	ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
	ListTasksByDAGId(ctx context.Context, tenantId string, dagIds []pgtype.UUID, includePayloads bool) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error)
	ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)

	// ListTasksByExternalIds returns a list of tasks based on their external ids or the external id of their parent DAG.
	// In the case of a DAG, we flatten the result into the list of tasks which belong to that DAG.
	ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)

	GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error)
	BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error
	ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*ListEventsRow, *int64, error)
	ListEventKeys(ctx context.Context, tenantId string) ([]string, error)

	GetDAGDurations(ctx context.Context, tenantId string, externalIds []pgtype.UUID, minInsertedAt pgtype.Timestamptz) (map[string]*sqlcv1.GetDagDurationsRow, error)
	GetTaskDurationsByTaskIds(ctx context.Context, tenantId string, taskIds []int64, taskInsertedAts []pgtype.Timestamptz, readableStatuses []sqlcv1.V1ReadableStatusOlap) (map[int64]*sqlcv1.GetTaskDurationsByTaskIdsRow, error)

	CreateIncomingWebhookValidationFailureLogs(ctx context.Context, tenantId string, opts []CreateIncomingWebhookFailureLogOpts) error
	StoreCELEvaluationFailures(ctx context.Context, tenantId string, failures []CELEvaluationFailure) error

	AnalyzeOLAPTables(ctx context.Context) error
}

func NewOLAPRepositoryFromPool

func NewOLAPRepositoryFromPool(pool *pgxpool.Pool, l *zerolog.Logger, olapRetentionPeriod time.Duration, entitlements repository.EntitlementsRepository, shouldPartitionEventsTables, enablePayloadDualWrites bool) (OLAPRepository, func() error)

type OLAPRepositoryImpl

type OLAPRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (*OLAPRepositoryImpl) AnalyzeOLAPTables added in v0.71.4

func (r *OLAPRepositoryImpl) AnalyzeOLAPTables(ctx context.Context) error

func (*OLAPRepositoryImpl) BulkCreateEventsAndTriggers

func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error

func (*OLAPRepositoryImpl) CreateDAGs

func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error

func (*OLAPRepositoryImpl) CreateIncomingWebhookValidationFailureLogs added in v0.70.0

func (r *OLAPRepositoryImpl) CreateIncomingWebhookValidationFailureLogs(ctx context.Context, tenantId string, opts []CreateIncomingWebhookFailureLogOpts) error

func (*OLAPRepositoryImpl) CreateTaskEvents

func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error

func (*OLAPRepositoryImpl) CreateTasks

func (r *OLAPRepositoryImpl) CreateTasks(ctx context.Context, tenantId string, tasks []*V1TaskWithPayload) error

func (OLAPRepositoryImpl) DesiredWorkerId

func (s OLAPRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (*OLAPRepositoryImpl) GetDAGDurations added in v0.70.6

func (r *OLAPRepositoryImpl) GetDAGDurations(ctx context.Context, tenantId string, externalIds []pgtype.UUID, minInsertedAt pgtype.Timestamptz) (map[string]*sqlcv1.GetDagDurationsRow, error)

func (*OLAPRepositoryImpl) GetTaskDurationsByTaskIds

func (r *OLAPRepositoryImpl) GetTaskDurationsByTaskIds(ctx context.Context, tenantId string, taskIds []int64, taskInsertedAts []pgtype.Timestamptz, readableStatuses []sqlcv1.V1ReadableStatusOlap) (map[int64]*sqlcv1.GetTaskDurationsByTaskIdsRow, error)

func (*OLAPRepositoryImpl) GetTaskPointMetrics

func (r *OLAPRepositoryImpl) GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, endTimestamp *time.Time, bucketInterval time.Duration) ([]*sqlcv1.GetTaskPointMetricsRow, error)

func (*OLAPRepositoryImpl) GetTaskTimings

func (r *OLAPRepositoryImpl) GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error)

func (*OLAPRepositoryImpl) ListEventKeys

func (r *OLAPRepositoryImpl) ListEventKeys(ctx context.Context, tenantId string) ([]string, error)

func (*OLAPRepositoryImpl) ListEvents

func (*OLAPRepositoryImpl) ListTaskRunEvents

func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)

func (*OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId

func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)

func (*OLAPRepositoryImpl) ListTasks

func (*OLAPRepositoryImpl) ListTasksByDAGId

func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId string, dagids []pgtype.UUID, includePayloads bool) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error)

func (*OLAPRepositoryImpl) ListTasksByExternalIds

func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)

func (*OLAPRepositoryImpl) ListTasksByIdAndInsertedAt

func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)

func (*OLAPRepositoryImpl) ListWorkflowRunDisplayNames

func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)

func (*OLAPRepositoryImpl) ListWorkflowRuns

func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)

func (OLAPRepositoryImpl) PopulateExternalIdsForWorkflow

func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*OLAPRepositoryImpl) ReadDAG

func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)

func (*OLAPRepositoryImpl) ReadTaskRun

func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)

func (*OLAPRepositoryImpl) ReadTaskRunData

func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz, retryCount *int) (*sqlcv1.PopulateSingleTaskRunDataRow, pgtype.UUID, error)

func (*OLAPRepositoryImpl) ReadTaskRunMetrics

func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)

func (*OLAPRepositoryImpl) ReadWorkflowRun

func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)

func (*OLAPRepositoryImpl) SetReadReplicaPool

func (r *OLAPRepositoryImpl) SetReadReplicaPool(pool *pgxpool.Pool)

func (*OLAPRepositoryImpl) StoreCELEvaluationFailures added in v0.70.0

func (r *OLAPRepositoryImpl) StoreCELEvaluationFailures(ctx context.Context, tenantId string, failures []CELEvaluationFailure) error

func (OLAPRepositoryImpl) ToV1StepRunData

func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

func (*OLAPRepositoryImpl) UpdateDAGStatuses

func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateDAGStatusRow, error)

func (*OLAPRepositoryImpl) UpdateTablePartitions

func (r *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error

func (*OLAPRepositoryImpl) UpdateTaskStatuses

func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateTaskStatusRow, error)

type OffloadToExternalStoreOpts added in v0.73.0

type OffloadToExternalStoreOpts struct {
	*StorePayloadOpts
	OffloadAt time.Time
}

type PayloadLocation added in v0.73.0

type PayloadLocation string

type PayloadStoreRepository added in v0.73.0

type PayloadStoreRepository interface {
	Store(ctx context.Context, tx sqlcv1.DBTX, payloads ...StorePayloadOpts) error
	Retrieve(ctx context.Context, opts RetrievePayloadOpts) ([]byte, error)
	BulkRetrieve(ctx context.Context, opts ...RetrievePayloadOpts) (map[RetrievePayloadOpts][]byte, error)
	ProcessPayloadWAL(ctx context.Context, partitionNumber int64) (bool, error)
	OverwriteExternalStore(store ExternalStore, inlineStoreTTL time.Duration)
	DualWritesEnabled() bool
}

func NewPayloadStoreRepository added in v0.73.0

func NewPayloadStoreRepository(
	pool *pgxpool.Pool,
	l *zerolog.Logger,
	queries *sqlcv1.Queries,
	enablePayloadDualWrites bool,
) PayloadStoreRepository

type QueueFactoryRepository

type QueueFactoryRepository interface {
	NewQueue(tenantId pgtype.UUID, queueName string) QueueRepository
}

type QueueRepository

type QueueRepository interface {
	ListQueueItems(ctx context.Context, limit int) ([]*sqlcv1.V1QueueItem, error)
	MarkQueueItemsProcessed(ctx context.Context, r *AssignResults) (succeeded []*AssignedItem, failed []*AssignedItem, err error)

	GetTaskRateLimits(ctx context.Context, queueItems []*sqlcv1.V1QueueItem) (map[int64]map[string]int32, error)
	RequeueRateLimitedItems(ctx context.Context, tenantId pgtype.UUID, queueName string) ([]*sqlcv1.RequeueRateLimitedQueueItemsRow, error)
	GetDesiredLabels(ctx context.Context, stepIds []pgtype.UUID) (map[string][]*sqlcv1.GetDesiredLabelsRow, error)
	Cleanup()
}

type RateLimitRepository

type RateLimitRepository interface {
	UpdateRateLimits(ctx context.Context, tenantId pgtype.UUID, updates map[string]int) ([]*sqlcv1.ListRateLimitsForTenantWithMutateRow, *time.Time, error)
}

type RateLimitResult

type RateLimitResult struct {
	*sqlcv1.V1QueueItem

	ExceededKey    string
	ExceededUnits  int32
	ExceededVal    int32
	NextRefillAt   *time.Time
	TaskId         int64
	TaskInsertedAt pgtype.Timestamptz
	RetryCount     int32
}

type ReadTaskRunMetricsOpts

type ReadTaskRunMetricsOpts struct {
	CreatedAfter time.Time

	CreatedBefore *time.Time

	WorkflowIds []uuid.UUID

	ParentTaskExternalID *pgtype.UUID

	TriggeringEventExternalId *pgtype.UUID

	AdditionalMetadata map[string]interface{}
}

type ReadableTaskStatus

type ReadableTaskStatus string
const (
	READABLE_TASK_STATUS_QUEUED    ReadableTaskStatus = "QUEUED"
	READABLE_TASK_STATUS_RUNNING   ReadableTaskStatus = "RUNNING"
	READABLE_TASK_STATUS_COMPLETED ReadableTaskStatus = "COMPLETED"
	READABLE_TASK_STATUS_CANCELLED ReadableTaskStatus = "CANCELLED"
	READABLE_TASK_STATUS_FAILED    ReadableTaskStatus = "FAILED"
)

func StringToReadableStatus

func StringToReadableStatus(status string) ReadableTaskStatus

func (ReadableTaskStatus) EnumValue

func (s ReadableTaskStatus) EnumValue() int

type RefreshTimeoutBy

type RefreshTimeoutBy struct {
	TaskExternalId string `validate:"required,uuid"`

	IncrementTimeoutBy string `validate:"required,duration"`
}

type ReplayTaskOpts

type ReplayTaskOpts struct {
	// (required) the task id
	TaskId int64

	// (required) the inserted at time
	InsertedAt pgtype.Timestamptz

	// (required) the external id
	ExternalId string

	// (required) the step id
	StepId string

	// (optional) the input bytes to the task, uses the existing input if not set
	Input *TaskInput

	// (required) the initial state for the task
	InitialState sqlcv1.V1TaskInitialState

	// (optional) the additional metadata for the task
	AdditionalMetadata []byte
}

type ReplayTasksResult

type ReplayTasksResult struct {
	ReplayedTasks []TaskIdInsertedAtRetryCount

	UpsertedTasks []*V1TaskWithPayload

	InternalEventResults *EventMatchResults
}

type Repository

type Repository interface {
	Triggers() TriggerRepository
	Tasks() TaskRepository
	Scheduler() SchedulerRepository
	Matches() MatchRepository
	OLAP() OLAPRepository
	OverwriteOLAPRepository(o OLAPRepository)
	Logs() LogLineRepository
	OverwriteLogsRepository(l LogLineRepository)
	Payloads() PayloadStoreRepository
	OverwriteExternalPayloadStore(o ExternalStore, nativeStoreTTL time.Duration)
	Workers() WorkerRepository
	Workflows() WorkflowRepository
	Ticker() TickerRepository
	Filters() FilterRepository
	Webhooks() WebhookRepository
	Idempotency() IdempotencyRepository
}

func NewRepository

func NewRepository(pool *pgxpool.Pool, l *zerolog.Logger, taskRetentionPeriod, olapRetentionPeriod time.Duration, maxInternalRetryCount int32, entitlements repository.EntitlementsRepository, taskLimits TaskOperationLimits, enablePayloadDualWrites bool) (Repository, func() error)

type RetriedTask

type RetriedTask struct {
	*TaskIdInsertedAtRetryCount

	IsAppError bool

	AppRetryCount int32

	RetryBackoffFactor pgtype.Float8

	RetryMaxBackoff pgtype.Int4
}

type RetrievePayloadOpts added in v0.73.0

type RetrievePayloadOpts struct {
	Id         int64
	InsertedAt pgtype.Timestamptz
	Type       sqlcv1.V1PayloadType
	TenantId   pgtype.UUID
}

type Run

type Run struct {
	Id         int64
	InsertedAt time.Time
	FilterId   *string
}

type RunConcurrencyResult

type RunConcurrencyResult struct {
	// The tasks which were enqueued
	Queued []TaskWithQueue

	// If the strategy involves cancelling a task, these are the tasks to cancel
	Cancelled []TaskWithCancelledReason

	// If the step has multiple concurrency strategies, these are the next ones to notify
	NextConcurrencyStrategies []int64
}

type SchedulerRepository

type SchedulerRepository interface {
	Concurrency() ConcurrencyRepository
	Lease() LeaseRepository
	QueueFactory() QueueFactoryRepository
	RateLimit() RateLimitRepository
	Assignment() AssignmentRepository
}

type Sticky

type Sticky string
const (
	STICKY_HARD Sticky = "HARD"
	STICKY_SOFT Sticky = "SOFT"
	STICKY_NONE Sticky = "NONE"
)

type StorePayloadOpts added in v0.73.0

type StorePayloadOpts struct {
	Id         int64
	InsertedAt pgtype.Timestamptz
	Type       sqlcv1.V1PayloadType
	Payload    []byte
	TenantId   string
}

type TaskIdEventKeyTuple

type TaskIdEventKeyTuple struct {
	Id int64 `validate:"required"`

	EventKey string `validate:"required"`
}

type TaskIdInsertedAtRetryCount

type TaskIdInsertedAtRetryCount struct {
	// (required) the external id
	Id int64 `validate:"required"`

	// (required) the inserted at time
	InsertedAt pgtype.Timestamptz

	// (required) the retry count
	RetryCount int32
}

type TaskIdInsertedAtSignalKey

type TaskIdInsertedAtSignalKey struct {
	// (required) the external id
	Id int64 `validate:"required"`

	// (required) the inserted at time
	InsertedAt pgtype.Timestamptz

	// (required) the signal key for the event
	SignalKey string
}

type TaskInput

type TaskInput struct {
	Input map[string]interface{} `json:"input"`

	TriggerData *MatchData `json:"trigger_datas"`

	FilterPayload map[string]interface{} `json:"filter_payload"`
}

func (*TaskInput) Bytes

func (t *TaskInput) Bytes() []byte

type TaskMetadata

type TaskMetadata struct {
	TaskID         int64     `json:"task_id"`
	TaskInsertedAt time.Time `json:"task_inserted_at"`
}

func ParseTaskMetadata

func ParseTaskMetadata(jsonData []byte) ([]TaskMetadata, error)

type TaskOperationLimits added in v0.70.2

type TaskOperationLimits struct {
	TimeoutLimit      int
	ReassignLimit     int
	RetryQueueLimit   int
	DurableSleepLimit int
}

type TaskOutputEvent

type TaskOutputEvent struct {
	IsFailure bool `json:"is_failure"`

	EventType sqlcv1.V1TaskEventType `json:"event_type"`

	TaskExternalId string `json:"task_external_id"`

	TaskId int64 `json:"task_id"`

	RetryCount int32 `json:"retry_count"`

	WorkerId *string `json:"worker_id"`

	Output []byte `json:"output"`

	ErrorMessage string `json:"error_message"`

	StepReadableID string `json:"step_readable_id"`
}

func NewCancelledTaskOutputEvent

func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent

func NewCancelledTaskOutputEventFromTask

func NewCancelledTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent

func NewCompletedTaskOutputEvent

func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent

func NewFailedTaskOutputEvent

func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent

func NewFailedTaskOutputEventFromTask

func NewFailedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent

func NewSkippedTaskOutputEventFromTask

func NewSkippedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent

func (*TaskOutputEvent) Bytes

func (e *TaskOutputEvent) Bytes() []byte

func (*TaskOutputEvent) IsCancelled

func (e *TaskOutputEvent) IsCancelled() bool

func (*TaskOutputEvent) IsCompleted

func (e *TaskOutputEvent) IsCompleted() bool

func (*TaskOutputEvent) IsFailed

func (e *TaskOutputEvent) IsFailed() bool

type TaskRepository

type TaskRepository interface {
	EnsureTablePartitionsExist(ctx context.Context) (bool, error)
	UpdateTablePartitions(ctx context.Context) error

	// GetTaskByExternalId is a heavily cached method to return task metadata by its external id
	GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)

	// FlattenExternalIds is a non-cached method to look up all tasks in a workflow run by their external ids.
	// This is non-cacheable because tasks can be added to a workflow run as it executes.
	FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)

	CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)

	FailTasks(ctx context.Context, tenantId string, tasks []FailTaskOpts) (*FailTasksResponse, error)

	CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)

	ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)

	ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)

	ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)

	// ListTaskParentOutputs is a method to return the output of a task's parent and grandparent tasks. This is for v0 compatibility
	// with the v1 engine, and shouldn't be called from new v1 endpoints.
	ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)

	ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)

	ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)

	ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)

	ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)

	GetQueueCounts(ctx context.Context, tenantId string) (map[string]interface{}, error)

	ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)

	RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)

	ReleaseSlot(ctx context.Context, tenantId string, externalId string) (*sqlcv1.V1TaskRuntime, error)

	ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*sqlcv1.V1TaskEvent, error)

	// AnalyzeTaskTables runs ANALYZE on the task tables
	AnalyzeTaskTables(ctx context.Context) error
}

type TaskRepositoryImpl

type TaskRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (*TaskRepositoryImpl) AnalyzeTaskTables added in v0.71.13

func (r *TaskRepositoryImpl) AnalyzeTaskTables(ctx context.Context) error

func (*TaskRepositoryImpl) CancelTasks

func (*TaskRepositoryImpl) CompleteTasks

func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)

func (TaskRepositoryImpl) DesiredWorkerId

func (s TaskRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (*TaskRepositoryImpl) EnsureTablePartitionsExist

func (r *TaskRepositoryImpl) EnsureTablePartitionsExist(ctx context.Context) (bool, error)

func (*TaskRepositoryImpl) FailTasks

func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)

func (*TaskRepositoryImpl) FlattenExternalIds

func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)

func (*TaskRepositoryImpl) GetQueueCounts

func (r *TaskRepositoryImpl) GetQueueCounts(ctx context.Context, tenantId string) (map[string]interface{}, error)

func (*TaskRepositoryImpl) GetTaskByExternalId

func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)

func (*TaskRepositoryImpl) ListFinalizedWorkflowRuns

func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)

func (*TaskRepositoryImpl) ListSignalCompletedEvents

func (r *TaskRepositoryImpl) ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*sqlcv1.V1TaskEvent, error)

func (*TaskRepositoryImpl) ListTaskMetas

func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)

func (*TaskRepositoryImpl) ListTaskParentOutputs

func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)

func (*TaskRepositoryImpl) ListTasks

func (r *TaskRepositoryImpl) ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)

func (TaskRepositoryImpl) PopulateExternalIdsForWorkflow

func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*TaskRepositoryImpl) ProcessDurableSleeps

func (r *TaskRepositoryImpl) ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)

func (*TaskRepositoryImpl) ProcessTaskReassignments

func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)

func (*TaskRepositoryImpl) ProcessTaskRetryQueueItems

func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)

func (*TaskRepositoryImpl) ProcessTaskTimeouts

func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)

func (*TaskRepositoryImpl) RefreshTimeoutBy

func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)

func (*TaskRepositoryImpl) ReleaseSlot

func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)

func (*TaskRepositoryImpl) ReplayTasks

func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)

func (TaskRepositoryImpl) ToV1StepRunData

func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

func (*TaskRepositoryImpl) UpdateTablePartitions

func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error

type TaskRunMetric

type TaskRunMetric struct {
	Status string `json:"status"`
	Count  uint64 `json:"count"`
}

type TaskWithCancelledReason

type TaskWithCancelledReason struct {
	*TaskIdInsertedAtRetryCount

	CancelledReason string

	TaskExternalId string

	WorkflowRunId string
}

type TaskWithQueue

type TaskWithQueue struct {
	*TaskIdInsertedAtRetryCount

	Queue string
}

type TickerRepository

type TickerRepository interface {
	IsTenantAlertActive(ctx context.Context, tenantId string) (bool, time.Time, error)
}

type TimeoutTasksResponse

type TimeoutTasksResponse struct {
	*FailTasksResponse

	TimeoutTasks []*sqlcv1.ListTasksToTimeoutRow
}

type TriggerDecision

type TriggerDecision struct {
	ShouldTrigger bool
	FilterPayload []byte
	FilterId      *string
}

type TriggerFromEventsResult

type TriggerFromEventsResult struct {
	Tasks                 []*V1TaskWithPayload
	Dags                  []*DAGWithData
	EventExternalIdToRuns map[string][]*Run
	CELEvaluationFailures []CELEvaluationFailure
}

type TriggerRepository

type TriggerRepository interface {
	TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)

	TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*V1TaskWithPayload, []*DAGWithData, error)

	PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

	PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
}

type TriggerRepositoryImpl

type TriggerRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (TriggerRepositoryImpl) DesiredWorkerId

func (s TriggerRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (TriggerRepositoryImpl) PopulateExternalIdsForWorkflow

func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts

func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

func (TriggerRepositoryImpl) ToV1StepRunData

func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

func (*TriggerRepositoryImpl) TriggerFromEvents

func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)

func (*TriggerRepositoryImpl) TriggerFromWorkflowNames

func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*V1TaskWithPayload, []*DAGWithData, error)

type TriggerTaskData

type TriggerTaskData struct {
	// (required) the workflow name
	WorkflowName string `json:"workflow_name" validate:"required"`

	// (optional) the workflow run data
	Data []byte `json:"data"`

	// (optional) the workflow run metadata
	AdditionalMetadata []byte `json:"additional_metadata"`

	// (optional) the desired worker id
	DesiredWorkerId *string `json:"desired_worker_id"`

	// (optional) the parent external id
	ParentExternalId *string `json:"parent_external_id"`

	// (optional) the parent task id
	ParentTaskId *int64 `json:"parent_task_id"`

	// (optional) the parent inserted_at
	ParentTaskInsertedAt *time.Time `json:"parent_task_inserted_at"`

	// (optional) the child index
	ChildIndex *int64 `json:"child_index"`

	// (optional) the child key
	ChildKey *string `json:"child_key"`

	// (optional) the priority of the task
	Priority *int32 `json:"priority"`
}

type TriggeredBy

type TriggeredBy interface {
	ToMetadata([]byte) []byte
}

type TriggeredByEvent

type TriggeredByEvent struct {
	// contains filtered or unexported fields
}

func (*TriggeredByEvent) ToMetadata

func (t *TriggeredByEvent) ToMetadata(additionalMetadata []byte) []byte

type UpdateDAGStatusRow

type UpdateDAGStatusRow struct {
	TenantId       pgtype.UUID
	DagId          int64
	DagInsertedAt  pgtype.Timestamptz
	ReadableStatus sqlcv1.V1ReadableStatusOlap
	ExternalId     pgtype.UUID
	WorkflowId     pgtype.UUID
}

type UpdateFilterOpts

type UpdateFilterOpts struct {
	Scope      *string `json:"scope"`
	Expression *string `json:"expression"`
	Payload    []byte  `json:"payload"`
}

type UpdateTaskStatusRow

type UpdateTaskStatusRow struct {
	TenantId       pgtype.UUID
	TaskId         int64
	TaskInsertedAt pgtype.Timestamptz
	ReadableStatus sqlcv1.V1ReadableStatusOlap
	ExternalId     pgtype.UUID
	LatestWorkerId pgtype.UUID
	WorkflowId     pgtype.UUID
	IsDAGTask      bool
}

type V1StepRunData

type V1StepRunData struct {
	Input       map[string]interface{}            `json:"input"`
	TriggeredBy string                            `json:"triggered_by"`
	Parents     map[string]map[string]interface{} `json:"parents"`

	Triggers map[string]map[string]interface{} `json:"triggers"`

	// custom-set user data for the step
	UserData map[string]interface{} `json:"user_data"`

	// overrides set from the playground
	Overrides map[string]interface{} `json:"overrides"`

	// errors in upstream steps (only used in on-failure step)
	StepRunErrors map[string]string `json:"step_run_errors,omitempty"`
}

func (*V1StepRunData) Bytes

func (v1 *V1StepRunData) Bytes() []byte

type V1TaskWithPayload added in v0.73.0

type V1TaskWithPayload struct {
	*sqlcv1.V1Task
	Payload []byte `json:"payload"`
}

type V1WorkflowRunPopulator

type V1WorkflowRunPopulator struct {
	WorkflowRun  *WorkflowRunData
	TaskMetadata []TaskMetadata
}

type WasSuccessfullyClaimed added in v0.73.0

type WasSuccessfullyClaimed bool

type WebhookRepository added in v0.70.0

type WebhookRepository interface {
	CreateWebhook(ctx context.Context, tenantId string, params CreateWebhookOpts) (*sqlcv1.V1IncomingWebhook, error)
	ListWebhooks(ctx context.Context, tenantId string, params ListWebhooksOpts) ([]*sqlcv1.V1IncomingWebhook, error)
	DeleteWebhook(ctx context.Context, tenantId, webhookId string) (*sqlcv1.V1IncomingWebhook, error)
	GetWebhook(ctx context.Context, tenantId, webhookId string) (*sqlcv1.V1IncomingWebhook, error)
	CanCreate(ctx context.Context, tenantId string, webhookLimit int32) (bool, error)
	UpdateWebhook(ctx context.Context, tenantId string, webhookId, newExpression string) (*sqlcv1.V1IncomingWebhook, error)
}

type WorkerRepository

type WorkerRepository interface {
	ListWorkers(tenantId string, opts *repository.ListWorkersOpts) ([]*sqlcv1.ListWorkersWithSlotCountRow, error)
	GetWorkerById(workerId string) (*sqlcv1.GetWorkerByIdRow, error)
	ListWorkerState(tenantId, workerId string, maxRuns int) ([]*sqlcv1.ListSemaphoreSlotsWithStateForWorkerRow, []*dbsqlc.GetStepRunForEngineRow, error)
}

type WorkflowAndScope

type WorkflowAndScope struct {
	WorkflowId pgtype.UUID
	Scope      string
}

type WorkflowNameTriggerOpts

type WorkflowNameTriggerOpts struct {
	*TriggerTaskData

	ExternalId string

	// (optional) The idempotency key to use for debouncing this task
	IdempotencyKey *IdempotencyKey

	// Whether to skip the creation of the child workflow
	ShouldSkip bool
}

type WorkflowRepository

type WorkflowRepository interface {
	ListWorkflowNamesByIds(ctx context.Context, tenantId string, workflowIds []pgtype.UUID) (map[pgtype.UUID]string, error)
	PutWorkflowVersion(ctx context.Context, tenantId string, opts *CreateWorkflowVersionOpts) (*sqlcv1.GetWorkflowVersionForEngineRow, error)
}

type WorkflowRunData

type WorkflowRunData struct {
	AdditionalMetadata   []byte                      `json:"additional_metadata"`
	CreatedAt            pgtype.Timestamptz          `json:"created_at"`
	DisplayName          string                      `json:"display_name"`
	ErrorMessage         string                      `json:"error_message"`
	ExternalID           pgtype.UUID                 `json:"external_id"`
	FinishedAt           pgtype.Timestamptz          `json:"finished_at"`
	Input                []byte                      `json:"input"`
	InsertedAt           pgtype.Timestamptz          `json:"inserted_at"`
	Kind                 sqlcv1.V1RunKind            `json:"kind"`
	Output               *[]byte                     `json:"output,omitempty"`
	ParentTaskExternalId *pgtype.UUID                `json:"parent_task_external_id,omitempty"`
	ReadableStatus       sqlcv1.V1ReadableStatusOlap `json:"readable_status"`
	StepId               *pgtype.UUID                `json:"step_id,omitempty"`
	StartedAt            pgtype.Timestamptz          `json:"started_at"`
	TaskExternalId       *pgtype.UUID                `json:"task_external_id,omitempty"`
	TaskId               *int64                      `json:"task_id,omitempty"`
	TaskInsertedAt       *pgtype.Timestamptz         `json:"task_inserted_at,omitempty"`
	TenantID             pgtype.UUID                 `json:"tenant_id"`
	WorkflowID           pgtype.UUID                 `json:"workflow_id"`
	WorkflowVersionId    pgtype.UUID                 `json:"workflow_version_id"`
	RetryCount           *int                        `json:"retry_count,omitempty"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL