Documentation
¶
Index ¶
- Constants
- Variables
- type AssignResults
- type AssignedItem
- type AssignmentRepository
- type CandidateEventMatch
- type ChildWorkflowSignalCreatedData
- type CompleteTaskOpts
- type ConcurrencyRepository
- type ConcurrencyRepositoryImpl
- func (s ConcurrencyRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (res *RunConcurrencyResult, err error)
- func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error
- type CreateConcurrencyOpts
- type CreateCronWorkflowTriggerOpts
- type CreateExternalSignalConditionKind
- type CreateExternalSignalConditionOpt
- type CreateFilterOpts
- type CreateLogLineOpts
- type CreateMatchOpts
- type CreateStepMatchConditionOpt
- type CreateStepOpts
- type CreateTaskOpts
- type CreateWorkflowStepRateLimitOpts
- type CreateWorkflowVersionOpts
- type DAGWithData
- type DesiredWorkerLabelOpts
- type ErrNamesNotFound
- type EventMatchResults
- type EventTriggerOpts
- type EventTriggersFromExternalId
- type EventType
- type ExternalCreateSignalMatchOpts
- type FailTaskOpts
- type FailTasksResponse
- type FilterRepository
- type FinalizedTaskResponse
- type GroupMatchCondition
- type InternalTaskEvent
- type JobRunHasCycleError
- type LeaseRepository
- type ListActiveWorkersResult
- type ListFiltersOpts
- type ListFinalizedWorkflowRunsResponse
- type ListLogsOpts
- type ListTaskRunOpts
- type ListWorkflowRunOpts
- type LogLineRepository
- type MatchData
- type MatchRepository
- type MatchRepositoryImpl
- func (s MatchRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
- func (m *MatchRepositoryImpl) ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
- func (m *MatchRepositoryImpl) RegisterSignalMatchConditions(ctx context.Context, tenantId string, ...) error
- func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- type OLAPRepository
- type OLAPRepositoryImpl
- func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, ...) error
- func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
- func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, ...) error
- func (r *OLAPRepositoryImpl) CreateTasks(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) error
- func (s OLAPRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (r *OLAPRepositoryImpl) GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, ...) ([]*sqlcv1.GetTaskPointMetricsRow, error)
- func (r *OLAPRepositoryImpl) GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error)
- func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*sqlcv1.ListEventsRow, *int64, error)
- func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, ...) ([]*sqlcv1.ListTaskEventsRow, error)
- func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
- func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
- func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId string, dagids []pgtype.UUID, ...) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error)
- func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
- func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)
- func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
- func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
- func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
- func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
- func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, ...) (*sqlcv1.PopulateSingleTaskRunDataRow, pgtype.UUID, error)
- func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
- func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
- func (o *OLAPRepositoryImpl) SetReadReplicaPool(pool *pgxpool.Pool)
- func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantId string) (bool, []UpdateDAGStatusRow, error)
- func (o *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
- func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantId string) (bool, []UpdateTaskStatusRow, error)
- type QueueFactoryRepository
- type QueueRepository
- type RateLimitRepository
- type RateLimitResult
- type ReadTaskRunMetricsOpts
- type ReadableTaskStatus
- type RefreshTimeoutBy
- type ReplayTaskOpts
- type ReplayTasksResult
- type Repository
- type RetriedTask
- type Run
- type RunConcurrencyResult
- type SchedulerRepository
- type Sticky
- type TaskIdEventKeyTuple
- type TaskIdInsertedAtRetryCount
- type TaskIdInsertedAtSignalKey
- type TaskInput
- type TaskMetadata
- type TaskOutputEvent
- func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent
- func NewCancelledTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
- func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent
- func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent
- func NewFailedTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
- func NewSkippedTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
- type TaskRepository
- type TaskRepositoryImpl
- func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
- func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
- func (s TaskRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)
- func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
- func (r *TaskRepositoryImpl) GetQueueCounts(ctx context.Context, tenantId string) (map[string]interface{}, error)
- func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
- func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
- func (r *TaskRepositoryImpl) ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*sqlcv1.V1TaskEvent, error)
- func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
- func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
- func (r *TaskRepositoryImpl) ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)
- func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *TaskRepositoryImpl) ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)
- func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
- func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)
- func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
- func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
- type TaskRunMetric
- type TaskWithCancelledReason
- type TaskWithQueue
- type TickerRepository
- type TimeoutTasksResponse
- type TriggerDecision
- type TriggerFromEventsResult
- type TriggerRepository
- type TriggerRepositoryImpl
- func (s TriggerRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)
- func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
- type TriggerTaskData
- type TriggeredBy
- type TriggeredByEvent
- type UpdateDAGStatusRow
- type UpdateFilterOpts
- type UpdateTaskStatusRow
- type V1StepRunData
- type V1WorkflowRunPopulator
- type WorkerRepository
- type WorkflowNameTriggerOpts
- type WorkflowRepository
- type WorkflowRunData
Constants ¶
const MAX_TENANT_RATE_LIMITS = 10000
const NUM_PARTITIONS = 4
TODO: make this dynamic for the instance
const PARENT_STRATEGY_LOCK_OFFSET = 1000000000000 // 1 trillion
Variables ¶
var ErrDagParentNotFound = errors.New("dag parent not found")
Functions ¶
This section is empty.
Types ¶
type AssignResults ¶
type AssignResults struct {
Assigned []*AssignedItem
Unassigned []*sqlcv1.V1QueueItem
SchedulingTimedOut []*sqlcv1.V1QueueItem
RateLimited []*RateLimitResult
}
type AssignedItem ¶
type AssignedItem struct {
WorkerId pgtype.UUID
QueueItem *sqlcv1.V1QueueItem
}
type AssignmentRepository ¶
type AssignmentRepository interface {
ListActionsForWorkers(ctx context.Context, tenantId pgtype.UUID, workerIds []pgtype.UUID) ([]*sqlcv1.ListActionsForWorkersRow, error)
ListAvailableSlotsForWorkers(ctx context.Context, tenantId pgtype.UUID, params sqlcv1.ListAvailableSlotsForWorkersParams) ([]*sqlcv1.ListAvailableSlotsForWorkersRow, error)
}
type CandidateEventMatch ¶
type ChildWorkflowSignalCreatedData ¶
type ChildWorkflowSignalCreatedData struct {
// The external id of the target child task
ChildExternalId string `json:"external_id"`
// The external id of the parent task
ParentExternalId string `json:"parent_external_id"`
// The index of the child task
ChildIndex int64 `json:"child_index"`
// The key of the child task
ChildKey *string `json:"child_key"`
}
func (*ChildWorkflowSignalCreatedData) Bytes ¶
func (c *ChildWorkflowSignalCreatedData) Bytes() []byte
type CompleteTaskOpts ¶
type CompleteTaskOpts struct {
*TaskIdInsertedAtRetryCount
// (required) the output bytes for the task
Output []byte
}
type ConcurrencyRepository ¶
type ConcurrencyRepository interface {
// Checks whether the concurrency strategy is active, and if not, sets is_active=False
UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error
RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (*RunConcurrencyResult, error)
}
type ConcurrencyRepositoryImpl ¶
type ConcurrencyRepositoryImpl struct {
// contains filtered or unexported fields
}
func (ConcurrencyRepositoryImpl) DesiredWorkerId ¶
func (ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*ConcurrencyRepositoryImpl) RunConcurrencyStrategy ¶
func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy( ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency, ) (res *RunConcurrencyResult, err error)
func (ConcurrencyRepositoryImpl) ToV1StepRunData ¶
func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive ¶
func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive( ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency, ) error
type CreateConcurrencyOpts ¶
type CreateConcurrencyOpts struct {
// (optional) the maximum number of concurrent workflow runs, default 1
MaxRuns *int32
// (optional) the strategy to use when the concurrency limit is reached, default CANCEL_IN_PROGRESS
LimitStrategy *string `validate:"omitnil,oneof=CANCEL_IN_PROGRESS GROUP_ROUND_ROBIN CANCEL_NEWEST"`
// (required) a concurrency expression for evaluating the concurrency key
Expression string `validate:"celworkflowrunstr"`
}
type CreateExternalSignalConditionKind ¶
type CreateExternalSignalConditionKind string
const ( CreateExternalSignalConditionKindSLEEP CreateExternalSignalConditionKind = "SLEEP" CreateExternalSignalConditionKindUSEREVENT CreateExternalSignalConditionKind = "USER_EVENT" )
type CreateExternalSignalConditionOpt ¶
type CreateExternalSignalConditionOpt struct {
Kind CreateExternalSignalConditionKind `validate:"required, oneof=SLEEP USER_EVENT"`
ReadableDataKey string `validate:"required"`
OrGroupId string `validate:"required,uuid"`
UserEventKey *string
SleepFor *string `validate:"omitempty,duration"`
Expression string
}
type CreateFilterOpts ¶
type CreateLogLineOpts ¶
type CreateLogLineOpts struct {
TaskId int64
TaskInsertedAt pgtype.Timestamptz
// (optional) The time when the log line was created.
CreatedAt *time.Time
// (required) The message of the log line.
Message string `validate:"required,min=1,max=10000"`
// (optional) The level of the log line.
Level *string `validate:"omitnil,oneof=INFO ERROR WARN DEBUG"`
// (optional) The metadata of the log line.
Metadata []byte
// The retry count of the log line.
RetryCount int
}
type CreateMatchOpts ¶
type CreateMatchOpts struct {
Kind sqlcv1.V1MatchKind
ExistingMatchData []byte
Conditions []GroupMatchCondition
TriggerDAGId *int64
TriggerDAGInsertedAt pgtype.Timestamptz
TriggerExternalId *string
TriggerWorkflowRunId *string
TriggerStepId *string
TriggerStepIndex pgtype.Int8
TriggerExistingTaskId *int64
TriggerExistingTaskInsertedAt pgtype.Timestamptz
TriggerParentTaskExternalId pgtype.UUID
TriggerParentTaskId pgtype.Int8
TriggerParentTaskInsertedAt pgtype.Timestamptz
TriggerChildIndex pgtype.Int8
TriggerChildKey pgtype.Text
SignalTaskId *int64
SignalTaskInsertedAt pgtype.Timestamptz
SignalExternalId *string
SignalKey *string
}
type CreateStepMatchConditionOpt ¶
type CreateStepMatchConditionOpt struct {
// (required) the type of match condition for triggering the step
MatchConditionKind string `validate:"required,oneof=PARENT_OVERRIDE USER_EVENT SLEEP"`
// (required) the key for the event data when the workflow is triggered
ReadableDataKey string `validate:"required"`
// (required) the initial state for the task when the match condition is satisfied
Action string `validate:"required,oneof=QUEUE CANCEL SKIP"`
// (required) the or group id for the match condition
// we ignore the JSON field here because it's randomly generated by the client, and we don't want to
// publish a new workflow version just because this UUID is different. we use the `OrGroupHashIndex` instead.
OrGroupId string `json:"-" validate:"required,uuid"`
// NOTE: should not be set by the caller. This is populated by this package before creating the checksum.
OrGroupIdIndex int32
// (optional) the expression for the match condition
Expression string `validate:"omitempty"`
// (optional) the sleep duration for the match condition, only set if this is a SLEEP
SleepDuration *string `validate:"omitempty,duration"`
// (optional) the event key for the match condition, only set if this is a USER_EVENT
EventKey *string `validate:"omitempty"`
// (optional) if this is a PARENT_OVERRIDE condition, this will be set to the parent readable_id for
// the parent whose trigger behavior we're overriding
ParentReadableId *string `validate:"omitempty"`
}
type CreateStepOpts ¶
type CreateStepOpts struct {
// (required) the task name
ReadableId string `validate:"hatchetName"`
// (required) the task action id
Action string `validate:"required,actionId"`
// (optional) the task timeout
Timeout *string `validate:"omitnil,duration"`
// (optional) the task scheduling timeout
ScheduleTimeout *string `validate:"omitnil,duration"`
// (optional) the parents that this step depends on
Parents []string `validate:"dive,hatchetName"`
// (optional) the step retry max
Retries *int `validate:"omitempty,min=0"`
// (optional) rate limits for this step
RateLimits []CreateWorkflowStepRateLimitOpts `validate:"dive"`
// (optional) desired worker affinity state for this step
DesiredWorkerLabels map[string]DesiredWorkerLabelOpts `validate:"omitempty"`
// (optional) the step retry backoff factor
RetryBackoffFactor *float64 `validate:"omitnil,min=1,max=1000"`
// (optional) the step retry backoff max seconds (can't be greater than 86400)
RetryBackoffMaxSeconds *int `validate:"omitnil,min=1,max=86400"`
// (optional) a list of additional trigger conditions
TriggerConditions []CreateStepMatchConditionOpt `validate:"omitempty,dive"`
// (optional) the step concurrency options
Concurrency []CreateConcurrencyOpts `json:"concurrency,omitempty" validator:"omitnil"`
}
type CreateTaskOpts ¶
type CreateTaskOpts struct {
// (required) the external id
ExternalId string `validate:"required,uuid"`
// (required) the workflow run id. note this may be the same as the external id if this is a
// single-task workflow, otherwise it represents the external id of the DAG.
WorkflowRunId string `validate:"required,uuid"`
// (required) the step id
StepId string `validate:"required,uuid"`
// (required) the input bytes to the task
Input *TaskInput
FilterPayload []byte
// (required) the step index for the task
StepIndex int
// (optional) the additional metadata for the task
AdditionalMetadata []byte
// (optional) the desired worker id
DesiredWorkerId *string
// (optional) the DAG id for the task
DagId *int64
// (optional) the DAG inserted at for the task
DagInsertedAt pgtype.Timestamptz
// (required) the initial state for the task
InitialState sqlcv1.V1TaskInitialState
// (optional) the parent task external id
ParentTaskExternalId *string
// (optional) the parent task id
ParentTaskId *int64
// (optional) the parent task inserted at
ParentTaskInsertedAt *time.Time
// (optional) The priority of a task, between 1 and 3
Priority *int32
// (optional) the child index for the task
ChildIndex *int64
// (optional) the child key for the task
ChildKey *string
}
type CreateWorkflowStepRateLimitOpts ¶
type CreateWorkflowStepRateLimitOpts struct {
// (required) the rate limit key
Key string `validate:"required"`
// (optional) a CEL expression for the rate limit key
KeyExpr *string `validate:"omitnil,celsteprunstr,required_without=Key"`
// (optional) the rate limit units to consume
Units *int `validate:"omitnil,required_without=UnitsExpr"`
// (optional) a CEL expression for the rate limit units
UnitsExpr *string `validate:"omitnil,celsteprunstr,required_without=Units"`
// (optional) a CEL expression for a dynamic limit value for the rate limit
LimitExpr *string `validate:"omitnil,celsteprunstr"`
// (optional) the rate limit duration, defaults to MINUTE
Duration *string `validate:"omitnil,oneof=SECOND MINUTE HOUR DAY WEEK MONTH YEAR"`
}
type CreateWorkflowVersionOpts ¶
type CreateWorkflowVersionOpts struct {
// (required) the workflow name
Name string `validate:"required,hatchetName"`
// (optional) the workflow description
Description *string `json:"description,omitempty"`
// (optional) event triggers for the workflow
EventTriggers []string
// (optional) cron triggers for the workflow
CronTriggers []string `validate:"dive,cron"`
// (optional) the input bytes for the cron triggers
CronInput []byte
// (required) the tasks in the workflow
Tasks []CreateStepOpts `validate:"required,min=1,dive"`
OnFailure *CreateStepOpts `json:"onFailureJob,omitempty" validate:"omitempty"`
// (optional) the workflow concurrency groups
Concurrency []CreateConcurrencyOpts `json:"concurrency,omitempty" validator:"omitempty,dive"`
// (optional) sticky strategy
Sticky *string `validate:"omitempty,oneof=SOFT HARD"`
DefaultPriority *int32 `validate:"omitempty,min=1,max=3"`
DefaultFilters []types.DefaultFilter `json:"defaultFilters,omitempty" validate:"omitempty,dive"`
}
type DAGWithData ¶
type DesiredWorkerLabelOpts ¶
type DesiredWorkerLabelOpts struct {
// (required) the label key
Key string `validate:"required"`
// (required if StringValue is nil) the label integer value
IntValue *int32 `validate:"omitnil,required_without=StrValue"`
// (required if StrValue is nil) the label string value
StrValue *string `validate:"omitnil,required_without=IntValue"`
// (optional) if the label is required
Required *bool `validate:"omitempty"`
// (optional) the weight of the label for scheduling (default: 100)
Weight *int32 `validate:"omitempty"`
// (optional) the label comparator for scheduling (default: EQUAL)
Comparator *string `validate:"omitempty,oneof=EQUAL NOT_EQUAL GREATER_THAN LESS_THAN GREATER_THAN_OR_EQUAL LESS_THAN_OR_EQUAL"`
}
type ErrNamesNotFound ¶
type ErrNamesNotFound struct {
Names []string
}
func (*ErrNamesNotFound) Error ¶
func (e *ErrNamesNotFound) Error() string
type EventMatchResults ¶
type EventTriggerOpts ¶
type EventTriggersFromExternalId ¶
type EventTriggersFromExternalId struct {
RunID int64 `json:"run_id"`
RunInsertedAt pgtype.Timestamptz `json:"run_inserted_at"`
EventExternalId pgtype.UUID `json:"event_external_id"`
EventSeenAt pgtype.Timestamptz `json:"event_seen_at"`
}
type EventType ¶
type EventType string
const ( EVENT_TYPE_REQUEUED_NO_WORKER EventType = "REQUEUED_NO_WORKER" EVENT_TYPE_REQUEUED_RATE_LIMIT EventType = "REQUEUED_RATE_LIMIT" EVENT_TYPE_SCHEDULING_TIMED_OUT EventType = "SCHEDULING_TIMED_OUT" EVENT_TYPE_ASSIGNED EventType = "ASSIGNED" EVENT_TYPE_STARTED EventType = "STARTED" EVENT_TYPE_FINISHED EventType = "FINISHED" EVENT_TYPE_FAILED EventType = "FAILED" EVENT_TYPE_RETRYING EventType = "RETRYING" EVENT_TYPE_CANCELLED EventType = "CANCELLED" EVENT_TYPE_TIMED_OUT EventType = "TIMED_OUT" EVENT_TYPE_REASSIGNED EventType = "REASSIGNED" EVENT_TYPE_SLOT_RELEASED EventType = "SLOT_RELEASED" EVENT_TYPE_TIMEOUT_REFRESHED EventType = "TIMEOUT_REFRESHED" EVENT_TYPE_RETRIED_BY_USER EventType = "RETRIED_BY_USER" EVENT_TYPE_SENT_TO_WORKER EventType = "SENT_TO_WORKER" EVENT_TYPE_RATE_LIMIT_ERROR EventType = "RATE_LIMIT_ERROR" EVENT_TYPE_ACKNOWLEDGED EventType = "ACKNOWLEDGED" EVENT_TYPE_CREATED EventType = "CREATED" EVENT_TYPE_QUEUED EventType = "QUEUED" )
type ExternalCreateSignalMatchOpts ¶
type ExternalCreateSignalMatchOpts struct {
Conditions []CreateExternalSignalConditionOpt `validate:"required,min=1,dive"`
SignalTaskId int64 `validate:"required,gt=0"`
SignalTaskInsertedAt pgtype.Timestamptz
SignalExternalId string `validate:"required,uuid"`
SignalKey string `validate:"required"`
}
type FailTaskOpts ¶
type FailTaskOpts struct {
*TaskIdInsertedAtRetryCount
// (required) whether this is an application-level error or an internal error on the Hatchet side
IsAppError bool
// (optional) the error message for the task
ErrorMessage string
// (optional) A boolean flag to indicate whether the error is non-retryable, meaning it should _not_ be retried. Defaults to false.
IsNonRetryable bool
}
type FailTasksResponse ¶
type FailTasksResponse struct {
*FinalizedTaskResponse
RetriedTasks []RetriedTask
}
type FilterRepository ¶
type FilterRepository interface {
CreateFilter(ctx context.Context, tenantId string, params CreateFilterOpts) (*sqlcv1.V1Filter, error)
ListFilters(ctx context.Context, tenantId string, params ListFiltersOpts) ([]*sqlcv1.V1Filter, error)
DeleteFilter(ctx context.Context, tenantId, filterId string) (*sqlcv1.V1Filter, error)
GetFilter(ctx context.Context, tenantId, filterId string) (*sqlcv1.V1Filter, error)
UpdateFilter(ctx context.Context, tenantId string, filterId string, opts UpdateFilterOpts) (*sqlcv1.V1Filter, error)
}
type FinalizedTaskResponse ¶
type FinalizedTaskResponse struct {
ReleasedTasks []*sqlcv1.ReleaseTasksRow
InternalEvents []InternalTaskEvent
}
type GroupMatchCondition ¶
type GroupMatchCondition struct {
GroupId string `validate:"required,uuid"`
EventType sqlcv1.V1EventType
EventKey string
// (optional) a hint for querying the event data
EventResourceHint *string
// the data key which gets inserted into the returned data from a satisfied match condition
ReadableDataKey string
Expression string
Action sqlcv1.V1MatchConditionAction
// (optional) the data which was used to satisfy the condition (relevant for replays)
Data []byte
}
type InternalTaskEvent ¶
type InternalTaskEvent struct {
TenantID string `json:"tenant_id"`
TaskID int64 `json:"task_id"`
TaskExternalID string `json:"task_external_id"`
RetryCount int32 `json:"retry_count"`
EventType sqlcv1.V1TaskEventType `json:"event_type"`
EventKey string `json:"event_key"`
Data []byte `json:"data"`
}
InternalTaskEvent resembles sqlcv1.V1TaskEvent, but doesn't include the id field as we use COPY FROM to write the events to the database.
type JobRunHasCycleError ¶
type JobRunHasCycleError struct {
JobName string
}
func (*JobRunHasCycleError) Error ¶
func (e *JobRunHasCycleError) Error() string
type LeaseRepository ¶
type LeaseRepository interface {
ListQueues(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1Queue, error)
ListActiveWorkers(ctx context.Context, tenantId pgtype.UUID) ([]*ListActiveWorkersResult, error)
ListConcurrencyStrategies(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1StepConcurrency, error)
AcquireOrExtendLeases(ctx context.Context, tenantId pgtype.UUID, kind sqlcv1.LeaseKind, resourceIds []string, existingLeases []*sqlcv1.Lease) ([]*sqlcv1.Lease, error)
ReleaseLeases(ctx context.Context, tenantId pgtype.UUID, leases []*sqlcv1.Lease) error
}
type ListActiveWorkersResult ¶
type ListActiveWorkersResult struct {
ID string
MaxRuns int
Labels []*sqlcv1.ListManyWorkerLabelsRow
}
type ListFiltersOpts ¶
type ListFinalizedWorkflowRunsResponse ¶
type ListFinalizedWorkflowRunsResponse struct {
WorkflowRunId string
OutputEvents []*TaskOutputEvent
}
type ListLogsOpts ¶
type ListLogsOpts struct {
// (optional) number of logs to skip
Offset *int
// (optional) number of logs to return
Limit *int `validate:"omitnil,min=1,max=10000"`
// (optional) a list of log levels to filter by
Levels []string `validate:"omitnil,dive,oneof=INFO ERROR WARN DEBUG"`
// (optional) a search query
Search *string
}
type ListTaskRunOpts ¶
type ListTaskRunOpts struct {
CreatedAfter time.Time
Statuses []sqlcv1.V1ReadableStatusOlap
WorkflowIds []uuid.UUID
WorkerId *uuid.UUID
StartedAfter time.Time
FinishedBefore *time.Time
AdditionalMetadata map[string]interface{}
TriggeringEventExternalId *uuid.UUID
Limit int64
Offset int64
IncludePayloads bool
}
type ListWorkflowRunOpts ¶
type ListWorkflowRunOpts struct {
CreatedAfter time.Time
Statuses []sqlcv1.V1ReadableStatusOlap
WorkflowIds []uuid.UUID
StartedAfter time.Time
FinishedBefore *time.Time
AdditionalMetadata map[string]interface{}
Limit int64
Offset int64
ParentTaskExternalId *pgtype.UUID
TriggeringEventExternalId *pgtype.UUID
IncludePayloads bool
}
type LogLineRepository ¶
type LogLineRepository interface {
ListLogLines(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, opts *ListLogsOpts) ([]*sqlcv1.V1LogLine, error)
PutLog(ctx context.Context, tenantId string, opts *CreateLogLineOpts) error
}
type MatchData ¶
type MatchData struct {
// contains filtered or unexported fields
}
parses match aggregated data
func NewMatchData ¶
func (*MatchData) Action ¶
func (m *MatchData) Action() sqlcv1.V1MatchConditionAction
func (*MatchData) DataValueAsTaskOutputEvent ¶
func (m *MatchData) DataValueAsTaskOutputEvent(key string) *TaskOutputEvent
Helper function for internal events
func (*MatchData) TriggerDataKeys ¶
func (*MatchData) TriggerDataValue ¶
type MatchRepository ¶
type MatchRepository interface {
RegisterSignalMatchConditions(ctx context.Context, tenantId string, eventMatches []ExternalCreateSignalMatchOpts) error
ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
}
type MatchRepositoryImpl ¶
type MatchRepositoryImpl struct {
// contains filtered or unexported fields
}
func (MatchRepositoryImpl) DesiredWorkerId ¶
func (MatchRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*MatchRepositoryImpl) ProcessInternalEventMatches ¶
func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
ProcessInternalEventMatches processes a list of internal events
func (*MatchRepositoryImpl) ProcessUserEventMatches ¶
func (m *MatchRepositoryImpl) ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
ProcessUserEventMatches processes a list of user events
func (*MatchRepositoryImpl) RegisterSignalMatchConditions ¶
func (m *MatchRepositoryImpl) RegisterSignalMatchConditions(ctx context.Context, tenantId string, signalMatches []ExternalCreateSignalMatchOpts) error
func (MatchRepositoryImpl) ToV1StepRunData ¶
func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
type OLAPRepository ¶
type OLAPRepository interface {
UpdateTablePartitions(ctx context.Context) error
SetReadReplicaPool(pool *pgxpool.Pool)
ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz, retryCount *int) (*sqlcv1.PopulateSingleTaskRunDataRow, pgtype.UUID, error)
ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)
ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
CreateTasks(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) error
CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error
CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, endTimestamp *time.Time, bucketInterval time.Duration) ([]*sqlcv1.GetTaskPointMetricsRow, error)
UpdateTaskStatuses(ctx context.Context, tenantId string) (bool, []UpdateTaskStatusRow, error)
UpdateDAGStatuses(ctx context.Context, tenantId string) (bool, []UpdateDAGStatusRow, error)
ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
ListTasksByDAGId(ctx context.Context, tenantId string, dagIds []pgtype.UUID, includePayloads bool) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error)
ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)
// ListTasksByExternalIds returns a list of tasks based on their external ids or the external id of their parent DAG.
// In the case of a DAG, we flatten the result into the list of tasks which belong to that DAG.
ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error)
BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error
ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*sqlcv1.ListEventsRow, *int64, error)
}
func NewOLAPRepositoryFromPool ¶
func NewOLAPRepositoryFromPool(pool *pgxpool.Pool, l *zerolog.Logger, olapRetentionPeriod time.Duration, entitlements repository.EntitlementsRepository, shouldPartitionEventsTables bool) (OLAPRepository, func() error)
type OLAPRepositoryImpl ¶
type OLAPRepositoryImpl struct {
// contains filtered or unexported fields
}
func (*OLAPRepositoryImpl) BulkCreateEventsAndTriggers ¶
func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error
func (*OLAPRepositoryImpl) CreateDAGs ¶
func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
func (*OLAPRepositoryImpl) CreateTaskEvents ¶
func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error
func (*OLAPRepositoryImpl) CreateTasks ¶
func (OLAPRepositoryImpl) DesiredWorkerId ¶
func (*OLAPRepositoryImpl) GetTaskPointMetrics ¶
func (*OLAPRepositoryImpl) GetTaskTimings ¶
func (*OLAPRepositoryImpl) ListEvents ¶
func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*sqlcv1.ListEventsRow, *int64, error)
func (*OLAPRepositoryImpl) ListTaskRunEvents ¶
func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)
func (*OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId ¶
func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
func (*OLAPRepositoryImpl) ListTasks ¶
func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
func (*OLAPRepositoryImpl) ListTasksByDAGId ¶
func (*OLAPRepositoryImpl) ListTasksByExternalIds ¶
func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
func (*OLAPRepositoryImpl) ListTasksByIdAndInsertedAt ¶
func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)
func (*OLAPRepositoryImpl) ListWorkflowRunDisplayNames ¶
func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
func (*OLAPRepositoryImpl) ListWorkflowRuns ¶
func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
func (OLAPRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*OLAPRepositoryImpl) ReadDAG ¶
func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
func (*OLAPRepositoryImpl) ReadTaskRun ¶
func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
func (*OLAPRepositoryImpl) ReadTaskRunData ¶
func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz, retryCount *int) (*sqlcv1.PopulateSingleTaskRunDataRow, pgtype.UUID, error)
func (*OLAPRepositoryImpl) ReadTaskRunMetrics ¶
func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
func (*OLAPRepositoryImpl) ReadWorkflowRun ¶
func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
func (*OLAPRepositoryImpl) SetReadReplicaPool ¶
func (o *OLAPRepositoryImpl) SetReadReplicaPool(pool *pgxpool.Pool)
func (OLAPRepositoryImpl) ToV1StepRunData ¶
func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*OLAPRepositoryImpl) UpdateDAGStatuses ¶
func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantId string) (bool, []UpdateDAGStatusRow, error)
func (*OLAPRepositoryImpl) UpdateTablePartitions ¶
func (o *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
func (*OLAPRepositoryImpl) UpdateTaskStatuses ¶
func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantId string) (bool, []UpdateTaskStatusRow, error)
type QueueFactoryRepository ¶
type QueueFactoryRepository interface {
NewQueue(tenantId pgtype.UUID, queueName string) QueueRepository
}
type QueueRepository ¶
type QueueRepository interface {
ListQueueItems(ctx context.Context, limit int) ([]*sqlcv1.V1QueueItem, error)
MarkQueueItemsProcessed(ctx context.Context, r *AssignResults) (succeeded []*AssignedItem, failed []*AssignedItem, err error)
GetTaskRateLimits(ctx context.Context, queueItems []*sqlcv1.V1QueueItem) (map[int64]map[string]int32, error)
GetDesiredLabels(ctx context.Context, stepIds []pgtype.UUID) (map[string][]*sqlcv1.GetDesiredLabelsRow, error)
Cleanup()
}
type RateLimitRepository ¶
type RateLimitResult ¶
type ReadTaskRunMetricsOpts ¶
type ReadableTaskStatus ¶
type ReadableTaskStatus string
const ( READABLE_TASK_STATUS_QUEUED ReadableTaskStatus = "QUEUED" READABLE_TASK_STATUS_RUNNING ReadableTaskStatus = "RUNNING" READABLE_TASK_STATUS_COMPLETED ReadableTaskStatus = "COMPLETED" READABLE_TASK_STATUS_CANCELLED ReadableTaskStatus = "CANCELLED" READABLE_TASK_STATUS_FAILED ReadableTaskStatus = "FAILED" )
func StringToReadableStatus ¶
func StringToReadableStatus(status string) ReadableTaskStatus
func (ReadableTaskStatus) EnumValue ¶
func (s ReadableTaskStatus) EnumValue() int
type RefreshTimeoutBy ¶
type ReplayTaskOpts ¶
type ReplayTaskOpts struct {
// (required) the task id
TaskId int64
// (required) the inserted at time
InsertedAt pgtype.Timestamptz
// (required) the external id
ExternalId string
// (required) the step id
StepId string
// (optional) the input bytes to the task, uses the existing input if not set
Input *TaskInput
// (required) the initial state for the task
InitialState sqlcv1.V1TaskInitialState
// (optional) the additional metadata for the task
AdditionalMetadata []byte
}
type ReplayTasksResult ¶
type ReplayTasksResult struct {
ReplayedTasks []TaskIdInsertedAtRetryCount
UpsertedTasks []*sqlcv1.V1Task
InternalEventResults *EventMatchResults
}
type Repository ¶
type Repository interface {
Triggers() TriggerRepository
Tasks() TaskRepository
Scheduler() SchedulerRepository
Matches() MatchRepository
OLAP() OLAPRepository
OverwriteOLAPRepository(o OLAPRepository)
Logs() LogLineRepository
OverwriteLogsRepository(l LogLineRepository)
Workers() WorkerRepository
Workflows() WorkflowRepository
Ticker() TickerRepository
Filters() FilterRepository
}
func NewRepository ¶
func NewRepository(pool *pgxpool.Pool, l *zerolog.Logger, taskRetentionPeriod, olapRetentionPeriod time.Duration, maxInternalRetryCount int32, entitlements repository.EntitlementsRepository) (Repository, func() error)
type RetriedTask ¶
type RunConcurrencyResult ¶
type RunConcurrencyResult struct {
// The tasks which were enqueued
Queued []TaskWithQueue
// If the strategy involves cancelling a task, these are the tasks to cancel
Cancelled []TaskWithCancelledReason
// If the step has multiple concurrency strategies, these are the next ones to notify
NextConcurrencyStrategies []int64
}
type SchedulerRepository ¶
type SchedulerRepository interface {
Concurrency() ConcurrencyRepository
Lease() LeaseRepository
QueueFactory() QueueFactoryRepository
RateLimit() RateLimitRepository
Assignment() AssignmentRepository
}
type TaskIdEventKeyTuple ¶
type TaskIdInsertedAtRetryCount ¶
type TaskIdInsertedAtRetryCount struct {
// (required) the external id
Id int64 `validate:"required"`
// (required) the inserted at time
InsertedAt pgtype.Timestamptz
// (required) the retry count
RetryCount int32
}
type TaskIdInsertedAtSignalKey ¶
type TaskIdInsertedAtSignalKey struct {
// (required) the external id
Id int64 `validate:"required"`
// (required) the inserted at time
InsertedAt pgtype.Timestamptz
// (required) the signal key for the event
SignalKey string
}
type TaskInput ¶
type TaskMetadata ¶
type TaskMetadata struct {
TaskID int64 `json:"task_id"`
TaskInsertedAt time.Time `json:"task_inserted_at"`
}
func ParseTaskMetadata ¶
func ParseTaskMetadata(jsonData []byte) ([]TaskMetadata, error)
type TaskOutputEvent ¶
type TaskOutputEvent struct {
IsFailure bool `json:"is_failure"`
EventType sqlcv1.V1TaskEventType `json:"event_type"`
TaskExternalId string `json:"task_external_id"`
TaskId int64 `json:"task_id"`
RetryCount int32 `json:"retry_count"`
WorkerId *string `json:"worker_id"`
Output []byte `json:"output"`
ErrorMessage string `json:"error_message"`
StepReadableID string `json:"step_readable_id"`
}
func NewCancelledTaskOutputEvent ¶
func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent
func NewCancelledTaskOutputEventFromTask ¶
func NewCancelledTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
func NewCompletedTaskOutputEvent ¶
func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent
func NewFailedTaskOutputEvent ¶
func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent
func NewFailedTaskOutputEventFromTask ¶
func NewFailedTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
func NewSkippedTaskOutputEventFromTask ¶
func NewSkippedTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
func (*TaskOutputEvent) Bytes ¶
func (e *TaskOutputEvent) Bytes() []byte
func (*TaskOutputEvent) IsCancelled ¶
func (e *TaskOutputEvent) IsCancelled() bool
func (*TaskOutputEvent) IsCompleted ¶
func (e *TaskOutputEvent) IsCompleted() bool
func (*TaskOutputEvent) IsFailed ¶
func (e *TaskOutputEvent) IsFailed() bool
type TaskRepository ¶
type TaskRepository interface {
UpdateTablePartitions(ctx context.Context) error
// GetTaskByExternalId is a heavily cached method to return task metadata by its external id
GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
// FlattenExternalIds is a non-cached method to look up all tasks in a workflow run by their external ids.
// This is non-cacheable because tasks can be added to a workflow run as it executes.
FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
FailTasks(ctx context.Context, tenantId string, tasks []FailTaskOpts) (*FailTasksResponse, error)
CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)
ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
// ListTaskParentOutputs is a method to return the output of a task's parent and grandparent tasks. This is for v0 compatibility
// with the v1 engine, and shouldn't be called from new v1 endpoints.
ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)
ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)
ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)
GetQueueCounts(ctx context.Context, tenantId string) (map[string]interface{}, error)
ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
ReleaseSlot(ctx context.Context, tenantId string, externalId string) (*sqlcv1.V1TaskRuntime, error)
ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*sqlcv1.V1TaskEvent, error)
}
type TaskRepositoryImpl ¶
type TaskRepositoryImpl struct {
// contains filtered or unexported fields
}
func (*TaskRepositoryImpl) CancelTasks ¶
func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
func (*TaskRepositoryImpl) CompleteTasks ¶
func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
func (TaskRepositoryImpl) DesiredWorkerId ¶
func (*TaskRepositoryImpl) FailTasks ¶
func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)
func (*TaskRepositoryImpl) FlattenExternalIds ¶
func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
func (*TaskRepositoryImpl) GetQueueCounts ¶
func (*TaskRepositoryImpl) GetTaskByExternalId ¶
func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
func (*TaskRepositoryImpl) ListFinalizedWorkflowRuns ¶
func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
func (*TaskRepositoryImpl) ListSignalCompletedEvents ¶
func (r *TaskRepositoryImpl) ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*sqlcv1.V1TaskEvent, error)
func (*TaskRepositoryImpl) ListTaskMetas ¶
func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
func (*TaskRepositoryImpl) ListTaskParentOutputs ¶
func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
func (TaskRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*TaskRepositoryImpl) ProcessDurableSleeps ¶
func (r *TaskRepositoryImpl) ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)
func (*TaskRepositoryImpl) ProcessTaskReassignments ¶
func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)
func (*TaskRepositoryImpl) ProcessTaskRetryQueueItems ¶
func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
func (*TaskRepositoryImpl) ProcessTaskTimeouts ¶
func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)
func (*TaskRepositoryImpl) RefreshTimeoutBy ¶
func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
func (*TaskRepositoryImpl) ReleaseSlot ¶
func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)
func (*TaskRepositoryImpl) ReplayTasks ¶
func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
func (TaskRepositoryImpl) ToV1StepRunData ¶
func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*TaskRepositoryImpl) UpdateTablePartitions ¶
func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
type TaskRunMetric ¶
type TaskWithCancelledReason ¶
type TaskWithCancelledReason struct {
*TaskIdInsertedAtRetryCount
CancelledReason string
TaskExternalId string
WorkflowRunId string
}
type TaskWithQueue ¶
type TaskWithQueue struct {
*TaskIdInsertedAtRetryCount
Queue string
}
type TickerRepository ¶
type TimeoutTasksResponse ¶
type TimeoutTasksResponse struct {
*FailTasksResponse
TimeoutTasks []*sqlcv1.ListTasksToTimeoutRow
}
type TriggerDecision ¶
type TriggerFromEventsResult ¶
type TriggerFromEventsResult struct {
Tasks []*sqlcv1.V1Task
Dags []*DAGWithData
EventExternalIdToRuns map[string][]*Run
}
type TriggerRepository ¶
type TriggerRepository interface {
TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)
TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
}
type TriggerRepositoryImpl ¶
type TriggerRepositoryImpl struct {
// contains filtered or unexported fields
}
func (TriggerRepositoryImpl) DesiredWorkerId ¶
func (TriggerRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts ¶
func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
func (TriggerRepositoryImpl) ToV1StepRunData ¶
func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*TriggerRepositoryImpl) TriggerFromEvents ¶
func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)
func (*TriggerRepositoryImpl) TriggerFromWorkflowNames ¶
func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
type TriggerTaskData ¶
type TriggerTaskData struct {
// (required) the workflow name
WorkflowName string `json:"workflow_name" validate:"required"`
// (optional) the workflow run data
Data []byte `json:"data"`
// (optional) the workflow run metadata
AdditionalMetadata []byte `json:"additional_metadata"`
// (optional) the desired worker id
DesiredWorkerId *string `json:"desired_worker_id"`
// (optional) the parent external id
ParentExternalId *string `json:"parent_external_id"`
// (optional) the parent task id
ParentTaskId *int64 `json:"parent_task_id"`
// (optional) the parent inserted_at
ParentTaskInsertedAt *time.Time `json:"parent_task_inserted_at"`
// (optional) the child index
ChildIndex *int64 `json:"child_index"`
// (optional) the child key
ChildKey *string `json:"child_key"`
Priority *int32 `json:"priority"`
}
type TriggeredBy ¶
type TriggeredByEvent ¶
type TriggeredByEvent struct {
// contains filtered or unexported fields
}
func (*TriggeredByEvent) ToMetadata ¶
func (t *TriggeredByEvent) ToMetadata(additionalMetadata []byte) []byte
type UpdateDAGStatusRow ¶
type UpdateDAGStatusRow struct {
DagId int64
DagInsertedAt pgtype.Timestamptz
ReadableStatus sqlcv1.V1ReadableStatusOlap
ExternalId pgtype.UUID
}
type UpdateFilterOpts ¶
type UpdateTaskStatusRow ¶
type UpdateTaskStatusRow struct {
TaskId int64
TaskInsertedAt pgtype.Timestamptz
ReadableStatus sqlcv1.V1ReadableStatusOlap
ExternalId pgtype.UUID
}
type V1StepRunData ¶
type V1StepRunData struct {
Input map[string]interface{} `json:"input"`
TriggeredBy string `json:"triggered_by"`
Parents map[string]map[string]interface{} `json:"parents"`
Triggers map[string]map[string]interface{} `json:"triggers"`
// custom-set user data for the step
UserData map[string]interface{} `json:"user_data"`
// overrides set from the playground
Overrides map[string]interface{} `json:"overrides"`
// errors in upstream steps (only used in on-failure step)
StepRunErrors map[string]string `json:"step_run_errors,omitempty"`
}
func (*V1StepRunData) Bytes ¶
func (v1 *V1StepRunData) Bytes() []byte
type V1WorkflowRunPopulator ¶
type V1WorkflowRunPopulator struct {
WorkflowRun *WorkflowRunData
TaskMetadata []TaskMetadata
}
type WorkerRepository ¶
type WorkerRepository interface {
ListWorkers(tenantId string, opts *repository.ListWorkersOpts) ([]*sqlcv1.ListWorkersWithSlotCountRow, error)
GetWorkerById(workerId string) (*sqlcv1.GetWorkerByIdRow, error)
ListWorkerState(tenantId, workerId string, maxRuns int) ([]*sqlcv1.ListSemaphoreSlotsWithStateForWorkerRow, []*dbsqlc.GetStepRunForEngineRow, error)
}
type WorkflowNameTriggerOpts ¶
type WorkflowNameTriggerOpts struct {
*TriggerTaskData
ExternalId string
// Whether to skip the creation of the child workflow
ShouldSkip bool
}
type WorkflowRepository ¶
type WorkflowRunData ¶
type WorkflowRunData struct {
AdditionalMetadata []byte `json:"additional_metadata"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
DisplayName string `json:"display_name"`
ErrorMessage string `json:"error_message"`
ExternalID pgtype.UUID `json:"external_id"`
FinishedAt pgtype.Timestamptz `json:"finished_at"`
Input []byte `json:"input"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
Kind sqlcv1.V1RunKind `json:"kind"`
Output *[]byte `json:"output,omitempty"`
ParentTaskExternalId *pgtype.UUID `json:"parent_task_external_id,omitempty"`
ReadableStatus sqlcv1.V1ReadableStatusOlap `json:"readable_status"`
StepId *pgtype.UUID `json:"step_id,omitempty"`
StartedAt pgtype.Timestamptz `json:"started_at"`
TaskExternalId *pgtype.UUID `json:"task_external_id,omitempty"`
TaskId *int64 `json:"task_id,omitempty"`
TaskInsertedAt *pgtype.Timestamptz `json:"task_inserted_at,omitempty"`
TenantID pgtype.UUID `json:"tenant_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
WorkflowVersionId pgtype.UUID `json:"workflow_version_id"`
RetryCount *int `json:"retry_count,omitempty"`
}