Documentation
¶
Index ¶
- Constants
- type AssignResults
- type AssignedItem
- type AssignmentRepository
- type CandidateEventMatch
- type ChildWorkflowSignalCreatedData
- type CompleteTaskOpts
- type ConcurrencyRepository
- type ConcurrencyRepositoryImpl
- func (s ConcurrencyRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (res *RunConcurrencyResult, err error)
- func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error
- type CreateLogLineOpts
- type CreateMatchOpts
- type CreateTaskOpts
- type DAGWithData
- type ErrNamesNotFound
- type EventTriggerOpts
- type EventType
- type FailTaskOpts
- type FailTasksResponse
- type FinalizedTaskResponse
- type GroupMatchCondition
- type InternalEventMatchResults
- type InternalTaskEvent
- type LeaseRepository
- type ListActiveWorkersResult
- type ListFinalizedWorkflowRunsResponse
- type ListLogsOpts
- type ListTaskRunOpts
- type ListWorkflowRunOpts
- type LogLineRepository
- type MatchData
- type MatchRepository
- type MatchRepositoryImpl
- func (s MatchRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*InternalEventMatchResults, error)
- func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- type OLAPRepository
- type OLAPRepositoryImpl
- func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
- func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, ...) error
- func (r *OLAPRepositoryImpl) CreateTasks(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) error
- func (s OLAPRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (r *OLAPRepositoryImpl) GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, ...) ([]*sqlcv1.GetTaskPointMetricsRow, error)
- func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, ...) ([]*sqlcv1.ListTaskEventsRow, error)
- func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
- func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
- func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId string, dagids []pgtype.UUID) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error)
- func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
- func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)
- func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
- func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
- func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
- func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
- func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, ...) (*sqlcv1.PopulateSingleTaskRunDataRow, *pgtype.UUID, error)
- func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
- func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
- func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantId string) (bool, []UpdateDAGStatusRow, error)
- func (o *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
- func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantId string) (bool, []UpdateTaskStatusRow, error)
- type QueueFactoryRepository
- type QueueRepository
- type RateLimitRepository
- type RateLimitResult
- type ReadTaskRunMetricsOpts
- type ReadableTaskStatus
- type RefreshTimeoutBy
- type ReplayTaskOpts
- type ReplayTasksResult
- type Repository
- type RetriedTask
- type RunConcurrencyResult
- type SchedulerRepository
- type Sticky
- type TaskIdEventKeyTuple
- type TaskIdInsertedAtRetryCount
- type TaskInput
- type TaskMetadata
- type TaskOutputEvent
- func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent
- func NewCancelledTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
- func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent
- func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent
- func NewFailedTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
- func NewSkippedTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
- type TaskRepository
- type TaskRepositoryImpl
- func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
- func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
- func (s TaskRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)
- func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
- func (r *TaskRepositoryImpl) GetQueueCounts(ctx context.Context, tenantId string) (map[string]int, error)
- func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
- func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
- func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
- func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
- func (r *TaskRepositoryImpl) ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)
- func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)
- func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
- func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)
- func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
- func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
- type TaskRunMetric
- type TaskWithCancelledReason
- type TaskWithQueue
- type TickerRepository
- type TimeoutTasksResponse
- type TriggerRepository
- type TriggerRepositoryImpl
- func (s TriggerRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
- func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
- type TriggerTaskData
- type UpdateDAGStatusRow
- type UpdateTaskStatusRow
- type V1StepRunData
- type V1WorkflowRunPopulator
- type WorkerRepository
- type WorkflowNameTriggerOpts
- type WorkflowRepository
- type WorkflowRunData
Constants ¶
const MAX_TENANT_RATE_LIMITS = 10000
const NUM_PARTITIONS = 4
TODO: make this dynamic for the instance
const PARENT_STRATEGY_LOCK_OFFSET = 1000000000000 // 1 trillion
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AssignResults ¶
type AssignResults struct {
Assigned []*AssignedItem
Unassigned []*sqlcv1.V1QueueItem
SchedulingTimedOut []*sqlcv1.V1QueueItem
RateLimited []*RateLimitResult
}
type AssignedItem ¶
type AssignedItem struct {
WorkerId pgtype.UUID
QueueItem *sqlcv1.V1QueueItem
}
type AssignmentRepository ¶
type AssignmentRepository interface {
ListActionsForWorkers(ctx context.Context, tenantId pgtype.UUID, workerIds []pgtype.UUID) ([]*sqlcv1.ListActionsForWorkersRow, error)
ListAvailableSlotsForWorkers(ctx context.Context, tenantId pgtype.UUID, params sqlcv1.ListAvailableSlotsForWorkersParams) ([]*sqlcv1.ListAvailableSlotsForWorkersRow, error)
}
type CandidateEventMatch ¶
type ChildWorkflowSignalCreatedData ¶
type ChildWorkflowSignalCreatedData struct {
// The external id of the target child task
ChildExternalId string `json:"external_id"`
// The external id of the parent task
ParentExternalId string `json:"parent_external_id"`
// The index of the child task
ChildIndex int64 `json:"child_index"`
// The key of the child task
ChildKey *string `json:"child_key"`
}
func (*ChildWorkflowSignalCreatedData) Bytes ¶
func (c *ChildWorkflowSignalCreatedData) Bytes() []byte
type CompleteTaskOpts ¶
type CompleteTaskOpts struct {
*TaskIdInsertedAtRetryCount
// (required) the output bytes for the task
Output []byte
}
type ConcurrencyRepository ¶
type ConcurrencyRepository interface {
// Checks whether the concurrency strategy is active, and if not, sets is_active=False
UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error
RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (*RunConcurrencyResult, error)
}
type ConcurrencyRepositoryImpl ¶
type ConcurrencyRepositoryImpl struct {
// contains filtered or unexported fields
}
func (ConcurrencyRepositoryImpl) DesiredWorkerId ¶
func (ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*ConcurrencyRepositoryImpl) RunConcurrencyStrategy ¶
func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy( ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency, ) (res *RunConcurrencyResult, err error)
func (ConcurrencyRepositoryImpl) ToV1StepRunData ¶
func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive ¶
func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive( ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency, ) error
type CreateLogLineOpts ¶
type CreateLogLineOpts struct {
TaskId int64
TaskInsertedAt pgtype.Timestamptz
// (optional) The time when the log line was created.
CreatedAt *time.Time
// (required) The message of the log line.
Message string `validate:"required,min=1,max=10000"`
// (optional) The level of the log line.
Level *string `validate:"omitnil,oneof=INFO ERROR WARN DEBUG"`
// (optional) The metadata of the log line.
Metadata []byte
}
type CreateMatchOpts ¶
type CreateMatchOpts struct {
Kind sqlcv1.V1MatchKind
Conditions []GroupMatchCondition
TriggerDAGId *int64
TriggerDAGInsertedAt pgtype.Timestamptz
TriggerExternalId *string
TriggerWorkflowRunId *string
TriggerStepId *string
TriggerStepIndex pgtype.Int8
TriggerExistingTaskId *int64
TriggerExistingTaskInsertedAt pgtype.Timestamptz
TriggerParentTaskExternalId pgtype.UUID
TriggerParentTaskId pgtype.Int8
TriggerParentTaskInsertedAt pgtype.Timestamptz
TriggerChildIndex pgtype.Int8
TriggerChildKey pgtype.Text
SignalTaskId *int64
SignalTaskInsertedAt pgtype.Timestamptz
SignalExternalId *string
SignalKey *string
}
type CreateTaskOpts ¶
type CreateTaskOpts struct {
// (required) the external id
ExternalId string `validate:"required,uuid"`
// (required) the workflow run id. note this may be the same as the external id if this is a
// single-task workflow, otherwise it represents the external id of the DAG.
WorkflowRunId string `validate:"required,uuid"`
// (required) the step id
StepId string `validate:"required,uuid"`
// (required) the input bytes to the task
Input *TaskInput
// (required) the step index for the task
StepIndex int
// (optional) the additional metadata for the task
AdditionalMetadata []byte
// (optional) the desired worker id
DesiredWorkerId *string
// (optional) the DAG id for the task
DagId *int64
// (optional) the DAG inserted at for the task
DagInsertedAt pgtype.Timestamptz
// (required) the initial state for the task
InitialState sqlcv1.V1TaskInitialState
// (optional) the parent task external id
ParentTaskExternalId *string
// (optional) the parent task id
ParentTaskId *int64
// (optional) the parent task inserted at
ParentTaskInsertedAt *time.Time
// (optional) the child index for the task
ChildIndex *int64
// (optional) the child key for the task
ChildKey *string
}
type DAGWithData ¶
type ErrNamesNotFound ¶
type ErrNamesNotFound struct {
Names []string
}
func (*ErrNamesNotFound) Error ¶
func (e *ErrNamesNotFound) Error() string
type EventTriggerOpts ¶
type EventType ¶
type EventType string
const ( EVENT_TYPE_REQUEUED_NO_WORKER EventType = "REQUEUED_NO_WORKER" EVENT_TYPE_REQUEUED_RATE_LIMIT EventType = "REQUEUED_RATE_LIMIT" EVENT_TYPE_SCHEDULING_TIMED_OUT EventType = "SCHEDULING_TIMED_OUT" EVENT_TYPE_ASSIGNED EventType = "ASSIGNED" EVENT_TYPE_STARTED EventType = "STARTED" EVENT_TYPE_FINISHED EventType = "FINISHED" EVENT_TYPE_FAILED EventType = "FAILED" EVENT_TYPE_RETRYING EventType = "RETRYING" EVENT_TYPE_CANCELLED EventType = "CANCELLED" EVENT_TYPE_TIMED_OUT EventType = "TIMED_OUT" EVENT_TYPE_REASSIGNED EventType = "REASSIGNED" EVENT_TYPE_SLOT_RELEASED EventType = "SLOT_RELEASED" EVENT_TYPE_TIMEOUT_REFRESHED EventType = "TIMEOUT_REFRESHED" EVENT_TYPE_RETRIED_BY_USER EventType = "RETRIED_BY_USER" EVENT_TYPE_SENT_TO_WORKER EventType = "SENT_TO_WORKER" EVENT_TYPE_RATE_LIMIT_ERROR EventType = "RATE_LIMIT_ERROR" EVENT_TYPE_ACKNOWLEDGED EventType = "ACKNOWLEDGED" EVENT_TYPE_CREATED EventType = "CREATED" EVENT_TYPE_QUEUED EventType = "QUEUED" )
type FailTaskOpts ¶
type FailTaskOpts struct {
*TaskIdInsertedAtRetryCount
// (required) whether this is an application-level error or an internal error on the Hatchet side
IsAppError bool
// (optional) the error message for the task
ErrorMessage string
}
type FailTasksResponse ¶
type FailTasksResponse struct {
*FinalizedTaskResponse
RetriedTasks []RetriedTask
}
type FinalizedTaskResponse ¶
type FinalizedTaskResponse struct {
ReleasedTasks []*sqlcv1.ReleaseTasksRow
InternalEvents []InternalTaskEvent
}
type GroupMatchCondition ¶
type GroupMatchCondition struct {
GroupId string `validate:"required,uuid"`
EventType sqlcv1.V1EventType
EventKey string
// (optional) a hint for querying the event data
EventResourceHint *string
// the data key which gets inserted into the returned data from a satisfied match condition
ReadableDataKey string
Expression string
Action sqlcv1.V1MatchConditionAction
// (optional) the data which was used to satisfy the condition (relevant for replays)
Data []byte
}
type InternalEventMatchResults ¶ added in v0.55.0
type InternalTaskEvent ¶
type InternalTaskEvent struct {
TenantID string `json:"tenant_id"`
TaskID int64 `json:"task_id"`
TaskExternalID string `json:"task_external_id"`
RetryCount int32 `json:"retry_count"`
EventType sqlcv1.V1TaskEventType `json:"event_type"`
EventKey string `json:"event_key"`
Data []byte `json:"data"`
}
InternalTaskEvent resembles sqlcv1.V1TaskEvent, but doesn't include the id field as we use COPY FROM to write the events to the database.
type LeaseRepository ¶
type LeaseRepository interface {
ListQueues(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1Queue, error)
ListActiveWorkers(ctx context.Context, tenantId pgtype.UUID) ([]*ListActiveWorkersResult, error)
ListConcurrencyStrategies(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1StepConcurrency, error)
AcquireOrExtendLeases(ctx context.Context, tenantId pgtype.UUID, kind sqlcv1.LeaseKind, resourceIds []string, existingLeases []*sqlcv1.Lease) ([]*sqlcv1.Lease, error)
ReleaseLeases(ctx context.Context, tenantId pgtype.UUID, leases []*sqlcv1.Lease) error
}
type ListActiveWorkersResult ¶
type ListActiveWorkersResult struct {
ID string
MaxRuns int
Labels []*sqlcv1.ListManyWorkerLabelsRow
}
type ListFinalizedWorkflowRunsResponse ¶
type ListFinalizedWorkflowRunsResponse struct {
WorkflowRunId string
OutputEvents []*TaskOutputEvent
}
type ListLogsOpts ¶
type ListLogsOpts struct {
// (optional) number of logs to skip
Offset *int
// (optional) number of logs to return
Limit *int `validate:"omitnil,min=1,max=1000"`
// (optional) a list of log levels to filter by
Levels []string `validate:"omitnil,dive,oneof=INFO ERROR WARN DEBUG"`
// (optional) a search query
Search *string
}
type ListTaskRunOpts ¶
type ListWorkflowRunOpts ¶
type LogLineRepository ¶
type LogLineRepository interface {
ListLogLines(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, opts *ListLogsOpts) ([]*sqlcv1.V1LogLine, error)
PutLog(ctx context.Context, tenantId string, opts *CreateLogLineOpts) error
}
type MatchData ¶
type MatchData struct {
// contains filtered or unexported fields
}
parses match aggregated data
func NewMatchData ¶
func (*MatchData) Action ¶
func (m *MatchData) Action() sqlcv1.V1MatchConditionAction
func (*MatchData) DataValueAsTaskOutputEvent ¶
func (m *MatchData) DataValueAsTaskOutputEvent(key string) *TaskOutputEvent
Helper function for internal events
type MatchRepository ¶
type MatchRepository interface {
ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*InternalEventMatchResults, error)
}
type MatchRepositoryImpl ¶
type MatchRepositoryImpl struct {
// contains filtered or unexported fields
}
func (MatchRepositoryImpl) DesiredWorkerId ¶
func (MatchRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*MatchRepositoryImpl) ProcessInternalEventMatches ¶
func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*InternalEventMatchResults, error)
ProcessInternalEventMatches processes a list of internal events
func (MatchRepositoryImpl) ToV1StepRunData ¶
func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
type OLAPRepository ¶
type OLAPRepository interface {
UpdateTablePartitions(ctx context.Context) error
ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz) (*sqlcv1.PopulateSingleTaskRunDataRow, *pgtype.UUID, error)
ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)
ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
CreateTasks(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) error
CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error
CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, endTimestamp *time.Time, bucketInterval time.Duration) ([]*sqlcv1.GetTaskPointMetricsRow, error)
UpdateTaskStatuses(ctx context.Context, tenantId string) (bool, []UpdateTaskStatusRow, error)
UpdateDAGStatuses(ctx context.Context, tenantId string) (bool, []UpdateDAGStatusRow, error)
ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
ListTasksByDAGId(ctx context.Context, tenantId string, dagIds []pgtype.UUID) ([]*sqlcv1.PopulateTaskRunDataRow, map[int64]uuid.UUID, error)
ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)
// ListTasksByExternalIds returns a list of tasks based on their external ids or the external id of their parent DAG.
// In the case of a DAG, we flatten the result into the list of tasks which belong to that DAG.
ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
}
type OLAPRepositoryImpl ¶
type OLAPRepositoryImpl struct {
// contains filtered or unexported fields
}
func (*OLAPRepositoryImpl) CreateDAGs ¶
func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
func (*OLAPRepositoryImpl) CreateTaskEvents ¶
func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error
func (*OLAPRepositoryImpl) CreateTasks ¶
func (OLAPRepositoryImpl) DesiredWorkerId ¶
func (*OLAPRepositoryImpl) GetTaskPointMetrics ¶
func (*OLAPRepositoryImpl) ListTaskRunEvents ¶
func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)
func (*OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId ¶
func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*sqlcv1.ListTaskEventsForWorkflowRunRow, error)
func (*OLAPRepositoryImpl) ListTasks ¶
func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*sqlcv1.PopulateTaskRunDataRow, int, error)
func (*OLAPRepositoryImpl) ListTasksByDAGId ¶
func (*OLAPRepositoryImpl) ListTasksByExternalIds ¶
func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
func (*OLAPRepositoryImpl) ListTasksByIdAndInsertedAt ¶
func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata) ([]*sqlcv1.PopulateTaskRunDataRow, error)
func (*OLAPRepositoryImpl) ListWorkflowRunDisplayNames ¶
func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
func (*OLAPRepositoryImpl) ListWorkflowRuns ¶
func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
func (OLAPRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*OLAPRepositoryImpl) ReadDAG ¶
func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
func (*OLAPRepositoryImpl) ReadTaskRun ¶
func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
func (*OLAPRepositoryImpl) ReadTaskRunData ¶
func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz) (*sqlcv1.PopulateSingleTaskRunDataRow, *pgtype.UUID, error)
func (*OLAPRepositoryImpl) ReadTaskRunMetrics ¶
func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
func (*OLAPRepositoryImpl) ReadWorkflowRun ¶
func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
func (OLAPRepositoryImpl) ToV1StepRunData ¶
func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*OLAPRepositoryImpl) UpdateDAGStatuses ¶
func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantId string) (bool, []UpdateDAGStatusRow, error)
func (*OLAPRepositoryImpl) UpdateTablePartitions ¶
func (o *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
func (*OLAPRepositoryImpl) UpdateTaskStatuses ¶
func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantId string) (bool, []UpdateTaskStatusRow, error)
type QueueFactoryRepository ¶
type QueueFactoryRepository interface {
NewQueue(tenantId pgtype.UUID, queueName string) QueueRepository
}
type QueueRepository ¶
type QueueRepository interface {
ListQueueItems(ctx context.Context, limit int) ([]*sqlcv1.V1QueueItem, error)
MarkQueueItemsProcessed(ctx context.Context, r *AssignResults) (succeeded []*AssignedItem, failed []*AssignedItem, err error)
GetTaskRateLimits(ctx context.Context, queueItems []*sqlcv1.V1QueueItem) (map[int64]map[string]int32, error)
GetDesiredLabels(ctx context.Context, stepIds []pgtype.UUID) (map[string][]*sqlcv1.GetDesiredLabelsRow, error)
Cleanup()
}
type RateLimitRepository ¶
type RateLimitResult ¶
type ReadTaskRunMetricsOpts ¶
type ReadableTaskStatus ¶
type ReadableTaskStatus string
const ( READABLE_TASK_STATUS_QUEUED ReadableTaskStatus = "QUEUED" READABLE_TASK_STATUS_RUNNING ReadableTaskStatus = "RUNNING" READABLE_TASK_STATUS_COMPLETED ReadableTaskStatus = "COMPLETED" READABLE_TASK_STATUS_CANCELLED ReadableTaskStatus = "CANCELLED" READABLE_TASK_STATUS_FAILED ReadableTaskStatus = "FAILED" )
func StringToReadableStatus ¶
func StringToReadableStatus(status string) ReadableTaskStatus
func (ReadableTaskStatus) EnumValue ¶
func (s ReadableTaskStatus) EnumValue() int
type RefreshTimeoutBy ¶
type ReplayTaskOpts ¶
type ReplayTaskOpts struct {
// (required) the task id
TaskId int64
// (required) the inserted at time
InsertedAt pgtype.Timestamptz
// (required) the external id
ExternalId string
// (required) the step id
StepId string
// (optional) the input bytes to the task, uses the existing input if not set
Input *TaskInput
// (required) the initial state for the task
InitialState sqlcv1.V1TaskInitialState
// (optional) the additional metadata for the task
AdditionalMetadata []byte
}
type ReplayTasksResult ¶
type ReplayTasksResult struct {
ReplayedTasks []TaskIdInsertedAtRetryCount
UpsertedTasks []*sqlcv1.V1Task
InternalEventResults *InternalEventMatchResults
}
type Repository ¶
type Repository interface {
Triggers() TriggerRepository
Tasks() TaskRepository
Scheduler() SchedulerRepository
Matches() MatchRepository
OLAP() OLAPRepository
OverwriteOLAPRepository(o OLAPRepository)
Logs() LogLineRepository
Workers() WorkerRepository
Workflows() WorkflowRepository
Ticker() TickerRepository
}
type RetriedTask ¶
type RunConcurrencyResult ¶
type RunConcurrencyResult struct {
// The tasks which were enqueued
Queued []TaskWithQueue
// If the strategy involves cancelling a task, these are the tasks to cancel
Cancelled []TaskWithCancelledReason
// If the step has multiple concurrency strategies, these are the next ones to notify
NextConcurrencyStrategies []int64
}
type SchedulerRepository ¶
type SchedulerRepository interface {
Concurrency() ConcurrencyRepository
Lease() LeaseRepository
QueueFactory() QueueFactoryRepository
RateLimit() RateLimitRepository
Assignment() AssignmentRepository
}
type TaskIdEventKeyTuple ¶
type TaskIdInsertedAtRetryCount ¶
type TaskIdInsertedAtRetryCount struct {
// (required) the external id
Id int64 `validate:"required"`
// (required) the inserted at time
InsertedAt pgtype.Timestamptz
// (required) the retry count
RetryCount int32
}
type TaskInput ¶
type TaskMetadata ¶
type TaskMetadata struct {
TaskID int64 `json:"task_id"`
TaskInsertedAt time.Time `json:"task_inserted_at"`
}
func ParseTaskMetadata ¶
func ParseTaskMetadata(jsonData []byte) ([]TaskMetadata, error)
type TaskOutputEvent ¶
type TaskOutputEvent struct {
IsFailure bool `json:"is_failure"`
EventType sqlcv1.V1TaskEventType `json:"event_type"`
TaskExternalId string `json:"task_external_id"`
TaskId int64 `json:"task_id"`
RetryCount int32 `json:"retry_count"`
WorkerId *string `json:"worker_id"`
Output []byte `json:"output"`
ErrorMessage string `json:"error_message"`
StepReadableID string `json:"step_readable_id"`
}
func NewCancelledTaskOutputEvent ¶
func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent
func NewCancelledTaskOutputEventFromTask ¶
func NewCancelledTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
func NewCompletedTaskOutputEvent ¶
func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent
func NewFailedTaskOutputEvent ¶
func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent
func NewFailedTaskOutputEventFromTask ¶
func NewFailedTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
func NewSkippedTaskOutputEventFromTask ¶
func NewSkippedTaskOutputEventFromTask(task *sqlcv1.V1Task) *TaskOutputEvent
func (*TaskOutputEvent) Bytes ¶
func (e *TaskOutputEvent) Bytes() []byte
func (*TaskOutputEvent) IsCancelled ¶
func (e *TaskOutputEvent) IsCancelled() bool
func (*TaskOutputEvent) IsCompleted ¶
func (e *TaskOutputEvent) IsCompleted() bool
func (*TaskOutputEvent) IsFailed ¶
func (e *TaskOutputEvent) IsFailed() bool
type TaskRepository ¶
type TaskRepository interface {
UpdateTablePartitions(ctx context.Context) error
// GetTaskByExternalId is a heavily cached method to return task metadata by its external id
GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
// FlattenExternalIds is a non-cached method to look up all tasks in a workflow run by their external ids.
// This is non-cacheable because tasks can be added to a workflow run as it executes.
FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
FailTasks(ctx context.Context, tenantId string, tasks []FailTaskOpts) (*FailTasksResponse, error)
CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)
ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
// ListTaskParentOutputs is a method to return the output of a task's parent and grandparent tasks. This is for v0 compatibility
// with the v1 engine, and shouldn't be called from new v1 endpoints.
ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)
ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)
ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
GetQueueCounts(ctx context.Context, tenantId string) (map[string]int, error)
ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
ReleaseSlot(ctx context.Context, tenantId string, externalId string) (*sqlcv1.V1TaskRuntime, error)
}
type TaskRepositoryImpl ¶
type TaskRepositoryImpl struct {
// contains filtered or unexported fields
}
func (*TaskRepositoryImpl) CancelTasks ¶
func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
func (*TaskRepositoryImpl) CompleteTasks ¶
func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
func (TaskRepositoryImpl) DesiredWorkerId ¶
func (*TaskRepositoryImpl) FailTasks ¶
func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)
func (*TaskRepositoryImpl) FlattenExternalIds ¶
func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
func (*TaskRepositoryImpl) GetQueueCounts ¶
func (*TaskRepositoryImpl) GetTaskByExternalId ¶
func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
func (*TaskRepositoryImpl) ListFinalizedWorkflowRuns ¶
func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
func (*TaskRepositoryImpl) ListTaskMetas ¶
func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
func (*TaskRepositoryImpl) ListTaskParentOutputs ¶
func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
func (TaskRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*TaskRepositoryImpl) ProcessTaskReassignments ¶
func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)
func (*TaskRepositoryImpl) ProcessTaskRetryQueueItems ¶
func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
func (*TaskRepositoryImpl) ProcessTaskTimeouts ¶
func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)
func (*TaskRepositoryImpl) RefreshTimeoutBy ¶
func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
func (*TaskRepositoryImpl) ReleaseSlot ¶
func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)
func (*TaskRepositoryImpl) ReplayTasks ¶
func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
func (TaskRepositoryImpl) ToV1StepRunData ¶
func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*TaskRepositoryImpl) UpdateTablePartitions ¶
func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
type TaskRunMetric ¶
type TaskWithCancelledReason ¶
type TaskWithCancelledReason struct {
*TaskIdInsertedAtRetryCount
CancelledReason string
TaskExternalId string
WorkflowRunId string
}
type TaskWithQueue ¶
type TaskWithQueue struct {
*TaskIdInsertedAtRetryCount
Queue string
}
type TickerRepository ¶
type TimeoutTasksResponse ¶
type TimeoutTasksResponse struct {
*FailTasksResponse
TimeoutTasks []*sqlcv1.ListTasksToTimeoutRow
}
type TriggerRepository ¶
type TriggerRepository interface {
TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
}
type TriggerRepositoryImpl ¶
type TriggerRepositoryImpl struct {
// contains filtered or unexported fields
}
func (TriggerRepositoryImpl) DesiredWorkerId ¶
func (TriggerRepositoryImpl) PopulateExternalIdsForWorkflow ¶
func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts ¶
func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
func (TriggerRepositoryImpl) ToV1StepRunData ¶
func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*TriggerRepositoryImpl) TriggerFromEvents ¶
func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
func (*TriggerRepositoryImpl) TriggerFromWorkflowNames ¶
func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*sqlcv1.V1Task, []*DAGWithData, error)
type TriggerTaskData ¶
type TriggerTaskData struct {
// (required) the workflow name
WorkflowName string `json:"workflow_name" validate:"required"`
// (optional) the workflow run data
Data []byte `json:"data"`
// (optional) the workflow run metadata
AdditionalMetadata []byte `json:"additional_metadata"`
// (optional) the desired worker id
DesiredWorkerId *string `json:"desired_worker_id"`
// (optional) the parent external id
ParentExternalId *string `json:"parent_external_id"`
// (optional) the parent task id
ParentTaskId *int64 `json:"parent_task_id"`
// (optional) the parent inserted_at
ParentTaskInsertedAt *time.Time `json:"parent_task_inserted_at"`
// (optional) the child index
ChildIndex *int64 `json:"child_index"`
// (optional) the child key
ChildKey *string `json:"child_key"`
}
type UpdateDAGStatusRow ¶
type UpdateDAGStatusRow struct {
DagId int64
DagInsertedAt pgtype.Timestamptz
ReadableStatus sqlcv1.V1ReadableStatusOlap
ExternalId pgtype.UUID
}
type UpdateTaskStatusRow ¶
type UpdateTaskStatusRow struct {
TaskId int64
TaskInsertedAt pgtype.Timestamptz
ReadableStatus sqlcv1.V1ReadableStatusOlap
ExternalId pgtype.UUID
}
type V1StepRunData ¶
type V1StepRunData struct {
Input map[string]interface{} `json:"input"`
TriggeredBy string `json:"triggered_by"`
Parents map[string]map[string]interface{} `json:"parents"`
// custom-set user data for the step
UserData map[string]interface{} `json:"user_data"`
// overrides set from the playground
Overrides map[string]interface{} `json:"overrides"`
// errors in upstream steps (only used in on-failure step)
StepRunErrors map[string]string `json:"step_run_errors,omitempty"`
}
func (*V1StepRunData) Bytes ¶
func (v1 *V1StepRunData) Bytes() []byte
type V1WorkflowRunPopulator ¶
type V1WorkflowRunPopulator struct {
WorkflowRun *WorkflowRunData
TaskMetadata []TaskMetadata
}
type WorkerRepository ¶
type WorkerRepository interface {
ListWorkers(tenantId string, opts *repository.ListWorkersOpts) ([]*sqlcv1.ListWorkersWithSlotCountRow, error)
GetWorkerById(workerId string) (*sqlcv1.GetWorkerByIdRow, error)
ListWorkerState(tenantId, workerId string, maxRuns int) ([]*sqlcv1.ListSemaphoreSlotsWithStateForWorkerRow, []*dbsqlc.GetStepRunForEngineRow, error)
}
type WorkflowNameTriggerOpts ¶
type WorkflowNameTriggerOpts struct {
*TriggerTaskData
ExternalId string
// Whether to skip the creation of the child workflow
ShouldSkip bool
}
type WorkflowRepository ¶
type WorkflowRunData ¶
type WorkflowRunData struct {
AdditionalMetadata []byte `json:"additional_metadata"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
DisplayName string `json:"display_name"`
ErrorMessage string `json:"error_message"`
ExternalID pgtype.UUID `json:"external_id"`
FinishedAt pgtype.Timestamptz `json:"finished_at"`
Input []byte `json:"input"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
Kind sqlcv1.V1RunKind `json:"kind"`
Output *[]byte `json:"output,omitempty"`
ReadableStatus sqlcv1.V1ReadableStatusOlap `json:"readable_status"`
StepId *pgtype.UUID `json:"step_id,omitempty"`
StartedAt pgtype.Timestamptz `json:"started_at"`
TaskExternalId *pgtype.UUID `json:"task_external_id,omitempty"`
TaskId *int64 `json:"task_id,omitempty"`
TaskInsertedAt *pgtype.Timestamptz `json:"task_inserted_at,omitempty"`
TenantID pgtype.UUID `json:"tenant_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
WorkflowVersionId pgtype.UUID `json:"workflow_version_id"`
}