Documentation
¶
Index ¶
- Constants
- Variables
- type APIKeyAuthCredentials
- type AssignResults
- type AssignedItem
- type AssignmentRepository
- type AuthConfig
- type BasicAuthCredentials
- type BulkRetrievePayloadOpts
- type CELEvaluationFailure
- type CandidateEventMatch
- type ChildWorkflowSignalCreatedData
- type CompleteTaskOpts
- type ConcurrencyRepository
- type ConcurrencyRepositoryImpl
- func (s ConcurrencyRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (res *RunConcurrencyResult, err error)
- func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error
- type CreateConcurrencyOpts
- type CreateCronWorkflowTriggerOpts
- type CreateExternalSignalConditionKind
- type CreateExternalSignalConditionOpt
- type CreateFilterOpts
- type CreateIncomingWebhookFailureLogOpts
- type CreateLogLineOpts
- type CreateMatchOpts
- type CreateStepMatchConditionOpt
- type CreateStepOpts
- type CreateTaskOpts
- type CreateWebhookOpts
- type CreateWorkflowStepRateLimitOpts
- type CreateWorkflowVersionOpts
- type DAGWithData
- type DesiredWorkerLabelOpts
- type ErrNamesNotFound
- type EventExternalIdFilterId
- type EventMatchResults
- type EventTriggerOpts
- type EventTriggersFromExternalId
- type EventType
- type ExternalCreateSignalMatchOpts
- type ExternalPayloadLocationKey
- type ExternalStore
- type FailTaskOpts
- type FailTasksResponse
- type FilterRepository
- type FinalizedTaskResponse
- type GroupMatchCondition
- type HMACAuthCredentials
- type IdInsertedAt
- type IdempotencyKey
- type IdempotencyRepository
- type InternalTaskEvent
- type JobRunHasCycleError
- type KeyClaimantPair
- type LeaseRepository
- type ListActiveWorkersResult
- type ListEventsRow
- type ListFiltersOpts
- type ListFinalizedWorkflowRunsResponse
- type ListLogsOpts
- type ListTaskRunOpts
- type ListWebhooksOpts
- type ListWorkflowRunOpts
- type LogLineRepository
- type MatchData
- type MatchRepository
- type MatchRepositoryImpl
- func (s MatchRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
- func (m *MatchRepositoryImpl) ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
- func (m *MatchRepositoryImpl) RegisterSignalMatchConditions(ctx context.Context, tenantId string, ...) error
- func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- type NoOpExternalStore
- type OLAPRepository
- type OLAPRepositoryImpl
- func (r *OLAPRepositoryImpl) AnalyzeOLAPTables(ctx context.Context) error
- func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, ...) error
- func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
- func (r *OLAPRepositoryImpl) CreateIncomingWebhookValidationFailureLogs(ctx context.Context, tenantId string, ...) error
- func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, ...) error
- func (r *OLAPRepositoryImpl) CreateTasks(ctx context.Context, tenantId string, tasks []*V1TaskWithPayload) error
- func (s OLAPRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (r *OLAPRepositoryImpl) GetDAGDurations(ctx context.Context, tenantId string, externalIds []pgtype.UUID, ...) (map[string]*sqlcv1.GetDagDurationsRow, error)
- func (r *OLAPRepositoryImpl) GetTaskDurationsByTaskIds(ctx context.Context, tenantId string, taskIds []int64, ...) (map[int64]*sqlcv1.GetTaskDurationsByTaskIdsRow, error)
- func (r *OLAPRepositoryImpl) GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, ...) ([]*sqlcv1.GetTaskPointMetricsRow, error)
- func (r *OLAPRepositoryImpl) GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error)
- func (r *OLAPRepositoryImpl) ListEventKeys(ctx context.Context, tenantId string) ([]string, error)
- func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*ListEventsRow, *int64, error)
- func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, ...) ([]*sqlcv1.ListTaskEventsRow, error)
- func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
- func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
- func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId string, dagids []pgtype.UUID, ...) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error)
- func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
- func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)
- func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
- func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
- func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
- func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
- func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, ...) (*sqlcv1.PopulateSingleTaskRunDataRow, pgtype.UUID, error)
- func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
- func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
- func (r *OLAPRepositoryImpl) SetReadReplicaPool(pool *pgxpool.Pool)
- func (r *OLAPRepositoryImpl) StoreCELEvaluationFailures(ctx context.Context, tenantId string, failures []CELEvaluationFailure) error
- func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateDAGStatusRow, error)
- func (r *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
- func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateTaskStatusRow, error)
- type OffloadToExternalStoreOpts
- type PayloadLocation
- type PayloadStoreRepository
- type QueueFactoryRepository
- type QueueRepository
- type RateLimitRepository
- type RateLimitResult
- type ReadTaskRunMetricsOpts
- type ReadableTaskStatus
- type RefreshTimeoutBy
- type ReplayTaskOpts
- type ReplayTasksResult
- type Repository
- type RetriedTask
- type RetrievePayloadOpts
- type Run
- type RunConcurrencyResult
- type SchedulerRepository
- type Sticky
- type StorePayloadOpts
- type TaskIdEventKeyTuple
- type TaskIdInsertedAtRetryCount
- type TaskIdInsertedAtSignalKey
- type TaskInput
- type TaskMetadata
- type TaskOperationLimits
- type TaskOutputEvent
- func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent
- func NewCancelledTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
- func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent
- func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent
- func NewFailedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
- func NewSkippedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
- type TaskRepository
- type TaskRepositoryImpl
- func (r *TaskRepositoryImpl) AnalyzeTaskTables(ctx context.Context) error
- func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
- func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
- func (s TaskRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (r *TaskRepositoryImpl) EnsureTablePartitionsExist(ctx context.Context) (bool, error)
- func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)
- func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
- func (r *TaskRepositoryImpl) GetQueueCounts(ctx context.Context, tenantId string) (map[string]interface{}, error)
- func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
- func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
- func (r *TaskRepositoryImpl) ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*sqlcv1.V1TaskEvent, error)
- func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
- func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
- func (r *TaskRepositoryImpl) ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)
- func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *TaskRepositoryImpl) ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)
- func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
- func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)
- func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
- func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
- type TaskRunMetric
- type TaskWithCancelledReason
- type TaskWithQueue
- type TickerRepository
- type TimeoutTasksResponse
- type TriggerDecision
- type TriggerFromEventsResult
- type TriggerRepository
- type TriggerRepositoryImpl
- func (s TriggerRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)
- func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*V1TaskWithPayload, []*DAGWithData, error)
- type TriggerTaskData
- type TriggeredBy
- type TriggeredByEvent
- type UpdateDAGStatusRow
- type UpdateFilterOpts
- type UpdateTaskStatusRow
- type V1StepRunData
- type V1TaskWithPayload
- type V1WorkflowRunPopulator
- type WasSuccessfullyClaimed
- type WebhookRepository
- type WorkerRepository
- type WorkflowAndScope
- type WorkflowNameTriggerOpts
- type WorkflowRepository
- type WorkflowRunData
Constants ¶
const MAX_TENANT_RATE_LIMITS = 10000
const NUM_PARTITIONS = 4
TODO: make this dynamic for the instance
const PARENT_STRATEGY_LOCK_OFFSET = 1000000000000 // 1 trillion
Variables ¶
var ErrDagParentNotFound = errors.New("dag parent not found")
Functions ¶
This section is empty.
Types ¶
type APIKeyAuthCredentials ¶ added in v0.70.0
type AssignResults ¶
type AssignResults struct {
Assigned []*AssignedItem
Unassigned []*sqlcv1.V1QueueItem
SchedulingTimedOut []*sqlcv1.V1QueueItem
RateLimited []*RateLimitResult
RateLimitedToMove []*RateLimitResult
}
type AssignedItem ¶
type AssignedItem struct {
WorkerId pgtype.UUID
QueueItem *sqlcv1.V1QueueItem
}
type AssignmentRepository ¶
type AssignmentRepository interface {
ListActionsForWorkers(ctx context.Context, tenantId pgtype.UUID, workerIds []pgtype.UUID) ([]*sqlcv1.ListActionsForWorkersRow, error)
ListAvailableSlotsForWorkers(ctx context.Context, tenantId pgtype.UUID, params sqlcv1.ListAvailableSlotsForWorkersParams) ([]*sqlcv1.ListAvailableSlotsForWorkersRow, error)
}
type AuthConfig ¶ added in v0.70.0
type AuthConfig struct {
Type sqlcv1.V1IncomingWebhookAuthType `json:"type" validate:"required"`
BasicAuth *BasicAuthCredentials `json:"basic_auth,omitempty"`
APIKeyAuth *APIKeyAuthCredentials `json:"api_key_auth,omitempty"`
HMACAuth *HMACAuthCredentials `json:"hmac_auth,omitempty"`
}
func (*AuthConfig) Validate ¶ added in v0.70.0
func (ac *AuthConfig) Validate() error
type BasicAuthCredentials ¶ added in v0.70.0
type BulkRetrievePayloadOpts ¶ added in v0.73.0
type BulkRetrievePayloadOpts struct {
Keys []ExternalPayloadLocationKey
TenantId string
}
type CELEvaluationFailure ¶ added in v0.70.0
type CELEvaluationFailure struct {
Source sqlcv1.V1CelEvaluationFailureSource `json:"source"`
ErrorMessage string `json:"error_message"`
}
type CandidateEventMatch ¶
type ChildWorkflowSignalCreatedData ¶
type ChildWorkflowSignalCreatedData struct {
// The external id of the target child task
ChildExternalId string `json:"external_id"`
// The external id of the parent task
ParentExternalId string `json:"parent_external_id"`
// The index of the child task
ChildIndex int64 `json:"child_index"`
// The key of the child task
ChildKey *string `json:"child_key"`
}
func (*ChildWorkflowSignalCreatedData) Bytes ¶
func (c *ChildWorkflowSignalCreatedData) Bytes() []byte
type CompleteTaskOpts ¶
type CompleteTaskOpts struct {
*TaskIdInsertedAtRetryCount
// (required) the output bytes for the task
Output []byte
}
type ConcurrencyRepository ¶
type ConcurrencyRepository interface {
// Checks whether the concurrency strategy is active, and if not, sets is_active=False
UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error
RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (*RunConcurrencyResult, error)
}
type ConcurrencyRepositoryImpl ¶
type ConcurrencyRepositoryImpl struct {
// contains filtered or unexported fields
}
func (ConcurrencyRepositoryImpl) DesiredWorkerId ¶
func (ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*ConcurrencyRepositoryImpl) RunConcurrencyStrategy ¶
func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy( ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency, ) (res *RunConcurrencyResult, err error)
func (ConcurrencyRepositoryImpl) ToV1StepRunData ¶
func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive ¶
func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive( ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency, ) error
type CreateConcurrencyOpts ¶
type CreateConcurrencyOpts struct {
// (optional) the maximum number of concurrent workflow runs, default 1
MaxRuns *int32
// (optional) the strategy to use when the concurrency limit is reached, default CANCEL_IN_PROGRESS
LimitStrategy *string `validate:"omitnil,oneof=CANCEL_IN_PROGRESS GROUP_ROUND_ROBIN CANCEL_NEWEST"`
// (required) a concurrency expression for evaluating the concurrency key
Expression string `validate:"celworkflowrunstr"`
}
type CreateExternalSignalConditionKind ¶
type CreateExternalSignalConditionKind string
const ( CreateExternalSignalConditionKindSLEEP CreateExternalSignalConditionKind = "SLEEP" CreateExternalSignalConditionKindUSEREVENT CreateExternalSignalConditionKind = "USER_EVENT" )
type CreateExternalSignalConditionOpt ¶
type CreateExternalSignalConditionOpt struct {
Kind CreateExternalSignalConditionKind `validate:"required, oneof=SLEEP USER_EVENT"`
ReadableDataKey string `validate:"required"`
OrGroupId string `validate:"required,uuid"`
UserEventKey *string
SleepFor *string `validate:"omitempty,duration"`
Expression string
}
type CreateFilterOpts ¶
type CreateIncomingWebhookFailureLogOpts ¶ added in v0.70.0
type CreateLogLineOpts ¶
type CreateLogLineOpts struct {
TaskId int64
TaskInsertedAt pgtype.Timestamptz
// (optional) The time when the log line was created.
CreatedAt *time.Time
// (required) The message of the log line.
Message string `validate:"required,min=1,max=10000"`
// (optional) The level of the log line.
Level *string `validate:"omitnil,oneof=INFO ERROR WARN DEBUG"`
// (optional) The metadata of the log line.
Metadata []byte
// The retry count of the log line.
RetryCount int
}
type CreateMatchOpts ¶
type CreateMatchOpts struct {
Kind sqlcv1.V1MatchKind
ExistingMatchData []byte
Conditions []GroupMatchCondition
TriggerDAGId *int64
TriggerDAGInsertedAt pgtype.Timestamptz
TriggerExternalId *string
TriggerWorkflowRunId *string
TriggerStepId *string
TriggerStepIndex pgtype.Int8
TriggerExistingTaskId *int64
TriggerExistingTaskInsertedAt pgtype.Timestamptz
TriggerParentTaskExternalId pgtype.UUID
TriggerParentTaskId pgtype.Int8
TriggerParentTaskInsertedAt pgtype.Timestamptz
TriggerChildIndex pgtype.Int8
TriggerChildKey pgtype.Text
TriggerPriority pgtype.Int4
SignalTaskId *int64
SignalTaskInsertedAt pgtype.Timestamptz
SignalExternalId *string
SignalKey *string
}
type CreateStepMatchConditionOpt ¶
type CreateStepMatchConditionOpt struct {
// (required) the type of match condition for triggering the step
MatchConditionKind string `validate:"required,oneof=PARENT_OVERRIDE USER_EVENT SLEEP"`
// (required) the key for the event data when the workflow is triggered
ReadableDataKey string `validate:"required"`
// (required) the initial state for the task when the match condition is satisfied
Action string `validate:"required,oneof=QUEUE CANCEL SKIP"`
// (required) the or group id for the match condition
// we ignore the JSON field here because it's randomly generated by the client, and we don't want to
// publish a new workflow version just because this UUID is different. we use the `OrGroupHashIndex` instead.
OrGroupId string `json:"-" validate:"required,uuid"`
// NOTE: should not be set by the caller. This is populated by this package before creating the checksum.
OrGroupIdIndex int32
// (optional) the expression for the match condition
Expression string `validate:"omitempty"`
// (optional) the sleep duration for the match condition, only set if this is a SLEEP
SleepDuration *string `validate:"omitempty,duration"`
// (optional) the event key for the match condition, only set if this is a USER_EVENT
EventKey *string `validate:"omitempty"`
// (optional) if this is a PARENT_OVERRIDE condition, this will be set to the parent readable_id for
// the parent whose trigger behavior we're overriding
ParentReadableId *string `validate:"omitempty"`
}
type CreateStepOpts ¶
type CreateStepOpts struct {
// (required) the task name
ReadableId string `validate:"hatchetName"`
// (required) the task action id
Action string `validate:"required,actionId"`
// (optional) the task timeout
Timeout *string `validate:"omitnil,duration"`
// (optional) the task scheduling timeout
ScheduleTimeout *string `validate:"omitnil,duration"`
// (optional) the parents that this step depends on
Parents []string `validate:"dive,hatchetName"`
// (optional) the step retry max
Retries *int `validate:"omitempty,min=0"`
// (optional) rate limits for this step
RateLimits []CreateWorkflowStepRateLimitOpts `validate:"dive"`
// (optional) desired worker affinity state for this step
DesiredWorkerLabels map[string]DesiredWorkerLabelOpts `validate:"omitempty"`
// (optional) the step retry backoff factor
RetryBackoffFactor *float64 `validate:"omitnil,min=1,max=1000"`
// (optional) the step retry backoff max seconds (can't be greater than 86400)
RetryBackoffMaxSeconds *int `validate:"omitnil,min=1,max=86400"`
// (optional) a list of additional trigger conditions
TriggerConditions []CreateStepMatchConditionOpt `validate:"omitempty,dive"`
// (optional) the step concurrency options
Concurrency []CreateConcurrencyOpts `json:"concurrency,omitempty" validator:"omitnil"`
}
type CreateTaskOpts ¶
type CreateTaskOpts struct {
// (required) the external id
ExternalId string `validate:"required,uuid"`
// (required) the workflow run id. note this may be the same as the external id if this is a
// single-task workflow, otherwise it represents the external id of the DAG.
WorkflowRunId string `validate:"required,uuid"`
// (required) the step id
StepId string `validate:"required,uuid"`
// (required) the input bytes to the task
Input *TaskInput
FilterPayload []byte
// (required) the step index for the task
StepIndex int
// (optional) the additional metadata for the task
AdditionalMetadata []byte
// (optional) the desired worker id
DesiredWorkerId *string
// (optional) the DAG id for the task
DagId *int64
// (optional) the DAG inserted at for the task
DagInsertedAt pgtype.Timestamptz
// (required) the initial state for the task
InitialState sqlcv1.V1TaskInitialState
// (optional) the parent task external id
ParentTaskExternalId *string
// (optional) the parent task id
ParentTaskId *int64
// (optional) the parent task inserted at
ParentTaskInsertedAt *time.Time
// (optional) The priority of a task, between 1 and 3
Priority *int32
// (optional) the child index for the task
ChildIndex *int64
// (optional) the child key for the task
ChildKey *string
}
type CreateWebhookOpts ¶ added in v0.70.0
type CreateWebhookOpts struct {
Tenantid pgtype.UUID `json:"tenantid"`
Sourcename sqlcv1.V1IncomingWebhookSourceName `json:"sourcename"`
Name string `json:"name" validate:"required"`
Eventkeyexpression string `json:"eventkeyexpression"`
AuthConfig AuthConfig `json:"auth_config,omitempty"`
}
type CreateWorkflowStepRateLimitOpts ¶
type CreateWorkflowStepRateLimitOpts struct {
// (required) the rate limit key
Key string `validate:"required"`
// (optional) a CEL expression for the rate limit key
KeyExpr *string `validate:"omitnil,celsteprunstr,required_without=Key"`
// (optional) the rate limit units to consume
Units *int `validate:"omitnil,required_without=UnitsExpr"`
// (optional) a CEL expression for the rate limit units
UnitsExpr *string `validate:"omitnil,celsteprunstr,required_without=Units"`
// (optional) a CEL expression for a dynamic limit value for the rate limit
LimitExpr *string `validate:"omitnil,celsteprunstr"`
// (optional) the rate limit duration, defaults to MINUTE
Duration *string `validate:"omitnil,oneof=SECOND MINUTE HOUR DAY WEEK MONTH YEAR"`
}
type CreateWorkflowVersionOpts ¶
type CreateWorkflowVersionOpts struct {
// (required) the workflow name
Name string `validate:"required,hatchetName"`
// (optional) the workflow description
Description *string `json:"description,omitempty"`
// (optional) event triggers for the workflow
EventTriggers []string
// (optional) cron triggers for the workflow
CronTriggers []string `validate:"dive,cron"`
// (optional) the input bytes for the cron triggers
CronInput []byte
// (required) the tasks in the workflow
Tasks []CreateStepOpts `validate:"required,min=1,dive"`
OnFailure *CreateStepOpts `json:"onFailureJob,omitempty" validate:"omitempty"`
// (optional) the workflow concurrency groups
Concurrency []CreateConcurrencyOpts `json:"concurrency,omitempty" validator:"omitempty,dive"`
// (optional) sticky strategy
Sticky *string `validate:"omitempty,oneof=SOFT HARD"`
DefaultPriority *int32 `validate:"omitempty,min=1,max=3"`
DefaultFilters []types.DefaultFilter `json:"defaultFilters,omitempty" validate:"omitempty,dive"`
}
type DAGWithData ¶
type DesiredWorkerLabelOpts ¶
type DesiredWorkerLabelOpts struct {
// (required) the label key
Key string `validate:"required"`
// (required if StringValue is nil) the label integer value
IntValue *int32 `validate:"omitnil,required_without=StrValue"`
// (required if StrValue is nil) the label string value
StrValue *string `validate:"omitnil,required_without=IntValue"`
// (optional) if the label is required
Required *bool `validate:"omitempty"`
// (optional) the weight of the label for scheduling (default: 100)
Weight *int32 `validate:"omitempty"`
// (optional) the label comparator for scheduling (default: EQUAL)
Comparator *string `validate:"omitempty,oneof=EQUAL NOT_EQUAL GREATER_THAN LESS_THAN GREATER_THAN_OR_EQUAL LESS_THAN_OR_EQUAL"`
}
type ErrNamesNotFound ¶
type ErrNamesNotFound struct {
Names []string
}
func (*ErrNamesNotFound) Error ¶
func (e *ErrNamesNotFound) Error() string
type EventExternalIdFilterId ¶
type EventMatchResults ¶
type EventMatchResults struct {
// The list of tasks which were created from the matches
CreatedTasks []*V1TaskWithPayload
// The list of tasks which were replayed from the matches
ReplayedTasks []*V1TaskWithPayload
}
type EventTriggerOpts ¶
type EventTriggersFromExternalId ¶
type EventTriggersFromExternalId struct {
RunID int64 `json:"run_id"`
RunInsertedAt pgtype.Timestamptz `json:"run_inserted_at"`
EventExternalId pgtype.UUID `json:"event_external_id"`
EventSeenAt pgtype.Timestamptz `json:"event_seen_at"`
FilterId pgtype.UUID `json:"filter_id"`
}
type EventType ¶
type EventType string
const ( EVENT_TYPE_REQUEUED_NO_WORKER EventType = "REQUEUED_NO_WORKER" EVENT_TYPE_REQUEUED_RATE_LIMIT EventType = "REQUEUED_RATE_LIMIT" EVENT_TYPE_SCHEDULING_TIMED_OUT EventType = "SCHEDULING_TIMED_OUT" EVENT_TYPE_ASSIGNED EventType = "ASSIGNED" EVENT_TYPE_STARTED EventType = "STARTED" EVENT_TYPE_FINISHED EventType = "FINISHED" EVENT_TYPE_FAILED EventType = "FAILED" EVENT_TYPE_RETRYING EventType = "RETRYING" EVENT_TYPE_CANCELLED EventType = "CANCELLED" EVENT_TYPE_TIMED_OUT EventType = "TIMED_OUT" EVENT_TYPE_REASSIGNED EventType = "REASSIGNED" EVENT_TYPE_SLOT_RELEASED EventType = "SLOT_RELEASED" EVENT_TYPE_TIMEOUT_REFRESHED EventType = "TIMEOUT_REFRESHED" EVENT_TYPE_RETRIED_BY_USER EventType = "RETRIED_BY_USER" EVENT_TYPE_SENT_TO_WORKER EventType = "SENT_TO_WORKER" EVENT_TYPE_RATE_LIMIT_ERROR EventType = "RATE_LIMIT_ERROR" EVENT_TYPE_ACKNOWLEDGED EventType = "ACKNOWLEDGED" EVENT_TYPE_CREATED EventType = "CREATED" EVENT_TYPE_QUEUED EventType = "QUEUED" )
type ExternalCreateSignalMatchOpts ¶
type ExternalCreateSignalMatchOpts struct {
Conditions []CreateExternalSignalConditionOpt `validate:"required,min=1,dive"`
SignalTaskId int64 `validate:"required,gt=0"`
SignalTaskInsertedAt pgtype.Timestamptz
SignalExternalId string `validate:"required,uuid"`
SignalKey string `validate:"required"`
}
type ExternalPayloadLocationKey ¶ added in v0.73.0
type ExternalPayloadLocationKey string
type ExternalStore ¶ added in v0.73.0
type ExternalStore interface {
Store(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[RetrievePayloadOpts]ExternalPayloadLocationKey, error)
BulkRetrieve(ctx context.Context, opts ...BulkRetrievePayloadOpts) (map[ExternalPayloadLocationKey][]byte, error)
}
type FailTaskOpts ¶
type FailTaskOpts struct {
*TaskIdInsertedAtRetryCount
// (required) whether this is an application-level error or an internal error on the Hatchet side
IsAppError bool
// (optional) the error message for the task
ErrorMessage string
// (optional) A boolean flag to indicate whether the error is non-retryable, meaning it should _not_ be retried. Defaults to false.
IsNonRetryable bool
}
type FailTasksResponse ¶
type FailTasksResponse struct {
*FinalizedTaskResponse
RetriedTasks []RetriedTask
}
type FilterRepository ¶
type FilterRepository interface {
CreateFilter(ctx context.Context, tenantId string, params CreateFilterOpts) (*sqlcv1.V1Filter, error)
ListFilters(ctx context.Context, tenantId string, params ListFiltersOpts) ([]*sqlcv1.V1Filter, int64, error)
DeleteFilter(ctx context.Context, tenantId, filterId string) (*sqlcv1.V1Filter, error)
GetFilter(ctx context.Context, tenantId, filterId string) (*sqlcv1.V1Filter, error)
UpdateFilter(ctx context.Context, tenantId string, filterId string, opts UpdateFilterOpts) (*sqlcv1.V1Filter, error)
}
type FinalizedTaskResponse ¶
type FinalizedTaskResponse struct {
ReleasedTasks []*sqlcv1.ReleaseTasksRow
InternalEvents []InternalTaskEvent
}
type GroupMatchCondition ¶
type GroupMatchCondition struct {
GroupId string `validate:"required,uuid"`
EventType sqlcv1.V1EventType
EventKey string
// (optional) a hint for querying the event data
EventResourceHint *string
// the data key which gets inserted into the returned data from a satisfied match condition
ReadableDataKey string
Expression string
Action sqlcv1.V1MatchConditionAction
// (optional) the data which was used to satisfy the condition (relevant for replays)
Data []byte
}
type HMACAuthCredentials ¶ added in v0.70.0
type HMACAuthCredentials struct {
Algorithm sqlcv1.V1IncomingWebhookHmacAlgorithm `json:"algorithm" validate:"required"`
Encoding sqlcv1.V1IncomingWebhookHmacEncoding `json:"encoding" validate:"required"`
SignatureHeaderName string `json:"signature_header_name" validate:"required"`
EncryptedWebhookSigningSecret []byte `json:"webhook_signing_secret" validate:"required"`
}
type IdInsertedAt ¶ added in v0.71.2
type IdInsertedAt struct {
ID int64 `json:"id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
}
type IdempotencyKey ¶ added in v0.73.0
type IdempotencyKey string
type IdempotencyRepository ¶ added in v0.73.0
type InternalTaskEvent ¶
type InternalTaskEvent struct {
TenantID string `json:"tenant_id"`
TaskID int64 `json:"task_id"`
TaskExternalID string `json:"task_external_id"`
RetryCount int32 `json:"retry_count"`
EventType sqlcv1.V1TaskEventType `json:"event_type"`
EventKey string `json:"event_key"`
Data []byte `json:"data"`
}
InternalTaskEvent resembles sqlcv1.V1TaskEvent, but doesn't include the id field as we use COPY FROM to write the events to the database.
type JobRunHasCycleError ¶
type JobRunHasCycleError struct {
JobName string
}
func (*JobRunHasCycleError) Error ¶
func (e *JobRunHasCycleError) Error() string
type KeyClaimantPair ¶ added in v0.73.0
type KeyClaimantPair struct {
IdempotencyKey IdempotencyKey
ClaimedByExternalId pgtype.UUID
}
type LeaseRepository ¶
type LeaseRepository interface {
ListQueues(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1Queue, error)
ListActiveWorkers(ctx context.Context, tenantId pgtype.UUID) ([]*ListActiveWorkersResult, error)
ListConcurrencyStrategies(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1StepConcurrency, error)
AcquireOrExtendLeases(ctx context.Context, tenantId pgtype.UUID, kind sqlcv1.LeaseKind, resourceIds []string, existingLeases []*sqlcv1.Lease) ([]*sqlcv1.Lease, error)
ReleaseLeases(ctx context.Context, tenantId pgtype.UUID, leases []*sqlcv1.Lease) error
}
type ListActiveWorkersResult ¶
type ListActiveWorkersResult struct {
ID string
MaxRuns int
Name string
Labels []*sqlcv1.ListManyWorkerLabelsRow
}
type ListEventsRow ¶
type ListEventsRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
EventID int64 `json:"event_id"`
EventExternalID pgtype.UUID `json:"event_external_id"`
EventSeenAt pgtype.Timestamptz `json:"event_seen_at"`
EventKey string `json:"event_key"`
EventPayload []byte `json:"event_payload"`
EventAdditionalMetadata []byte `json:"event_additional_metadata"`
EventScope string `json:"event_scope"`
QueuedCount int64 `json:"queued_count"`
RunningCount int64 `json:"running_count"`
CompletedCount int64 `json:"completed_count"`
CancelledCount int64 `json:"cancelled_count"`
FailedCount int64 `json:"failed_count"`
TriggeredRuns []byte `json:"triggered_runs"`
TriggeringWebhookName *string `json:"triggering_webhook_name,omitempty"`
}
type ListFiltersOpts ¶
type ListFinalizedWorkflowRunsResponse ¶
type ListFinalizedWorkflowRunsResponse struct {
WorkflowRunId string
OutputEvents []*TaskOutputEvent
}
type ListLogsOpts ¶
type ListLogsOpts struct {
// (optional) number of logs to skip
Offset *int
// (optional) number of logs to return
Limit *int `validate:"omitnil,min=1,max=10000"`
// (optional) a list of log levels to filter by
Levels []string `validate:"omitnil,dive,oneof=INFO ERROR WARN DEBUG"`
// (optional) a search query
Search *string
}
type ListTaskRunOpts ¶
type ListTaskRunOpts struct {
CreatedAfter time.Time
Statuses []sqlcv1.V1ReadableStatusOlap
WorkflowIds []uuid.UUID
WorkerId *uuid.UUID
StartedAfter time.Time
FinishedBefore *time.Time
AdditionalMetadata map[string]interface{}
TriggeringEventExternalId *uuid.UUID
Limit int64
Offset int64
IncludePayloads bool
}
type ListWebhooksOpts ¶ added in v0.70.0
type ListWebhooksOpts struct {
WebhookNames []string `json:"webhook_names"`
WebhookSourceNames []sqlcv1.V1IncomingWebhookSourceName `json:"webhook_source_names"`
Limit *int64 `json:"limit" validate:"omitnil,min=1"`
Offset *int64 `json:"offset" validate:"omitnil,min=0"`
}
type ListWorkflowRunOpts ¶
type ListWorkflowRunOpts struct {
CreatedAfter time.Time
Statuses []sqlcv1.V1ReadableStatusOlap
WorkflowIds []uuid.UUID
StartedAfter time.Time
FinishedBefore *time.Time
AdditionalMetadata map[string]interface{}
Limit int64
Offset int64
ParentTaskExternalId *pgtype.UUID
TriggeringEventExternalId *pgtype.UUID
IncludePayloads bool
}
type LogLineRepository ¶
type LogLineRepository interface {
ListLogLines(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, opts *ListLogsOpts) ([]*sqlcv1.V1LogLine, error)
PutLog(ctx context.Context, tenantId string, opts *CreateLogLineOpts) error
}
type MatchData ¶
type MatchData struct {
// contains filtered or unexported fields
}
parses match aggregated data
func NewMatchData ¶
func (*MatchData) Action ¶
func (m *MatchData) Action() sqlcv1.V1MatchConditionAction
func (*MatchData) DataValueAsTaskOutputEvent ¶
func (m *MatchData) DataValueAsTaskOutputEvent(key string) *TaskOutputEvent
Helper function for internal events
func (*MatchData) TriggerDataKeys ¶
func (*MatchData) TriggerDataValue ¶
type MatchRepository ¶
type MatchRepository interface {
RegisterSignalMatchConditions(ctx context.Context, tenantId string, eventMatches []ExternalCreateSignalMatchOpts) error
ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
}
type MatchRepositoryImpl ¶
type MatchRepositoryImpl struct {
// contains filtered or unexported fields
}
func (MatchRepositoryImpl) DesiredWorkerId ¶
func (MatchRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*MatchRepositoryImpl) ProcessInternalEventMatches ¶
func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
ProcessInternalEventMatches processes a list of internal events
func (*MatchRepositoryImpl) ProcessUserEventMatches ¶
func (m *MatchRepositoryImpl) ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
ProcessUserEventMatches processes a list of user events
func (*MatchRepositoryImpl) RegisterSignalMatchConditions ¶
func (m *MatchRepositoryImpl) RegisterSignalMatchConditions(ctx context.Context, tenantId string, signalMatches []ExternalCreateSignalMatchOpts) error
func (MatchRepositoryImpl) ToV1StepRunData ¶
func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
type NoOpExternalStore ¶ added in v0.73.0
type NoOpExternalStore struct{}
func (*NoOpExternalStore) BulkRetrieve ¶ added in v0.73.0
func (n *NoOpExternalStore) BulkRetrieve(ctx context.Context, opts ...BulkRetrievePayloadOpts) (map[ExternalPayloadLocationKey][]byte, error)
func (*NoOpExternalStore) Store ¶ added in v0.73.0
func (n *NoOpExternalStore) Store(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[RetrievePayloadOpts]ExternalPayloadLocationKey, error)
type OLAPRepository ¶
type OLAPRepository interface {
UpdateTablePartitions(ctx context.Context) error
SetReadReplicaPool(pool *pgxpool.Pool)
ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz, retryCount *int) (*sqlcv1.PopulateSingleTaskRunDataRow, pgtype.UUID, error)
ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)
ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
CreateTasks(ctx context.Context, tenantId string, tasks []*V1TaskWithPayload) error
CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error
CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, endTimestamp *time.Time, bucketInterval time.Duration) ([]*sqlcv1.GetTaskPointMetricsRow, error)
UpdateTaskStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateTaskStatusRow, error)
UpdateDAGStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateDAGStatusRow, error)
ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
ListTasksByDAGId(ctx context.Context, tenantId string, dagIds []pgtype.UUID, includePayloads bool) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error)
ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)
// ListTasksByExternalIds returns a list of tasks based on their external ids or the external id of their parent DAG.
// In the case of a DAG, we flatten the result into the list of tasks which belong to that DAG.
ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error)
BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error
ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*ListEventsRow, *int64, error)
ListEventKeys(ctx context.Context, tenantId string) ([]string, error)
GetDAGDurations(ctx context.Context, tenantId string, externalIds []pgtype.UUID, minInsertedAt pgtype.Timestamptz) (map[string]*sqlcv1.GetDagDurationsRow, error)
GetTaskDurationsByTaskIds(ctx context.Context, tenantId string, taskIds []int64, taskInsertedAts []pgtype.Timestamptz, readableStatuses []sqlcv1.V1ReadableStatusOlap) (map[int64]*sqlcv1.GetTaskDurationsByTaskIdsRow, error)
CreateIncomingWebhookValidationFailureLogs(ctx context.Context, tenantId string, opts []CreateIncomingWebhookFailureLogOpts) error
StoreCELEvaluationFailures(ctx context.Context, tenantId string, failures []CELEvaluationFailure) error
AnalyzeOLAPTables(ctx context.Context) error
}
func NewOLAPRepositoryFromPool ¶
func NewOLAPRepositoryFromPool(pool *pgxpool.Pool, l *zerolog.Logger, olapRetentionPeriod time.Duration, entitlements repository.EntitlementsRepository, shouldPartitionEventsTables, enablePayloadDualWrites bool) (OLAPRepository, func() error)
type OLAPRepositoryImpl ¶
type OLAPRepositoryImpl struct {
// contains filtered or unexported fields
}
func (*OLAPRepositoryImpl) AnalyzeOLAPTables ¶ added in v0.71.4
func (r *OLAPRepositoryImpl) AnalyzeOLAPTables(ctx context.Context) error
func (*OLAPRepositoryImpl) BulkCreateEventsAndTriggers ¶
func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error
func (*OLAPRepositoryImpl) CreateDAGs ¶
func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
func (*OLAPRepositoryImpl) CreateIncomingWebhookValidationFailureLogs ¶ added in v0.70.0
func (r *OLAPRepositoryImpl) CreateIncomingWebhookValidationFailureLogs(ctx context.Context, tenantId string, opts []CreateIncomingWebhookFailureLogOpts) error
func (*OLAPRepositoryImpl) CreateTaskEvents ¶
func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error
func (*OLAPRepositoryImpl) CreateTasks ¶
func (r *OLAPRepositoryImpl) CreateTasks(ctx context.Context, tenantId string, tasks []*V1TaskWithPayload) error
func (OLAPRepositoryImpl) DesiredWorkerId ¶
func (*OLAPRepositoryImpl) GetDAGDurations ¶ added in v0.70.6
func (r *OLAPRepositoryImpl) GetDAGDurations(ctx context.Context, tenantId string, externalIds []pgtype.UUID, minInsertedAt pgtype.Timestamptz) (map[string]*sqlcv1.GetDagDurationsRow, error)
func (*OLAPRepositoryImpl) GetTaskDurationsByTaskIds ¶
func (r *OLAPRepositoryImpl) GetTaskDurationsByTaskIds(ctx context.Context, tenantId string, taskIds []int64, taskInsertedAts []pgtype.Timestamptz, readableStatuses []sqlcv1.V1ReadableStatusOlap) (map[int64]*sqlcv1.GetTaskDurationsByTaskIdsRow, error)
func (*OLAPRepositoryImpl) GetTaskPointMetrics ¶
func (*OLAPRepositoryImpl) GetTaskTimings ¶
func (*OLAPRepositoryImpl) ListEventKeys ¶
func (*OLAPRepositoryImpl) ListEvents ¶
func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*ListEventsRow, *int64, error)
func (*OLAPRepositoryImpl) ListTaskRunEvents ¶
func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)
func (*OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId ¶
func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
func (*OLAPRepositoryImpl) ListTasks ¶
func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
func (*OLAPRepositoryImpl) ListTasksByDAGId ¶
func (*OLAPRepositoryImpl) ListTasksByExternalIds ¶
func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
func (*OLAPRepositoryImpl) ListTasksByIdAndInsertedAt ¶
func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)
func (*OLAPRepositoryImpl) ListWorkflowRunDisplayNames ¶
func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
func (*OLAPRepositoryImpl) ListWorkflowRuns ¶
func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
func (OLAPRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*OLAPRepositoryImpl) ReadDAG ¶
func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
func (*OLAPRepositoryImpl) ReadTaskRun ¶
func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
func (*OLAPRepositoryImpl) ReadTaskRunData ¶
func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz, retryCount *int) (*sqlcv1.PopulateSingleTaskRunDataRow, pgtype.UUID, error)
func (*OLAPRepositoryImpl) ReadTaskRunMetrics ¶
func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
func (*OLAPRepositoryImpl) ReadWorkflowRun ¶
func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
func (*OLAPRepositoryImpl) SetReadReplicaPool ¶
func (r *OLAPRepositoryImpl) SetReadReplicaPool(pool *pgxpool.Pool)
func (*OLAPRepositoryImpl) StoreCELEvaluationFailures ¶ added in v0.70.0
func (r *OLAPRepositoryImpl) StoreCELEvaluationFailures(ctx context.Context, tenantId string, failures []CELEvaluationFailure) error
func (OLAPRepositoryImpl) ToV1StepRunData ¶
func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*OLAPRepositoryImpl) UpdateDAGStatuses ¶
func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateDAGStatusRow, error)
func (*OLAPRepositoryImpl) UpdateTablePartitions ¶
func (r *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
func (*OLAPRepositoryImpl) UpdateTaskStatuses ¶
func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateTaskStatusRow, error)
type OffloadToExternalStoreOpts ¶ added in v0.73.0
type OffloadToExternalStoreOpts struct {
*StorePayloadOpts
OffloadAt time.Time
}
type PayloadLocation ¶ added in v0.73.0
type PayloadLocation string
type PayloadStoreRepository ¶ added in v0.73.0
type PayloadStoreRepository interface {
Store(ctx context.Context, tx sqlcv1.DBTX, payloads ...StorePayloadOpts) error
Retrieve(ctx context.Context, opts RetrievePayloadOpts) ([]byte, error)
BulkRetrieve(ctx context.Context, opts ...RetrievePayloadOpts) (map[RetrievePayloadOpts][]byte, error)
ProcessPayloadWAL(ctx context.Context, partitionNumber int64) (bool, error)
OverwriteExternalStore(store ExternalStore, inlineStoreTTL time.Duration)
DualWritesEnabled() bool
}
func NewPayloadStoreRepository ¶ added in v0.73.0
type QueueFactoryRepository ¶
type QueueFactoryRepository interface {
NewQueue(tenantId pgtype.UUID, queueName string) QueueRepository
}
type QueueRepository ¶
type QueueRepository interface {
ListQueueItems(ctx context.Context, limit int) ([]*sqlcv1.V1QueueItem, error)
MarkQueueItemsProcessed(ctx context.Context, r *AssignResults) (succeeded []*AssignedItem, failed []*AssignedItem, err error)
GetTaskRateLimits(ctx context.Context, queueItems []*sqlcv1.V1QueueItem) (map[int64]map[string]int32, error)
RequeueRateLimitedItems(ctx context.Context, tenantId pgtype.UUID, queueName string) ([]*sqlcv1.RequeueRateLimitedQueueItemsRow, error)
GetDesiredLabels(ctx context.Context, stepIds []pgtype.UUID) (map[string][]*sqlcv1.GetDesiredLabelsRow, error)
Cleanup()
}
type RateLimitRepository ¶
type RateLimitResult ¶
type ReadTaskRunMetricsOpts ¶
type ReadableTaskStatus ¶
type ReadableTaskStatus string
const ( READABLE_TASK_STATUS_QUEUED ReadableTaskStatus = "QUEUED" READABLE_TASK_STATUS_RUNNING ReadableTaskStatus = "RUNNING" READABLE_TASK_STATUS_COMPLETED ReadableTaskStatus = "COMPLETED" READABLE_TASK_STATUS_CANCELLED ReadableTaskStatus = "CANCELLED" READABLE_TASK_STATUS_FAILED ReadableTaskStatus = "FAILED" )
func StringToReadableStatus ¶
func StringToReadableStatus(status string) ReadableTaskStatus
func (ReadableTaskStatus) EnumValue ¶
func (s ReadableTaskStatus) EnumValue() int
type RefreshTimeoutBy ¶
type ReplayTaskOpts ¶
type ReplayTaskOpts struct {
// (required) the task id
TaskId int64
// (required) the inserted at time
InsertedAt pgtype.Timestamptz
// (required) the external id
ExternalId string
// (required) the step id
StepId string
// (optional) the input bytes to the task, uses the existing input if not set
Input *TaskInput
// (required) the initial state for the task
InitialState sqlcv1.V1TaskInitialState
// (optional) the additional metadata for the task
AdditionalMetadata []byte
}
type ReplayTasksResult ¶
type ReplayTasksResult struct {
ReplayedTasks []TaskIdInsertedAtRetryCount
UpsertedTasks []*V1TaskWithPayload
InternalEventResults *EventMatchResults
}
type Repository ¶
type Repository interface {
Triggers() TriggerRepository
Tasks() TaskRepository
Scheduler() SchedulerRepository
Matches() MatchRepository
OLAP() OLAPRepository
OverwriteOLAPRepository(o OLAPRepository)
Logs() LogLineRepository
OverwriteLogsRepository(l LogLineRepository)
Payloads() PayloadStoreRepository
OverwriteExternalPayloadStore(o ExternalStore, nativeStoreTTL time.Duration)
Workers() WorkerRepository
Workflows() WorkflowRepository
Ticker() TickerRepository
Filters() FilterRepository
Webhooks() WebhookRepository
Idempotency() IdempotencyRepository
}
func NewRepository ¶
func NewRepository(pool *pgxpool.Pool, l *zerolog.Logger, taskRetentionPeriod, olapRetentionPeriod time.Duration, maxInternalRetryCount int32, entitlements repository.EntitlementsRepository, taskLimits TaskOperationLimits, enablePayloadDualWrites bool) (Repository, func() error)
type RetriedTask ¶
type RetrievePayloadOpts ¶ added in v0.73.0
type RetrievePayloadOpts struct {
Id int64
InsertedAt pgtype.Timestamptz
Type sqlcv1.V1PayloadType
TenantId pgtype.UUID
}
type RunConcurrencyResult ¶
type RunConcurrencyResult struct {
// The tasks which were enqueued
Queued []TaskWithQueue
// If the strategy involves cancelling a task, these are the tasks to cancel
Cancelled []TaskWithCancelledReason
// If the step has multiple concurrency strategies, these are the next ones to notify
NextConcurrencyStrategies []int64
}
type SchedulerRepository ¶
type SchedulerRepository interface {
Concurrency() ConcurrencyRepository
Lease() LeaseRepository
QueueFactory() QueueFactoryRepository
RateLimit() RateLimitRepository
Assignment() AssignmentRepository
}
type StorePayloadOpts ¶ added in v0.73.0
type StorePayloadOpts struct {
Id int64
InsertedAt pgtype.Timestamptz
Type sqlcv1.V1PayloadType
Payload []byte
TenantId string
}
type TaskIdEventKeyTuple ¶
type TaskIdInsertedAtRetryCount ¶
type TaskIdInsertedAtRetryCount struct {
// (required) the external id
Id int64 `validate:"required"`
// (required) the inserted at time
InsertedAt pgtype.Timestamptz
// (required) the retry count
RetryCount int32
}
type TaskIdInsertedAtSignalKey ¶
type TaskIdInsertedAtSignalKey struct {
// (required) the external id
Id int64 `validate:"required"`
// (required) the inserted at time
InsertedAt pgtype.Timestamptz
// (required) the signal key for the event
SignalKey string
}
type TaskInput ¶
type TaskMetadata ¶
type TaskMetadata struct {
TaskID int64 `json:"task_id"`
TaskInsertedAt time.Time `json:"task_inserted_at"`
}
func ParseTaskMetadata ¶
func ParseTaskMetadata(jsonData []byte) ([]TaskMetadata, error)
type TaskOperationLimits ¶ added in v0.70.2
type TaskOutputEvent ¶
type TaskOutputEvent struct {
IsFailure bool `json:"is_failure"`
EventType sqlcv1.V1TaskEventType `json:"event_type"`
TaskExternalId string `json:"task_external_id"`
TaskId int64 `json:"task_id"`
RetryCount int32 `json:"retry_count"`
WorkerId *string `json:"worker_id"`
Output []byte `json:"output"`
ErrorMessage string `json:"error_message"`
StepReadableID string `json:"step_readable_id"`
}
func NewCancelledTaskOutputEvent ¶
func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent
func NewCancelledTaskOutputEventFromTask ¶
func NewCancelledTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
func NewCompletedTaskOutputEvent ¶
func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent
func NewFailedTaskOutputEvent ¶
func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent
func NewFailedTaskOutputEventFromTask ¶
func NewFailedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
func NewSkippedTaskOutputEventFromTask ¶
func NewSkippedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
func (*TaskOutputEvent) Bytes ¶
func (e *TaskOutputEvent) Bytes() []byte
func (*TaskOutputEvent) IsCancelled ¶
func (e *TaskOutputEvent) IsCancelled() bool
func (*TaskOutputEvent) IsCompleted ¶
func (e *TaskOutputEvent) IsCompleted() bool
func (*TaskOutputEvent) IsFailed ¶
func (e *TaskOutputEvent) IsFailed() bool
type TaskRepository ¶
type TaskRepository interface {
EnsureTablePartitionsExist(ctx context.Context) (bool, error)
UpdateTablePartitions(ctx context.Context) error
// GetTaskByExternalId is a heavily cached method to return task metadata by its external id
GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
// FlattenExternalIds is a non-cached method to look up all tasks in a workflow run by their external ids.
// This is non-cacheable because tasks can be added to a workflow run as it executes.
FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
FailTasks(ctx context.Context, tenantId string, tasks []FailTaskOpts) (*FailTasksResponse, error)
CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)
ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
// ListTaskParentOutputs is a method to return the output of a task's parent and grandparent tasks. This is for v0 compatibility
// with the v1 engine, and shouldn't be called from new v1 endpoints.
ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)
ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)
ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)
GetQueueCounts(ctx context.Context, tenantId string) (map[string]interface{}, error)
ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
ReleaseSlot(ctx context.Context, tenantId string, externalId string) (*sqlcv1.V1TaskRuntime, error)
ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*sqlcv1.V1TaskEvent, error)
// AnalyzeTaskTables runs ANALYZE on the task tables
AnalyzeTaskTables(ctx context.Context) error
}
type TaskRepositoryImpl ¶
type TaskRepositoryImpl struct {
// contains filtered or unexported fields
}
func (*TaskRepositoryImpl) AnalyzeTaskTables ¶ added in v0.71.13
func (r *TaskRepositoryImpl) AnalyzeTaskTables(ctx context.Context) error
func (*TaskRepositoryImpl) CancelTasks ¶
func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
func (*TaskRepositoryImpl) CompleteTasks ¶
func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
func (TaskRepositoryImpl) DesiredWorkerId ¶
func (*TaskRepositoryImpl) EnsureTablePartitionsExist ¶
func (r *TaskRepositoryImpl) EnsureTablePartitionsExist(ctx context.Context) (bool, error)
func (*TaskRepositoryImpl) FailTasks ¶
func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)
func (*TaskRepositoryImpl) FlattenExternalIds ¶
func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
func (*TaskRepositoryImpl) GetQueueCounts ¶
func (*TaskRepositoryImpl) GetTaskByExternalId ¶
func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
func (*TaskRepositoryImpl) ListFinalizedWorkflowRuns ¶
func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
func (*TaskRepositoryImpl) ListSignalCompletedEvents ¶
func (r *TaskRepositoryImpl) ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*sqlcv1.V1TaskEvent, error)
func (*TaskRepositoryImpl) ListTaskMetas ¶
func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
func (*TaskRepositoryImpl) ListTaskParentOutputs ¶
func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
func (TaskRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*TaskRepositoryImpl) ProcessDurableSleeps ¶
func (r *TaskRepositoryImpl) ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)
func (*TaskRepositoryImpl) ProcessTaskReassignments ¶
func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)
func (*TaskRepositoryImpl) ProcessTaskRetryQueueItems ¶
func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
func (*TaskRepositoryImpl) ProcessTaskTimeouts ¶
func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)
func (*TaskRepositoryImpl) RefreshTimeoutBy ¶
func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
func (*TaskRepositoryImpl) ReleaseSlot ¶
func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)
func (*TaskRepositoryImpl) ReplayTasks ¶
func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
func (TaskRepositoryImpl) ToV1StepRunData ¶
func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*TaskRepositoryImpl) UpdateTablePartitions ¶
func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
type TaskRunMetric ¶
type TaskWithCancelledReason ¶
type TaskWithCancelledReason struct {
*TaskIdInsertedAtRetryCount
CancelledReason string
TaskExternalId string
WorkflowRunId string
}
type TaskWithQueue ¶
type TaskWithQueue struct {
*TaskIdInsertedAtRetryCount
Queue string
}
type TickerRepository ¶
type TimeoutTasksResponse ¶
type TimeoutTasksResponse struct {
*FailTasksResponse
TimeoutTasks []*sqlcv1.ListTasksToTimeoutRow
}
type TriggerDecision ¶
type TriggerFromEventsResult ¶
type TriggerFromEventsResult struct {
Tasks []*V1TaskWithPayload
Dags []*DAGWithData
EventExternalIdToRuns map[string][]*Run
CELEvaluationFailures []CELEvaluationFailure
}
type TriggerRepository ¶
type TriggerRepository interface {
TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)
TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*V1TaskWithPayload, []*DAGWithData, error)
PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
}
type TriggerRepositoryImpl ¶
type TriggerRepositoryImpl struct {
// contains filtered or unexported fields
}
func (TriggerRepositoryImpl) DesiredWorkerId ¶
func (TriggerRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts ¶
func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
func (TriggerRepositoryImpl) ToV1StepRunData ¶
func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*TriggerRepositoryImpl) TriggerFromEvents ¶
func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)
func (*TriggerRepositoryImpl) TriggerFromWorkflowNames ¶
func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*V1TaskWithPayload, []*DAGWithData, error)
type TriggerTaskData ¶
type TriggerTaskData struct {
// (required) the workflow name
WorkflowName string `json:"workflow_name" validate:"required"`
// (optional) the workflow run data
Data []byte `json:"data"`
// (optional) the workflow run metadata
AdditionalMetadata []byte `json:"additional_metadata"`
// (optional) the desired worker id
DesiredWorkerId *string `json:"desired_worker_id"`
// (optional) the parent external id
ParentExternalId *string `json:"parent_external_id"`
// (optional) the parent task id
ParentTaskId *int64 `json:"parent_task_id"`
// (optional) the parent inserted_at
ParentTaskInsertedAt *time.Time `json:"parent_task_inserted_at"`
// (optional) the child index
ChildIndex *int64 `json:"child_index"`
// (optional) the child key
ChildKey *string `json:"child_key"`
// (optional) the priority of the task
Priority *int32 `json:"priority"`
}
type TriggeredBy ¶
type TriggeredByEvent ¶
type TriggeredByEvent struct {
// contains filtered or unexported fields
}
func (*TriggeredByEvent) ToMetadata ¶
func (t *TriggeredByEvent) ToMetadata(additionalMetadata []byte) []byte
type UpdateDAGStatusRow ¶
type UpdateFilterOpts ¶
type UpdateTaskStatusRow ¶
type V1StepRunData ¶
type V1StepRunData struct {
Input map[string]interface{} `json:"input"`
TriggeredBy string `json:"triggered_by"`
Parents map[string]map[string]interface{} `json:"parents"`
Triggers map[string]map[string]interface{} `json:"triggers"`
// custom-set user data for the step
UserData map[string]interface{} `json:"user_data"`
// overrides set from the playground
Overrides map[string]interface{} `json:"overrides"`
// errors in upstream steps (only used in on-failure step)
StepRunErrors map[string]string `json:"step_run_errors,omitempty"`
}
func (*V1StepRunData) Bytes ¶
func (v1 *V1StepRunData) Bytes() []byte
type V1TaskWithPayload ¶ added in v0.73.0
type V1WorkflowRunPopulator ¶
type V1WorkflowRunPopulator struct {
WorkflowRun *WorkflowRunData
TaskMetadata []TaskMetadata
}
type WasSuccessfullyClaimed ¶ added in v0.73.0
type WasSuccessfullyClaimed bool
type WebhookRepository ¶ added in v0.70.0
type WebhookRepository interface {
CreateWebhook(ctx context.Context, tenantId string, params CreateWebhookOpts) (*sqlcv1.V1IncomingWebhook, error)
ListWebhooks(ctx context.Context, tenantId string, params ListWebhooksOpts) ([]*sqlcv1.V1IncomingWebhook, error)
DeleteWebhook(ctx context.Context, tenantId, webhookId string) (*sqlcv1.V1IncomingWebhook, error)
GetWebhook(ctx context.Context, tenantId, webhookId string) (*sqlcv1.V1IncomingWebhook, error)
CanCreate(ctx context.Context, tenantId string, webhookLimit int32) (bool, error)
UpdateWebhook(ctx context.Context, tenantId string, webhookId, newExpression string) (*sqlcv1.V1IncomingWebhook, error)
}
type WorkerRepository ¶
type WorkerRepository interface {
ListWorkers(tenantId string, opts *repository.ListWorkersOpts) ([]*sqlcv1.ListWorkersWithSlotCountRow, error)
GetWorkerById(workerId string) (*sqlcv1.GetWorkerByIdRow, error)
ListWorkerState(tenantId, workerId string, maxRuns int) ([]*sqlcv1.ListSemaphoreSlotsWithStateForWorkerRow, []*dbsqlc.GetStepRunForEngineRow, error)
}
type WorkflowAndScope ¶
type WorkflowNameTriggerOpts ¶
type WorkflowNameTriggerOpts struct {
*TriggerTaskData
ExternalId string
// (optional) The idempotency key to use for debouncing this task
IdempotencyKey *IdempotencyKey
// Whether to skip the creation of the child workflow
ShouldSkip bool
}
type WorkflowRepository ¶
type WorkflowRunData ¶
type WorkflowRunData struct {
AdditionalMetadata []byte `json:"additional_metadata"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
DisplayName string `json:"display_name"`
ErrorMessage string `json:"error_message"`
ExternalID pgtype.UUID `json:"external_id"`
FinishedAt pgtype.Timestamptz `json:"finished_at"`
Input []byte `json:"input"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
Kind sqlcv1.V1RunKind `json:"kind"`
Output *[]byte `json:"output,omitempty"`
ParentTaskExternalId *pgtype.UUID `json:"parent_task_external_id,omitempty"`
ReadableStatus sqlcv1.V1ReadableStatusOlap `json:"readable_status"`
StepId *pgtype.UUID `json:"step_id,omitempty"`
StartedAt pgtype.Timestamptz `json:"started_at"`
TaskExternalId *pgtype.UUID `json:"task_external_id,omitempty"`
TaskId *int64 `json:"task_id,omitempty"`
TaskInsertedAt *pgtype.Timestamptz `json:"task_inserted_at,omitempty"`
TenantID pgtype.UUID `json:"tenant_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
WorkflowVersionId pgtype.UUID `json:"workflow_version_id"`
RetryCount *int `json:"retry_count,omitempty"`
}