Documentation
¶
Index ¶
- Constants
- Variables
- func BoolPtr(b bool) *bool
- func HashPassword(pw string) (*string, error)
- func OLAPPayloadOffloadMessage(tenantId string, payloads []OLAPPayloadToOffload) (*msgqueue.Message, error)
- func RunPostCommit[T any](l *zerolog.Logger, tenantId string, v T, opts []CallbackOptFunc[T])
- func RunPreCommit[T any](l *zerolog.Logger, tenantId string, v T, opts []CallbackOptFunc[T])
- func StringPtr(s string) *string
- func ValidateJSONB(jsonb []byte, fieldName string) error
- func VerifyPassword(hashedPW, candidate string) (bool, error)
- type APIKeyAuthCredentials
- type APITokenGenerator
- type APITokenRepository
- type AssignResults
- type AssignedItem
- type AssignmentRepository
- type AuthConfig
- type BasicAuthCredentials
- type BulkCutOverOLAPPayload
- type BulkCutOverPayload
- type CELEvaluationFailure
- type CallbackOptFunc
- type CandidateEventMatch
- type ChildWorkflowSignalCreatedData
- type CompleteTaskOpts
- type ConcurrencyRepository
- type ConcurrencyRepositoryImpl
- func (s ConcurrencyRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (res *RunConcurrencyResult, err error)
- func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error
- type ConcurrencyStat
- type CreateAPITokenOpts
- type CreateConcurrencyOpts
- type CreateCronWorkflowTriggerOpts
- type CreateDispatcherOpts
- type CreateExternalSignalConditionKind
- type CreateExternalSignalConditionOpt
- type CreateFilterOpts
- type CreateIncomingWebhookFailureLogOpts
- type CreateLogLineOpts
- type CreateMatchOpts
- type CreateSNSIntegrationOpts
- type CreateScheduledWorkflowRunForWorkflowOpts
- type CreateSessionOpts
- type CreateStepMatchConditionOpt
- type CreateStepOpts
- type CreateTaskOpts
- type CreateTenantAlertGroupOpts
- type CreateTenantInviteOpts
- type CreateTenantMemberOpts
- type CreateTenantOpts
- type CreateTickerOpts
- type CreateUserOpts
- type CreateWebhookOpts
- type CreateWorkerOpts
- type CreateWorkflowStepRateLimitOpts
- type CreateWorkflowVersionOpts
- type CutoverBatchOutcome
- type CutoverJobRunMetadata
- type DAGWithData
- type DesiredWorkerLabelOpts
- type DispatcherRepository
- type ErrNamesNotFound
- type EventExternalIdFilterId
- type EventMatchResults
- type EventTriggerOpts
- type EventTriggersFromExternalId
- type EventType
- type EventWithPayload
- type ExternalCreateSignalMatchOpts
- type ExternalPayloadLocationKey
- type ExternalStore
- type FailTaskOpts
- type FailTasksResponse
- type FilterRepository
- type FinalizedTaskResponse
- type GetQueueMetricsOpts
- type GetQueueMetricsResponse
- type GetTenantAlertingSettingsResponse
- type GroupMatchCondition
- type HMACAuthCredentials
- type HealthRepository
- type IdInsertedAt
- type IdempotencyKey
- type IdempotencyRepository
- type InternalTaskEvent
- type IntervalSettingsRepository
- type JobRunHasCycleError
- type KeyClaimantPair
- type LeaseRepository
- type Limit
- type ListActiveWorkersResult
- type ListCronWorkflowsOpts
- type ListEventsRow
- type ListFiltersOpts
- type ListFinalizedWorkflowRunsResponse
- type ListLogsOpts
- type ListRateLimitOpts
- type ListRateLimitsResult
- type ListScheduledWorkflowsOpts
- type ListTaskRunOpts
- type ListTenantInvitesOpts
- type ListTickerOpts
- type ListWebhooksOpts
- type ListWorkersOpts
- type ListWorkflowRunOpts
- type ListWorkflowsOpts
- type ListWorkflowsResult
- type LogLineRepository
- type MatchData
- type MatchRepository
- type MatchRepositoryImpl
- func (s MatchRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
- func (m *MatchRepositoryImpl) ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
- func (m *MatchRepositoryImpl) RegisterSignalMatchConditions(ctx context.Context, tenantId string, ...) error
- func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- type MessageQueueRepository
- type NoOpExternalStore
- type NoOpIntervalSettingsRepository
- func (r *NoOpIntervalSettingsRepository) ReadAllIntervals(ctx context.Context, operationId string) (map[string]time.Duration, error)
- func (r *NoOpIntervalSettingsRepository) ReadInterval(ctx context.Context, operationId string, tenantId string) (time.Duration, error)
- func (r *NoOpIntervalSettingsRepository) SetInterval(ctx context.Context, operationId string, tenantId string, d time.Duration) (time.Duration, error)
- type OAuthOpts
- type OLAPCutoverBatchOutcome
- type OLAPCutoverJobRunMetadata
- type OLAPPaginationParams
- type OLAPPayloadToOffload
- type OLAPPayloadsToOffload
- type OLAPRepository
- type OLAPRepositoryImpl
- func (r *OLAPRepositoryImpl) AnalyzeOLAPTables(ctx context.Context) error
- func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, ...) error
- func (r *OLAPRepositoryImpl) CountOLAPTempTableSizeForDAGStatusUpdates(ctx context.Context) (int64, error)
- func (r *OLAPRepositoryImpl) CountOLAPTempTableSizeForTaskStatusUpdates(ctx context.Context) (int64, error)
- func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
- func (r *OLAPRepositoryImpl) CreateIncomingWebhookValidationFailureLogs(ctx context.Context, tenantId string, ...) error
- func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, ...) error
- func (r *OLAPRepositoryImpl) CreateTasks(ctx context.Context, tenantId string, tasks []*V1TaskWithPayload) error
- func (s OLAPRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (r *OLAPRepositoryImpl) GetDAGDurations(ctx context.Context, tenantId string, externalIds []pgtype.UUID, ...) (map[string]*sqlcv1.GetDagDurationsRow, error)
- func (r *OLAPRepositoryImpl) GetEvent(ctx context.Context, externalId string) (*sqlcv1.V1EventsOlap, error)
- func (r *OLAPRepositoryImpl) GetEventWithPayload(ctx context.Context, externalId, tenantId string) (*EventWithPayload, error)
- func (r *OLAPRepositoryImpl) GetTaskDurationsByTaskIds(ctx context.Context, tenantId string, taskIds []int64, ...) (map[int64]*sqlcv1.GetTaskDurationsByTaskIdsRow, error)
- func (r *OLAPRepositoryImpl) GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, ...) ([]*sqlcv1.GetTaskPointMetricsRow, error)
- func (r *OLAPRepositoryImpl) GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error)
- func (r *OLAPRepositoryImpl) ListEventKeys(ctx context.Context, tenantId string) ([]string, error)
- func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*EventWithPayload, *int64, error)
- func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, ...) ([]*sqlcv1.ListTaskEventsRow, error)
- func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*TaskEventWithPayloads, error)
- func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*TaskWithPayloads, int, error)
- func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId string, dagids []pgtype.UUID, ...) ([]*TaskWithPayloads, map[int64]uuid.UUID, error)
- func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
- func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata, ...) ([]*TaskWithPayloads, error)
- func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
- func (r *OLAPRepositoryImpl) ListWorkflowRunExternalIds(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]pgtype.UUID, error)
- func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
- func (r *OLAPRepositoryImpl) ListYesterdayRunCountsByStatus(ctx context.Context) (map[sqlcv1.V1ReadableStatusOlap]int64, error)
- func (r *OLAPRepositoryImpl) OffloadPayloads(ctx context.Context, tenantId string, payloads []OffloadPayloadOpts) error
- func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context, partitionDate PartitionDate, candidateBatchNumRows int32, ...) (*int32, error)
- func (r *OLAPRepositoryImpl) PayloadStore() PayloadStoreRepository
- func (r *OLAPRepositoryImpl) PopulateEventData(ctx context.Context, tenantId pgtype.UUID, eventExternalIds []pgtype.UUID) (map[pgtype.UUID]sqlcv1.PopulateEventDataRow, error)
- func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (p *OLAPRepositoryImpl) ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, ...) error
- func (r *OLAPRepositoryImpl) PutPayloads(ctx context.Context, tx sqlcv1.DBTX, tenantId TenantID, ...) (map[PayloadExternalId]ExternalPayloadLocationKey, error)
- func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
- func (r *OLAPRepositoryImpl) ReadPayload(ctx context.Context, tenantId string, externalId pgtype.UUID) ([]byte, error)
- func (r *OLAPRepositoryImpl) ReadPayloads(ctx context.Context, tenantId string, externalIds ...pgtype.UUID) (map[pgtype.UUID][]byte, error)
- func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
- func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, ...) (*TaskWithPayloads, pgtype.UUID, error)
- func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
- func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
- func (r *OLAPRepositoryImpl) SetReadReplicaPool(pool *pgxpool.Pool)
- func (r *OLAPRepositoryImpl) StatusUpdateBatchSizeLimits() StatusUpdateBatchSizeLimits
- func (r *OLAPRepositoryImpl) StoreCELEvaluationFailures(ctx context.Context, tenantId string, failures []CELEvaluationFailure) error
- func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateDAGStatusRow, error)
- func (r *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
- func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateTaskStatusRow, error)
- type OffloadPayloadOpts
- type OffloadToExternalStoreOpts
- type PGHealthError
- type PGHealthRepository
- type PaginationParams
- type PartitionDate
- type PayloadExternalId
- type PayloadLocation
- type PayloadMetadata
- type PayloadStoreRepository
- type PayloadStoreRepositoryOpts
- type PayloadUniqueKey
- type PlanLimitMap
- type PubSubMessage
- type QueueFactoryRepository
- type QueueMetric
- type QueueRepository
- type RateLimitRepository
- type RateLimitResult
- type ReadTaskRunMetricsOpts
- type ReadableTaskStatus
- type RefreshTimeoutBy
- type ReplayTaskOpts
- type ReplayTasksResult
- type Repository
- type RetriedTask
- type RetrievePayloadOpts
- type Run
- type RunConcurrencyResult
- type RuntimeInfo
- type SDK
- type SNSRepository
- type ScheduledWorkflowMeta
- type ScheduledWorkflowUpdate
- type SchedulerRepository
- type SecurityCheckRepository
- type SlackRepository
- type StatusUpdateBatchSizeLimits
- type Sticky
- type StoreOLAPPayloadOpts
- type StorePayloadOpts
- type TaskEventWithPayloads
- type TaskIdEventKeyTuple
- type TaskIdInsertedAtRetryCount
- type TaskIdInsertedAtSignalKey
- type TaskInput
- type TaskMetadata
- type TaskOperationLimits
- type TaskOutputEvent
- func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent
- func NewCancelledTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
- func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent
- func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent
- func NewFailedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
- func NewSkippedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
- type TaskRepository
- type TaskRepositoryImpl
- func (r *TaskRepositoryImpl) AnalyzeTaskTables(ctx context.Context) error
- func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
- func (r *TaskRepositoryImpl) Cleanup(ctx context.Context) (bool, error)
- func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
- func (r *TaskRepositoryImpl) DefaultTaskActivityGauge(ctx context.Context, tenantId string) (int, error)
- func (s TaskRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (r *TaskRepositoryImpl) EnsureTablePartitionsExist(ctx context.Context) (bool, error)
- func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)
- func (r *TaskRepositoryImpl) FindOldestRunningTaskInsertedAt(ctx context.Context) (*time.Time, error)
- func (r *TaskRepositoryImpl) FindOldestTaskInsertedAt(ctx context.Context) (*time.Time, error)
- func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
- func (r *TaskRepositoryImpl) GetQueueCounts(ctx context.Context, tenantId string) (map[string]interface{}, error)
- func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
- func (r *TaskRepositoryImpl) GetTaskStats(ctx context.Context, tenantId string) (map[string]TaskStat, error)
- func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
- func (r *TaskRepositoryImpl) ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*V1TaskEventWithPayload, error)
- func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
- func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
- func (r *TaskRepositoryImpl) ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)
- func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *TaskRepositoryImpl) ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
- func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)
- func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
- func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)
- func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
- func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
- type TaskRunMetric
- type TaskStat
- type TaskStatusStat
- type TaskWithCancelledReason
- type TaskWithPayloads
- type TaskWithQueue
- type TenantAlertEmailGroupForSend
- type TenantAlertingRepository
- type TenantCallbackOpts
- type TenantID
- type TenantIdSDKTuple
- type TenantInviteRepository
- type TenantLimitConfig
- type TenantLimitRepository
- type TenantRepository
- type TenantScopedCallback
- type TickerRepository
- type TimeoutTasksResponse
- type TriggerDecision
- type TriggerFromEventsResult
- type TriggerRepository
- type TriggerRepositoryImpl
- func (s TriggerRepositoryImpl) DesiredWorkerId(t *TaskInput) *string
- func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
- func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
- func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)
- func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*V1TaskWithPayload, []*DAGWithData, error)
- type TriggerTaskData
- type TriggeredBy
- type TriggeredByEvent
- type UnscopedCallback
- type UpdateCronOpts
- type UpdateDAGStatusRow
- type UpdateDispatcherOpts
- type UpdateFilterOpts
- type UpdateSessionOpts
- type UpdateTaskStatusRow
- type UpdateTenantAlertGroupOpts
- type UpdateTenantAlertingSettingsOpts
- type UpdateTenantInviteOpts
- type UpdateTenantMemberOpts
- type UpdateTenantOpts
- type UpdateTickerOpts
- type UpdateUserOpts
- type UpdateWorkerOpts
- type UpsertRateLimitOpts
- type UpsertSlackWebhookOpts
- type UpsertTenantAlertingSettingsOpts
- type UpsertWorkerLabelOpts
- type UserRepository
- type UserSessionRepository
- type V1StepRunData
- type V1TaskEventWithPayload
- type V1TaskWithPayload
- type V1WorkflowRunPopulator
- type WasSuccessfullyClaimed
- type WebhookRepository
- type WorkerRepository
- type WorkflowAndScope
- type WorkflowMetrics
- type WorkflowNameTriggerOpts
- type WorkflowRepository
- type WorkflowRunData
- type WorkflowScheduleRepository
Constants ¶
const MAX_BATCH_SIZE_BYTES = 1.5 * 1024 * 1024 * 1024 // 1.5 GB
const MAX_PARTITIONS_TO_OFFLOAD = 14 // two weeks
const MAX_TENANT_RATE_LIMITS = 10000
const NUM_PARTITIONS = 4
TODO: make this dynamic for the instance
const PARENT_STRATEGY_LOCK_OFFSET = 1000000000000 // 1 trillion
Variables ¶
var ErrDagParentNotFound = errors.New("dag parent not found")
var ErrResourceExhausted = fmt.Errorf("resource exhausted")
Functions ¶
func HashPassword ¶
func OLAPPayloadOffloadMessage ¶ added in v0.74.3
func OLAPPayloadOffloadMessage(tenantId string, payloads []OLAPPayloadToOffload) (*msgqueue.Message, error)
func RunPostCommit ¶
func RunPostCommit[T any](l *zerolog.Logger, tenantId string, v T, opts []CallbackOptFunc[T])
func RunPreCommit ¶
func RunPreCommit[T any](l *zerolog.Logger, tenantId string, v T, opts []CallbackOptFunc[T])
func ValidateJSONB ¶
func VerifyPassword ¶
Types ¶
type APIKeyAuthCredentials ¶ added in v0.74.3
type APITokenGenerator ¶
type APITokenRepository ¶
type APITokenRepository interface {
CreateAPIToken(ctx context.Context, opts *CreateAPITokenOpts) (*sqlcv1.APIToken, error)
GetAPITokenById(ctx context.Context, id string) (*sqlcv1.APIToken, error)
ListAPITokensByTenant(ctx context.Context, tenantId string) ([]*sqlcv1.APIToken, error)
RevokeAPIToken(ctx context.Context, id string) error
DeleteAPIToken(ctx context.Context, tenantId, id string) error
}
type AssignResults ¶
type AssignResults struct {
Assigned []*AssignedItem
Unassigned []*sqlcv1.V1QueueItem
SchedulingTimedOut []*sqlcv1.V1QueueItem
RateLimited []*RateLimitResult
RateLimitedToMove []*RateLimitResult
}
type AssignedItem ¶
type AssignedItem struct {
WorkerId pgtype.UUID
QueueItem *sqlcv1.V1QueueItem
}
type AssignmentRepository ¶
type AssignmentRepository interface {
ListActionsForWorkers(ctx context.Context, tenantId pgtype.UUID, workerIds []pgtype.UUID) ([]*sqlcv1.ListActionsForWorkersRow, error)
ListAvailableSlotsForWorkers(ctx context.Context, tenantId pgtype.UUID, params sqlcv1.ListAvailableSlotsForWorkersParams) ([]*sqlcv1.ListAvailableSlotsForWorkersRow, error)
}
type AuthConfig ¶ added in v0.74.3
type AuthConfig struct {
Type sqlcv1.V1IncomingWebhookAuthType `json:"type" validate:"required"`
BasicAuth *BasicAuthCredentials `json:"basic_auth,omitempty"`
APIKeyAuth *APIKeyAuthCredentials `json:"api_key_auth,omitempty"`
HMACAuth *HMACAuthCredentials `json:"hmac_auth,omitempty"`
}
func (*AuthConfig) Validate ¶ added in v0.74.3
func (ac *AuthConfig) Validate() error
type BasicAuthCredentials ¶ added in v0.74.3
type BulkCutOverOLAPPayload ¶ added in v0.74.3
type BulkCutOverOLAPPayload struct {
TenantID pgtype.UUID
InsertedAt pgtype.Timestamptz
ExternalId pgtype.UUID
ExternalLocationKey ExternalPayloadLocationKey
}
type BulkCutOverPayload ¶ added in v0.74.3
type BulkCutOverPayload struct {
TenantID pgtype.UUID
Id int64
InsertedAt pgtype.Timestamptz
ExternalId pgtype.UUID
Type sqlcv1.V1PayloadType
ExternalLocationKey ExternalPayloadLocationKey
}
type CELEvaluationFailure ¶ added in v0.74.3
type CELEvaluationFailure struct {
Source sqlcv1.V1CelEvaluationFailureSource `json:"source"`
ErrorMessage string `json:"error_message"`
}
type CallbackOptFunc ¶
type CallbackOptFunc[T any] func(*TenantCallbackOpts[T])
func WithPostCommitCallback ¶
func WithPostCommitCallback[T any](cb TenantScopedCallback[T]) CallbackOptFunc[T]
func WithPreCommitCallback ¶
func WithPreCommitCallback[T any](cb TenantScopedCallback[T]) CallbackOptFunc[T]
type CandidateEventMatch ¶ added in v0.74.3
type ChildWorkflowSignalCreatedData ¶ added in v0.74.3
type ChildWorkflowSignalCreatedData struct {
// The external id of the target child task
ChildExternalId string `json:"external_id"`
// The external id of the parent task
ParentExternalId string `json:"parent_external_id"`
// The index of the child task
ChildIndex int64 `json:"child_index"`
// The key of the child task
ChildKey *string `json:"child_key"`
}
func (*ChildWorkflowSignalCreatedData) Bytes ¶ added in v0.74.3
func (c *ChildWorkflowSignalCreatedData) Bytes() []byte
type CompleteTaskOpts ¶ added in v0.74.3
type CompleteTaskOpts struct {
*TaskIdInsertedAtRetryCount
// (required) the output bytes for the task
Output []byte
}
type ConcurrencyRepository ¶ added in v0.74.3
type ConcurrencyRepository interface {
// Checks whether the concurrency strategy is active, and if not, sets is_active=False
UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error
RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (*RunConcurrencyResult, error)
}
type ConcurrencyRepositoryImpl ¶ added in v0.74.3
type ConcurrencyRepositoryImpl struct {
// contains filtered or unexported fields
}
func (ConcurrencyRepositoryImpl) DesiredWorkerId ¶ added in v0.74.3
func (ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow ¶ added in v0.74.3
func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*ConcurrencyRepositoryImpl) RunConcurrencyStrategy ¶ added in v0.74.3
func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy( ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency, ) (res *RunConcurrencyResult, err error)
func (ConcurrencyRepositoryImpl) ToV1StepRunData ¶ added in v0.74.3
func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive ¶ added in v0.74.3
func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive( ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency, ) error
type ConcurrencyStat ¶ added in v0.74.3
type ConcurrencyStat struct {
Expression string `json:"expression"`
Type string `json:"type"`
Keys map[string]int64 `json:"keys"`
}
ConcurrencyStat represents concurrency information for a task
type CreateAPITokenOpts ¶
type CreateAPITokenOpts struct {
// The id of the token
ID string `validate:"required,uuid"`
// When the token expires
ExpiresAt time.Time
// (optional) A tenant ID for this API token
TenantId *string `validate:"omitempty,uuid"`
// (optional) A name for this API token
Name *string `validate:"omitempty,max=255"`
Internal bool
}
type CreateConcurrencyOpts ¶ added in v0.74.3
type CreateConcurrencyOpts struct {
// (optional) the maximum number of concurrent workflow runs, default 1
MaxRuns *int32
// (optional) the strategy to use when the concurrency limit is reached, default CANCEL_IN_PROGRESS
LimitStrategy *string `validate:"omitnil,oneof=CANCEL_IN_PROGRESS GROUP_ROUND_ROBIN CANCEL_NEWEST"`
// (required) a concurrency expression for evaluating the concurrency key
Expression string `validate:"celworkflowrunstr"`
}
type CreateCronWorkflowTriggerOpts ¶
type CreateCronWorkflowTriggerOpts struct {
// (required) the workflow id
WorkflowId string `validate:"required,uuid"`
// (required) the workflow name
Name string `validate:"required"`
Cron string `validate:"required,cron"`
Input map[string]interface{}
AdditionalMetadata map[string]interface{}
Priority *int32 `validate:"omitempty,min=1,max=3"`
}
type CreateDispatcherOpts ¶
type CreateDispatcherOpts struct {
ID string `validate:"required,uuid"`
}
type CreateExternalSignalConditionKind ¶ added in v0.74.3
type CreateExternalSignalConditionKind string
const ( CreateExternalSignalConditionKindSLEEP CreateExternalSignalConditionKind = "SLEEP" CreateExternalSignalConditionKindUSEREVENT CreateExternalSignalConditionKind = "USER_EVENT" )
type CreateExternalSignalConditionOpt ¶ added in v0.74.3
type CreateExternalSignalConditionOpt struct {
Kind CreateExternalSignalConditionKind `validate:"required, oneof=SLEEP USER_EVENT"`
ReadableDataKey string `validate:"required"`
OrGroupId string `validate:"required,uuid"`
UserEventKey *string
SleepFor *string `validate:"omitempty,duration"`
Expression string
}
type CreateFilterOpts ¶ added in v0.74.3
type CreateIncomingWebhookFailureLogOpts ¶ added in v0.74.3
type CreateLogLineOpts ¶
type CreateLogLineOpts struct {
TaskId int64
TaskInsertedAt pgtype.Timestamptz
// (optional) The time when the log line was created.
CreatedAt *time.Time
// (required) The message of the log line.
Message string `validate:"required,min=1,max=10000"`
// (optional) The level of the log line.
Level *string `validate:"omitnil,oneof=INFO ERROR WARN DEBUG"`
// (optional) The metadata of the log line.
Metadata []byte
// The retry count of the log line.
RetryCount int
}
type CreateMatchOpts ¶ added in v0.74.3
type CreateMatchOpts struct {
Kind sqlcv1.V1MatchKind
ExistingMatchData []byte
Conditions []GroupMatchCondition
TriggerDAGId *int64
TriggerDAGInsertedAt pgtype.Timestamptz
TriggerExternalId *string
TriggerWorkflowRunId *string
TriggerStepId *string
TriggerStepIndex pgtype.Int8
TriggerExistingTaskId *int64
TriggerExistingTaskInsertedAt pgtype.Timestamptz
TriggerParentTaskExternalId pgtype.UUID
TriggerParentTaskId pgtype.Int8
TriggerParentTaskInsertedAt pgtype.Timestamptz
TriggerChildIndex pgtype.Int8
TriggerChildKey pgtype.Text
TriggerPriority pgtype.Int4
SignalTaskId *int64
SignalTaskInsertedAt pgtype.Timestamptz
SignalExternalId *string
SignalKey *string
}
type CreateSNSIntegrationOpts ¶
type CreateSNSIntegrationOpts struct {
TopicArn string `validate:"required,min=1,max=255"`
}
type CreateSessionOpts ¶
type CreateStepMatchConditionOpt ¶ added in v0.74.3
type CreateStepMatchConditionOpt struct {
// (required) the type of match condition for triggering the step
MatchConditionKind string `validate:"required,oneof=PARENT_OVERRIDE USER_EVENT SLEEP"`
// (required) the key for the event data when the workflow is triggered
ReadableDataKey string `validate:"required"`
// (required) the initial state for the task when the match condition is satisfied
Action string `validate:"required,oneof=QUEUE CANCEL SKIP"`
// (required) the or group id for the match condition
// we ignore the JSON field here because it's randomly generated by the client, and we don't want to
// publish a new workflow version just because this UUID is different. we use the `OrGroupHashIndex` instead.
OrGroupId string `json:"-" validate:"required,uuid"`
// NOTE: should not be set by the caller. This is populated by this package before creating the checksum.
OrGroupIdIndex int32
// (optional) the expression for the match condition
Expression string `validate:"omitempty"`
// (optional) the sleep duration for the match condition, only set if this is a SLEEP
SleepDuration *string `validate:"omitempty,duration"`
// (optional) the event key for the match condition, only set if this is a USER_EVENT
EventKey *string `validate:"omitempty"`
// (optional) if this is a PARENT_OVERRIDE condition, this will be set to the parent readable_id for
// the parent whose trigger behavior we're overriding
ParentReadableId *string `validate:"omitempty"`
}
type CreateStepOpts ¶ added in v0.74.3
type CreateStepOpts struct {
// (required) the task name
ReadableId string `validate:"hatchetName"`
// (required) the task action id
Action string `validate:"required,actionId"`
// (optional) the task timeout
Timeout *string `validate:"omitnil,duration"`
// (optional) the task scheduling timeout
ScheduleTimeout *string `validate:"omitnil,duration"`
// (optional) the parents that this step depends on
Parents []string `validate:"dive,hatchetName"`
// (optional) the step retry max
Retries *int `validate:"omitempty,min=0"`
// (optional) rate limits for this step
RateLimits []CreateWorkflowStepRateLimitOpts `validate:"dive"`
// (optional) desired worker affinity state for this step
DesiredWorkerLabels map[string]DesiredWorkerLabelOpts `validate:"omitempty"`
// (optional) the step retry backoff factor
RetryBackoffFactor *float64 `validate:"omitnil,min=1,max=1000"`
// (optional) the step retry backoff max seconds (can't be greater than 86400)
RetryBackoffMaxSeconds *int `validate:"omitnil,min=1,max=86400"`
// (optional) a list of additional trigger conditions
TriggerConditions []CreateStepMatchConditionOpt `validate:"omitempty,dive"`
// (optional) the step concurrency options
Concurrency []CreateConcurrencyOpts `json:"concurrency,omitempty" validator:"omitnil"`
}
type CreateTaskOpts ¶ added in v0.74.3
type CreateTaskOpts struct {
// (required) the external id
ExternalId string `validate:"required,uuid"`
// (required) the workflow run id. note this may be the same as the external id if this is a
// single-task workflow, otherwise it represents the external id of the DAG.
WorkflowRunId string `validate:"required,uuid"`
// (required) the step id
StepId string `validate:"required,uuid"`
// (required) the input bytes to the task
Input *TaskInput
FilterPayload []byte
// (required) the step index for the task
StepIndex int
// (optional) the additional metadata for the task
AdditionalMetadata []byte
// (optional) the desired worker id
DesiredWorkerId *string
// (optional) the DAG id for the task
DagId *int64
// (optional) the DAG inserted at for the task
DagInsertedAt pgtype.Timestamptz
// (required) the initial state for the task
InitialState sqlcv1.V1TaskInitialState
// (optional) the parent task external id
ParentTaskExternalId *string
// (optional) the parent task id
ParentTaskId *int64
// (optional) the parent task inserted at
ParentTaskInsertedAt *time.Time
// (optional) The priority of a task, between 1 and 3
Priority *int32
// (optional) the child index for the task
ChildIndex *int64
// (optional) the child key for the task
ChildKey *string
}
type CreateTenantAlertGroupOpts ¶
type CreateTenantAlertGroupOpts struct {
Emails []string `validate:"required,dive,email,max=255"`
}
type CreateTenantInviteOpts ¶
type CreateTenantInviteOpts struct {
// (required) the invitee email
InviteeEmail string `validate:"required,email"`
// (required) the inviter email
InviterEmail string `validate:"required,email"`
// (required) when the invite expires
ExpiresAt time.Time `validate:"required,future"`
// (required) the role of the invitee
Role string `validate:"omitempty,oneof=OWNER ADMIN MEMBER"`
MaxPending int `validate:"omitempty"`
}
type CreateTenantMemberOpts ¶
type CreateTenantOpts ¶
type CreateTenantOpts struct {
// (required) the tenant name
Name string `validate:"required"`
// (required) the tenant slug
Slug string `validate:"required,hatchetName"`
// (optional) the tenant ID
ID *string `validate:"omitempty,uuid"`
// (optional) the tenant data retention period
DataRetentionPeriod *string `validate:"omitempty,duration"`
// (optional) the tenant engine version
EngineVersion *sqlcv1.TenantMajorEngineVersion `validate:"omitempty"`
// (optional) the tenant environment type
Environment *string `validate:"omitempty,oneof=local development production"`
// (optional) additional onboarding data
OnboardingData map[string]interface{} `validate:"omitempty"`
}
type CreateTickerOpts ¶
type CreateTickerOpts struct {
ID string `validate:"required,uuid"`
}
type CreateUserOpts ¶
type CreateWebhookOpts ¶ added in v0.74.3
type CreateWebhookOpts struct {
Tenantid pgtype.UUID `json:"tenantid"`
Sourcename sqlcv1.V1IncomingWebhookSourceName `json:"sourcename"`
Name string `json:"name" validate:"required"`
Eventkeyexpression string `json:"eventkeyexpression"`
AuthConfig AuthConfig `json:"auth_config,omitempty"`
}
type CreateWorkerOpts ¶
type CreateWorkerOpts struct {
// The id of the dispatcher
DispatcherId string `validate:"required,uuid"`
// The maximum number of runs this worker can run at a time
MaxRuns *int `validate:"omitempty,gte=1"`
// The name of the worker
Name string `validate:"required,hatchetName"`
// The name of the service
Services []string `validate:"dive,hatchetName"`
// A list of actions this worker can run
Actions []string `validate:"dive,actionId"`
// (optional) Runtime info for the worker
RuntimeInfo *RuntimeInfo `validate:"omitempty"`
}
type CreateWorkflowStepRateLimitOpts ¶
type CreateWorkflowStepRateLimitOpts struct {
// (required) the rate limit key
Key string `validate:"required"`
// (optional) a CEL expression for the rate limit key
KeyExpr *string `validate:"omitnil,celsteprunstr,required_without=Key"`
// (optional) the rate limit units to consume
Units *int `validate:"omitnil,required_without=UnitsExpr"`
// (optional) a CEL expression for the rate limit units
UnitsExpr *string `validate:"omitnil,celsteprunstr,required_without=Units"`
// (optional) a CEL expression for a dynamic limit value for the rate limit
LimitExpr *string `validate:"omitnil,celsteprunstr"`
// (optional) the rate limit duration, defaults to MINUTE
Duration *string `validate:"omitnil"`
}
type CreateWorkflowVersionOpts ¶
type CreateWorkflowVersionOpts struct {
// (required) the workflow name
Name string `validate:"required,hatchetName"`
// (optional) the workflow description
Description *string `json:"description,omitempty"`
// (optional) event triggers for the workflow
EventTriggers []string
// (optional) cron triggers for the workflow
CronTriggers []string `validate:"dive,cron"`
// (optional) the input bytes for the cron triggers
CronInput []byte
// (required) the tasks in the workflow
Tasks []CreateStepOpts `validate:"required,min=1,dive"`
OnFailure *CreateStepOpts `json:"onFailureJob,omitempty" validate:"omitempty"`
// (optional) the workflow concurrency groups
Concurrency []CreateConcurrencyOpts `json:"concurrency,omitempty" validator:"omitempty,dive"`
// (optional) sticky strategy
Sticky *string `validate:"omitempty,oneof=SOFT HARD"`
DefaultPriority *int32 `validate:"omitempty,min=1,max=3"`
DefaultFilters []types.DefaultFilter `json:"defaultFilters,omitempty" validate:"omitempty,dive"`
}
type CutoverBatchOutcome ¶ added in v0.74.3
type CutoverBatchOutcome struct {
ShouldContinue bool
NextPagination PaginationParams
}
type CutoverJobRunMetadata ¶ added in v0.74.3
type CutoverJobRunMetadata struct {
ShouldRun bool
Pagination PaginationParams
PartitionDate PartitionDate
LeaseProcessId pgtype.UUID
}
type DAGWithData ¶ added in v0.74.3
type DesiredWorkerLabelOpts ¶
type DesiredWorkerLabelOpts struct {
// (required) the label key
Key string `validate:"required"`
// (required if StringValue is nil) the label integer value
IntValue *int32 `validate:"omitnil,required_without=StrValue"`
// (required if StrValue is nil) the label string value
StrValue *string `validate:"omitnil,required_without=IntValue"`
// (optional) if the label is required
Required *bool `validate:"omitempty"`
// (optional) the weight of the label for scheduling (default: 100)
Weight *int32 `validate:"omitempty"`
// (optional) the label comparator for scheduling (default: EQUAL)
Comparator *string `validate:"omitempty,oneof=EQUAL NOT_EQUAL GREATER_THAN LESS_THAN GREATER_THAN_OR_EQUAL LESS_THAN_OR_EQUAL"`
}
type DispatcherRepository ¶ added in v0.74.3
type DispatcherRepository interface {
// CreateNewDispatcher creates a new dispatcher for a given tenant.
CreateNewDispatcher(ctx context.Context, opts *CreateDispatcherOpts) (*sqlcv1.Dispatcher, error)
// UpdateDispatcher updates a dispatcher for a given tenant.
UpdateDispatcher(ctx context.Context, dispatcherId string, opts *UpdateDispatcherOpts) (*sqlcv1.Dispatcher, error)
Delete(ctx context.Context, dispatcherId string) error
UpdateStaleDispatchers(ctx context.Context, onStale func(dispatcherId string, getValidDispatcherId func() string) error) error
}
type ErrNamesNotFound ¶ added in v0.74.3
type ErrNamesNotFound struct {
Names []string
}
func (*ErrNamesNotFound) Error ¶ added in v0.74.3
func (e *ErrNamesNotFound) Error() string
type EventExternalIdFilterId ¶ added in v0.74.3
type EventMatchResults ¶ added in v0.74.3
type EventMatchResults struct {
// The list of tasks which were created from the matches
CreatedTasks []*V1TaskWithPayload
// The list of tasks which were replayed from the matches
ReplayedTasks []*V1TaskWithPayload
}
type EventTriggerOpts ¶ added in v0.74.3
type EventTriggersFromExternalId ¶ added in v0.74.3
type EventTriggersFromExternalId struct {
RunID int64 `json:"run_id"`
RunInsertedAt pgtype.Timestamptz `json:"run_inserted_at"`
EventExternalId pgtype.UUID `json:"event_external_id"`
EventSeenAt pgtype.Timestamptz `json:"event_seen_at"`
FilterId pgtype.UUID `json:"filter_id"`
}
type EventType ¶ added in v0.74.3
type EventType string
const ( EVENT_TYPE_REQUEUED_NO_WORKER EventType = "REQUEUED_NO_WORKER" EVENT_TYPE_REQUEUED_RATE_LIMIT EventType = "REQUEUED_RATE_LIMIT" EVENT_TYPE_SCHEDULING_TIMED_OUT EventType = "SCHEDULING_TIMED_OUT" EVENT_TYPE_ASSIGNED EventType = "ASSIGNED" EVENT_TYPE_STARTED EventType = "STARTED" EVENT_TYPE_FINISHED EventType = "FINISHED" EVENT_TYPE_FAILED EventType = "FAILED" EVENT_TYPE_RETRYING EventType = "RETRYING" EVENT_TYPE_CANCELLED EventType = "CANCELLED" EVENT_TYPE_TIMED_OUT EventType = "TIMED_OUT" EVENT_TYPE_REASSIGNED EventType = "REASSIGNED" EVENT_TYPE_SLOT_RELEASED EventType = "SLOT_RELEASED" EVENT_TYPE_TIMEOUT_REFRESHED EventType = "TIMEOUT_REFRESHED" EVENT_TYPE_RETRIED_BY_USER EventType = "RETRIED_BY_USER" EVENT_TYPE_SENT_TO_WORKER EventType = "SENT_TO_WORKER" EVENT_TYPE_RATE_LIMIT_ERROR EventType = "RATE_LIMIT_ERROR" EVENT_TYPE_ACKNOWLEDGED EventType = "ACKNOWLEDGED" EVENT_TYPE_CREATED EventType = "CREATED" EVENT_TYPE_QUEUED EventType = "QUEUED" )
type EventWithPayload ¶ added in v0.74.3
type EventWithPayload struct {
*ListEventsRow
Payload []byte `json:"payload"`
}
type ExternalCreateSignalMatchOpts ¶ added in v0.74.3
type ExternalCreateSignalMatchOpts struct {
Conditions []CreateExternalSignalConditionOpt `validate:"required,min=1,dive"`
SignalTaskId int64 `validate:"required,gt=0"`
SignalTaskInsertedAt pgtype.Timestamptz
SignalExternalId string `validate:"required,uuid"`
SignalKey string `validate:"required"`
}
type ExternalPayloadLocationKey ¶ added in v0.74.3
type ExternalPayloadLocationKey string
type ExternalStore ¶ added in v0.74.3
type ExternalStore interface {
Store(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[PayloadExternalId]ExternalPayloadLocationKey, error)
Retrieve(ctx context.Context, keys ...ExternalPayloadLocationKey) (map[ExternalPayloadLocationKey][]byte, error)
}
type FailTaskOpts ¶ added in v0.74.3
type FailTaskOpts struct {
*TaskIdInsertedAtRetryCount
// (required) whether this is an application-level error or an internal error on the Hatchet side
IsAppError bool
// (optional) the error message for the task
ErrorMessage string
// (optional) A boolean flag to indicate whether the error is non-retryable, meaning it should _not_ be retried. Defaults to false.
IsNonRetryable bool
}
type FailTasksResponse ¶ added in v0.74.3
type FailTasksResponse struct {
*FinalizedTaskResponse
RetriedTasks []RetriedTask
}
type FilterRepository ¶ added in v0.74.3
type FilterRepository interface {
CreateFilter(ctx context.Context, tenantId string, params CreateFilterOpts) (*sqlcv1.V1Filter, error)
ListFilters(ctx context.Context, tenantId string, params ListFiltersOpts) ([]*sqlcv1.V1Filter, int64, error)
DeleteFilter(ctx context.Context, tenantId, filterId string) (*sqlcv1.V1Filter, error)
GetFilter(ctx context.Context, tenantId, filterId string) (*sqlcv1.V1Filter, error)
UpdateFilter(ctx context.Context, tenantId string, filterId string, opts UpdateFilterOpts) (*sqlcv1.V1Filter, error)
}
type FinalizedTaskResponse ¶ added in v0.74.3
type FinalizedTaskResponse struct {
ReleasedTasks []*sqlcv1.ReleaseTasksRow
InternalEvents []InternalTaskEvent
}
type GetQueueMetricsOpts ¶
type GetQueueMetricsResponse ¶
type GetQueueMetricsResponse struct {
Total QueueMetric `json:"total"`
ByWorkflowId map[string]QueueMetric `json:"by_workflow"`
}
type GetTenantAlertingSettingsResponse ¶
type GetTenantAlertingSettingsResponse struct {
Settings *sqlcv1.TenantAlertingSettings
SlackWebhooks []*sqlcv1.SlackAppWebhook
EmailGroups []*TenantAlertEmailGroupForSend
Tenant *sqlcv1.Tenant
}
type GroupMatchCondition ¶ added in v0.74.3
type GroupMatchCondition struct {
GroupId string `validate:"required,uuid"`
EventType sqlcv1.V1EventType
EventKey string
// (optional) a hint for querying the event data
EventResourceHint *string
// the data key which gets inserted into the returned data from a satisfied match condition
ReadableDataKey string
Expression string
Action sqlcv1.V1MatchConditionAction
// (optional) the data which was used to satisfy the condition (relevant for replays)
Data []byte
}
type HMACAuthCredentials ¶ added in v0.74.3
type HMACAuthCredentials struct {
Algorithm sqlcv1.V1IncomingWebhookHmacAlgorithm `json:"algorithm" validate:"required"`
Encoding sqlcv1.V1IncomingWebhookHmacEncoding `json:"encoding" validate:"required"`
SignatureHeaderName string `json:"signature_header_name" validate:"required"`
EncryptedWebhookSigningSecret []byte `json:"webhook_signing_secret" validate:"required"`
}
type HealthRepository ¶
type IdInsertedAt ¶ added in v0.74.3
type IdInsertedAt struct {
ID int64 `json:"id"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
}
type IdempotencyKey ¶ added in v0.74.3
type IdempotencyKey string
type IdempotencyRepository ¶ added in v0.74.3
type InternalTaskEvent ¶ added in v0.74.3
type InternalTaskEvent struct {
TenantID string `json:"tenant_id"`
TaskID int64 `json:"task_id"`
TaskExternalID string `json:"task_external_id"`
RetryCount int32 `json:"retry_count"`
EventType sqlcv1.V1TaskEventType `json:"event_type"`
EventKey string `json:"event_key"`
Data []byte `json:"data"`
}
InternalTaskEvent resembles sqlcv1.V1TaskEvent, but doesn't include the id field as we use COPY FROM to write the events to the database.
type IntervalSettingsRepository ¶ added in v0.74.3
type IntervalSettingsRepository interface {
ReadAllIntervals(ctx context.Context, operationId string) (map[string]time.Duration, error)
ReadInterval(ctx context.Context, operationId string, tenantId string) (time.Duration, error)
SetInterval(ctx context.Context, operationId string, tenantId string, d time.Duration) (time.Duration, error)
}
func NewNoOpIntervalSettingsRepository ¶ added in v0.74.3
func NewNoOpIntervalSettingsRepository() IntervalSettingsRepository
type JobRunHasCycleError ¶
type JobRunHasCycleError struct {
JobName string
}
func (*JobRunHasCycleError) Error ¶
func (e *JobRunHasCycleError) Error() string
type KeyClaimantPair ¶ added in v0.74.3
type KeyClaimantPair struct {
IdempotencyKey IdempotencyKey
ClaimedByExternalId pgtype.UUID
}
type LeaseRepository ¶
type LeaseRepository interface {
ListQueues(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1Queue, error)
ListActiveWorkers(ctx context.Context, tenantId pgtype.UUID) ([]*ListActiveWorkersResult, error)
ListConcurrencyStrategies(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1StepConcurrency, error)
AcquireOrExtendLeases(ctx context.Context, tenantId pgtype.UUID, kind sqlcv1.LeaseKind, resourceIds []string, existingLeases []*sqlcv1.Lease) ([]*sqlcv1.Lease, error)
ReleaseLeases(ctx context.Context, tenantId pgtype.UUID, leases []*sqlcv1.Lease) error
}
type ListActiveWorkersResult ¶
type ListActiveWorkersResult struct {
ID string
MaxRuns int
Name string
Labels []*sqlcv1.ListManyWorkerLabelsRow
}
type ListCronWorkflowsOpts ¶
type ListCronWorkflowsOpts struct {
// (optional) number of events to skip
Offset *int
// (optional) number of events to return
Limit *int
// (optional) the order by field
OrderBy *string `validate:"omitempty,oneof=createdAt name"`
// (optional) the order direction
OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
// (optional) the workflow id
WorkflowId *string `validate:"omitempty,uuid"`
// (optional) additional metadata for the workflow run
AdditionalMetadata map[string]interface{} `validate:"omitempty"`
// (optional) the name of the cron to filter by
CronName *string `validate:"omitempty"`
// (optional) the name of the workflow to filter by
WorkflowName *string `validate:"omitempty"`
}
type ListEventsRow ¶ added in v0.74.3
type ListEventsRow struct {
TenantID pgtype.UUID `json:"tenant_id"`
EventID int64 `json:"event_id"`
EventExternalID pgtype.UUID `json:"event_external_id"`
EventSeenAt pgtype.Timestamptz `json:"event_seen_at"`
EventKey string `json:"event_key"`
EventPayload []byte `json:"event_payload"`
EventAdditionalMetadata []byte `json:"event_additional_metadata"`
EventScope string `json:"event_scope"`
QueuedCount int64 `json:"queued_count"`
RunningCount int64 `json:"running_count"`
CompletedCount int64 `json:"completed_count"`
CancelledCount int64 `json:"cancelled_count"`
FailedCount int64 `json:"failed_count"`
TriggeredRuns []byte `json:"triggered_runs"`
TriggeringWebhookName *string `json:"triggering_webhook_name,omitempty"`
}
type ListFiltersOpts ¶ added in v0.74.3
type ListFinalizedWorkflowRunsResponse ¶ added in v0.74.3
type ListFinalizedWorkflowRunsResponse struct {
WorkflowRunId string
OutputEvents []*TaskOutputEvent
}
type ListLogsOpts ¶
type ListLogsOpts struct {
// (optional) number of logs to skip
Offset *int
// (optional) number of logs to return
Limit *int `validate:"omitnil,min=1,max=10000"`
// (optional) a list of log levels to filter by
Levels []string `validate:"omitnil,dive,oneof=INFO ERROR WARN DEBUG"`
// (optional) a search query
Search *string
// (optional) the start time to get logs for
Since *time.Time
// (optional) the end time to get logs for
Until *time.Time
}
type ListRateLimitOpts ¶
type ListRateLimitOpts struct {
// (optional) a search query for the key
Search *string
// (optional) number of events to skip
Offset *int
// (optional) number of events to return
Limit *int
// (optional) the order by field
OrderBy *string `validate:"omitempty,oneof=key value limitValue"`
// (optional) the order direction
OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
}
type ListRateLimitsResult ¶
type ListRateLimitsResult struct {
Rows []*sqlcv1.ListRateLimitsForTenantNoMutateRow
Count int
}
type ListScheduledWorkflowsOpts ¶
type ListScheduledWorkflowsOpts struct {
// (optional) number of events to skip
Offset *int
// (optional) number of events to return
Limit *int
// (optional) the order by field
OrderBy *string `validate:"omitempty,oneof=createdAt triggerAt"`
// (optional) the order direction
OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
// (optional) the workflow id
WorkflowId *string `validate:"omitempty,uuid"`
// (optional) the parent workflow run id
ParentWorkflowRunId *string `validate:"omitempty,uuid"`
// (optional) the parent step run id
ParentStepRunId *string `validate:"omitempty,uuid"`
// (optional) statuses to filter by
Statuses *[]sqlcv1.WorkflowRunStatus
// (optional) include scheduled runs that are in the future
IncludeFuture *bool
// (optional) additional metadata for the workflow run
AdditionalMetadata map[string]interface{} `validate:"omitempty"`
}
type ListTaskRunOpts ¶ added in v0.74.3
type ListTaskRunOpts struct {
CreatedAfter time.Time
Statuses []sqlcv1.V1ReadableStatusOlap
WorkflowIds []uuid.UUID
WorkerId *uuid.UUID
StartedAfter time.Time
FinishedBefore *time.Time
AdditionalMetadata map[string]interface{}
TriggeringEventExternalId *uuid.UUID
Limit int64
Offset int64
IncludePayloads bool
}
type ListTenantInvitesOpts ¶
type ListTickerOpts ¶
type ListWebhooksOpts ¶ added in v0.74.3
type ListWebhooksOpts struct {
WebhookNames []string `json:"webhook_names"`
WebhookSourceNames []sqlcv1.V1IncomingWebhookSourceName `json:"webhook_source_names"`
Limit *int64 `json:"limit" validate:"omitnil,min=1"`
Offset *int64 `json:"offset" validate:"omitnil,min=0"`
}
type ListWorkersOpts ¶
type ListWorkflowRunOpts ¶ added in v0.74.3
type ListWorkflowRunOpts struct {
CreatedAfter time.Time
Statuses []sqlcv1.V1ReadableStatusOlap
WorkflowIds []uuid.UUID
StartedAfter time.Time
FinishedBefore *time.Time
AdditionalMetadata map[string]interface{}
Limit int64
Offset int64
ParentTaskExternalId *pgtype.UUID
TriggeringEventExternalId *pgtype.UUID
IncludePayloads bool
}
type ListWorkflowsOpts ¶
type ListWorkflowsResult ¶
type LogLineRepository ¶ added in v0.74.3
type LogLineRepository interface {
ListLogLines(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, opts *ListLogsOpts) ([]*sqlcv1.V1LogLine, error)
PutLog(ctx context.Context, tenantId string, opts *CreateLogLineOpts) error
}
type MatchData ¶ added in v0.74.3
type MatchData struct {
// contains filtered or unexported fields
}
parses match aggregated data
func NewMatchData ¶ added in v0.74.3
func (*MatchData) Action ¶ added in v0.74.3
func (m *MatchData) Action() sqlcv1.V1MatchConditionAction
func (*MatchData) DataValueAsTaskOutputEvent ¶ added in v0.74.3
func (m *MatchData) DataValueAsTaskOutputEvent(key string) *TaskOutputEvent
Helper function for internal events
func (*MatchData) TriggerDataKeys ¶ added in v0.74.3
func (*MatchData) TriggerDataValue ¶ added in v0.74.3
type MatchRepository ¶ added in v0.74.3
type MatchRepository interface {
RegisterSignalMatchConditions(ctx context.Context, tenantId string, eventMatches []ExternalCreateSignalMatchOpts) error
ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
}
type MatchRepositoryImpl ¶ added in v0.74.3
type MatchRepositoryImpl struct {
// contains filtered or unexported fields
}
func (MatchRepositoryImpl) DesiredWorkerId ¶ added in v0.74.3
func (MatchRepositoryImpl) PopulateExternalIdsForWorkflow ¶ added in v0.74.3
func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*MatchRepositoryImpl) ProcessInternalEventMatches ¶ added in v0.74.3
func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
ProcessInternalEventMatches processes a list of internal events
func (*MatchRepositoryImpl) ProcessUserEventMatches ¶ added in v0.74.3
func (m *MatchRepositoryImpl) ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
ProcessUserEventMatches processes a list of user events
func (*MatchRepositoryImpl) RegisterSignalMatchConditions ¶ added in v0.74.3
func (m *MatchRepositoryImpl) RegisterSignalMatchConditions(ctx context.Context, tenantId string, signalMatches []ExternalCreateSignalMatchOpts) error
func (MatchRepositoryImpl) ToV1StepRunData ¶ added in v0.74.3
func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
type MessageQueueRepository ¶
type MessageQueueRepository interface {
// PubSub
Listen(ctx context.Context, name string, f func(ctx context.Context, notification *PubSubMessage) error) error
Notify(ctx context.Context, name string, payload string) error
// Queues
BindQueue(ctx context.Context, queue string, durable, autoDeleted, exclusive bool, exclusiveConsumer *string) error
UpdateQueueLastActive(ctx context.Context, queue string) error
CleanupQueues(ctx context.Context) error
// Messages
AddMessage(ctx context.Context, queue string, payload []byte) error
ReadMessages(ctx context.Context, queue string, qos int) ([]*sqlcv1.ReadMessagesRow, error)
AckMessage(ctx context.Context, id int64) error
CleanupMessageQueueItems(ctx context.Context) error
}
type NoOpExternalStore ¶ added in v0.74.3
type NoOpExternalStore struct{}
func (*NoOpExternalStore) Retrieve ¶ added in v0.74.3
func (n *NoOpExternalStore) Retrieve(ctx context.Context, keys ...ExternalPayloadLocationKey) (map[ExternalPayloadLocationKey][]byte, error)
func (*NoOpExternalStore) Store ¶ added in v0.74.3
func (n *NoOpExternalStore) Store(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[PayloadExternalId]ExternalPayloadLocationKey, error)
type NoOpIntervalSettingsRepository ¶ added in v0.74.3
type NoOpIntervalSettingsRepository struct{}
func (*NoOpIntervalSettingsRepository) ReadAllIntervals ¶ added in v0.74.3
func (*NoOpIntervalSettingsRepository) ReadInterval ¶ added in v0.74.3
type OLAPCutoverBatchOutcome ¶ added in v0.74.3
type OLAPCutoverBatchOutcome struct {
ShouldContinue bool
NextPagination OLAPPaginationParams
}
type OLAPCutoverJobRunMetadata ¶ added in v0.74.3
type OLAPCutoverJobRunMetadata struct {
ShouldRun bool
Pagination OLAPPaginationParams
PartitionDate PartitionDate
LeaseProcessId pgtype.UUID
}
type OLAPPaginationParams ¶ added in v0.74.3
type OLAPPayloadToOffload ¶ added in v0.74.3
type OLAPPayloadsToOffload ¶ added in v0.74.3
type OLAPPayloadsToOffload struct {
Payloads []OLAPPayloadToOffload
}
type OLAPRepository ¶ added in v0.74.3
type OLAPRepository interface {
UpdateTablePartitions(ctx context.Context) error
SetReadReplicaPool(pool *pgxpool.Pool)
ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz, retryCount *int) (*TaskWithPayloads, pgtype.UUID, error)
ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*TaskWithPayloads, int, error)
ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)
ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*TaskEventWithPayloads, error)
ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
CreateTasks(ctx context.Context, tenantId string, tasks []*V1TaskWithPayload) error
CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error
CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, endTimestamp *time.Time, bucketInterval time.Duration) ([]*sqlcv1.GetTaskPointMetricsRow, error)
UpdateTaskStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateTaskStatusRow, error)
UpdateDAGStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateDAGStatusRow, error)
ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
ListTasksByDAGId(ctx context.Context, tenantId string, dagIds []pgtype.UUID, includePayloads bool) ([]*TaskWithPayloads, map[int64]uuid.UUID, error)
ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata, includePayloads bool) ([]*TaskWithPayloads, error)
// ListTasksByExternalIds returns a list of tasks based on their external ids or the external id of their parent DAG.
// In the case of a DAG, we flatten the result into the list of tasks which belong to that DAG.
ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error)
BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error
ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*EventWithPayload, *int64, error)
GetEvent(ctx context.Context, externalId string) (*sqlcv1.V1EventsOlap, error)
GetEventWithPayload(ctx context.Context, externalId, tenantId string) (*EventWithPayload, error)
ListEventKeys(ctx context.Context, tenantId string) ([]string, error)
GetDAGDurations(ctx context.Context, tenantId string, externalIds []pgtype.UUID, minInsertedAt pgtype.Timestamptz) (map[string]*sqlcv1.GetDagDurationsRow, error)
GetTaskDurationsByTaskIds(ctx context.Context, tenantId string, taskIds []int64, taskInsertedAts []pgtype.Timestamptz, readableStatuses []sqlcv1.V1ReadableStatusOlap) (map[int64]*sqlcv1.GetTaskDurationsByTaskIdsRow, error)
CreateIncomingWebhookValidationFailureLogs(ctx context.Context, tenantId string, opts []CreateIncomingWebhookFailureLogOpts) error
StoreCELEvaluationFailures(ctx context.Context, tenantId string, failures []CELEvaluationFailure) error
PutPayloads(ctx context.Context, tx sqlcv1.DBTX, tenantId TenantID, putPayloadOpts ...StoreOLAPPayloadOpts) (map[PayloadExternalId]ExternalPayloadLocationKey, error)
ReadPayload(ctx context.Context, tenantId string, externalId pgtype.UUID) ([]byte, error)
ReadPayloads(ctx context.Context, tenantId string, externalIds ...pgtype.UUID) (map[pgtype.UUID][]byte, error)
AnalyzeOLAPTables(ctx context.Context) error
OffloadPayloads(ctx context.Context, tenantId string, payloads []OffloadPayloadOpts) error
PayloadStore() PayloadStoreRepository
StatusUpdateBatchSizeLimits() StatusUpdateBatchSizeLimits
ListWorkflowRunExternalIds(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]pgtype.UUID, error)
ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) error
CountOLAPTempTableSizeForDAGStatusUpdates(ctx context.Context) (int64, error)
CountOLAPTempTableSizeForTaskStatusUpdates(ctx context.Context) (int64, error)
ListYesterdayRunCountsByStatus(ctx context.Context) (map[sqlcv1.V1ReadableStatusOlap]int64, error)
}
func NewOLAPRepositoryFromPool ¶ added in v0.74.3
func NewOLAPRepositoryFromPool( pool *pgxpool.Pool, l *zerolog.Logger, olapRetentionPeriod time.Duration, tenantLimitConfig limits.LimitConfigFile, enforceLimits bool, shouldPartitionEventsTables bool, payloadStoreOpts PayloadStoreRepositoryOpts, statusUpdateBatchSizeLimits StatusUpdateBatchSizeLimits, cacheDuration time.Duration, ) (OLAPRepository, func() error)
type OLAPRepositoryImpl ¶ added in v0.74.3
type OLAPRepositoryImpl struct {
// contains filtered or unexported fields
}
func (*OLAPRepositoryImpl) AnalyzeOLAPTables ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) AnalyzeOLAPTables(ctx context.Context) error
func (*OLAPRepositoryImpl) BulkCreateEventsAndTriggers ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error
func (*OLAPRepositoryImpl) CountOLAPTempTableSizeForDAGStatusUpdates ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) CountOLAPTempTableSizeForDAGStatusUpdates(ctx context.Context) (int64, error)
func (*OLAPRepositoryImpl) CountOLAPTempTableSizeForTaskStatusUpdates ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) CountOLAPTempTableSizeForTaskStatusUpdates(ctx context.Context) (int64, error)
func (*OLAPRepositoryImpl) CreateDAGs ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
func (*OLAPRepositoryImpl) CreateIncomingWebhookValidationFailureLogs ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) CreateIncomingWebhookValidationFailureLogs(ctx context.Context, tenantId string, opts []CreateIncomingWebhookFailureLogOpts) error
func (*OLAPRepositoryImpl) CreateTaskEvents ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error
func (*OLAPRepositoryImpl) CreateTasks ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) CreateTasks(ctx context.Context, tenantId string, tasks []*V1TaskWithPayload) error
func (OLAPRepositoryImpl) DesiredWorkerId ¶ added in v0.74.3
func (*OLAPRepositoryImpl) GetDAGDurations ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) GetDAGDurations(ctx context.Context, tenantId string, externalIds []pgtype.UUID, minInsertedAt pgtype.Timestamptz) (map[string]*sqlcv1.GetDagDurationsRow, error)
func (*OLAPRepositoryImpl) GetEvent ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) GetEvent(ctx context.Context, externalId string) (*sqlcv1.V1EventsOlap, error)
func (*OLAPRepositoryImpl) GetEventWithPayload ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) GetEventWithPayload(ctx context.Context, externalId, tenantId string) (*EventWithPayload, error)
func (*OLAPRepositoryImpl) GetTaskDurationsByTaskIds ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) GetTaskDurationsByTaskIds(ctx context.Context, tenantId string, taskIds []int64, taskInsertedAts []pgtype.Timestamptz, readableStatuses []sqlcv1.V1ReadableStatusOlap) (map[int64]*sqlcv1.GetTaskDurationsByTaskIdsRow, error)
func (*OLAPRepositoryImpl) GetTaskPointMetrics ¶ added in v0.74.3
func (*OLAPRepositoryImpl) GetTaskTimings ¶ added in v0.74.3
func (*OLAPRepositoryImpl) ListEventKeys ¶ added in v0.74.3
func (*OLAPRepositoryImpl) ListEvents ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*EventWithPayload, *int64, error)
func (*OLAPRepositoryImpl) ListTaskRunEvents ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)
func (*OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*TaskEventWithPayloads, error)
func (*OLAPRepositoryImpl) ListTasks ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*TaskWithPayloads, int, error)
func (*OLAPRepositoryImpl) ListTasksByDAGId ¶ added in v0.74.3
func (*OLAPRepositoryImpl) ListTasksByExternalIds ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)
func (*OLAPRepositoryImpl) ListTasksByIdAndInsertedAt ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata, includePayloads bool) ([]*TaskWithPayloads, error)
func (*OLAPRepositoryImpl) ListWorkflowRunDisplayNames ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
func (*OLAPRepositoryImpl) ListWorkflowRunExternalIds ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ListWorkflowRunExternalIds(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]pgtype.UUID, error)
func (*OLAPRepositoryImpl) ListWorkflowRuns ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
func (*OLAPRepositoryImpl) ListYesterdayRunCountsByStatus ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ListYesterdayRunCountsByStatus(ctx context.Context) (map[sqlcv1.V1ReadableStatusOlap]int64, error)
func (*OLAPRepositoryImpl) OffloadPayloads ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) OffloadPayloads(ctx context.Context, tenantId string, payloads []OffloadPayloadOpts) error
func (*OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize ¶ added in v0.74.3
func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context, partitionDate PartitionDate, candidateBatchNumRows int32, pagination OLAPPaginationParams) (*int32, error)
func (*OLAPRepositoryImpl) PayloadStore ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) PayloadStore() PayloadStoreRepository
func (*OLAPRepositoryImpl) PopulateEventData ¶ added in v0.74.3
func (OLAPRepositoryImpl) PopulateExternalIdsForWorkflow ¶ added in v0.74.3
func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*OLAPRepositoryImpl) ProcessOLAPPayloadCutovers ¶ added in v0.74.3
func (*OLAPRepositoryImpl) PutPayloads ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) PutPayloads(ctx context.Context, tx sqlcv1.DBTX, tenantId TenantID, putPayloadOpts ...StoreOLAPPayloadOpts) (map[PayloadExternalId]ExternalPayloadLocationKey, error)
func (*OLAPRepositoryImpl) ReadDAG ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
func (*OLAPRepositoryImpl) ReadPayload ¶ added in v0.74.3
func (*OLAPRepositoryImpl) ReadPayloads ¶ added in v0.74.3
func (*OLAPRepositoryImpl) ReadTaskRun ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
func (*OLAPRepositoryImpl) ReadTaskRunData ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz, retryCount *int) (*TaskWithPayloads, pgtype.UUID, error)
func (*OLAPRepositoryImpl) ReadTaskRunMetrics ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
func (*OLAPRepositoryImpl) ReadWorkflowRun ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
func (*OLAPRepositoryImpl) SetReadReplicaPool ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) SetReadReplicaPool(pool *pgxpool.Pool)
func (*OLAPRepositoryImpl) StatusUpdateBatchSizeLimits ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) StatusUpdateBatchSizeLimits() StatusUpdateBatchSizeLimits
func (*OLAPRepositoryImpl) StoreCELEvaluationFailures ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) StoreCELEvaluationFailures(ctx context.Context, tenantId string, failures []CELEvaluationFailure) error
func (OLAPRepositoryImpl) ToV1StepRunData ¶ added in v0.74.3
func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*OLAPRepositoryImpl) UpdateDAGStatuses ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateDAGStatusRow, error)
func (*OLAPRepositoryImpl) UpdateTablePartitions ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
func (*OLAPRepositoryImpl) UpdateTaskStatuses ¶ added in v0.74.3
func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateTaskStatusRow, error)
type OffloadPayloadOpts ¶ added in v0.74.3
type OffloadToExternalStoreOpts ¶ added in v0.74.3
type OffloadToExternalStoreOpts struct {
TenantId TenantID
ExternalID PayloadExternalId
InsertedAt pgtype.Timestamptz
Payload []byte
}
type PGHealthError ¶ added in v0.74.3
type PGHealthError string
const ( PGHealthAlert PGHealthError = "alert" PGHealthWarn PGHealthError = "warn" PGHealthOK PGHealthError = "ok" )
type PGHealthRepository ¶ added in v0.74.3
type PGHealthRepository interface {
PGStatStatementsEnabled(ctx context.Context) (bool, error)
TrackCountsEnabled(ctx context.Context) (bool, error)
CheckBloat(ctx context.Context) (PGHealthError, int, error)
GetBloatDetails(ctx context.Context) ([]*sqlcv1.CheckBloatRow, error)
CheckLongRunningQueries(ctx context.Context) (PGHealthError, int, error)
CheckQueryCache(ctx context.Context) (PGHealthError, int, error)
CheckQueryCaches(ctx context.Context) ([]*sqlcv1.CheckQueryCachesRow, error)
CheckLongRunningVacuum(ctx context.Context) (PGHealthError, int, error)
CheckLastAutovacuumForPartitionedTables(ctx context.Context) ([]*sqlcv1.CheckLastAutovacuumForPartitionedTablesRow, error)
CheckLastAutovacuumForPartitionedTablesCoreDB(ctx context.Context) ([]*sqlcv1.CheckLastAutovacuumForPartitionedTablesCoreDBRow, error)
}
type PaginationParams ¶ added in v0.74.3
type PaginationParams struct {
LastTenantID pgtype.UUID
LastInsertedAt pgtype.Timestamptz
LastID int64
LastType sqlcv1.V1PayloadType
}
type PartitionDate ¶ added in v0.74.3
func (PartitionDate) String ¶ added in v0.74.3
func (d PartitionDate) String() string
type PayloadExternalId ¶ added in v0.74.3
type PayloadExternalId string
type PayloadLocation ¶ added in v0.74.3
type PayloadLocation string
type PayloadMetadata ¶ added in v0.74.3
type PayloadMetadata struct {
TenantID pgtype.UUID
ID int64
InsertedAt pgtype.Timestamptz
Type sqlcv1.V1PayloadType
}
type PayloadStoreRepository ¶ added in v0.74.3
type PayloadStoreRepository interface {
Store(ctx context.Context, tx sqlcv1.DBTX, payloads ...StorePayloadOpts) error
Retrieve(ctx context.Context, tx sqlcv1.DBTX, opts ...RetrievePayloadOpts) (map[RetrievePayloadOpts][]byte, error)
RetrieveFromExternal(ctx context.Context, keys ...ExternalPayloadLocationKey) (map[ExternalPayloadLocationKey][]byte, error)
OverwriteExternalStore(store ExternalStore)
DualWritesEnabled() bool
TaskEventDualWritesEnabled() bool
DagDataDualWritesEnabled() bool
OLAPDualWritesEnabled() bool
ExternalCutoverProcessInterval() time.Duration
InlineStoreTTL() *time.Duration
ExternalCutoverBatchSize() int32
ExternalCutoverNumConcurrentOffloads() int32
ExternalStoreEnabled() bool
ExternalStore() ExternalStore
ImmediateOffloadsEnabled() bool
ProcessPayloadCutovers(ctx context.Context) error
}
func NewPayloadStoreRepository ¶ added in v0.74.3
func NewPayloadStoreRepository( pool *pgxpool.Pool, l *zerolog.Logger, queries *sqlcv1.Queries, opts PayloadStoreRepositoryOpts, ) PayloadStoreRepository
type PayloadStoreRepositoryOpts ¶ added in v0.74.3
type PayloadStoreRepositoryOpts struct {
EnablePayloadDualWrites bool
EnableTaskEventPayloadDualWrites bool
EnableDagDataPayloadDualWrites bool
EnableOLAPPayloadDualWrites bool
ExternalCutoverProcessInterval time.Duration
ExternalCutoverBatchSize int32
ExternalCutoverNumConcurrentOffloads int32
InlineStoreTTL *time.Duration
EnableImmediateOffloads bool
}
type PayloadUniqueKey ¶ added in v0.74.3
type PayloadUniqueKey struct {
ID int64
InsertedAt pgtype.Timestamptz
TenantId pgtype.UUID
Type sqlcv1.V1PayloadType
}
type PlanLimitMap ¶
type PubSubMessage ¶
type QueueFactoryRepository ¶
type QueueFactoryRepository interface {
NewQueue(tenantId pgtype.UUID, queueName string) QueueRepository
}
type QueueMetric ¶
type QueueMetric struct {
// the total number of PENDING_ASSIGNMENT step runs in the queue
PendingAssignment int `json:"pending_assignment"`
// the total number of PENDING step runs in the queue
Pending int `json:"pending"`
// the total number of RUNNING step runs in the queue
Running int `json:"running"`
}
type QueueRepository ¶
type QueueRepository interface {
ListQueueItems(ctx context.Context, limit int) ([]*sqlcv1.V1QueueItem, error)
MarkQueueItemsProcessed(ctx context.Context, r *AssignResults) (succeeded []*AssignedItem, failed []*AssignedItem, err error)
GetTaskRateLimits(ctx context.Context, queueItems []*sqlcv1.V1QueueItem) (map[int64]map[string]int32, error)
RequeueRateLimitedItems(ctx context.Context, tenantId pgtype.UUID, queueName string) ([]*sqlcv1.RequeueRateLimitedQueueItemsRow, error)
GetDesiredLabels(ctx context.Context, stepIds []pgtype.UUID) (map[string][]*sqlcv1.GetDesiredLabelsRow, error)
Cleanup()
}
type RateLimitRepository ¶
type RateLimitRepository interface {
UpdateRateLimits(ctx context.Context, tenantId pgtype.UUID, updates map[string]int) ([]*sqlcv1.ListRateLimitsForTenantWithMutateRow, *time.Time, error)
UpsertRateLimit(ctx context.Context, tenantId string, key string, opts *UpsertRateLimitOpts) (*sqlcv1.RateLimit, error)
ListRateLimits(ctx context.Context, tenantId string, opts *ListRateLimitOpts) (*ListRateLimitsResult, error)
}
type RateLimitResult ¶
type ReadTaskRunMetricsOpts ¶ added in v0.74.3
type ReadableTaskStatus ¶ added in v0.74.3
type ReadableTaskStatus string
const ( READABLE_TASK_STATUS_QUEUED ReadableTaskStatus = "QUEUED" READABLE_TASK_STATUS_RUNNING ReadableTaskStatus = "RUNNING" READABLE_TASK_STATUS_COMPLETED ReadableTaskStatus = "COMPLETED" READABLE_TASK_STATUS_CANCELLED ReadableTaskStatus = "CANCELLED" READABLE_TASK_STATUS_FAILED ReadableTaskStatus = "FAILED" )
func StringToReadableStatus ¶ added in v0.74.3
func StringToReadableStatus(status string) ReadableTaskStatus
func (ReadableTaskStatus) EnumValue ¶ added in v0.74.3
func (s ReadableTaskStatus) EnumValue() int
type RefreshTimeoutBy ¶
type ReplayTaskOpts ¶ added in v0.74.3
type ReplayTaskOpts struct {
// (required) the task id
TaskId int64
// (required) the inserted at time
InsertedAt pgtype.Timestamptz
// (required) the external id
ExternalId string
// (required) the step id
StepId string
// (optional) the input bytes to the task, uses the existing input if not set
Input *TaskInput
// (required) the initial state for the task
InitialState sqlcv1.V1TaskInitialState
// (optional) the additional metadata for the task
AdditionalMetadata []byte
}
type ReplayTasksResult ¶ added in v0.74.3
type ReplayTasksResult struct {
ReplayedTasks []TaskIdInsertedAtRetryCount
UpsertedTasks []*V1TaskWithPayload
InternalEventResults *EventMatchResults
}
type Repository ¶ added in v0.74.3
type Repository interface {
APIToken() APITokenRepository
Dispatcher() DispatcherRepository
Health() HealthRepository
MessageQueue() MessageQueueRepository
RateLimit() RateLimitRepository
Triggers() TriggerRepository
Tasks() TaskRepository
Scheduler() SchedulerRepository
Matches() MatchRepository
OLAP() OLAPRepository
OverwriteOLAPRepository(o OLAPRepository)
Logs() LogLineRepository
OverwriteLogsRepository(l LogLineRepository)
Payloads() PayloadStoreRepository
OverwriteExternalPayloadStore(o ExternalStore)
Workers() WorkerRepository
Workflows() WorkflowRepository
Ticker() TickerRepository
Filters() FilterRepository
Webhooks() WebhookRepository
Idempotency() IdempotencyRepository
IntervalSettings() IntervalSettingsRepository
PGHealth() PGHealthRepository
SecurityCheck() SecurityCheckRepository
Slack() SlackRepository
SNS() SNSRepository
TenantInvite() TenantInviteRepository
TenantLimit() TenantLimitRepository
TenantAlertingSettings() TenantAlertingRepository
Tenant() TenantRepository
User() UserRepository
UserSession() UserSessionRepository
WorkflowSchedules() WorkflowScheduleRepository
}
func NewRepository ¶ added in v0.74.3
func NewRepository( pool *pgxpool.Pool, l *zerolog.Logger, cacheDuration time.Duration, taskRetentionPeriod, olapRetentionPeriod time.Duration, maxInternalRetryCount int32, taskLimits TaskOperationLimits, payloadStoreOpts PayloadStoreRepositoryOpts, statusUpdateBatchSizeLimits StatusUpdateBatchSizeLimits, tenantLimitConfig limits.LimitConfigFile, enforceLimits bool, ) (Repository, func() error)
type RetriedTask ¶ added in v0.74.3
type RetrievePayloadOpts ¶ added in v0.74.3
type RetrievePayloadOpts struct {
Id int64
InsertedAt pgtype.Timestamptz
Type sqlcv1.V1PayloadType
TenantId pgtype.UUID
}
type RunConcurrencyResult ¶ added in v0.74.3
type RunConcurrencyResult struct {
// The tasks which were enqueued
Queued []TaskWithQueue
// If the strategy involves cancelling a task, these are the tasks to cancel
Cancelled []TaskWithCancelledReason
// If the step has multiple concurrency strategies, these are the next ones to notify
NextConcurrencyStrategies []int64
}
type RuntimeInfo ¶
type SNSRepository ¶
type SNSRepository interface {
GetSNSIntegration(ctx context.Context, tenantId, topicArn string) (*sqlcv1.SNSIntegration, error)
GetSNSIntegrationById(ctx context.Context, id string) (*sqlcv1.SNSIntegration, error)
CreateSNSIntegration(ctx context.Context, tenantId string, opts *CreateSNSIntegrationOpts) (*sqlcv1.SNSIntegration, error)
ListSNSIntegrations(ctx context.Context, tenantId string) ([]*sqlcv1.SNSIntegration, error)
DeleteSNSIntegration(ctx context.Context, tenantId, id string) error
}
type ScheduledWorkflowMeta ¶ added in v0.73.104
type ScheduledWorkflowMeta struct {
Id string
Method sqlcv1.WorkflowTriggerScheduledRefMethods
HasTriggeredRun bool
}
type ScheduledWorkflowUpdate ¶ added in v0.73.104
type SchedulerRepository ¶
type SchedulerRepository interface {
Concurrency() ConcurrencyRepository
Lease() LeaseRepository
QueueFactory() QueueFactoryRepository
RateLimit() RateLimitRepository
Assignment() AssignmentRepository
}
type SecurityCheckRepository ¶
type SlackRepository ¶
type SlackRepository interface {
UpsertSlackWebhook(ctx context.Context, tenantId string, opts *UpsertSlackWebhookOpts) (*sqlcv1.SlackAppWebhook, error)
ListSlackWebhooks(ctx context.Context, tenantId string) ([]*sqlcv1.SlackAppWebhook, error)
GetSlackWebhookById(ctx context.Context, id string) (*sqlcv1.SlackAppWebhook, error)
DeleteSlackWebhook(ctx context.Context, tenantId string, id string) error
}
type StatusUpdateBatchSizeLimits ¶ added in v0.74.3
type StoreOLAPPayloadOpts ¶ added in v0.74.3
type StoreOLAPPayloadOpts struct {
ExternalId pgtype.UUID
InsertedAt pgtype.Timestamptz
Payload []byte
}
type StorePayloadOpts ¶ added in v0.74.3
type StorePayloadOpts struct {
Id int64
InsertedAt pgtype.Timestamptz
ExternalId pgtype.UUID
Type sqlcv1.V1PayloadType
Payload []byte
TenantId string
}
type TaskEventWithPayloads ¶ added in v0.74.3
type TaskEventWithPayloads struct {
*sqlcv1.ListTaskEventsForWorkflowRunRow
OutputPayload []byte
}
type TaskIdEventKeyTuple ¶ added in v0.74.3
type TaskIdInsertedAtRetryCount ¶ added in v0.74.3
type TaskIdInsertedAtRetryCount struct {
// (required) the external id
Id int64 `validate:"required"`
// (required) the inserted at time
InsertedAt pgtype.Timestamptz
// (required) the retry count
RetryCount int32
}
type TaskIdInsertedAtSignalKey ¶ added in v0.74.3
type TaskIdInsertedAtSignalKey struct {
// (required) the external id
Id int64 `validate:"required"`
// (required) the inserted at time
InsertedAt pgtype.Timestamptz
// (required) the signal key for the event
SignalKey string
}
type TaskInput ¶ added in v0.74.3
type TaskMetadata ¶ added in v0.74.3
type TaskMetadata struct {
TaskID int64 `json:"task_id"`
TaskInsertedAt time.Time `json:"task_inserted_at"`
}
func ParseTaskMetadata ¶ added in v0.74.3
func ParseTaskMetadata(jsonData []byte) ([]TaskMetadata, error)
type TaskOperationLimits ¶ added in v0.74.3
type TaskOutputEvent ¶ added in v0.74.3
type TaskOutputEvent struct {
IsFailure bool `json:"is_failure"`
EventType sqlcv1.V1TaskEventType `json:"event_type"`
TaskExternalId string `json:"task_external_id"`
TaskId int64 `json:"task_id"`
RetryCount int32 `json:"retry_count"`
WorkerId *string `json:"worker_id"`
Output []byte `json:"output"`
ErrorMessage string `json:"error_message"`
StepReadableID string `json:"step_readable_id"`
}
func NewCancelledTaskOutputEvent ¶ added in v0.74.3
func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent
func NewCancelledTaskOutputEventFromTask ¶ added in v0.74.3
func NewCancelledTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
func NewCompletedTaskOutputEvent ¶ added in v0.74.3
func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent
func NewFailedTaskOutputEvent ¶ added in v0.74.3
func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent
func NewFailedTaskOutputEventFromTask ¶ added in v0.74.3
func NewFailedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
func NewSkippedTaskOutputEventFromTask ¶ added in v0.74.3
func NewSkippedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent
func (*TaskOutputEvent) Bytes ¶ added in v0.74.3
func (e *TaskOutputEvent) Bytes() []byte
func (*TaskOutputEvent) IsCancelled ¶ added in v0.74.3
func (e *TaskOutputEvent) IsCancelled() bool
func (*TaskOutputEvent) IsCompleted ¶ added in v0.74.3
func (e *TaskOutputEvent) IsCompleted() bool
func (*TaskOutputEvent) IsFailed ¶ added in v0.74.3
func (e *TaskOutputEvent) IsFailed() bool
type TaskRepository ¶ added in v0.74.3
type TaskRepository interface {
EnsureTablePartitionsExist(ctx context.Context) (bool, error)
UpdateTablePartitions(ctx context.Context) error
// GetTaskByExternalId is a heavily cached method to return task metadata by its external id
GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
// FlattenExternalIds is a non-cached method to look up all tasks in a workflow run by their external ids.
// This is non-cacheable because tasks can be added to a workflow run as it executes.
FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
FailTasks(ctx context.Context, tenantId string, tasks []FailTaskOpts) (*FailTasksResponse, error)
CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)
ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
// ListTaskParentOutputs is a method to return the output of a task's parent and grandparent tasks. This is for v0 compatibility
// with the v1 engine, and shouldn't be called from new v1 endpoints.
ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
DefaultTaskActivityGauge(ctx context.Context, tenantId string) (int, error)
ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)
ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)
ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)
GetQueueCounts(ctx context.Context, tenantId string) (map[string]interface{}, error)
ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
ReleaseSlot(ctx context.Context, tenantId string, externalId string) (*sqlcv1.V1TaskRuntime, error)
ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*V1TaskEventWithPayload, error)
// AnalyzeTaskTables runs ANALYZE on the task tables
AnalyzeTaskTables(ctx context.Context) error
// Cleanup makes sure to get rid of invalid old entries
// Returns (shouldContinue, error) where shouldContinue indicates if there's more work
Cleanup(ctx context.Context) (bool, error)
GetTaskStats(ctx context.Context, tenantId string) (map[string]TaskStat, error)
FindOldestRunningTaskInsertedAt(ctx context.Context) (*time.Time, error)
FindOldestTaskInsertedAt(ctx context.Context) (*time.Time, error)
}
type TaskRepositoryImpl ¶ added in v0.74.3
type TaskRepositoryImpl struct {
// contains filtered or unexported fields
}
func (*TaskRepositoryImpl) AnalyzeTaskTables ¶ added in v0.74.3
func (r *TaskRepositoryImpl) AnalyzeTaskTables(ctx context.Context) error
func (*TaskRepositoryImpl) CancelTasks ¶ added in v0.74.3
func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)
func (*TaskRepositoryImpl) Cleanup ¶ added in v0.74.3
func (r *TaskRepositoryImpl) Cleanup(ctx context.Context) (bool, error)
func (*TaskRepositoryImpl) CompleteTasks ¶ added in v0.74.3
func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)
func (*TaskRepositoryImpl) DefaultTaskActivityGauge ¶ added in v0.74.3
func (r *TaskRepositoryImpl) DefaultTaskActivityGauge(ctx context.Context, tenantId string) (int, error)
DefaultTaskActivityGauge is a heavily cached method that returns the number of queues that have had activity since the task retention period.
func (TaskRepositoryImpl) DesiredWorkerId ¶ added in v0.74.3
func (*TaskRepositoryImpl) EnsureTablePartitionsExist ¶ added in v0.74.3
func (r *TaskRepositoryImpl) EnsureTablePartitionsExist(ctx context.Context) (bool, error)
func (*TaskRepositoryImpl) FailTasks ¶ added in v0.74.3
func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)
func (*TaskRepositoryImpl) FindOldestRunningTaskInsertedAt ¶ added in v0.74.3
func (*TaskRepositoryImpl) FindOldestTaskInsertedAt ¶ added in v0.74.3
func (*TaskRepositoryImpl) FlattenExternalIds ¶ added in v0.74.3
func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)
func (*TaskRepositoryImpl) GetQueueCounts ¶ added in v0.74.3
func (*TaskRepositoryImpl) GetTaskByExternalId ¶ added in v0.74.3
func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)
func (*TaskRepositoryImpl) GetTaskStats ¶ added in v0.74.3
func (*TaskRepositoryImpl) ListFinalizedWorkflowRuns ¶ added in v0.74.3
func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)
func (*TaskRepositoryImpl) ListSignalCompletedEvents ¶ added in v0.74.3
func (r *TaskRepositoryImpl) ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*V1TaskEventWithPayload, error)
func (*TaskRepositoryImpl) ListTaskMetas ¶ added in v0.74.3
func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)
func (*TaskRepositoryImpl) ListTaskParentOutputs ¶ added in v0.74.3
func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)
func (TaskRepositoryImpl) PopulateExternalIdsForWorkflow ¶ added in v0.74.3
func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*TaskRepositoryImpl) ProcessDurableSleeps ¶ added in v0.74.3
func (r *TaskRepositoryImpl) ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)
func (*TaskRepositoryImpl) ProcessTaskReassignments ¶ added in v0.74.3
func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)
func (*TaskRepositoryImpl) ProcessTaskRetryQueueItems ¶ added in v0.74.3
func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)
func (*TaskRepositoryImpl) ProcessTaskTimeouts ¶ added in v0.74.3
func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)
func (*TaskRepositoryImpl) RefreshTimeoutBy ¶ added in v0.74.3
func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)
func (*TaskRepositoryImpl) ReleaseSlot ¶ added in v0.74.3
func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)
func (*TaskRepositoryImpl) ReplayTasks ¶ added in v0.74.3
func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)
func (TaskRepositoryImpl) ToV1StepRunData ¶ added in v0.74.3
func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*TaskRepositoryImpl) UpdateTablePartitions ¶ added in v0.74.3
func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error
type TaskRunMetric ¶ added in v0.74.3
type TaskStat ¶ added in v0.74.3
type TaskStat struct {
Queued *TaskStatusStat `json:"queued,omitempty"`
Running *TaskStatusStat `json:"running,omitempty"`
}
TaskStat represents the statistics for a single task step
type TaskStatusStat ¶ added in v0.74.3
type TaskStatusStat struct {
Total int64 `json:"total"`
Oldest *time.Time `json:"oldest,omitempty"`
Queues map[string]int64 `json:"queues,omitempty"`
Concurrency []ConcurrencyStat `json:"concurrency,omitempty"`
}
TaskStatusStat represents statistics for a specific task status (queued or running)
type TaskWithCancelledReason ¶ added in v0.74.3
type TaskWithCancelledReason struct {
*TaskIdInsertedAtRetryCount
CancelledReason string
TaskExternalId string
WorkflowRunId string
}
type TaskWithPayloads ¶ added in v0.74.3
type TaskWithPayloads struct {
*sqlcv1.PopulateTaskRunDataRow
InputPayload []byte
OutputPayload []byte
NumSpawnedChildren int64
}
type TaskWithQueue ¶ added in v0.74.3
type TaskWithQueue struct {
*TaskIdInsertedAtRetryCount
Queue string
}
type TenantAlertingRepository ¶
type TenantAlertingRepository interface {
UpsertTenantAlertingSettings(ctx context.Context, tenantId string, opts *UpsertTenantAlertingSettingsOpts) (*sqlcv1.TenantAlertingSettings, error)
GetTenantAlertingSettings(ctx context.Context, tenantId string) (*GetTenantAlertingSettingsResponse, error)
GetTenantResourceLimitState(ctx context.Context, tenantId string, resource string) (*sqlcv1.GetTenantResourceLimitRow, error)
UpdateTenantAlertingSettings(ctx context.Context, tenantId string, opts *UpdateTenantAlertingSettingsOpts) error
CreateTenantAlertGroup(ctx context.Context, tenantId string, opts *CreateTenantAlertGroupOpts) (*sqlcv1.TenantAlertEmailGroup, error)
UpdateTenantAlertGroup(ctx context.Context, id string, opts *UpdateTenantAlertGroupOpts) (*sqlcv1.TenantAlertEmailGroup, error)
ListTenantAlertGroups(ctx context.Context, tenantId string) ([]*sqlcv1.TenantAlertEmailGroup, error)
GetTenantAlertGroupById(ctx context.Context, id string) (*sqlcv1.TenantAlertEmailGroup, error)
DeleteTenantAlertGroup(ctx context.Context, tenantId string, id string) error
}
type TenantCallbackOpts ¶
type TenantCallbackOpts[T any] struct { // contains filtered or unexported fields }
type TenantIdSDKTuple ¶ added in v0.74.3
type TenantInviteRepository ¶
type TenantInviteRepository interface {
// CreateTenantInvite creates a new tenant invite with the given options
CreateTenantInvite(ctx context.Context, tenantId string, opts *CreateTenantInviteOpts) (*sqlcv1.TenantInviteLink, error)
// GetTenantInvite returns the tenant invite with the given id
GetTenantInvite(ctx context.Context, id string) (*sqlcv1.TenantInviteLink, error)
// ListTenantInvitesByEmail returns the list of tenant invites for the given invitee email for invites
// which are not expired
ListTenantInvitesByEmail(ctx context.Context, email string) ([]*sqlcv1.ListTenantInvitesByEmailRow, error)
// ListTenantInvitesByTenantId returns the list of tenant invites for the given tenant id
ListTenantInvitesByTenantId(ctx context.Context, tenantId string, opts *ListTenantInvitesOpts) ([]*sqlcv1.TenantInviteLink, error)
// UpdateTenantInvite updates the tenant invite with the given id
UpdateTenantInvite(ctx context.Context, id string, opts *UpdateTenantInviteOpts) (*sqlcv1.TenantInviteLink, error)
// DeleteTenantInvite deletes the tenant invite with the given id
DeleteTenantInvite(ctx context.Context, id string) error
}
type TenantLimitConfig ¶
type TenantLimitConfig struct {
EnforceLimits bool
}
type TenantLimitRepository ¶
type TenantLimitRepository interface {
GetLimits(ctx context.Context, tenantId string) ([]*sqlcv1.TenantResourceLimit, error)
// CanCreate checks if the tenant can create a resource
CanCreate(ctx context.Context, resource sqlcv1.LimitResource, tenantId string, numberOfResources int32) (bool, int, error)
// Create new Tenant Resource Limits for a tenant
SelectOrInsertTenantLimits(ctx context.Context, tenantId string, plan *string) error
// UpsertTenantLimits updates or inserts new tenant limits
UpsertTenantLimits(ctx context.Context, tenantId string, plan *string) error
// Resolve all tenant resource limits
ResolveAllTenantResourceLimits(ctx context.Context) error
// SetPlanLimitMap sets the plan limit map
SetPlanLimitMap(planLimitMap PlanLimitMap) error
DefaultLimits() []Limit
Stop()
Meter(ctx context.Context, resource sqlcv1.LimitResource, tenantId string, numberOfResources int32) (precommit func() error, postcommit func())
}
type TenantRepository ¶ added in v0.74.3
type TenantRepository interface {
// CreateTenant creates a new tenant.
CreateTenant(ctx context.Context, opts *CreateTenantOpts) (*sqlcv1.Tenant, error)
// UpdateTenant updates an existing tenant in the db.
UpdateTenant(ctx context.Context, tenantId string, opts *UpdateTenantOpts) (*sqlcv1.Tenant, error)
// GetTenantByID returns the tenant with the given id
GetTenantByID(ctx context.Context, tenantId string) (*sqlcv1.Tenant, error)
// GetTenantBySlug returns the tenant with the given slug
GetTenantBySlug(ctx context.Context, slug string) (*sqlcv1.Tenant, error)
// CreateTenantMember creates a new member in the tenant
CreateTenantMember(ctx context.Context, tenantId string, opts *CreateTenantMemberOpts) (*sqlcv1.PopulateTenantMembersRow, error)
// GetTenantMemberByID returns the tenant member with the given id
GetTenantMemberByID(ctx context.Context, memberId string) (*sqlcv1.PopulateTenantMembersRow, error)
// GetTenantMemberByUserID returns the tenant member with the given user id
GetTenantMemberByUserID(ctx context.Context, tenantId string, userId string) (*sqlcv1.PopulateTenantMembersRow, error)
// GetTenantMemberByEmail returns the tenant member with the given email
GetTenantMemberByEmail(ctx context.Context, tenantId string, email string) (*sqlcv1.PopulateTenantMembersRow, error)
// ListTenantMembers returns the list of tenant members for the given tenant
ListTenantMembers(ctx context.Context, tenantId string) ([]*sqlcv1.PopulateTenantMembersRow, error)
// UpdateTenantMember updates the tenant member with the given id
UpdateTenantMember(ctx context.Context, memberId string, opts *UpdateTenantMemberOpts) (*sqlcv1.PopulateTenantMembersRow, error)
// DeleteTenantMember deletes the tenant member with the given id
DeleteTenantMember(ctx context.Context, memberId string) error
// GetQueueMetrics returns the queue metrics for the given tenant
GetQueueMetrics(ctx context.Context, tenantId string, opts *GetQueueMetricsOpts) (*GetQueueMetricsResponse, error)
// ListTenants lists all tenants in the instance
ListTenants(ctx context.Context) ([]*sqlcv1.Tenant, error)
// Gets the tenant corresponding to the "internal" tenant if it's assigned to this controller.
// Returns nil if the tenant is not assigned to this controller.
GetInternalTenantForController(ctx context.Context, controllerPartitionId string) (*sqlcv1.Tenant, error)
// ListTenantsByPartition lists all tenants in the given partition
ListTenantsByControllerPartition(ctx context.Context, controllerPartitionId string, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error)
ListTenantsByWorkerPartition(ctx context.Context, workerPartitionId string, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error)
ListTenantsBySchedulerPartition(ctx context.Context, schedulerPartitionId string, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error)
// CreateEnginePartition creates a new partition for tenants within the engine
CreateControllerPartition(ctx context.Context) (string, error)
// UpdateControllerPartitionHeartbeat updates the heartbeat for the given partition. If the partition no longer exists,
// it creates a new partition and returns the new partition id. Otherwise, it returns the existing partition id.
UpdateControllerPartitionHeartbeat(ctx context.Context, partitionId string) (string, error)
DeleteControllerPartition(ctx context.Context, id string) error
RebalanceAllControllerPartitions(ctx context.Context) error
RebalanceInactiveControllerPartitions(ctx context.Context) error
CreateSchedulerPartition(ctx context.Context) (string, error)
UpdateSchedulerPartitionHeartbeat(ctx context.Context, partitionId string) (string, error)
DeleteSchedulerPartition(ctx context.Context, id string) error
RebalanceAllSchedulerPartitions(ctx context.Context) error
RebalanceInactiveSchedulerPartitions(ctx context.Context) error
CreateTenantWorkerPartition(ctx context.Context) (string, error)
UpdateWorkerPartitionHeartbeat(ctx context.Context, partitionId string) (string, error)
DeleteTenantWorkerPartition(ctx context.Context, id string) error
RebalanceAllTenantWorkerPartitions(ctx context.Context) error
RebalanceInactiveTenantWorkerPartitions(ctx context.Context) error
}
type TenantScopedCallback ¶
type TickerRepository ¶ added in v0.74.3
type TickerRepository interface {
IsTenantAlertActive(ctx context.Context, tenantId string) (bool, time.Time, error)
// CreateNewTicker creates a new ticker.
CreateNewTicker(ctx context.Context, opts *CreateTickerOpts) (*sqlcv1.Ticker, error)
// UpdateTicker updates a ticker.
UpdateTicker(ctx context.Context, tickerId string, opts *UpdateTickerOpts) (*sqlcv1.Ticker, error)
// ListTickers lists tickers.
ListTickers(ctx context.Context, opts *ListTickerOpts) ([]*sqlcv1.Ticker, error)
// DeactivateTicker deletes a ticker.
DeactivateTicker(ctx context.Context, tickerId string) error
// PollCronSchedules returns all cron schedules which should be managed by the ticker
PollCronSchedules(ctx context.Context, tickerId string) ([]*sqlcv1.PollCronSchedulesRow, error)
PollScheduledWorkflows(ctx context.Context, tickerId string) ([]*sqlcv1.PollScheduledWorkflowsRow, error)
PollTenantAlerts(ctx context.Context, tickerId string) ([]*sqlcv1.PollTenantAlertsRow, error)
PollExpiringTokens(ctx context.Context) ([]*sqlcv1.PollExpiringTokensRow, error)
PollTenantResourceLimitAlerts(ctx context.Context) ([]*sqlcv1.TenantResourceLimitAlert, error)
}
type TimeoutTasksResponse ¶ added in v0.74.3
type TimeoutTasksResponse struct {
*FailTasksResponse
TimeoutTasks []*sqlcv1.ListTasksToTimeoutRow
}
type TriggerDecision ¶ added in v0.74.3
type TriggerFromEventsResult ¶ added in v0.74.3
type TriggerFromEventsResult struct {
Tasks []*V1TaskWithPayload
Dags []*DAGWithData
EventExternalIdToRuns map[string][]*Run
CELEvaluationFailures []CELEvaluationFailure
}
type TriggerRepository ¶ added in v0.74.3
type TriggerRepository interface {
TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)
TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*V1TaskWithPayload, []*DAGWithData, error)
PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
}
type TriggerRepositoryImpl ¶ added in v0.74.3
type TriggerRepositoryImpl struct {
// contains filtered or unexported fields
}
func (TriggerRepositoryImpl) DesiredWorkerId ¶ added in v0.74.3
func (TriggerRepositoryImpl) PopulateExternalIdsForWorkflow ¶ added in v0.74.3
func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.
func (*TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts ¶ added in v0.74.3
func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
func (TriggerRepositoryImpl) ToV1StepRunData ¶ added in v0.74.3
func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData
func (*TriggerRepositoryImpl) TriggerFromEvents ¶ added in v0.74.3
func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)
func (*TriggerRepositoryImpl) TriggerFromWorkflowNames ¶ added in v0.74.3
func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*V1TaskWithPayload, []*DAGWithData, error)
type TriggerTaskData ¶ added in v0.74.3
type TriggerTaskData struct {
// (required) the workflow name
WorkflowName string `json:"workflow_name" validate:"required"`
// (optional) the workflow run data
Data []byte `json:"data"`
// (optional) the workflow run metadata
AdditionalMetadata []byte `json:"additional_metadata"`
// (optional) the desired worker id
DesiredWorkerId *string `json:"desired_worker_id"`
// (optional) the parent external id
ParentExternalId *string `json:"parent_external_id"`
// (optional) the parent task id
ParentTaskId *int64 `json:"parent_task_id"`
// (optional) the parent inserted_at
ParentTaskInsertedAt *time.Time `json:"parent_task_inserted_at"`
// (optional) the child index
ChildIndex *int64 `json:"child_index"`
// (optional) the child key
ChildKey *string `json:"child_key"`
// (optional) the priority of the task
Priority *int32 `json:"priority"`
}
type TriggeredBy ¶ added in v0.74.3
type TriggeredByEvent ¶ added in v0.74.3
type TriggeredByEvent struct {
// contains filtered or unexported fields
}
func (*TriggeredByEvent) ToMetadata ¶ added in v0.74.3
func (t *TriggeredByEvent) ToMetadata(additionalMetadata []byte) []byte
type UnscopedCallback ¶
func (UnscopedCallback[T]) Do ¶
func (c UnscopedCallback[T]) Do(l *zerolog.Logger, v T)
type UpdateCronOpts ¶ added in v0.73.35
type UpdateCronOpts struct {
// (optional) a flag indicating whether or not the cron is enabled
Enabled *bool
}
type UpdateDAGStatusRow ¶ added in v0.74.3
type UpdateDispatcherOpts ¶
type UpdateFilterOpts ¶ added in v0.74.3
type UpdateSessionOpts ¶
type UpdateTaskStatusRow ¶ added in v0.74.3
type UpdateTenantAlertGroupOpts ¶
type UpdateTenantAlertGroupOpts struct {
Emails []string `validate:"required,dive,email,max=255"`
}
type UpdateTenantInviteOpts ¶
type UpdateTenantMemberOpts ¶
type UpdateTenantMemberOpts struct {
Role *string `validate:"omitempty,oneof=OWNER ADMIN MEMBER"`
}
type UpdateTenantOpts ¶
type UpdateTenantOpts struct {
Name *string
AnalyticsOptOut *bool `validate:"omitempty"`
AlertMemberEmails *bool `validate:"omitempty"`
Version *sqlcv1.NullTenantMajorEngineVersion `validate:"omitempty"`
}
type UpdateTickerOpts ¶
type UpdateUserOpts ¶
type UpdateWorkerOpts ¶
type UpdateWorkerOpts struct {
// The id of the dispatcher
DispatcherId *string `validate:"omitempty,uuid"`
// When the last worker heartbeat was
LastHeartbeatAt *time.Time
// If the worker is active and accepting new runs
IsActive *bool
// A list of actions this worker can run
Actions []string `validate:"dive,actionId"`
// If the worker is paused
IsPaused *bool
}
type UpsertRateLimitOpts ¶
type UpsertSlackWebhookOpts ¶
type UpsertWorkerLabelOpts ¶
type UserRepository ¶
type UserRepository interface {
RegisterCreateCallback(callback UnscopedCallback[*sqlcv1.User])
// GetUserByID returns the user with the given id
GetUserByID(ctx context.Context, id string) (*sqlcv1.User, error)
// GetUserByEmail returns the user with the given email
GetUserByEmail(ctx context.Context, email string) (*sqlcv1.User, error)
// GetUserPassword returns the user password with the given id
GetUserPassword(ctx context.Context, id string) (*sqlcv1.UserPassword, error)
// CreateUser creates a new user with the given options
CreateUser(ctx context.Context, opts *CreateUserOpts) (*sqlcv1.User, error)
// UpdateUser updates the user with the given email
UpdateUser(ctx context.Context, id string, opts *UpdateUserOpts) (*sqlcv1.User, error)
// ListTenantMemberships returns the list of tenant memberships for the given user
ListTenantMemberships(ctx context.Context, userId string) ([]*sqlcv1.PopulateTenantMembersRow, error)
}
type UserSessionRepository ¶
type UserSessionRepository interface {
Create(ctx context.Context, opts *CreateSessionOpts) (*sqlcv1.UserSession, error)
Update(ctx context.Context, sessionId string, opts *UpdateSessionOpts) (*sqlcv1.UserSession, error)
Delete(ctx context.Context, sessionId string) (*sqlcv1.UserSession, error)
GetById(ctx context.Context, sessionId string) (*sqlcv1.UserSession, error)
}
UserSessionRepository represents the set of queries on the UserSession model
type V1StepRunData ¶ added in v0.74.3
type V1StepRunData struct {
Input map[string]interface{} `json:"input"`
TriggeredBy string `json:"triggered_by"`
Parents map[string]map[string]interface{} `json:"parents"`
Triggers map[string]map[string]interface{} `json:"triggers"`
// custom-set user data for the step
UserData map[string]interface{} `json:"user_data"`
// overrides set from the playground
Overrides map[string]interface{} `json:"overrides"`
// errors in upstream steps (only used in on-failure step)
StepRunErrors map[string]string `json:"step_run_errors,omitempty"`
}
func (*V1StepRunData) Bytes ¶ added in v0.74.3
func (v1 *V1StepRunData) Bytes() []byte
type V1TaskEventWithPayload ¶ added in v0.74.3
type V1TaskEventWithPayload struct {
*sqlcv1.V1TaskEvent
Payload []byte `json:"payload"`
}
type V1TaskWithPayload ¶ added in v0.74.3
type V1WorkflowRunPopulator ¶ added in v0.74.3
type V1WorkflowRunPopulator struct {
WorkflowRun *WorkflowRunData
TaskMetadata []TaskMetadata
}
type WasSuccessfullyClaimed ¶ added in v0.74.3
type WasSuccessfullyClaimed bool
type WebhookRepository ¶ added in v0.74.3
type WebhookRepository interface {
CreateWebhook(ctx context.Context, tenantId string, params CreateWebhookOpts) (*sqlcv1.V1IncomingWebhook, error)
ListWebhooks(ctx context.Context, tenantId string, params ListWebhooksOpts) ([]*sqlcv1.V1IncomingWebhook, error)
DeleteWebhook(ctx context.Context, tenantId, webhookId string) (*sqlcv1.V1IncomingWebhook, error)
GetWebhook(ctx context.Context, tenantId, webhookId string) (*sqlcv1.V1IncomingWebhook, error)
CanCreate(ctx context.Context, tenantId string, webhookLimit int32) (bool, error)
UpdateWebhook(ctx context.Context, tenantId string, webhookId, newExpression string) (*sqlcv1.V1IncomingWebhook, error)
}
type WorkerRepository ¶ added in v0.74.3
type WorkerRepository interface {
ListWorkers(tenantId string, opts *ListWorkersOpts) ([]*sqlcv1.ListWorkersWithSlotCountRow, error)
GetWorkerById(workerId string) (*sqlcv1.GetWorkerByIdRow, error)
ListWorkerState(tenantId, workerId string, maxRuns int) ([]*sqlcv1.ListSemaphoreSlotsWithStateForWorkerRow, error)
CountActiveSlotsPerTenant() (map[string]int64, error)
CountActiveWorkersPerTenant() (map[string]int64, error)
ListActiveSDKsPerTenant() (map[TenantIdSDKTuple]int64, error)
// GetWorkerActionsByWorkerId returns a list of actions for a worker
GetWorkerActionsByWorkerId(tenantid string, workerId []string) (map[string][]string, error)
// GetWorkerWorkflowsByWorkerId returns a list of workflows for a worker
GetWorkerWorkflowsByWorkerId(tenantid string, workerId string) ([]*sqlcv1.Workflow, error)
// ListWorkerLabels returns a list of labels config for a worker
ListWorkerLabels(tenantId, workerId string) ([]*sqlcv1.ListWorkerLabelsRow, error)
// CreateNewWorker creates a new worker for a given tenant.
CreateNewWorker(ctx context.Context, tenantId string, opts *CreateWorkerOpts) (*sqlcv1.Worker, error)
// UpdateWorker updates a worker for a given tenant.
UpdateWorker(ctx context.Context, tenantId, workerId string, opts *UpdateWorkerOpts) (*sqlcv1.Worker, error)
// UpdateWorker updates a worker in the
// It will only update the worker if there is no lock on the worker, else it will skip.
UpdateWorkerHeartbeat(ctx context.Context, tenantId, workerId string, lastHeartbeatAt time.Time) error
// DeleteWorker removes the worker from the database
DeleteWorker(ctx context.Context, tenantId, workerId string) error
GetWorkerForEngine(ctx context.Context, tenantId, workerId string) (*sqlcv1.GetWorkerForEngineRow, error)
UpdateWorkerActiveStatus(ctx context.Context, tenantId, workerId string, isActive bool, timestamp time.Time) (*sqlcv1.Worker, error)
UpsertWorkerLabels(ctx context.Context, workerId pgtype.UUID, opts []UpsertWorkerLabelOpts) ([]*sqlcv1.WorkerLabel, error)
DeleteOldWorkers(ctx context.Context, tenantId string, lastHeartbeatBefore time.Time) (bool, error)
GetDispatcherIdsForWorkers(ctx context.Context, tenantId string, workerIds []string) (map[string][]string, error)
}
type WorkflowAndScope ¶ added in v0.74.3
type WorkflowMetrics ¶
type WorkflowNameTriggerOpts ¶ added in v0.74.3
type WorkflowNameTriggerOpts struct {
*TriggerTaskData
ExternalId string
// (optional) The idempotency key to use for debouncing this task
IdempotencyKey *IdempotencyKey
// Whether to skip the creation of the child workflow
ShouldSkip bool
}
type WorkflowRepository ¶ added in v0.74.3
type WorkflowRepository interface {
ListWorkflowNamesByIds(ctx context.Context, tenantId string, workflowIds []pgtype.UUID) (map[pgtype.UUID]string, error)
PutWorkflowVersion(ctx context.Context, tenantId string, opts *CreateWorkflowVersionOpts) (*sqlcv1.GetWorkflowVersionForEngineRow, error)
GetWorkflowShape(ctx context.Context, workflowVersionId uuid.UUID) ([]*sqlcv1.GetWorkflowShapeRow, error)
// ListWorkflows returns all workflows for a given tenant.
ListWorkflows(tenantId string, opts *ListWorkflowsOpts) (*ListWorkflowsResult, error)
// GetWorkflowById returns a workflow by its name. It will return db.ErrNotFound if the workflow does not exist.
GetWorkflowById(ctx context.Context, workflowId string) (*sqlcv1.GetWorkflowByIdRow, error)
// GetWorkflowVersionById returns a workflow version by its id. It will return db.ErrNotFound if the workflow
// version does not exist.
GetWorkflowVersionWithTriggers(ctx context.Context, tenantId, workflowVersionId string) (*sqlcv1.GetWorkflowVersionByIdRow,
[]*sqlcv1.WorkflowTriggerCronRef,
[]*sqlcv1.WorkflowTriggerEventRef,
[]*sqlcv1.WorkflowTriggerScheduledRef,
error)
GetWorkflowVersionById(ctx context.Context, tenantId, workflowId string) (*sqlcv1.GetWorkflowVersionForEngineRow, error)
// DeleteWorkflow deletes a workflow for a given tenant.
DeleteWorkflow(ctx context.Context, tenantId, workflowId string) (*sqlcv1.Workflow, error)
GetWorkflowByName(ctx context.Context, tenantId, workflowName string) (*sqlcv1.Workflow, error)
GetLatestWorkflowVersion(ctx context.Context, tenantId, workflowId string) (*sqlcv1.GetWorkflowVersionForEngineRow, error)
}
type WorkflowRunData ¶ added in v0.74.3
type WorkflowRunData struct {
AdditionalMetadata []byte `json:"additional_metadata"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
DisplayName string `json:"display_name"`
ErrorMessage string `json:"error_message"`
ExternalID pgtype.UUID `json:"external_id"`
FinishedAt pgtype.Timestamptz `json:"finished_at"`
Input []byte `json:"input"`
InsertedAt pgtype.Timestamptz `json:"inserted_at"`
Kind sqlcv1.V1RunKind `json:"kind"`
Output []byte `json:"output,omitempty"`
ParentTaskExternalId *pgtype.UUID `json:"parent_task_external_id,omitempty"`
ReadableStatus sqlcv1.V1ReadableStatusOlap `json:"readable_status"`
StepId *pgtype.UUID `json:"step_id,omitempty"`
StartedAt pgtype.Timestamptz `json:"started_at"`
TaskExternalId *pgtype.UUID `json:"task_external_id,omitempty"`
TaskId *int64 `json:"task_id,omitempty"`
TaskInsertedAt *pgtype.Timestamptz `json:"task_inserted_at,omitempty"`
TenantID pgtype.UUID `json:"tenant_id"`
WorkflowID pgtype.UUID `json:"workflow_id"`
WorkflowVersionId pgtype.UUID `json:"workflow_version_id"`
RetryCount *int `json:"retry_count,omitempty"`
}
type WorkflowScheduleRepository ¶ added in v0.74.3
type WorkflowScheduleRepository interface {
// List ScheduledWorkflows lists workflows by scheduled trigger
ListScheduledWorkflows(ctx context.Context, tenantId string, opts *ListScheduledWorkflowsOpts) ([]*sqlcv1.ListScheduledWorkflowsRow, int64, error)
// DeleteScheduledWorkflow deletes a scheduled workflow run
DeleteScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string) error
// GetScheduledWorkflow gets a scheduled workflow run
GetScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string) (*sqlcv1.ListScheduledWorkflowsRow, error)
// UpdateScheduledWorkflow updates a scheduled workflow run
UpdateScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string, triggerAt time.Time) error
// ScheduledWorkflowMetaByIds returns minimal metadata for scheduled workflows by id.
// Intended for bulk operations to avoid N+1 DB calls.
ScheduledWorkflowMetaByIds(ctx context.Context, tenantId string, scheduledWorkflowIds []string) (map[string]ScheduledWorkflowMeta, error)
// BulkDeleteScheduledWorkflows deletes scheduled workflows in bulk and returns deleted ids.
BulkDeleteScheduledWorkflows(ctx context.Context, tenantId string, scheduledWorkflowIds []string) ([]string, error)
// BulkUpdateScheduledWorkflows updates scheduled workflows in bulk and returns updated ids.
BulkUpdateScheduledWorkflows(ctx context.Context, tenantId string, updates []ScheduledWorkflowUpdate) ([]string, error)
CreateScheduledWorkflow(ctx context.Context, tenantId string, opts *CreateScheduledWorkflowRunForWorkflowOpts) (*sqlcv1.ListScheduledWorkflowsRow, error)
// CreateCronWorkflow creates a cron trigger
CreateCronWorkflow(ctx context.Context, tenantId string, opts *CreateCronWorkflowTriggerOpts) (*sqlcv1.ListCronWorkflowsRow, error)
// List ScheduledWorkflows lists workflows by scheduled trigger
ListCronWorkflows(ctx context.Context, tenantId string, opts *ListCronWorkflowsOpts) ([]*sqlcv1.ListCronWorkflowsRow, int64, error)
// GetCronWorkflow gets a cron workflow run
GetCronWorkflow(ctx context.Context, tenantId, cronWorkflowId string) (*sqlcv1.ListCronWorkflowsRow, error)
// DeleteCronWorkflow deletes a cron workflow run
DeleteCronWorkflow(ctx context.Context, tenantId, id string) error
// UpdateCronWorkflow updates a cron workflow
UpdateCronWorkflow(ctx context.Context, tenantId, id string, opts *UpdateCronOpts) error
DeleteInvalidCron(ctx context.Context, id pgtype.UUID) error
}
Source Files
¶
- api_token.go
- callbacks.go
- dispatcher.go
- filters.go
- health.go
- idempotency.go
- ids.go
- input.go
- interval_settings.go
- jsonb.go
- log_line.go
- match.go
- match_data.go
- mq.go
- multiplexer.go
- olap.go
- olappayload.go
- output.go
- payloadstore.go
- pg_health.go
- rate_limit.go
- repository.go
- scheduler.go
- scheduler_assignment.go
- scheduler_concurrency.go
- scheduler_lease.go
- scheduler_queue.go
- security_check.go
- shared.go
- slack.go
- sns.go
- task.go
- tenant.go
- tenant_alerting.go
- tenant_invite.go
- tenant_limit.go
- ticker.go
- trigger.go
- user.go
- user_session.go
- webhooks.go
- worker.go
- workflow.go
- workflow_schedules.go