Documentation
¶
Index ¶
- Variables
- func BoolPtr(b bool) *bool
- func HashPassword(pw string) (*string, error)
- func IsFinalJobRunStatus(status dbsqlc.JobRunStatus) bool
- func IsFinalStepRunStatus(status dbsqlc.StepRunStatus) bool
- func IsFinalWorkflowRunStatus(status dbsqlc.WorkflowRunStatus) bool
- func JobRunStatusPtr(status dbsqlc.JobRunStatus) *dbsqlc.JobRunStatus
- func RunPostCommit[T any](l *zerolog.Logger, tenantId string, v T, opts []CallbackOptFunc[T])
- func RunPreCommit[T any](l *zerolog.Logger, tenantId string, v T, opts []CallbackOptFunc[T])
- func StepRunEventReasonPtr(reason dbsqlc.StepRunEventReason) *dbsqlc.StepRunEventReason
- func StepRunEventSeverityPtr(severity dbsqlc.StepRunEventSeverity) *dbsqlc.StepRunEventSeverity
- func StepRunStatusPtr(status dbsqlc.StepRunStatus) *dbsqlc.StepRunStatus
- func StringPtr(s string) *string
- func VerifyPassword(hashedPW, candidate string) (bool, error)
- type APIRepository
- type APITokenGenerator
- type APITokenRepository
- type ApiUpdateWorkerOpts
- type AssignResults
- type AssignedItem
- type AssignmentRepository
- type BulkCreateEventOpts
- type BulkCreateEventResult
- type CallbackOptFunc
- type ChildWorkflowRun
- type CreateAPITokenOpts
- type CreateCronWorkflowTriggerOpts
- type CreateDispatcherOpts
- type CreateEventOpts
- type CreateExpressionEvalOpt
- type CreateGroupKeyRunOpts
- type CreateLogLineOpts
- type CreateSNSIntegrationOpts
- type CreateScheduledWorkflowRunForWorkflowOpts
- type CreateSessionOpts
- type CreateStepRunEventOpts
- type CreateStreamEventOpts
- type CreateTenantAlertGroupOpts
- type CreateTenantInviteOpts
- type CreateTenantMemberOpts
- type CreateTenantOpts
- type CreateTickerOpts
- type CreateUserOpts
- type CreateWebhookWorkerOpts
- type CreateWorkerOpts
- type CreateWorkflowConcurrencyOpts
- type CreateWorkflowJobOpts
- type CreateWorkflowRunOpt
- type CreateWorkflowRunOpts
- func GetCreateWorkflowRunOptsFromCron(cron, cronParentId string, cronName *string, ...) (*CreateWorkflowRunOpts, error)
- func GetCreateWorkflowRunOptsFromEvent(eventId string, workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow, ...) (*CreateWorkflowRunOpts, error)
- func GetCreateWorkflowRunOptsFromManual(workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow, input []byte, ...) (*CreateWorkflowRunOpts, error)
- func GetCreateWorkflowRunOptsFromParent(workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow, input []byte, ...) (*CreateWorkflowRunOpts, error)
- func GetCreateWorkflowRunOptsFromSchedule(scheduledWorkflowId string, ...) (*CreateWorkflowRunOpts, error)
- type CreateWorkflowRunPullRequestOpts
- type CreateWorkflowSchedulesOpts
- type CreateWorkflowStepOpts
- type CreateWorkflowStepRateLimitOpts
- type CreateWorkflowTagOpts
- type CreateWorkflowVersionOpts
- type DesiredWorkerLabelOpts
- type DispatcherEngineRepository
- type EngineRepository
- type EntitlementsRepository
- type ErrDedupeValueExists
- type EventAPIRepository
- type EventEngineRepository
- type GetGroupKeyRunEngineRepository
- type GetQueueMetricsOpts
- type GetQueueMetricsResponse
- type GetStepRunFull
- type GetTenantAlertingSettingsResponse
- type GetWorkflowMetricsOpts
- type HealthRepository
- type JobRunAPIRepository
- type JobRunEngineRepository
- type JobRunHasCycleError
- type LeaseRepository
- type Limit
- type ListActiveWorkersResult
- type ListAllJobRunsOpts
- type ListCronWorkflowsOpts
- type ListEventOpts
- type ListEventResult
- type ListGetGroupKeyRunsOpts
- type ListLogsOpts
- type ListLogsResult
- type ListPullRequestsForWorkflowRunOpts
- type ListRateLimitOpts
- type ListRateLimitsResult
- type ListScheduledWorkflowsOpts
- type ListStepRunArchivesOpts
- type ListStepRunArchivesResult
- type ListStepRunEventOpts
- type ListStepRunEventResult
- type ListStepRunsOpts
- type ListTenantInvitesOpts
- type ListTickerOpts
- type ListWorkersOpts
- type ListWorkflowRunRoundRobinsOpts
- type ListWorkflowRunsOpts
- type ListWorkflowRunsResult
- type ListWorkflowsOpts
- type ListWorkflowsResult
- type LogsAPIRepository
- type LogsEngineRepository
- type MessageQueueRepository
- type OAuthOpts
- type PlanLimitMap
- type ProcessStepRunUpdatesResultV2
- type PubMessage
- type QueueFactoryRepository
- type QueueMetric
- type QueueRepository
- type QueueStepRunOpts
- type QueuedStepRun
- type RateLimitEngineRepository
- type RateLimitRepository
- type RateLimitResult
- type RefreshTimeoutBy
- type RuntimeInfo
- type SNSRepository
- type SchedulerRepository
- type SecurityCheckRepository
- type SlackRepository
- type StepRepository
- type StepRunAPIRepository
- type StepRunEngineRepository
- type StepRunForJobRun
- type StepRunUpdateInfo
- type StreamEventsEngineRepository
- type TenantAPIRepository
- type TenantAlertEmailGroupForSend
- type TenantAlertingRepository
- type TenantCallbackOpts
- type TenantEngineRepository
- type TenantInviteRepository
- type TenantLimitConfig
- type TenantLimitRepository
- type TenantScopedCallback
- type TickerEngineRepository
- type UnscopedCallback
- type UpdateDispatcherOpts
- type UpdateGetGroupKeyRunOpts
- type UpdateJobRunLookupDataOpts
- type UpdateSessionOpts
- type UpdateStepRunOverridesDataOpts
- type UpdateTenantAlertGroupOpts
- type UpdateTenantAlertingSettingsOpts
- type UpdateTenantInviteOpts
- type UpdateTenantMemberOpts
- type UpdateTenantOpts
- type UpdateTickerOpts
- type UpdateUserOpts
- type UpdateWebhookWorkerTokenOpts
- type UpdateWorkerOpts
- type UpdateWorkflowOpts
- type UpdateWorkflowRunFromGroupKeyEvalOpts
- type UpsertRateLimitOpts
- type UpsertSlackWebhookOpts
- type UpsertTenantAlertingSettingsOpts
- type UpsertWorkerLabelOpts
- type UpsertWorkflowDeploymentConfigOpts
- type UserRepository
- type UserSessionRepository
- type WebhookWorkerEngineRepository
- type WebhookWorkerRepository
- type WorkerAPIRepository
- type WorkerEngineRepository
- type WorkflowAPIRepository
- type WorkflowEngineRepository
- type WorkflowMetrics
- type WorkflowRunAPIRepository
- type WorkflowRunEngineRepository
- type WorkflowRunMetricsCountOpts
- type WorkflowRunsMetricsOpts
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrAlreadyQueued = fmt.Errorf("step run is already queued")
View Source
var ErrAlreadyRunning = fmt.Errorf("step run is already running")
View Source
var ErrDagParentNotFound = errors.New("dag parent not found")
View Source
var ErrDuplicateKey = fmt.Errorf("duplicate key error")
View Source
var ErrNoWorkerAvailable = fmt.Errorf("no worker available")
View Source
var ErrPreflightReplayChildStepRunNotInFinalState = fmt.Errorf("child step run is not in a final state")
View Source
var ErrPreflightReplayStepRunNotInFinalState = fmt.Errorf("step run is not in a final state")
View Source
var ErrRateLimitExceeded = fmt.Errorf("rate limit exceeded")
View Source
var ErrStepRunIsNotAssigned = fmt.Errorf("step run is not assigned")
View Source
var (
ErrWorkflowRunNotFound = fmt.Errorf("workflow run not found")
)
Functions ¶
func HashPassword ¶
func IsFinalJobRunStatus ¶
func IsFinalJobRunStatus(status dbsqlc.JobRunStatus) bool
func IsFinalStepRunStatus ¶
func IsFinalStepRunStatus(status dbsqlc.StepRunStatus) bool
func IsFinalWorkflowRunStatus ¶
func IsFinalWorkflowRunStatus(status dbsqlc.WorkflowRunStatus) bool
func JobRunStatusPtr ¶
func JobRunStatusPtr(status dbsqlc.JobRunStatus) *dbsqlc.JobRunStatus
func RunPostCommit ¶
func RunPostCommit[T any](l *zerolog.Logger, tenantId string, v T, opts []CallbackOptFunc[T])
func RunPreCommit ¶
func RunPreCommit[T any](l *zerolog.Logger, tenantId string, v T, opts []CallbackOptFunc[T])
func StepRunEventReasonPtr ¶
func StepRunEventReasonPtr(reason dbsqlc.StepRunEventReason) *dbsqlc.StepRunEventReason
func StepRunEventSeverityPtr ¶
func StepRunEventSeverityPtr(severity dbsqlc.StepRunEventSeverity) *dbsqlc.StepRunEventSeverity
func StepRunStatusPtr ¶
func StepRunStatusPtr(status dbsqlc.StepRunStatus) *dbsqlc.StepRunStatus
func VerifyPassword ¶
Types ¶
type APIRepository ¶
type APIRepository interface {
Health() HealthRepository
APIToken() APITokenRepository
Event() EventAPIRepository
Log() LogsAPIRepository
Tenant() TenantAPIRepository
TenantAlertingSettings() TenantAlertingRepository
TenantInvite() TenantInviteRepository
Workflow() WorkflowAPIRepository
WorkflowRun() WorkflowRunAPIRepository
JobRun() JobRunAPIRepository
StepRun() StepRunAPIRepository
Slack() SlackRepository
SNS() SNSRepository
Step() StepRepository
Worker() WorkerAPIRepository
UserSession() UserSessionRepository
User() UserRepository
SecurityCheck() SecurityCheckRepository
WebhookWorker() WebhookWorkerRepository
}
type APITokenGenerator ¶
type APITokenRepository ¶
type APITokenRepository interface {
CreateAPIToken(ctx context.Context, opts *CreateAPITokenOpts) (*dbsqlc.APIToken, error)
GetAPITokenById(ctx context.Context, id string) (*dbsqlc.APIToken, error)
ListAPITokensByTenant(ctx context.Context, tenantId string) ([]*dbsqlc.APIToken, error)
RevokeAPIToken(ctx context.Context, id string) error
DeleteAPIToken(ctx context.Context, tenantId, id string) error
}
type ApiUpdateWorkerOpts ¶
type ApiUpdateWorkerOpts struct {
IsPaused *bool
}
type AssignResults ¶
type AssignResults struct {
Assigned []*AssignedItem
Unassigned []*dbsqlc.QueueItem
SchedulingTimedOut []*dbsqlc.QueueItem
RateLimited []*RateLimitResult
}
type AssignmentRepository ¶
type AssignmentRepository interface {
ListActionsForWorkers(ctx context.Context, tenantId pgtype.UUID, workerIds []pgtype.UUID) ([]*dbsqlc.ListActionsForWorkersRow, error)
ListAvailableSlotsForWorkers(ctx context.Context, tenantId pgtype.UUID, params dbsqlc.ListAvailableSlotsForWorkersParams) ([]*dbsqlc.ListAvailableSlotsForWorkersRow, error)
}
type BulkCreateEventOpts ¶
type BulkCreateEventOpts struct {
TenantId string `validate:"required,uuid"`
Events []*CreateEventOpts
}
type BulkCreateEventResult ¶
type CallbackOptFunc ¶
type CallbackOptFunc[T any] func(*TenantCallbackOpts[T])
func WithPostCommitCallback ¶
func WithPostCommitCallback[T any](cb TenantScopedCallback[T]) CallbackOptFunc[T]
func WithPreCommitCallback ¶
func WithPreCommitCallback[T any](cb TenantScopedCallback[T]) CallbackOptFunc[T]
type ChildWorkflowRun ¶
type CreateAPITokenOpts ¶
type CreateAPITokenOpts struct {
// The id of the token
ID string `validate:"required,uuid"`
// When the token expires
ExpiresAt time.Time
// (optional) A tenant ID for this API token
TenantId *string `validate:"omitempty,uuid"`
// (optional) A name for this API token
Name *string `validate:"omitempty,max=255"`
Internal bool
}
type CreateDispatcherOpts ¶
type CreateDispatcherOpts struct {
ID string `validate:"required,uuid"`
}
type CreateEventOpts ¶
type CreateEventOpts struct {
// (required) the tenant id
TenantId string `validate:"required,uuid"`
// (required) the event key
Key string `validate:"required"`
// (optional) the event data
Data []byte
// (optional) the event that this event is replaying
ReplayedEvent *string `validate:"omitempty,uuid"`
// (optional) the event metadata
AdditionalMetadata []byte
}
type CreateExpressionEvalOpt ¶
type CreateExpressionEvalOpt struct {
Key string
ValueStr *string
ValueInt *int
Kind dbsqlc.StepExpressionKind
}
type CreateGroupKeyRunOpts ¶
type CreateGroupKeyRunOpts struct {
// (optional) the input data
Input []byte
}
type CreateLogLineOpts ¶
type CreateLogLineOpts struct {
// The step run id
StepRunId string `validate:"required,uuid"`
// (optional) The time when the log line was created.
CreatedAt *time.Time
// (required) The message of the log line.
Message string `validate:"required,min=1,max=10000"`
// (optional) The level of the log line.
Level *string `validate:"omitnil,oneof=INFO ERROR WARN DEBUG"`
// (optional) The metadata of the log line.
Metadata []byte
}
type CreateSNSIntegrationOpts ¶
type CreateSNSIntegrationOpts struct {
TopicArn string `validate:"required,min=1,max=255"`
}
type CreateSessionOpts ¶
type CreateStepRunEventOpts ¶
type CreateStepRunEventOpts struct {
StepRunId string `validate:"required,uuid"`
EventMessage *string
EventReason *dbsqlc.StepRunEventReason
EventSeverity *dbsqlc.StepRunEventSeverity
Timestamp *time.Time
EventData map[string]interface{}
}
type CreateStreamEventOpts ¶
type CreateStreamEventOpts struct {
// The step run id
StepRunId string `validate:"required,uuid"`
// (optional) The time when the StreamEvent was created.
CreatedAt *time.Time
// (required) The message of the Stream Event.
Message []byte
// (optional) The metadata of the Stream Event.
Metadata []byte
}
type CreateTenantAlertGroupOpts ¶
type CreateTenantAlertGroupOpts struct {
Emails []string `validate:"required,dive,email,max=255"`
}
type CreateTenantInviteOpts ¶
type CreateTenantInviteOpts struct {
// (required) the invitee email
InviteeEmail string `validate:"required,email"`
// (required) the inviter email
InviterEmail string `validate:"required,email"`
// (required) when the invite expires
ExpiresAt time.Time `validate:"required,future"`
// (required) the role of the invitee
Role string `validate:"omitempty,oneof=OWNER ADMIN MEMBER"`
MaxPending int `validate:"omitempty"`
}
type CreateTenantMemberOpts ¶
type CreateTenantOpts ¶
type CreateTenantOpts struct {
// (required) the tenant name
Name string `validate:"required"`
// (required) the tenant slug
Slug string `validate:"required,hatchetName"`
// (optional) the tenant ID
ID *string `validate:"omitempty,uuid"`
// (optional) the tenant data retention period
DataRetentionPeriod *string `validate:"omitempty,duration"`
}
type CreateTickerOpts ¶
type CreateTickerOpts struct {
ID string `validate:"required,uuid"`
}
type CreateUserOpts ¶
type CreateWebhookWorkerOpts ¶
type CreateWorkerOpts ¶
type CreateWorkerOpts struct {
// The id of the dispatcher
DispatcherId string `validate:"required,uuid"`
// The maximum number of runs this worker can run at a time
MaxRuns *int `validate:"omitempty,gte=1"`
// The name of the worker
Name string `validate:"required,hatchetName"`
// The name of the service
Services []string `validate:"dive,hatchetName"`
// A list of actions this worker can run
Actions []string `validate:"dive,actionId"`
// (optional) Webhook Id associated with the worker (if any)
WebhookId *string `validate:"omitempty,uuid"`
// (optional) Runtime info for the worker
RuntimeInfo *RuntimeInfo `validate:"omitempty"`
}
type CreateWorkflowConcurrencyOpts ¶
type CreateWorkflowConcurrencyOpts struct {
// (optional) the action id for getting the concurrency group
Action *string `validate:"omitempty,actionId"`
// (optional) the maximum number of concurrent workflow runs, default 1
MaxRuns *int32
// (optional) the strategy to use when the concurrency limit is reached, default CANCEL_IN_PROGRESS
LimitStrategy *string `validate:"omitnil,oneof=CANCEL_IN_PROGRESS GROUP_ROUND_ROBIN CANCEL_NEWEST"`
// (optional) a concurrency expression for evaluating the concurrency key
Expression *string `validate:"omitempty,celworkflowrunstr"`
}
type CreateWorkflowJobOpts ¶
type CreateWorkflowJobOpts struct {
// (required) the job name
Name string `validate:"required,hatchetName"`
// (optional) the job description
Description *string
// (required) the job steps
Steps []CreateWorkflowStepOpts `validate:"required,min=1,dive"`
Kind string `validate:"required,oneof=DEFAULT ON_FAILURE"`
}
type CreateWorkflowRunOpt ¶
type CreateWorkflowRunOpt func(*CreateWorkflowRunOpts)
func WithParent ¶
type CreateWorkflowRunOpts ¶
type CreateWorkflowRunOpts struct {
// (optional) the workflow run display name
DisplayName *string
TenantId string `validate:"required,uuid"`
// (required) the workflow version id
WorkflowVersionId string `validate:"required,uuid"`
ManualTriggerInput *string `` /* 197-byte string literal not displayed */
// (optional) the event id that triggered the workflow run
TriggeringEventId *string `` /* 204-byte string literal not displayed */
// (optional) the cron schedule that triggered the workflow run
Cron *string `` /* 230-byte string literal not displayed */
CronParentId *string `` /* 230-byte string literal not displayed */
CronName *string `validate:"omitnil"`
// (optional) the scheduled trigger
ScheduledWorkflowId *string `` /* 200-byte string literal not displayed */
InputData []byte
TriggeredBy string
GetGroupKeyRun *CreateGroupKeyRunOpts `validate:"omitempty"`
// (optional) the parent workflow run which this workflow run was triggered from
ParentId *string `validate:"omitempty,uuid"`
// (optional) the parent step run id which this workflow run was triggered from
ParentStepRunId *string `validate:"omitempty,uuid"`
// (optional) the child key of the workflow run, if this is a child run of a different workflow
ChildKey *string
// (optional) the child index of the workflow run, if this is a child run of a different workflow
// python sdk uses -1 as default value
ChildIndex *int `validate:"omitempty,min=-1"`
// (optional) additional metadata for the workflow run
AdditionalMetadata map[string]interface{} `validate:"omitempty"`
// (optional) the desired worker id for sticky state
DesiredWorkerId *string `validate:"omitempty,uuid"`
// (optional) the deduplication value for the workflow run
DedupeValue *string `validate:"omitempty"`
// (optional) the priority of the workflow run
Priority *int32 `validate:"omitempty,min=1,max=3"`
}
func GetCreateWorkflowRunOptsFromCron ¶
func GetCreateWorkflowRunOptsFromCron( cron, cronParentId string, cronName *string, workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow, input []byte, additionalMetadata map[string]interface{}, ) (*CreateWorkflowRunOpts, error)
func GetCreateWorkflowRunOptsFromEvent ¶
func GetCreateWorkflowRunOptsFromEvent( eventId string, workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow, input []byte, additionalMetadata map[string]interface{}, ) (*CreateWorkflowRunOpts, error)
func GetCreateWorkflowRunOptsFromManual ¶
func GetCreateWorkflowRunOptsFromManual( workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow, input []byte, additionalMetadata map[string]interface{}, ) (*CreateWorkflowRunOpts, error)
func GetCreateWorkflowRunOptsFromParent ¶
func GetCreateWorkflowRunOptsFromParent( workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow, input []byte, parentId, parentStepRunId string, childIndex int, childKey *string, additionalMetadata map[string]interface{}, parentAdditionalMetadata map[string]interface{}, ) (*CreateWorkflowRunOpts, error)
func GetCreateWorkflowRunOptsFromSchedule ¶
func GetCreateWorkflowRunOptsFromSchedule( scheduledWorkflowId string, workflowVersion *dbsqlc.GetWorkflowVersionForEngineRow, input []byte, additionalMetadata map[string]interface{}, fs ...CreateWorkflowRunOpt, ) (*CreateWorkflowRunOpts, error)
type CreateWorkflowStepOpts ¶
type CreateWorkflowStepOpts struct {
// (required) the step name
ReadableId string `validate:"hatchetName"`
// (required) the step action id
Action string `validate:"required,actionId"`
// (optional) the step timeout
Timeout *string `validate:"omitnil,duration"`
// (optional) the parents that this step depends on
Parents []string `validate:"dive,hatchetName"`
// (optional) the custom user data for the step, serialized as a json string
UserData *string `validate:"omitnil,json"`
// (optional) the step retry max
Retries *int `validate:"omitempty,min=0"`
// (optional) rate limits for this step
RateLimits []CreateWorkflowStepRateLimitOpts `validate:"dive"`
// (optional) desired worker affinity state for this step
DesiredWorkerLabels map[string]DesiredWorkerLabelOpts `validate:"omitempty"`
// (optional) the step retry backoff factor
RetryBackoffFactor *float64 `validate:"omitnil,min=1,max=1000"`
// (optional) the step retry backoff max seconds (can't be greater than 86400)
RetryBackoffMaxSeconds *int `validate:"omitnil,min=1,max=86400"`
}
type CreateWorkflowStepRateLimitOpts ¶
type CreateWorkflowStepRateLimitOpts struct {
// (required) the rate limit key
Key string `validate:"required"`
// (optional) a CEL expression for the rate limit key
KeyExpr *string `validate:"omitnil,celsteprunstr,required_without=Key"`
// (optional) the rate limit units to consume
Units *int `validate:"omitnil,required_without=UnitsExpr"`
// (optional) a CEL expression for the rate limit units
UnitsExpr *string `validate:"omitnil,celsteprunstr,required_without=Units"`
// (optional) a CEL expression for a dynamic limit value for the rate limit
LimitExpr *string `validate:"omitnil,celsteprunstr"`
// (optional) the rate limit duration, defaults to MINUTE
Duration *string `validate:"omitnil,oneof=SECOND MINUTE HOUR DAY WEEK MONTH YEAR"`
}
type CreateWorkflowTagOpts ¶
type CreateWorkflowVersionOpts ¶
type CreateWorkflowVersionOpts struct {
// (required) the workflow name
Name string `validate:"required,hatchetName"`
Tags []CreateWorkflowTagOpts `validate:"dive"`
// (optional) the workflow description
Description *string `json:"description,omitempty"`
// (optional) the workflow version
Version *string `json:"version,omitempty"`
// (optional) event triggers for the workflow
EventTriggers []string
// (optional) cron triggers for the workflow
CronTriggers []string `validate:"dive,cron"`
// (optional) the input bytes for the cron triggers
CronInput []byte
// (optional) scheduled triggers for the workflow
ScheduledTriggers []time.Time
// (required) the workflow jobs
Jobs []CreateWorkflowJobOpts `validate:"required,min=1,dive"`
OnFailureJob *CreateWorkflowJobOpts `json:"onFailureJob,omitempty" validate:"omitempty"`
// (optional) the workflow concurrency groups
Concurrency *CreateWorkflowConcurrencyOpts `json:"concurrency,omitempty" validator:"omitnil"`
// (optional) the amount of time for step runs to wait to be scheduled before timing out
ScheduleTimeout *string `validate:"omitempty,duration"`
// (optional) sticky strategy
Sticky *string `validate:"omitempty,oneof=SOFT HARD"`
// (optional) the workflow kind
Kind *string `validate:"omitempty,oneof=FUNCTION DURABLE DAG"`
// (optional) the default priority for steps in the workflow (1-3)
DefaultPriority *int32 `validate:"omitempty,min=1,max=3"`
}
func (*CreateWorkflowVersionOpts) Checksum ¶ added in v0.33.0
func (o *CreateWorkflowVersionOpts) Checksum() (string, error)
type DesiredWorkerLabelOpts ¶
type DesiredWorkerLabelOpts struct {
// (required) the label key
Key string `validate:"required"`
// (required if StringValue is nil) the label integer value
IntValue *int32 `validate:"omitnil,required_without=StrValue"`
// (required if StrValue is nil) the label string value
StrValue *string `validate:"omitnil,required_without=IntValue"`
// (optional) if the label is required
Required *bool `validate:"omitempty"`
// (optional) the weight of the label for scheduling (default: 100)
Weight *int32 `validate:"omitempty"`
// (optional) the label comparator for scheduling (default: EQUAL)
Comparator *string `validate:"omitempty,oneof=EQUAL NOT_EQUAL GREATER_THAN LESS_THAN GREATER_THAN_OR_EQUAL LESS_THAN_OR_EQUAL"`
}
type DispatcherEngineRepository ¶
type DispatcherEngineRepository interface {
// CreateNewDispatcher creates a new dispatcher for a given tenant.
CreateNewDispatcher(ctx context.Context, opts *CreateDispatcherOpts) (*dbsqlc.Dispatcher, error)
// UpdateDispatcher updates a dispatcher for a given tenant.
UpdateDispatcher(ctx context.Context, dispatcherId string, opts *UpdateDispatcherOpts) (*dbsqlc.Dispatcher, error)
Delete(ctx context.Context, dispatcherId string) error
UpdateStaleDispatchers(ctx context.Context, onStale func(dispatcherId string, getValidDispatcherId func() string) error) error
}
type EngineRepository ¶
type EngineRepository interface {
Health() HealthRepository
APIToken() APITokenRepository
Dispatcher() DispatcherEngineRepository
Event() EventEngineRepository
GetGroupKeyRun() GetGroupKeyRunEngineRepository
JobRun() JobRunEngineRepository
StepRun() StepRunEngineRepository
Step() StepRepository
Tenant() TenantEngineRepository
TenantAlertingSettings() TenantAlertingRepository
Ticker() TickerEngineRepository
Worker() WorkerEngineRepository
Workflow() WorkflowEngineRepository
WorkflowRun() WorkflowRunEngineRepository
StreamEvent() StreamEventsEngineRepository
Log() LogsEngineRepository
RateLimit() RateLimitEngineRepository
WebhookWorker() WebhookWorkerEngineRepository
Scheduler() SchedulerRepository
MessageQueue() MessageQueueRepository
}
type EntitlementsRepository ¶
type EntitlementsRepository interface {
TenantLimit() TenantLimitRepository
}
type ErrDedupeValueExists ¶
type ErrDedupeValueExists struct {
DedupeValue string
}
func (ErrDedupeValueExists) Error ¶
func (e ErrDedupeValueExists) Error() string
type EventAPIRepository ¶
type EventAPIRepository interface {
// ListEvents returns all events for a given tenant.
ListEvents(ctx context.Context, tenantId string, opts *ListEventOpts) (*ListEventResult, error)
// ListEventKeys returns all unique event keys for a given tenant.
ListEventKeys(tenantId string) ([]string, error)
// GetEventById returns an event by id.
GetEventById(ctx context.Context, id string) (*dbsqlc.Event, error)
// ListEventsById returns a list of events by id.
ListEventsById(ctx context.Context, tenantId string, ids []string) ([]*dbsqlc.Event, error)
}
type EventEngineRepository ¶
type EventEngineRepository interface {
RegisterCreateCallback(callback TenantScopedCallback[*dbsqlc.Event])
// CreateEvent creates a new event for a given tenant.
CreateEvent(ctx context.Context, opts *CreateEventOpts) (*dbsqlc.Event, error)
// CreateEvent creates new events for a given tenant.
BulkCreateEvent(ctx context.Context, opts *BulkCreateEventOpts) (*BulkCreateEventResult, error)
BulkCreateEventSharedTenant(ctx context.Context, opts []*CreateEventOpts) ([]*dbsqlc.Event, error)
// GetEventForEngine returns an event for the engine by id.
GetEventForEngine(ctx context.Context, tenantId, id string) (*dbsqlc.Event, error)
ListEventsByIds(ctx context.Context, tenantId string, ids []string) ([]*dbsqlc.Event, error)
// DeleteExpiredEvents deletes events that were created before the given time. It returns the number of deleted events
// and the number of non-deleted events that match the conditions.
SoftDeleteExpiredEvents(ctx context.Context, tenantId string, before time.Time) (bool, error)
// ClearEventPayloadData removes the potentially large payload data of events that were created before the given time.
// It returns the number of events that were updated and the number of events that were not updated.
ClearEventPayloadData(ctx context.Context, tenantId string) (bool, error)
}
type GetGroupKeyRunEngineRepository ¶
type GetGroupKeyRunEngineRepository interface {
// ListStepRunsToRequeue returns a list of step runs which are in a requeueable state.
ListGetGroupKeyRunsToRequeue(ctx context.Context, tenantId string) ([]*dbsqlc.GetGroupKeyRun, error)
ListGetGroupKeyRunsToReassign(ctx context.Context, tenantId string) ([]*dbsqlc.GetGroupKeyRun, error)
AssignGetGroupKeyRunToWorker(ctx context.Context, tenantId, getGroupKeyRunId string) (workerId string, dispatcherId string, err error)
AssignGetGroupKeyRunToTicker(ctx context.Context, tenantId, getGroupKeyRunId string) (tickerId string, err error)
UpdateGetGroupKeyRun(ctx context.Context, tenantId, getGroupKeyRunId string, opts *UpdateGetGroupKeyRunOpts) (*dbsqlc.GetGroupKeyRunForEngineRow, error)
GetGroupKeyRunForEngine(ctx context.Context, tenantId, getGroupKeyRunId string) (*dbsqlc.GetGroupKeyRunForEngineRow, error)
}
type GetQueueMetricsOpts ¶
type GetQueueMetricsResponse ¶
type GetQueueMetricsResponse struct {
Total QueueMetric `json:"total"`
ByWorkflowId map[string]QueueMetric `json:"by_workflow"`
}
type GetStepRunFull ¶
type GetTenantAlertingSettingsResponse ¶
type GetTenantAlertingSettingsResponse struct {
Settings *dbsqlc.TenantAlertingSettings
SlackWebhooks []*dbsqlc.SlackAppWebhook
EmailGroups []*TenantAlertEmailGroupForSend
Tenant *dbsqlc.Tenant
}
type GetWorkflowMetricsOpts ¶
type HealthRepository ¶
type JobRunAPIRepository ¶
type JobRunAPIRepository interface {
RegisterWorkflowRunRunningCallback(callback TenantScopedCallback[pgtype.UUID])
// SetJobRunStatusRunning resets the status of a job run to a RUNNING status. This is useful if a step
// run is being manually replayed, but shouldn't be used by most callers.
SetJobRunStatusRunning(tenantId, jobRunId string) error
ListJobRunByWorkflowRunId(ctx context.Context, tenantId, WorkflowRunId string) ([]*dbsqlc.ListJobRunsForWorkflowRunFullRow, error)
}
type JobRunEngineRepository ¶
type JobRunEngineRepository interface {
RegisterWorkflowRunRunningCallback(callback TenantScopedCallback[pgtype.UUID])
// SetJobRunStatusRunning resets the status of a job run to a RUNNING status. This is useful if a step
// run is being manually replayed, but shouldn't be used by most callers.
SetJobRunStatusRunning(ctx context.Context, tenantId, jobRunId string) error
ListJobRunsForWorkflowRun(ctx context.Context, tenantId, workflowRunId string) ([]*dbsqlc.ListJobRunsForWorkflowRunRow, error)
GetJobRunByWorkflowRunIdAndJobId(ctx context.Context, tenantId, workflowRunId, jobId string) (*dbsqlc.GetJobRunByWorkflowRunIdAndJobIdRow, error)
GetJobRunsByWorkflowRunId(ctx context.Context, tenantId, workflowRunId string) ([]*dbsqlc.GetJobRunsByWorkflowRunIdRow, error)
ClearJobRunPayloadData(ctx context.Context, tenantId string) (bool, error)
StartJobRun(ctx context.Context, tenantId, jobId string) ([]*dbsqlc.GetStepRunForEngineRow, error)
}
type JobRunHasCycleError ¶
type JobRunHasCycleError struct {
JobName string
}
func (*JobRunHasCycleError) Error ¶
func (e *JobRunHasCycleError) Error() string
type LeaseRepository ¶
type LeaseRepository interface {
ListQueues(ctx context.Context, tenantId pgtype.UUID) ([]*dbsqlc.Queue, error)
ListActiveWorkers(ctx context.Context, tenantId pgtype.UUID) ([]*ListActiveWorkersResult, error)
AcquireOrExtendLeases(ctx context.Context, tenantId pgtype.UUID, kind dbsqlc.LeaseKind, resourceIds []string, existingLeases []*dbsqlc.Lease) ([]*dbsqlc.Lease, error)
ReleaseLeases(ctx context.Context, tenantId pgtype.UUID, leases []*dbsqlc.Lease) error
}
type ListActiveWorkersResult ¶
type ListActiveWorkersResult struct {
ID string
MaxRuns int
Labels []*dbsqlc.ListManyWorkerLabelsRow
}
type ListAllJobRunsOpts ¶
type ListAllJobRunsOpts struct {
TickerId *string
NoTickerId *bool
Status *dbsqlc.JobRunStatus
}
type ListCronWorkflowsOpts ¶
type ListCronWorkflowsOpts struct {
// (optional) number of events to skip
Offset *int
// (optional) number of events to return
Limit *int
// (optional) the order by field
OrderBy *string `validate:"omitempty,oneof=createdAt name"`
// (optional) the order direction
OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
// (optional) the workflow id
WorkflowId *string `validate:"omitempty,uuid"`
// (optional) additional metadata for the workflow run
AdditionalMetadata map[string]interface{} `validate:"omitempty"`
}
TODO move this to workflow.go
type ListEventOpts ¶
type ListEventOpts struct {
// (optional) a list of event keys to filter by
Keys []string
// (optional) a list of workflow IDs to filter by
Workflows []string
// (optional) a list of workflow run statuses to filter by
WorkflowRunStatus []dbsqlc.WorkflowRunStatus
// (optional) number of events to skip
Offset *int
// (optional) number of events to return
Limit *int
// (optional) a search query
Search *string
// (optional) the event that this event is replaying
ReplayedEvent *string `validate:"omitempty,uuid"`
// (optional) the order by field
OrderBy *string `validate:"omitempty,oneof=createdAt"`
// (optional) the order direction
OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
// (optional) the event metadata
AdditionalMetadata []byte
// (optional) event ids to filter by
Ids []string
}
type ListEventResult ¶
type ListEventResult struct {
Rows []*dbsqlc.ListEventsRow
Count int
}
type ListGetGroupKeyRunsOpts ¶
type ListGetGroupKeyRunsOpts struct {
Status *dbsqlc.StepRunStatus
}
type ListLogsOpts ¶
type ListLogsOpts struct {
// (optional) number of logs to skip
Offset *int
// (optional) number of logs to return
Limit *int `validate:"omitnil,min=1,max=1000"`
// (optional) a list of log levels to filter by
Levels []string `validate:"omitnil,dive,oneof=INFO ERROR WARN DEBUG"`
// (optional) a step run id to filter by
StepRunId *string `validate:"omitempty,uuid"`
// (optional) a search query
Search *string
// (optional) the order by field
OrderBy *string `validate:"omitempty,oneof=createdAt"`
// (optional) the order direction
OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
}
type ListLogsResult ¶
type ListPullRequestsForWorkflowRunOpts ¶
type ListPullRequestsForWorkflowRunOpts struct {
State *string
}
type ListRateLimitOpts ¶
type ListRateLimitOpts struct {
// (optional) a search query for the key
Search *string
// (optional) number of events to skip
Offset *int
// (optional) number of events to return
Limit *int
// (optional) the order by field
OrderBy *string `validate:"omitempty,oneof=key value limitValue"`
// (optional) the order direction
OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
}
type ListRateLimitsResult ¶
type ListRateLimitsResult struct {
Rows []*dbsqlc.ListRateLimitsForTenantNoMutateRow
Count int
}
type ListScheduledWorkflowsOpts ¶
type ListScheduledWorkflowsOpts struct {
// (optional) number of events to skip
Offset *int
// (optional) number of events to return
Limit *int
// (optional) the order by field
OrderBy *string `validate:"omitempty,oneof=createdAt triggerAt"`
// (optional) the order direction
OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
// (optional) the workflow id
WorkflowId *string `validate:"omitempty,uuid"`
// (optional) the parent workflow run id
ParentWorkflowRunId *string `validate:"omitempty,uuid"`
// (optional) the parent step run id
ParentStepRunId *string `validate:"omitempty,uuid"`
// (optional) statuses to filter by
Statuses *[]dbsqlc.WorkflowRunStatus
// (optional) include scheduled runs that are in the future
IncludeFuture *bool
// (optional) additional metadata for the workflow run
AdditionalMetadata map[string]interface{} `validate:"omitempty"`
}
type ListStepRunArchivesOpts ¶
type ListStepRunArchivesResult ¶
type ListStepRunArchivesResult struct {
Rows []*dbsqlc.StepRunResultArchive
Count int
}
type ListStepRunEventOpts ¶
type ListStepRunEventResult ¶
type ListStepRunEventResult struct {
Rows []*dbsqlc.StepRunEvent
Count int
}
type ListStepRunsOpts ¶
type ListStepRunsOpts struct {
JobRunId *string `validate:"omitempty,uuid"`
WorkflowRunIds []string `validate:"dive,uuid"`
Status *dbsqlc.StepRunStatus
}
type ListTenantInvitesOpts ¶
type ListTickerOpts ¶
type ListWorkersOpts ¶
type ListWorkflowRunRoundRobinsOpts ¶
type ListWorkflowRunRoundRobinsOpts struct {
// (optional) the workflow id
WorkflowId *string `validate:"omitempty,uuid"`
// (optional) the workflow version id
WorkflowVersionId *string `validate:"omitempty,uuid"`
// (optional) the status of the workflow run
Status *dbsqlc.WorkflowRunStatus
// (optional) number of events to skip
Offset *int
// (optional) number of events to return
Limit *int
}
type ListWorkflowRunsOpts ¶
type ListWorkflowRunsOpts struct {
// (optional) the workflow id
WorkflowId *string `validate:"omitempty,uuid"`
// (optional) the workflow version id
WorkflowVersionId *string `validate:"omitempty,uuid"`
// (optional) a list of workflow run ids to filter by
Ids []string `validate:"omitempty,dive,uuid"`
// (optional) the parent workflow run id
ParentId *string `validate:"omitempty,uuid"`
// (optional) the parent step run id
ParentStepRunId *string `validate:"omitempty,uuid"`
// (optional) the event id that triggered the workflow run
EventId *string `validate:"omitempty,uuid"`
// (optional) the group key for the workflow run
GroupKey *string
// (optional) the status of the workflow run
Statuses *[]dbsqlc.WorkflowRunStatus
// (optional) a list of kinds to filter by
Kinds *[]dbsqlc.WorkflowKind
// (optional) number of events to skip
Offset *int
// (optional) number of events to return
Limit *int
// (optional) the order by field
OrderBy *string `validate:"omitempty,oneof=createdAt finishedAt startedAt duration"`
// (optional) the order direction
OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
// (optional) a time after which the run was created
CreatedAfter *time.Time
// (optional) a time before which the run was created
CreatedBefore *time.Time
// (optional) a time after which the run was finished
FinishedAfter *time.Time
// (optional) a time before which the run was finished
FinishedBefore *time.Time
// (optional) exact metadata to filter by
AdditionalMetadata map[string]interface{} `validate:"omitempty"`
}
type ListWorkflowRunsResult ¶
type ListWorkflowRunsResult struct {
Rows []*dbsqlc.ListWorkflowRunsRow
Count int
}
type ListWorkflowsOpts ¶
type ListWorkflowsResult ¶
type LogsAPIRepository ¶
type LogsAPIRepository interface {
// ListLogLines returns a list of log lines for a given step run.
ListLogLines(tenantId string, opts *ListLogsOpts) (*ListLogsResult, error)
WithAdditionalConfig(validator.Validator, *zerolog.Logger) LogsAPIRepository
}
type LogsEngineRepository ¶
type MessageQueueRepository ¶
type MessageQueueRepository interface {
// PubSub
Listen(ctx context.Context, name string, f func(ctx context.Context, notification *PubMessage) error) error
Notify(ctx context.Context, name string, payload string) error
// Queues
BindQueue(ctx context.Context, queue string, durable, autoDeleted, exclusive bool, exclusiveConsumer *string) error
UpdateQueueLastActive(ctx context.Context, queue string) error
CleanupQueues(ctx context.Context) error
// Messages
AddMessage(ctx context.Context, queue string, payload []byte) error
ReadMessages(ctx context.Context, queue string, qos int) ([]*dbsqlc.ReadMessagesRow, error)
AckMessage(ctx context.Context, id int64) error
CleanupMessageQueueItems(ctx context.Context) error
}
type PlanLimitMap ¶
type ProcessStepRunUpdatesResultV2 ¶
type ProcessStepRunUpdatesResultV2 struct {
SucceededStepRuns []*dbsqlc.GetStepRunForEngineRow
CompletedWorkflowRuns []*dbsqlc.ResolveWorkflowRunStatusRow
Continue bool
}
type PubMessage ¶ added in v0.53.0
type QueueFactoryRepository ¶
type QueueFactoryRepository interface {
NewQueue(tenantId pgtype.UUID, queueName string) QueueRepository
}
type QueueMetric ¶
type QueueMetric struct {
// the total number of PENDING_ASSIGNMENT step runs in the queue
PendingAssignment int `json:"pending_assignment"`
// the total number of PENDING step runs in the queue
Pending int `json:"pending"`
// the total number of RUNNING step runs in the queue
Running int `json:"running"`
}
type QueueRepository ¶
type QueueRepository interface {
ListQueueItems(ctx context.Context, limit int) ([]*dbsqlc.QueueItem, error)
MarkQueueItemsProcessed(ctx context.Context, r *AssignResults) (succeeded []*AssignedItem, failed []*AssignedItem, err error)
GetStepRunRateLimits(ctx context.Context, queueItems []*dbsqlc.QueueItem) (map[string]map[string]int32, error)
GetDesiredLabels(ctx context.Context, stepIds []pgtype.UUID) (map[string][]*dbsqlc.GetDesiredLabelsRow, error)
Cleanup()
}
type QueueStepRunOpts ¶
type QueueStepRunOpts struct {
IsRetry bool
// IsInternalRetry is true if the step run is being retried internally by the system, for example if
// it was sent to an invalid dispatcher. This does not count towards the retry limit but still gets
// highest priority in the queue.
IsInternalRetry bool
Input []byte
ExpressionEvals []CreateExpressionEvalOpt
}
type QueuedStepRun ¶
type RateLimitEngineRepository ¶
type RateLimitEngineRepository interface {
ListRateLimits(ctx context.Context, tenantId string, opts *ListRateLimitOpts) (*ListRateLimitsResult, error)
// CreateRateLimit creates a new rate limit record
UpsertRateLimit(ctx context.Context, tenantId string, key string, opts *UpsertRateLimitOpts) (*dbsqlc.RateLimit, error)
}
type RateLimitRepository ¶
type RateLimitResult ¶
type RefreshTimeoutBy ¶
type RefreshTimeoutBy struct {
IncrementTimeoutBy string `validate:"required,duration"`
}
type RuntimeInfo ¶
type SNSRepository ¶
type SNSRepository interface {
GetSNSIntegration(ctx context.Context, tenantId, topicArn string) (*dbsqlc.SNSIntegration, error)
GetSNSIntegrationById(ctx context.Context, id string) (*dbsqlc.SNSIntegration, error)
CreateSNSIntegration(ctx context.Context, tenantId string, opts *CreateSNSIntegrationOpts) (*dbsqlc.SNSIntegration, error)
ListSNSIntegrations(ctx context.Context, tenantId string) ([]*dbsqlc.SNSIntegration, error)
DeleteSNSIntegration(ctx context.Context, tenantId, id string) error
}
type SchedulerRepository ¶
type SchedulerRepository interface {
Lease() LeaseRepository
QueueFactory() QueueFactoryRepository
RateLimit() RateLimitRepository
Assignment() AssignmentRepository
}
type SecurityCheckRepository ¶
type SlackRepository ¶
type SlackRepository interface {
UpsertSlackWebhook(ctx context.Context, tenantId string, opts *UpsertSlackWebhookOpts) (*dbsqlc.SlackAppWebhook, error)
ListSlackWebhooks(ctx context.Context, tenantId string) ([]*dbsqlc.SlackAppWebhook, error)
GetSlackWebhookById(ctx context.Context, id string) (*dbsqlc.SlackAppWebhook, error)
DeleteSlackWebhook(ctx context.Context, tenantId string, id string) error
}
type StepRepository ¶
type StepRunAPIRepository ¶
type StepRunAPIRepository interface {
GetStepRunById(stepRunId string) (*GetStepRunFull, error)
ListStepRunEvents(stepRunId string, opts *ListStepRunEventOpts) (*ListStepRunEventResult, error)
ListStepRunEventsByWorkflowRunId(ctx context.Context, tenantId, workflowRunId string, lastId *int32) (*ListStepRunEventResult, error)
ListStepRunArchives(tenantId, stepRunId string, opts *ListStepRunArchivesOpts) (*ListStepRunArchivesResult, error)
}
type StepRunEngineRepository ¶
type StepRunEngineRepository interface {
RegisterWorkflowRunCompletedCallback(callback TenantScopedCallback[*dbsqlc.ResolveWorkflowRunStatusRow])
ListStepRuns(ctx context.Context, tenantId string, opts *ListStepRunsOpts) ([]*dbsqlc.GetStepRunForEngineRow, error)
ListStepRunsToCancel(ctx context.Context, tenantId, jobRunId string) ([]*dbsqlc.GetStepRunForEngineRow, error)
// ListStepRunsToReassign returns a list of step runs which are in a reassignable state.
ListStepRunsToReassign(ctx context.Context, tenantId string) (reassignedStepRunIds []string, failedStepRuns []*dbsqlc.GetStepRunForEngineRow, err error)
InternalRetryStepRuns(ctx context.Context, tenantId string, srIdsIn []string) (reassignedStepRunIds []string, failedStepRuns []*dbsqlc.GetStepRunForEngineRow, err error)
ListStepRunsToTimeout(ctx context.Context, tenantId string) (bool, []*dbsqlc.GetStepRunForEngineRow, error)
StepRunAcked(ctx context.Context, tenantId, workflowRunId, stepRunId string, ackedAt time.Time) error
StepRunStarted(ctx context.Context, tenantId, workflowRunId, stepRunId string, startedAt time.Time) error
StepRunSucceeded(ctx context.Context, tenantId, workflowRunId, stepRunId string, finishedAt time.Time, output []byte) error
StepRunCancelled(ctx context.Context, tenantId, workflowRunId, stepRunId string, cancelledAt time.Time, cancelledReason string, propagate bool) error
StepRunFailed(ctx context.Context, tenantId, workflowRunId, stepRunId string, failedAt time.Time, errStr string, retryCount int) error
StepRunRetryBackoff(ctx context.Context, tenantId, workflowRunId, stepRunId string, retryAfter time.Time, retryCount int) error
RetryStepRuns(ctx context.Context, tenantId string) (bool, error)
ReplayStepRun(ctx context.Context, tenantId, stepRunId string, input []byte) (*dbsqlc.GetStepRunForEngineRow, error)
// PreflightCheckReplayStepRun checks if a step run can be replayed. If it can, it will return nil.
PreflightCheckReplayStepRun(ctx context.Context, tenantId, stepRunId string) error
ReleaseStepRunSemaphore(ctx context.Context, tenantId, stepRunId string, isUserTriggered bool) error
// UpdateStepRunOverridesData updates the overrides data field in the input for a step run. This returns the input
// bytes.
UpdateStepRunOverridesData(ctx context.Context, tenantId, stepRunId string, opts *UpdateStepRunOverridesDataOpts) ([]byte, error)
UpdateStepRunInputSchema(ctx context.Context, tenantId, stepRunId string, schema []byte) ([]byte, error)
GetStepRunForEngine(ctx context.Context, tenantId, stepRunId string) (*dbsqlc.GetStepRunForEngineRow, error)
GetStepRunDataForEngine(ctx context.Context, tenantId, stepRunId string) (*dbsqlc.GetStepRunDataForEngineRow, error)
GetStepRunBulkDataForEngine(ctx context.Context, tenantId string, stepRunIds []string) ([]*dbsqlc.GetStepRunBulkDataForEngineRow, error)
GetStepRunMetaForEngine(ctx context.Context, tenantId, stepRunId string) (*dbsqlc.GetStepRunMetaRow, error)
// QueueStepRun is like UpdateStepRun, except that it will only update the step run if it is in
// a pending state.
QueueStepRun(ctx context.Context, tenantId, stepRunId string, opts *QueueStepRunOpts) (*dbsqlc.GetStepRunForEngineRow, error)
GetQueueCounts(ctx context.Context, tenantId string) (map[string]int, error)
ProcessStepRunUpdatesV2(ctx context.Context, qlp *zerolog.Logger, tenantId string) (ProcessStepRunUpdatesResultV2, error)
CleanupQueueItems(ctx context.Context, tenantId string) error
CleanupInternalQueueItems(ctx context.Context, tenantId string) error
CleanupRetryQueueItems(ctx context.Context, tenantId string) error
ListInitialStepRunsForJobRun(ctx context.Context, tenantId, jobRunId string) ([]*dbsqlc.GetStepRunForEngineRow, error)
// ListStartableStepRuns returns a list of step runs that are in a startable state, assuming that the parentStepRunId has succeeded.
// The singleParent flag is used to determine if we should reject listing step runs with many parents. This is important to avoid
// race conditions where a step run is started by multiple parents completing at the same time. As a result, singleParent=false should
// be called from a serializable process after processing step run status updates.
ListStartableStepRuns(ctx context.Context, tenantId, parentStepRunId string, singleParent bool) ([]*dbsqlc.GetStepRunForEngineRow, error)
ArchiveStepRunResult(ctx context.Context, tenantId, stepRunId string, err *string) error
RefreshTimeoutBy(ctx context.Context, tenantId, stepRunId string, opts RefreshTimeoutBy) (pgtype.Timestamp, error)
DeferredStepRunEvent(
tenantId string,
opts CreateStepRunEventOpts,
)
ClearStepRunPayloadData(ctx context.Context, tenantId string) (bool, error)
}
type StepRunForJobRun ¶
type StepRunForJobRun struct {
*dbsqlc.GetStepRunsForJobRunsWithOutputRow
ChildWorkflowsCount int
}
type StepRunUpdateInfo ¶
type StreamEventsEngineRepository ¶
type StreamEventsEngineRepository interface {
// PutStreamEvent creates a new StreamEvent line.
PutStreamEvent(ctx context.Context, tenantId string, opts *CreateStreamEventOpts) (*dbsqlc.StreamEvent, error)
// GetStreamEvent returns a StreamEvent line by id.
GetStreamEvent(ctx context.Context, tenantId string, streamEventId int64) (*dbsqlc.StreamEvent, error)
// CleanupStreamEvents deletes all stale StreamEvents.
CleanupStreamEvents(ctx context.Context) error
// GetStreamEventMeta
GetStreamEventMeta(ctx context.Context, tenantId string, stepRunId string) (*dbsqlc.GetStreamEventMetaRow, error)
}
type TenantAPIRepository ¶
type TenantAPIRepository interface {
// CreateTenant creates a new tenant.
CreateTenant(ctx context.Context, opts *CreateTenantOpts) (*dbsqlc.Tenant, error)
// UpdateTenant updates an existing tenant in the db.
UpdateTenant(ctx context.Context, tenantId string, opts *UpdateTenantOpts) (*dbsqlc.Tenant, error)
// GetTenantByID returns the tenant with the given id
GetTenantByID(ctx context.Context, tenantId string) (*dbsqlc.Tenant, error)
// GetTenantBySlug returns the tenant with the given slug
GetTenantBySlug(ctx context.Context, slug string) (*dbsqlc.Tenant, error)
// CreateTenantMember creates a new member in the tenant
CreateTenantMember(ctx context.Context, tenantId string, opts *CreateTenantMemberOpts) (*dbsqlc.PopulateTenantMembersRow, error)
// GetTenantMemberByID returns the tenant member with the given id
GetTenantMemberByID(ctx context.Context, memberId string) (*dbsqlc.PopulateTenantMembersRow, error)
// GetTenantMemberByUserID returns the tenant member with the given user id
GetTenantMemberByUserID(ctx context.Context, tenantId string, userId string) (*dbsqlc.PopulateTenantMembersRow, error)
// GetTenantMemberByEmail returns the tenant member with the given email
GetTenantMemberByEmail(ctx context.Context, tenantId string, email string) (*dbsqlc.PopulateTenantMembersRow, error)
// ListTenantMembers returns the list of tenant members for the given tenant
ListTenantMembers(ctx context.Context, tenantId string) ([]*dbsqlc.PopulateTenantMembersRow, error)
// UpdateTenantMember updates the tenant member with the given id
UpdateTenantMember(ctx context.Context, memberId string, opts *UpdateTenantMemberOpts) (*dbsqlc.PopulateTenantMembersRow, error)
// DeleteTenantMember deletes the tenant member with the given id
DeleteTenantMember(ctx context.Context, memberId string) error
// GetQueueMetrics returns the queue metrics for the given tenant
GetQueueMetrics(ctx context.Context, tenantId string, opts *GetQueueMetricsOpts) (*GetQueueMetricsResponse, error)
}
type TenantAlertingRepository ¶
type TenantAlertingRepository interface {
UpsertTenantAlertingSettings(ctx context.Context, tenantId string, opts *UpsertTenantAlertingSettingsOpts) (*dbsqlc.TenantAlertingSettings, error)
GetTenantAlertingSettings(ctx context.Context, tenantId string) (*GetTenantAlertingSettingsResponse, error)
GetTenantResourceLimitState(ctx context.Context, tenantId string, resource string) (*dbsqlc.GetTenantResourceLimitRow, error)
UpdateTenantAlertingSettings(ctx context.Context, tenantId string, opts *UpdateTenantAlertingSettingsOpts) error
CreateTenantAlertGroup(ctx context.Context, tenantId string, opts *CreateTenantAlertGroupOpts) (*dbsqlc.TenantAlertEmailGroup, error)
UpdateTenantAlertGroup(ctx context.Context, id string, opts *UpdateTenantAlertGroupOpts) (*dbsqlc.TenantAlertEmailGroup, error)
ListTenantAlertGroups(ctx context.Context, tenantId string) ([]*dbsqlc.TenantAlertEmailGroup, error)
GetTenantAlertGroupById(ctx context.Context, id string) (*dbsqlc.TenantAlertEmailGroup, error)
DeleteTenantAlertGroup(ctx context.Context, tenantId string, id string) error
}
type TenantCallbackOpts ¶
type TenantCallbackOpts[T any] struct { // contains filtered or unexported fields }
type TenantEngineRepository ¶
type TenantEngineRepository interface {
// ListTenants lists all tenants in the instance
ListTenants(ctx context.Context) ([]*dbsqlc.Tenant, error)
// Gets the tenant corresponding to the "internal" tenant if it's assigned to this controller.
// Returns nil if the tenant is not assigned to this controller.
GetInternalTenantForController(ctx context.Context, controllerPartitionId string) (*dbsqlc.Tenant, error)
// ListTenantsByPartition lists all tenants in the given partition
ListTenantsByControllerPartition(ctx context.Context, controllerPartitionId string, majorVersion dbsqlc.TenantMajorEngineVersion) ([]*dbsqlc.Tenant, error)
ListTenantsByWorkerPartition(ctx context.Context, workerPartitionId string, majorVersion dbsqlc.TenantMajorEngineVersion) ([]*dbsqlc.Tenant, error)
ListTenantsBySchedulerPartition(ctx context.Context, schedulerPartitionId string, majorVersion dbsqlc.TenantMajorEngineVersion) ([]*dbsqlc.Tenant, error)
// CreateEnginePartition creates a new partition for tenants within the engine
CreateControllerPartition(ctx context.Context) (string, error)
// UpdateControllerPartitionHeartbeat updates the heartbeat for the given partition. If the partition no longer exists,
// it creates a new partition and returns the new partition id. Otherwise, it returns the existing partition id.
UpdateControllerPartitionHeartbeat(ctx context.Context, partitionId string) (string, error)
DeleteControllerPartition(ctx context.Context, id string) error
RebalanceAllControllerPartitions(ctx context.Context) error
RebalanceInactiveControllerPartitions(ctx context.Context) error
CreateSchedulerPartition(ctx context.Context) (string, error)
UpdateSchedulerPartitionHeartbeat(ctx context.Context, partitionId string) (string, error)
DeleteSchedulerPartition(ctx context.Context, id string) error
RebalanceAllSchedulerPartitions(ctx context.Context) error
RebalanceInactiveSchedulerPartitions(ctx context.Context) error
CreateTenantWorkerPartition(ctx context.Context) (string, error)
UpdateWorkerPartitionHeartbeat(ctx context.Context, partitionId string) (string, error)
DeleteTenantWorkerPartition(ctx context.Context, id string) error
RebalanceAllTenantWorkerPartitions(ctx context.Context) error
RebalanceInactiveTenantWorkerPartitions(ctx context.Context) error
// GetTenantByID returns the tenant with the given id
GetTenantByID(ctx context.Context, tenantId string) (*dbsqlc.Tenant, error)
}
type TenantInviteRepository ¶
type TenantInviteRepository interface {
// CreateTenantInvite creates a new tenant invite with the given options
CreateTenantInvite(ctx context.Context, tenantId string, opts *CreateTenantInviteOpts) (*dbsqlc.TenantInviteLink, error)
// GetTenantInvite returns the tenant invite with the given id
GetTenantInvite(ctx context.Context, id string) (*dbsqlc.TenantInviteLink, error)
// ListTenantInvitesByEmail returns the list of tenant invites for the given invitee email for invites
// which are not expired
ListTenantInvitesByEmail(ctx context.Context, email string) ([]*dbsqlc.ListTenantInvitesByEmailRow, error)
// ListTenantInvitesByTenantId returns the list of tenant invites for the given tenant id
ListTenantInvitesByTenantId(ctx context.Context, tenantId string, opts *ListTenantInvitesOpts) ([]*dbsqlc.TenantInviteLink, error)
// UpdateTenantInvite updates the tenant invite with the given id
UpdateTenantInvite(ctx context.Context, id string, opts *UpdateTenantInviteOpts) (*dbsqlc.TenantInviteLink, error)
// DeleteTenantInvite deletes the tenant invite with the given id
DeleteTenantInvite(ctx context.Context, id string) error
}
type TenantLimitConfig ¶
type TenantLimitConfig struct {
EnforceLimits bool
}
type TenantLimitRepository ¶
type TenantLimitRepository interface {
GetLimits(ctx context.Context, tenantId string) ([]*dbsqlc.TenantResourceLimit, error)
// CanCreateWorkflowRun checks if the tenant can create a resource
CanCreate(ctx context.Context, resource dbsqlc.LimitResource, tenantId string, numberOfResources int32) (bool, int, error)
// MeterWorkflowRun increments the tenant's resource count
Meter(ctx context.Context, resource dbsqlc.LimitResource, tenantId string, numberOfResources int32) (*dbsqlc.TenantResourceLimit, error)
// Create new Tenant Resource Limits for a tenant
SelectOrInsertTenantLimits(ctx context.Context, tenantId string, plan *string) error
// UpsertTenantLimits updates or inserts new tenant limits
UpsertTenantLimits(ctx context.Context, tenantId string, plan *string) error
// Resolve all tenant resource limits
ResolveAllTenantResourceLimits(ctx context.Context) error
// SetPlanLimitMap sets the plan limit map
SetPlanLimitMap(planLimitMap PlanLimitMap) error
DefaultLimits() []Limit
}
type TenantScopedCallback ¶
type TickerEngineRepository ¶
type TickerEngineRepository interface {
// CreateNewTicker creates a new ticker.
CreateNewTicker(ctx context.Context, opts *CreateTickerOpts) (*dbsqlc.Ticker, error)
// UpdateTicker updates a ticker.
UpdateTicker(ctx context.Context, tickerId string, opts *UpdateTickerOpts) (*dbsqlc.Ticker, error)
// ListTickers lists tickers.
ListTickers(ctx context.Context, opts *ListTickerOpts) ([]*dbsqlc.Ticker, error)
// DeactivateTicker deletes a ticker.
DeactivateTicker(ctx context.Context, tickerId string) error
// PollJobRuns looks for get group key runs who are close to past their timeoutAt value and are in a running state
PollGetGroupKeyRuns(ctx context.Context, tickerId string) ([]*dbsqlc.GetGroupKeyRun, error)
// PollCronSchedules returns all cron schedules which should be managed by the ticker
PollCronSchedules(ctx context.Context, tickerId string) ([]*dbsqlc.PollCronSchedulesRow, error)
PollScheduledWorkflows(ctx context.Context, tickerId string) ([]*dbsqlc.PollScheduledWorkflowsRow, error)
PollTenantAlerts(ctx context.Context, tickerId string) ([]*dbsqlc.PollTenantAlertsRow, error)
PollExpiringTokens(ctx context.Context) ([]*dbsqlc.PollExpiringTokensRow, error)
PollTenantResourceLimitAlerts(ctx context.Context) ([]*dbsqlc.TenantResourceLimitAlert, error)
PollUnresolvedFailedStepRuns(ctx context.Context) ([]*dbsqlc.PollUnresolvedFailedStepRunsRow, error)
}
type UnscopedCallback ¶
func (UnscopedCallback[T]) Do ¶
func (c UnscopedCallback[T]) Do(l *zerolog.Logger, v T)
type UpdateDispatcherOpts ¶
type UpdateSessionOpts ¶
type UpdateTenantAlertGroupOpts ¶
type UpdateTenantAlertGroupOpts struct {
Emails []string `validate:"required,dive,email,max=255"`
}
type UpdateTenantInviteOpts ¶
type UpdateTenantMemberOpts ¶
type UpdateTenantMemberOpts struct {
Role *string `validate:"omitempty,oneof=OWNER ADMIN MEMBER"`
}
type UpdateTenantOpts ¶
type UpdateTenantOpts struct {
Name *string
AnalyticsOptOut *bool `validate:"omitempty"`
AlertMemberEmails *bool `validate:"omitempty"`
Version *dbsqlc.NullTenantMajorEngineVersion `validate:"omitempty"`
}
type UpdateTickerOpts ¶
type UpdateUserOpts ¶
type UpdateWorkerOpts ¶
type UpdateWorkerOpts struct {
// The id of the dispatcher
DispatcherId *string `validate:"omitempty,uuid"`
// When the last worker heartbeat was
LastHeartbeatAt *time.Time
// If the worker is active and accepting new runs
IsActive *bool
// A list of actions this worker can run
Actions []string `validate:"dive,actionId"`
}
type UpdateWorkflowOpts ¶
type UpdateWorkflowOpts struct {
// (optional) is paused -- if true, the workflow will not be scheduled
IsPaused *bool
}
type UpsertRateLimitOpts ¶
type UpsertSlackWebhookOpts ¶
type UpsertWorkerLabelOpts ¶
type UpsertWorkflowDeploymentConfigOpts ¶
type UpsertWorkflowDeploymentConfigOpts struct {
// (required) the github app installation id
GithubAppInstallationId string `validate:"required,uuid"`
// (required) the github repository name
GitRepoName string `validate:"required"`
// (required) the github repository owner
GitRepoOwner string `validate:"required"`
// (required) the github repository branch
GitRepoBranch string `validate:"required"`
}
type UserRepository ¶
type UserRepository interface {
RegisterCreateCallback(callback UnscopedCallback[*dbsqlc.User])
// GetUserByID returns the user with the given id
GetUserByID(ctx context.Context, id string) (*dbsqlc.User, error)
// GetUserByEmail returns the user with the given email
GetUserByEmail(ctx context.Context, email string) (*dbsqlc.User, error)
// GetUserPassword returns the user password with the given id
GetUserPassword(ctx context.Context, id string) (*dbsqlc.UserPassword, error)
// CreateUser creates a new user with the given options
CreateUser(ctx context.Context, opts *CreateUserOpts) (*dbsqlc.User, error)
// UpdateUser updates the user with the given email
UpdateUser(ctx context.Context, id string, opts *UpdateUserOpts) (*dbsqlc.User, error)
// ListTenantMemberships returns the list of tenant memberships for the given user
ListTenantMemberships(ctx context.Context, userId string) ([]*dbsqlc.PopulateTenantMembersRow, error)
}
type UserSessionRepository ¶
type UserSessionRepository interface {
Create(ctx context.Context, opts *CreateSessionOpts) (*dbsqlc.UserSession, error)
Update(ctx context.Context, sessionId string, opts *UpdateSessionOpts) (*dbsqlc.UserSession, error)
Delete(ctx context.Context, sessionId string) (*dbsqlc.UserSession, error)
GetById(ctx context.Context, sessionId string) (*dbsqlc.UserSession, error)
}
UserSessionRepository represents the set of queries on the UserSession model
type WebhookWorkerEngineRepository ¶
type WebhookWorkerEngineRepository interface {
// ListWebhookWorkersByPartitionId returns the list of webhook workers for a worker partition
ListWebhookWorkersByPartitionId(ctx context.Context, partitionId string) ([]*dbsqlc.WebhookWorker, error)
// ListActiveWebhookWorkers returns the list of active webhook workers for the given tenant
ListActiveWebhookWorkers(ctx context.Context, tenantId string) ([]*dbsqlc.WebhookWorker, error)
// ListWebhookWorkerRequests returns the list of webhook worker requests for the given webhook worker id
ListWebhookWorkerRequests(ctx context.Context, webhookWorkerId string) ([]*dbsqlc.WebhookWorkerRequest, error)
// InsertWebhookWorkerRequest inserts a new webhook worker request with the given options
InsertWebhookWorkerRequest(ctx context.Context, webhookWorkerId string, method string, statusCode int32) error
// CreateWebhookWorker creates a new webhook worker with the given options
CreateWebhookWorker(ctx context.Context, opts *CreateWebhookWorkerOpts) (*dbsqlc.WebhookWorker, error)
// UpdateWebhookWorkerToken updates a webhook worker with the given id and tenant id
UpdateWebhookWorkerToken(ctx context.Context, id string, tenantId string, opts *UpdateWebhookWorkerTokenOpts) (*dbsqlc.WebhookWorker, error)
// SoftDeleteWebhookWorker flags a webhook worker for delete with the given id and tenant id
SoftDeleteWebhookWorker(ctx context.Context, id string, tenantId string) error
// HardDeleteWebhookWorker deletes a webhook worker with the given id and tenant id
HardDeleteWebhookWorker(ctx context.Context, id string, tenantId string) error
}
type WebhookWorkerRepository ¶
type WorkerAPIRepository ¶
type WorkerAPIRepository interface {
// ListWorkers lists workers for the tenant
ListWorkers(tenantId string, opts *ListWorkersOpts) ([]*dbsqlc.ListWorkersWithSlotCountRow, error)
// ListRecentWorkerStepRuns lists recent step runs for a given worker
ListWorkerState(tenantId, workerId string, maxRuns int) ([]*dbsqlc.ListSemaphoreSlotsWithStateForWorkerRow, []*dbsqlc.GetStepRunForEngineRow, error)
// GetWorkerActionsByWorkerId returns a list of actions for a worker
GetWorkerActionsByWorkerId(tenantid, workerId string) ([]pgtype.Text, error)
// GetWorkerById returns a worker by its id.
GetWorkerById(workerId string) (*dbsqlc.GetWorkerByIdRow, error)
// ListWorkerLabels returns a list of labels config for a worker
ListWorkerLabels(tenantId, workerId string) ([]*dbsqlc.ListWorkerLabelsRow, error)
// UpdateWorker updates a worker for a given tenant.
UpdateWorker(tenantId string, workerId string, opts ApiUpdateWorkerOpts) (*dbsqlc.Worker, error)
}
type WorkerEngineRepository ¶
type WorkerEngineRepository interface {
// CreateNewWorker creates a new worker for a given tenant.
CreateNewWorker(ctx context.Context, tenantId string, opts *CreateWorkerOpts) (*dbsqlc.Worker, error)
// UpdateWorker updates a worker for a given tenant.
UpdateWorker(ctx context.Context, tenantId, workerId string, opts *UpdateWorkerOpts) (*dbsqlc.Worker, error)
// UpdateWorker updates a worker in the repository.
// It will only update the worker if there is no lock on the worker, else it will skip.
UpdateWorkerHeartbeat(ctx context.Context, tenantId, workerId string, lastHeartbeatAt time.Time) error
// DeleteWorker removes the worker from the database
DeleteWorker(ctx context.Context, tenantId, workerId string) error
// UpdateWorkersByWebhookId removes the worker from the database
UpdateWorkersByWebhookId(ctx context.Context, opts dbsqlc.UpdateWorkersByWebhookIdParams) error
GetWorkerForEngine(ctx context.Context, tenantId, workerId string) (*dbsqlc.GetWorkerForEngineRow, error)
UpdateWorkerActiveStatus(ctx context.Context, tenantId, workerId string, isActive bool, timestamp time.Time) (*dbsqlc.Worker, error)
UpsertWorkerLabels(ctx context.Context, workerId pgtype.UUID, opts []UpsertWorkerLabelOpts) ([]*dbsqlc.WorkerLabel, error)
DeleteOldWorkers(ctx context.Context, tenantId string, lastHeartbeatBefore time.Time) (bool, error)
DeleteOldWorkerEvents(ctx context.Context, tenantId string, lastHeartbeatAfter time.Time) error
GetDispatcherIdsForWorkers(ctx context.Context, tenantId string, workerIds []string) (map[string][]string, error)
}
type WorkflowAPIRepository ¶
type WorkflowAPIRepository interface {
// ListWorkflows returns all workflows for a given tenant.
ListWorkflows(tenantId string, opts *ListWorkflowsOpts) (*ListWorkflowsResult, error)
// GetWorkflowById returns a workflow by its name. It will return db.ErrNotFound if the workflow does not exist.
GetWorkflowById(context context.Context, workflowId string) (*dbsqlc.GetWorkflowByIdRow, error)
// GetWorkflowVersionById returns a workflow version by its id. It will return db.ErrNotFound if the workflow
// version does not exist.
GetWorkflowVersionById(tenantId, workflowVersionId string) (*dbsqlc.GetWorkflowVersionByIdRow,
[]*dbsqlc.WorkflowTriggerCronRef,
[]*dbsqlc.WorkflowTriggerEventRef,
[]*dbsqlc.WorkflowTriggerScheduledRef,
error)
// DeleteWorkflow deletes a workflow for a given tenant.
DeleteWorkflow(ctx context.Context, tenantId, workflowId string) (*dbsqlc.Workflow, error)
// GetWorkflowVersionMetrics returns the metrics for a given workflow version.
GetWorkflowMetrics(tenantId, workflowId string, opts *GetWorkflowMetricsOpts) (*WorkflowMetrics, error)
// UpdateWorkflow updates a workflow for a given tenant.
UpdateWorkflow(ctx context.Context, tenantId, workflowId string, opts *UpdateWorkflowOpts) (*dbsqlc.Workflow, error)
// GetWorkflowWorkerCount returns the number of workers for a given workflow.
GetWorkflowWorkerCount(tenantId, workflowId string) (int, int, error)
// CreateCronWorkflow creates a cron trigger
CreateCronWorkflow(ctx context.Context, tenantId string, opts *CreateCronWorkflowTriggerOpts) (*dbsqlc.ListCronWorkflowsRow, error)
// List ScheduledWorkflows lists workflows by scheduled trigger
ListCronWorkflows(ctx context.Context, tenantId string, opts *ListCronWorkflowsOpts) ([]*dbsqlc.ListCronWorkflowsRow, int64, error)
// GetCronWorkflow gets a cron workflow run
GetCronWorkflow(ctx context.Context, tenantId, cronWorkflowId string) (*dbsqlc.ListCronWorkflowsRow, error)
// DeleteCronWorkflow deletes a cron workflow run
DeleteCronWorkflow(ctx context.Context, tenantId, id string) error
// CreateScheduledWorkflow creates a scheduled workflow run
CreateScheduledWorkflow(ctx context.Context, tenantId string, opts *CreateScheduledWorkflowRunForWorkflowOpts) (*dbsqlc.ListScheduledWorkflowsRow, error)
}
type WorkflowEngineRepository ¶
type WorkflowEngineRepository interface {
// CreateNewWorkflow creates a new workflow for a given tenant. It will create the parent
// workflow based on the version's name.
CreateNewWorkflow(ctx context.Context, tenantId string, opts *CreateWorkflowVersionOpts) (*dbsqlc.GetWorkflowVersionForEngineRow, error)
// CreateWorkflowVersion creates a new workflow version for a given tenant. This will fail if there is
// not a parent workflow with the same name already in the database.
CreateWorkflowVersion(ctx context.Context, tenantId string, opts *CreateWorkflowVersionOpts, oldWorkflowVersion *dbsqlc.GetWorkflowVersionForEngineRow) (*dbsqlc.GetWorkflowVersionForEngineRow, error)
// CreateSchedules creates schedules for a given workflow version.
CreateSchedules(ctx context.Context, tenantId, workflowVersionId string, opts *CreateWorkflowSchedulesOpts) ([]*dbsqlc.WorkflowTriggerScheduledRef, error)
GetLatestWorkflowVersion(ctx context.Context, tenantId, workflowId string) (*dbsqlc.GetWorkflowVersionForEngineRow, error)
GetLatestWorkflowVersions(ctx context.Context, tenantId string, workflowIds []string) ([]*dbsqlc.GetWorkflowVersionForEngineRow, error)
// GetWorkflowByName returns a workflow by its name. It will return db.ErrNotFound if the workflow does not exist.
GetWorkflowByName(ctx context.Context, tenantId, workflowName string) (*dbsqlc.Workflow, error)
// GetWorkflowsByName returns all workflows by their name. It will return db.ErrNotFound if the workflow does not exist.
GetWorkflowsByNames(ctx context.Context, tenantId string, workflowNames []string) ([]*dbsqlc.Workflow, error)
// ListWorkflowsForEvent returns the latest workflow versions for a given tenant that are triggered by the
// given event.
ListWorkflowsForEvent(ctx context.Context, tenantId, eventKey string) ([]*dbsqlc.GetWorkflowVersionForEngineRow, error)
// GetWorkflowVersionById returns a workflow version by its id. It will return db.ErrNotFound if the workflow
// version does not exist.
GetWorkflowVersionById(ctx context.Context, tenantId, workflowVersionId string) (*dbsqlc.GetWorkflowVersionForEngineRow, error)
}
type WorkflowMetrics ¶
type WorkflowRunAPIRepository ¶
type WorkflowRunAPIRepository interface {
RegisterCreateCallback(callback TenantScopedCallback[*dbsqlc.WorkflowRun])
// ListWorkflowRuns returns workflow runs for a given workflow version id.
ListWorkflowRuns(ctx context.Context, tenantId string, opts *ListWorkflowRunsOpts) (*ListWorkflowRunsResult, error)
// Counts by status
WorkflowRunMetricsCount(ctx context.Context, tenantId string, opts *WorkflowRunsMetricsOpts) (*dbsqlc.WorkflowRunsMetricsCountRow, error)
// List ScheduledWorkflows lists workflows by scheduled trigger
ListScheduledWorkflows(ctx context.Context, tenantId string, opts *ListScheduledWorkflowsOpts) ([]*dbsqlc.ListScheduledWorkflowsRow, int64, error)
// DeleteScheduledWorkflow deletes a scheduled workflow run
DeleteScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string) error
// GetScheduledWorkflow gets a scheduled workflow run
GetScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string) (*dbsqlc.ListScheduledWorkflowsRow, error)
// UpdateScheduledWorkflow updates a scheduled workflow run
UpdateScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string, triggerAt time.Time) error
// CreateNewWorkflowRun creates a new workflow run for a workflow version.
CreateNewWorkflowRun(ctx context.Context, tenantId string, opts *CreateWorkflowRunOpts) (*dbsqlc.WorkflowRun, error)
// GetWorkflowRunById returns a workflow run by id.
GetWorkflowRunById(ctx context.Context, tenantId, runId string) (*dbsqlc.GetWorkflowRunByIdRow, error)
// GetWorkflowRunById returns a workflow run by id.
GetWorkflowRunByIds(ctx context.Context, tenantId string, runIds []string) ([]*dbsqlc.GetWorkflowRunByIdsRow, error)
GetStepsForJobs(ctx context.Context, tenantId string, jobIds []string) ([]*dbsqlc.GetStepsForJobsRow, error)
GetStepRunsForJobRuns(ctx context.Context, tenantId string, jobRunIds []string) ([]*StepRunForJobRun, error)
GetWorkflowRunShape(ctx context.Context, workflowVersionId uuid.UUID) ([]*dbsqlc.GetWorkflowRunShapeRow, error)
}
type WorkflowRunEngineRepository ¶
type WorkflowRunEngineRepository interface {
RegisterCreateCallback(callback TenantScopedCallback[*dbsqlc.WorkflowRun])
RegisterQueuedCallback(callback TenantScopedCallback[pgtype.UUID])
// ListWorkflowRuns returns workflow runs for a given workflow version id.
ListWorkflowRuns(ctx context.Context, tenantId string, opts *ListWorkflowRunsOpts) (*ListWorkflowRunsResult, error)
GetChildWorkflowRun(ctx context.Context, parentId, parentStepRunId string, childIndex int, childkey *string) (*dbsqlc.WorkflowRun, error)
GetChildWorkflowRuns(ctx context.Context, childWorkflowRuns []ChildWorkflowRun) ([]*dbsqlc.WorkflowRun, error)
GetScheduledChildWorkflowRun(ctx context.Context, parentId, parentStepRunId string, childIndex int, childkey *string) (*dbsqlc.WorkflowTriggerScheduledRef, error)
PopWorkflowRunsCancelInProgress(ctx context.Context, tenantId, workflowVersionId string, maxRuns int) (toCancel []*dbsqlc.WorkflowRun, toStart []*dbsqlc.WorkflowRun, err error)
PopWorkflowRunsCancelNewest(ctx context.Context, tenantId, workflowVersionId string, maxRuns int) (toCancel []*dbsqlc.WorkflowRun, toStart []*dbsqlc.WorkflowRun, err error)
PopWorkflowRunsRoundRobin(ctx context.Context, tenantId, workflowVersionId string, maxRuns int) ([]*dbsqlc.WorkflowRun, []*dbsqlc.GetStepRunForEngineRow, error)
// CreateNewWorkflowRun creates a new workflow run for a workflow version.
CreateNewWorkflowRun(ctx context.Context, tenantId string, opts *CreateWorkflowRunOpts) (*dbsqlc.WorkflowRun, error)
// CreateNewWorkflowRuns creates new workflow runs in bulk
CreateNewWorkflowRuns(ctx context.Context, tenantId string, opts []*CreateWorkflowRunOpts) ([]*dbsqlc.WorkflowRun, error)
CreateDeDupeKey(ctx context.Context, tenantId, workflowRunId, worrkflowVersionId, dedupeValue string) error
GetWorkflowRunInputData(tenantId, workflowRunId string) (map[string]interface{}, error)
ProcessWorkflowRunUpdates(ctx context.Context, tenantId string) (bool, error)
UpdateWorkflowRunFromGroupKeyEval(ctx context.Context, tenantId, workflowRunId string, opts *UpdateWorkflowRunFromGroupKeyEvalOpts) error
// GetWorkflowRunById returns a workflow run by id.
GetWorkflowRunById(ctx context.Context, tenantId, runId string) (*dbsqlc.GetWorkflowRunRow, error)
DeleteScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string) error
// TODO maybe we don't need this?
GetWorkflowRunByIds(ctx context.Context, tenantId string, runId []string) ([]*dbsqlc.GetWorkflowRunRow, error)
QueuePausedWorkflowRun(ctx context.Context, tenantId, workflowId, workflowRunId string) error
QueueWorkflowRunJobs(ctx context.Context, tenant string, workflowRun string) ([]*dbsqlc.GetStepRunForEngineRow, error)
ProcessUnpausedWorkflowRuns(ctx context.Context, tenantId string) ([]*dbsqlc.GetWorkflowRunRow, bool, error)
GetWorkflowRunAdditionalMeta(ctx context.Context, tenantId, workflowRunId string) (*dbsqlc.GetWorkflowRunAdditionalMetaRow, error)
ReplayWorkflowRun(ctx context.Context, tenantId, workflowRunId string) (*dbsqlc.GetWorkflowRunRow, error)
ListActiveQueuedWorkflowVersions(ctx context.Context, tenantId string) ([]*dbsqlc.ListActiveQueuedWorkflowVersionsRow, error)
// DeleteExpiredWorkflowRuns deletes workflow runs that were created before the given time. It returns the number of deleted runs
// and the number of non-deleted runs that match the conditions.
SoftDeleteExpiredWorkflowRuns(ctx context.Context, tenantId string, statuses []dbsqlc.WorkflowRunStatus, before time.Time) (bool, error)
GetUpstreamErrorsForOnFailureStep(ctx context.Context, onFailureStepRunId string) ([]*dbsqlc.GetUpstreamErrorsForOnFailureStepRow, error)
}
type WorkflowRunsMetricsOpts ¶
type WorkflowRunsMetricsOpts struct {
// (optional) the workflow id
WorkflowId *string `validate:"omitempty,uuid"`
// (optional) the workflow version id
WorkflowVersionId *string `validate:"omitempty,uuid"`
// (optional) the parent workflow run id
ParentId *string `validate:"omitempty,uuid"`
// (optional) the parent step run id
ParentStepRunId *string `validate:"omitempty,uuid"`
// (optional) the event id that triggered the workflow run
EventId *string `validate:"omitempty,uuid"`
// (optional) exact metadata to filter by
AdditionalMetadata map[string]interface{} `validate:"omitempty"`
// (optional) the time the workflow run was created before
CreatedBefore *time.Time `validate:"omitempty"`
// (optional) the time the workflow run was created after
CreatedAfter *time.Time `validate:"omitempty"`
}
Source Files
¶
- api_token.go
- dispatcher.go
- event.go
- get_group_key_run.go
- health.go
- job_run.go
- logs.go
- mq.go
- rate_limit.go
- repository.go
- scheduler.go
- slack.go
- sns.go
- step.go
- step_run.go
- stream_event.go
- tenant.go
- tenant_alerting.go
- tenant_invite.go
- tenant_limit.go
- ticker.go
- user.go
- user_session.go
- webhook_worker.go
- webhook_worker_api.go
- worker.go
- workflow.go
- workflow_run.go
Click to show internal directories.
Click to hide internal directories.