repository

package
v0.74.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2026 License: MIT Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const MAX_BATCH_SIZE_BYTES = 1.5 * 1024 * 1024 * 1024 // 1.5 GB
View Source
const MAX_PARTITIONS_TO_OFFLOAD = 14 // two weeks
View Source
const MAX_TENANT_RATE_LIMITS = 10000
View Source
const NUM_PARTITIONS = 4

TODO: make this dynamic for the instance

View Source
const PARENT_STRATEGY_LOCK_OFFSET = 1000000000000 // 1 trillion

Variables

View Source
var ErrDagParentNotFound = errors.New("dag parent not found")
View Source
var ErrResourceExhausted = fmt.Errorf("resource exhausted")

Functions

func BoolPtr

func BoolPtr(b bool) *bool

func HashPassword

func HashPassword(pw string) (*string, error)

func OLAPPayloadOffloadMessage added in v0.74.3

func OLAPPayloadOffloadMessage(tenantId string, payloads []OLAPPayloadToOffload) (*msgqueue.Message, error)

func RunPostCommit

func RunPostCommit[T any](l *zerolog.Logger, tenantId string, v T, opts []CallbackOptFunc[T])

func RunPreCommit

func RunPreCommit[T any](l *zerolog.Logger, tenantId string, v T, opts []CallbackOptFunc[T])

func StringPtr

func StringPtr(s string) *string

func ValidateJSONB

func ValidateJSONB(jsonb []byte, fieldName string) error

func VerifyPassword

func VerifyPassword(hashedPW, candidate string) (bool, error)

Types

type APIKeyAuthCredentials added in v0.74.3

type APIKeyAuthCredentials struct {
	HeaderName   string `json:"header_name" validate:"required"`
	EncryptedKey []byte `json:"key" validate:"required"`
}

type APITokenGenerator

type APITokenGenerator func(ctx context.Context, tenantId, name string, internal bool, expires *time.Time) (string, error)

type APITokenRepository

type APITokenRepository interface {
	CreateAPIToken(ctx context.Context, opts *CreateAPITokenOpts) (*sqlcv1.APIToken, error)
	GetAPITokenById(ctx context.Context, id string) (*sqlcv1.APIToken, error)
	ListAPITokensByTenant(ctx context.Context, tenantId string) ([]*sqlcv1.APIToken, error)
	RevokeAPIToken(ctx context.Context, id string) error
	DeleteAPIToken(ctx context.Context, tenantId, id string) error
}

type AssignResults

type AssignResults struct {
	Assigned           []*AssignedItem
	Unassigned         []*sqlcv1.V1QueueItem
	SchedulingTimedOut []*sqlcv1.V1QueueItem
	RateLimited        []*RateLimitResult
	RateLimitedToMove  []*RateLimitResult
}

type AssignedItem

type AssignedItem struct {
	WorkerId pgtype.UUID

	QueueItem *sqlcv1.V1QueueItem
}

type AssignmentRepository

type AssignmentRepository interface {
	ListActionsForWorkers(ctx context.Context, tenantId pgtype.UUID, workerIds []pgtype.UUID) ([]*sqlcv1.ListActionsForWorkersRow, error)
	ListAvailableSlotsForWorkers(ctx context.Context, tenantId pgtype.UUID, params sqlcv1.ListAvailableSlotsForWorkersParams) ([]*sqlcv1.ListAvailableSlotsForWorkersRow, error)
}

type AuthConfig added in v0.74.3

type AuthConfig struct {
	Type       sqlcv1.V1IncomingWebhookAuthType `json:"type" validate:"required"`
	BasicAuth  *BasicAuthCredentials            `json:"basic_auth,omitempty"`
	APIKeyAuth *APIKeyAuthCredentials           `json:"api_key_auth,omitempty"`
	HMACAuth   *HMACAuthCredentials             `json:"hmac_auth,omitempty"`
}

func (*AuthConfig) Validate added in v0.74.3

func (ac *AuthConfig) Validate() error

type BasicAuthCredentials added in v0.74.3

type BasicAuthCredentials struct {
	Username          string `json:"username" validate:"required"`
	EncryptedPassword []byte `json:"password" validate:"required"`
}

type BulkCutOverOLAPPayload added in v0.74.3

type BulkCutOverOLAPPayload struct {
	TenantID            pgtype.UUID
	InsertedAt          pgtype.Timestamptz
	ExternalId          pgtype.UUID
	ExternalLocationKey ExternalPayloadLocationKey
}

type BulkCutOverPayload added in v0.74.3

type BulkCutOverPayload struct {
	TenantID            pgtype.UUID
	Id                  int64
	InsertedAt          pgtype.Timestamptz
	ExternalId          pgtype.UUID
	Type                sqlcv1.V1PayloadType
	ExternalLocationKey ExternalPayloadLocationKey
}

type CELEvaluationFailure added in v0.74.3

type CELEvaluationFailure struct {
	Source       sqlcv1.V1CelEvaluationFailureSource `json:"source"`
	ErrorMessage string                              `json:"error_message"`
}

type CallbackOptFunc

type CallbackOptFunc[T any] func(*TenantCallbackOpts[T])

func WithPostCommitCallback

func WithPostCommitCallback[T any](cb TenantScopedCallback[T]) CallbackOptFunc[T]

func WithPreCommitCallback

func WithPreCommitCallback[T any](cb TenantScopedCallback[T]) CallbackOptFunc[T]

type CandidateEventMatch added in v0.74.3

type CandidateEventMatch struct {
	// A UUID for the event
	ID string

	// A timestamp for the event
	EventTimestamp time.Time

	// Key for the event
	Key string

	// Resource hint for the event
	ResourceHint *string

	// Data for the event
	Data []byte
}

type ChildWorkflowSignalCreatedData added in v0.74.3

type ChildWorkflowSignalCreatedData struct {
	// The external id of the target child task
	ChildExternalId string `json:"external_id"`

	// The external id of the parent task
	ParentExternalId string `json:"parent_external_id"`

	// The index of the child task
	ChildIndex int64 `json:"child_index"`

	// The key of the child task
	ChildKey *string `json:"child_key"`
}

func (*ChildWorkflowSignalCreatedData) Bytes added in v0.74.3

func (c *ChildWorkflowSignalCreatedData) Bytes() []byte

type CompleteTaskOpts added in v0.74.3

type CompleteTaskOpts struct {
	*TaskIdInsertedAtRetryCount

	// (required) the output bytes for the task
	Output []byte
}

type ConcurrencyRepository added in v0.74.3

type ConcurrencyRepository interface {
	// Checks whether the concurrency strategy is active, and if not, sets is_active=False
	UpdateConcurrencyStrategyIsActive(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) error

	RunConcurrencyStrategy(ctx context.Context, tenantId pgtype.UUID, strategy *sqlcv1.V1StepConcurrency) (*RunConcurrencyResult, error)
}

type ConcurrencyRepositoryImpl added in v0.74.3

type ConcurrencyRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (ConcurrencyRepositoryImpl) DesiredWorkerId added in v0.74.3

func (s ConcurrencyRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow added in v0.74.3

func (s ConcurrencyRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*ConcurrencyRepositoryImpl) RunConcurrencyStrategy added in v0.74.3

func (c *ConcurrencyRepositoryImpl) RunConcurrencyStrategy(
	ctx context.Context,
	tenantId pgtype.UUID,
	strategy *sqlcv1.V1StepConcurrency,
) (res *RunConcurrencyResult, err error)

func (ConcurrencyRepositoryImpl) ToV1StepRunData added in v0.74.3

func (s ConcurrencyRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

func (*ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive added in v0.74.3

func (c *ConcurrencyRepositoryImpl) UpdateConcurrencyStrategyIsActive(
	ctx context.Context,
	tenantId pgtype.UUID,
	strategy *sqlcv1.V1StepConcurrency,
) error

type ConcurrencyStat added in v0.74.3

type ConcurrencyStat struct {
	Expression string           `json:"expression"`
	Type       string           `json:"type"`
	Keys       map[string]int64 `json:"keys"`
}

ConcurrencyStat represents concurrency information for a task

type CreateAPITokenOpts

type CreateAPITokenOpts struct {
	// The id of the token
	ID string `validate:"required,uuid"`

	// When the token expires
	ExpiresAt time.Time

	// (optional) A tenant ID for this API token
	TenantId *string `validate:"omitempty,uuid"`

	// (optional) A name for this API token
	Name *string `validate:"omitempty,max=255"`

	Internal bool
}

type CreateConcurrencyOpts added in v0.74.3

type CreateConcurrencyOpts struct {
	// (optional) the maximum number of concurrent workflow runs, default 1
	MaxRuns *int32

	// (optional) the strategy to use when the concurrency limit is reached, default CANCEL_IN_PROGRESS
	LimitStrategy *string `validate:"omitnil,oneof=CANCEL_IN_PROGRESS GROUP_ROUND_ROBIN CANCEL_NEWEST"`

	// (required) a concurrency expression for evaluating the concurrency key
	Expression string `validate:"celworkflowrunstr"`
}

type CreateCronWorkflowTriggerOpts

type CreateCronWorkflowTriggerOpts struct {
	// (required) the workflow id
	WorkflowId string `validate:"required,uuid"`

	// (required) the workflow name
	Name string `validate:"required"`

	Cron string `validate:"required,cron"`

	Input              map[string]interface{}
	AdditionalMetadata map[string]interface{}

	Priority *int32 `validate:"omitempty,min=1,max=3"`
}

type CreateDispatcherOpts

type CreateDispatcherOpts struct {
	ID string `validate:"required,uuid"`
}

type CreateExternalSignalConditionKind added in v0.74.3

type CreateExternalSignalConditionKind string
const (
	CreateExternalSignalConditionKindSLEEP     CreateExternalSignalConditionKind = "SLEEP"
	CreateExternalSignalConditionKindUSEREVENT CreateExternalSignalConditionKind = "USER_EVENT"
)

type CreateExternalSignalConditionOpt added in v0.74.3

type CreateExternalSignalConditionOpt struct {
	Kind CreateExternalSignalConditionKind `validate:"required, oneof=SLEEP USER_EVENT"`

	ReadableDataKey string `validate:"required"`

	OrGroupId string `validate:"required,uuid"`

	UserEventKey *string

	SleepFor *string `validate:"omitempty,duration"`

	Expression string
}

type CreateFilterOpts added in v0.74.3

type CreateFilterOpts struct {
	Workflowid    pgtype.UUID `json:"workflowid" validate:"required,uuid"`
	Scope         string      `json:"scope" validate:"required"`
	Expression    string      `json:"expression" validate:"required"`
	Payload       []byte      `json:"payload"`
	IsDeclarative bool        `json:"is_declarative"`
}

type CreateIncomingWebhookFailureLogOpts added in v0.74.3

type CreateIncomingWebhookFailureLogOpts struct {
	WebhookName string
	ErrorText   string
}

type CreateLogLineOpts

type CreateLogLineOpts struct {
	TaskId int64

	TaskInsertedAt pgtype.Timestamptz

	// (optional) The time when the log line was created.
	CreatedAt *time.Time

	// (required) The message of the log line.
	Message string `validate:"required,min=1,max=10000"`

	// (optional) The level of the log line.
	Level *string `validate:"omitnil,oneof=INFO ERROR WARN DEBUG"`

	// (optional) The metadata of the log line.
	Metadata []byte

	// The retry count of the log line.
	RetryCount int
}

type CreateMatchOpts added in v0.74.3

type CreateMatchOpts struct {
	Kind sqlcv1.V1MatchKind

	ExistingMatchData []byte

	Conditions []GroupMatchCondition

	TriggerDAGId *int64

	TriggerDAGInsertedAt pgtype.Timestamptz

	TriggerExternalId *string

	TriggerWorkflowRunId *string

	TriggerStepId *string

	TriggerStepIndex pgtype.Int8

	TriggerExistingTaskId *int64

	TriggerExistingTaskInsertedAt pgtype.Timestamptz

	TriggerParentTaskExternalId pgtype.UUID

	TriggerParentTaskId pgtype.Int8

	TriggerParentTaskInsertedAt pgtype.Timestamptz

	TriggerChildIndex pgtype.Int8

	TriggerChildKey pgtype.Text

	TriggerPriority pgtype.Int4

	SignalTaskId *int64

	SignalTaskInsertedAt pgtype.Timestamptz

	SignalExternalId *string

	SignalKey *string
}

type CreateSNSIntegrationOpts

type CreateSNSIntegrationOpts struct {
	TopicArn string `validate:"required,min=1,max=255"`
}

type CreateScheduledWorkflowRunForWorkflowOpts

type CreateScheduledWorkflowRunForWorkflowOpts struct {
	WorkflowId string `validate:"required,uuid"`

	ScheduledTrigger time.Time

	Input              []byte
	AdditionalMetadata []byte

	Priority *int32 `validate:"omitempty,min=1,max=3"`
}

type CreateSessionOpts

type CreateSessionOpts struct {
	ID string `validate:"required,uuid"`

	ExpiresAt time.Time `validate:"required"`

	// (optional) the user id, can be nil if session is unauthenticated
	UserId *string `validate:"omitempty,uuid"`

	Data []byte
}

type CreateStepMatchConditionOpt added in v0.74.3

type CreateStepMatchConditionOpt struct {
	// (required) the type of match condition for triggering the step
	MatchConditionKind string `validate:"required,oneof=PARENT_OVERRIDE USER_EVENT SLEEP"`

	// (required) the key for the event data when the workflow is triggered
	ReadableDataKey string `validate:"required"`

	// (required) the initial state for the task when the match condition is satisfied
	Action string `validate:"required,oneof=QUEUE CANCEL SKIP"`

	// (required) the or group id for the match condition
	// we ignore the JSON field here because it's randomly generated by the client, and we don't want to
	// publish a new workflow version just because this UUID is different. we use the `OrGroupHashIndex` instead.
	OrGroupId string `json:"-" validate:"required,uuid"`

	// NOTE: should not be set by the caller. This is populated by this package before creating the checksum.
	OrGroupIdIndex int32

	// (optional) the expression for the match condition
	Expression string `validate:"omitempty"`

	// (optional) the sleep duration for the match condition, only set if this is a SLEEP
	SleepDuration *string `validate:"omitempty,duration"`

	// (optional) the event key for the match condition, only set if this is a USER_EVENT
	EventKey *string `validate:"omitempty"`

	// (optional) if this is a PARENT_OVERRIDE condition, this will be set to the parent readable_id for
	// the parent whose trigger behavior we're overriding
	ParentReadableId *string `validate:"omitempty"`
}

type CreateStepOpts added in v0.74.3

type CreateStepOpts struct {
	// (required) the task name
	ReadableId string `validate:"hatchetName"`

	// (required) the task action id
	Action string `validate:"required,actionId"`

	// (optional) the task timeout
	Timeout *string `validate:"omitnil,duration"`

	// (optional) the task scheduling timeout
	ScheduleTimeout *string `validate:"omitnil,duration"`

	// (optional) the parents that this step depends on
	Parents []string `validate:"dive,hatchetName"`

	// (optional) the step retry max
	Retries *int `validate:"omitempty,min=0"`

	// (optional) rate limits for this step
	RateLimits []CreateWorkflowStepRateLimitOpts `validate:"dive"`

	// (optional) desired worker affinity state for this step
	DesiredWorkerLabels map[string]DesiredWorkerLabelOpts `validate:"omitempty"`

	// (optional) the step retry backoff factor
	RetryBackoffFactor *float64 `validate:"omitnil,min=1,max=1000"`

	// (optional) the step retry backoff max seconds (can't be greater than 86400)
	RetryBackoffMaxSeconds *int `validate:"omitnil,min=1,max=86400"`

	// (optional) a list of additional trigger conditions
	TriggerConditions []CreateStepMatchConditionOpt `validate:"omitempty,dive"`

	// (optional) the step concurrency options
	Concurrency []CreateConcurrencyOpts `json:"concurrency,omitempty" validator:"omitnil"`
}

type CreateTaskOpts added in v0.74.3

type CreateTaskOpts struct {
	// (required) the external id
	ExternalId string `validate:"required,uuid"`

	// (required) the workflow run id. note this may be the same as the external id if this is a
	// single-task workflow, otherwise it represents the external id of the DAG.
	WorkflowRunId string `validate:"required,uuid"`

	// (required) the step id
	StepId string `validate:"required,uuid"`

	// (required) the input bytes to the task
	Input *TaskInput

	FilterPayload []byte

	// (required) the step index for the task
	StepIndex int

	// (optional) the additional metadata for the task
	AdditionalMetadata []byte

	// (optional) the desired worker id
	DesiredWorkerId *string

	// (optional) the DAG id for the task
	DagId *int64

	// (optional) the DAG inserted at for the task
	DagInsertedAt pgtype.Timestamptz

	// (required) the initial state for the task
	InitialState sqlcv1.V1TaskInitialState

	// (optional) the parent task external id
	ParentTaskExternalId *string

	// (optional) the parent task id
	ParentTaskId *int64

	// (optional) the parent task inserted at
	ParentTaskInsertedAt *time.Time

	// (optional) The priority of a task, between 1 and 3
	Priority *int32

	// (optional) the child index for the task
	ChildIndex *int64

	// (optional) the child key for the task
	ChildKey *string
}

type CreateTenantAlertGroupOpts

type CreateTenantAlertGroupOpts struct {
	Emails []string `validate:"required,dive,email,max=255"`
}

type CreateTenantInviteOpts

type CreateTenantInviteOpts struct {
	// (required) the invitee email
	InviteeEmail string `validate:"required,email"`

	// (required) the inviter email
	InviterEmail string `validate:"required,email"`

	// (required) when the invite expires
	ExpiresAt time.Time `validate:"required,future"`

	// (required) the role of the invitee
	Role string `validate:"omitempty,oneof=OWNER ADMIN MEMBER"`

	MaxPending int `validate:"omitempty"`
}

type CreateTenantMemberOpts

type CreateTenantMemberOpts struct {
	Role   string `validate:"required,oneof=OWNER ADMIN MEMBER"`
	UserId string `validate:"required,uuid"`
}

type CreateTenantOpts

type CreateTenantOpts struct {
	// (required) the tenant name
	Name string `validate:"required"`

	// (required) the tenant slug
	Slug string `validate:"required,hatchetName"`

	// (optional) the tenant ID
	ID *string `validate:"omitempty,uuid"`

	// (optional) the tenant data retention period
	DataRetentionPeriod *string `validate:"omitempty,duration"`

	// (optional) the tenant engine version
	EngineVersion *sqlcv1.TenantMajorEngineVersion `validate:"omitempty"`

	// (optional) the tenant environment type
	Environment *string `validate:"omitempty,oneof=local development production"`

	// (optional) additional onboarding data
	OnboardingData map[string]interface{} `validate:"omitempty"`
}

type CreateTickerOpts

type CreateTickerOpts struct {
	ID string `validate:"required,uuid"`
}

type CreateUserOpts

type CreateUserOpts struct {
	Email         string `validate:"required,email"`
	EmailVerified *bool
	Name          *string

	// auth options
	Password *string    `validate:"omitempty,excluded_with=OAuth"`
	OAuth    *OAuthOpts `validate:"omitempty,excluded_with=Password"`
}

type CreateWebhookOpts added in v0.74.3

type CreateWebhookOpts struct {
	Tenantid           pgtype.UUID                        `json:"tenantid"`
	Sourcename         sqlcv1.V1IncomingWebhookSourceName `json:"sourcename"`
	Name               string                             `json:"name" validate:"required"`
	Eventkeyexpression string                             `json:"eventkeyexpression"`
	AuthConfig         AuthConfig                         `json:"auth_config,omitempty"`
}

type CreateWorkerOpts

type CreateWorkerOpts struct {
	// The id of the dispatcher
	DispatcherId string `validate:"required,uuid"`

	// The maximum number of runs this worker can run at a time
	MaxRuns *int `validate:"omitempty,gte=1"`

	// The name of the worker
	Name string `validate:"required,hatchetName"`

	// The name of the service
	Services []string `validate:"dive,hatchetName"`

	// A list of actions this worker can run
	Actions []string `validate:"dive,actionId"`

	// (optional) Runtime info for the worker
	RuntimeInfo *RuntimeInfo `validate:"omitempty"`
}

type CreateWorkflowStepRateLimitOpts

type CreateWorkflowStepRateLimitOpts struct {
	// (required) the rate limit key
	Key string `validate:"required"`

	// (optional) a CEL expression for the rate limit key
	KeyExpr *string `validate:"omitnil,celsteprunstr,required_without=Key"`

	// (optional) the rate limit units to consume
	Units *int `validate:"omitnil,required_without=UnitsExpr"`

	// (optional) a CEL expression for the rate limit units
	UnitsExpr *string `validate:"omitnil,celsteprunstr,required_without=Units"`

	// (optional) a CEL expression for a dynamic limit value for the rate limit
	LimitExpr *string `validate:"omitnil,celsteprunstr"`

	// (optional) the rate limit duration, defaults to MINUTE
	Duration *string `validate:"omitnil"`
}

type CreateWorkflowVersionOpts

type CreateWorkflowVersionOpts struct {
	// (required) the workflow name
	Name string `validate:"required,hatchetName"`

	// (optional) the workflow description
	Description *string `json:"description,omitempty"`

	// (optional) event triggers for the workflow
	EventTriggers []string

	// (optional) cron triggers for the workflow
	CronTriggers []string `validate:"dive,cron"`

	// (optional) the input bytes for the cron triggers
	CronInput []byte

	// (required) the tasks in the workflow
	Tasks []CreateStepOpts `validate:"required,min=1,dive"`

	OnFailure *CreateStepOpts `json:"onFailureJob,omitempty" validate:"omitempty"`

	// (optional) the workflow concurrency groups
	Concurrency []CreateConcurrencyOpts `json:"concurrency,omitempty" validator:"omitempty,dive"`

	// (optional) sticky strategy
	Sticky *string `validate:"omitempty,oneof=SOFT HARD"`

	DefaultPriority *int32 `validate:"omitempty,min=1,max=3"`

	DefaultFilters []types.DefaultFilter `json:"defaultFilters,omitempty" validate:"omitempty,dive"`
}

type CutoverBatchOutcome added in v0.74.3

type CutoverBatchOutcome struct {
	ShouldContinue bool
	NextPagination PaginationParams
}

type CutoverJobRunMetadata added in v0.74.3

type CutoverJobRunMetadata struct {
	ShouldRun      bool
	Pagination     PaginationParams
	PartitionDate  PartitionDate
	LeaseProcessId pgtype.UUID
}

type DAGWithData added in v0.74.3

type DAGWithData struct {
	*sqlcv1.V1Dag

	Input []byte

	AdditionalMetadata []byte

	ParentTaskExternalID *pgtype.UUID

	TotalTasks int
}

type DesiredWorkerLabelOpts

type DesiredWorkerLabelOpts struct {
	// (required) the label key
	Key string `validate:"required"`

	// (required if StringValue is nil) the label integer value
	IntValue *int32 `validate:"omitnil,required_without=StrValue"`

	// (required if StrValue is nil) the label string value
	StrValue *string `validate:"omitnil,required_without=IntValue"`

	// (optional) if the label is required
	Required *bool `validate:"omitempty"`

	// (optional) the weight of the label for scheduling (default: 100)
	Weight *int32 `validate:"omitempty"`

	// (optional) the label comparator for scheduling (default: EQUAL)
	Comparator *string `validate:"omitempty,oneof=EQUAL NOT_EQUAL GREATER_THAN LESS_THAN GREATER_THAN_OR_EQUAL LESS_THAN_OR_EQUAL"`
}

type DispatcherRepository added in v0.74.3

type DispatcherRepository interface {
	// CreateNewDispatcher creates a new dispatcher for a given tenant.
	CreateNewDispatcher(ctx context.Context, opts *CreateDispatcherOpts) (*sqlcv1.Dispatcher, error)

	// UpdateDispatcher updates a dispatcher for a given tenant.
	UpdateDispatcher(ctx context.Context, dispatcherId string, opts *UpdateDispatcherOpts) (*sqlcv1.Dispatcher, error)

	Delete(ctx context.Context, dispatcherId string) error

	UpdateStaleDispatchers(ctx context.Context, onStale func(dispatcherId string, getValidDispatcherId func() string) error) error
}

type ErrNamesNotFound added in v0.74.3

type ErrNamesNotFound struct {
	Names []string
}

func (*ErrNamesNotFound) Error added in v0.74.3

func (e *ErrNamesNotFound) Error() string

type EventExternalIdFilterId added in v0.74.3

type EventExternalIdFilterId struct {
	ExternalId string
	FilterId   *string
}

type EventMatchResults added in v0.74.3

type EventMatchResults struct {
	// The list of tasks which were created from the matches
	CreatedTasks []*V1TaskWithPayload

	// The list of tasks which were replayed from the matches
	ReplayedTasks []*V1TaskWithPayload
}

type EventTriggerOpts added in v0.74.3

type EventTriggerOpts struct {
	ExternalId string

	Key string

	Data []byte

	AdditionalMetadata []byte

	Priority *int32

	Scope *string

	TriggeringWebhookName *string
}

type EventTriggersFromExternalId added in v0.74.3

type EventTriggersFromExternalId struct {
	RunID           int64              `json:"run_id"`
	RunInsertedAt   pgtype.Timestamptz `json:"run_inserted_at"`
	EventExternalId pgtype.UUID        `json:"event_external_id"`
	EventSeenAt     pgtype.Timestamptz `json:"event_seen_at"`
	FilterId        pgtype.UUID        `json:"filter_id"`
}

type EventType added in v0.74.3

type EventType string
const (
	EVENT_TYPE_REQUEUED_NO_WORKER   EventType = "REQUEUED_NO_WORKER"
	EVENT_TYPE_REQUEUED_RATE_LIMIT  EventType = "REQUEUED_RATE_LIMIT"
	EVENT_TYPE_SCHEDULING_TIMED_OUT EventType = "SCHEDULING_TIMED_OUT"
	EVENT_TYPE_ASSIGNED             EventType = "ASSIGNED"
	EVENT_TYPE_STARTED              EventType = "STARTED"
	EVENT_TYPE_FINISHED             EventType = "FINISHED"
	EVENT_TYPE_FAILED               EventType = "FAILED"
	EVENT_TYPE_RETRYING             EventType = "RETRYING"
	EVENT_TYPE_CANCELLED            EventType = "CANCELLED"
	EVENT_TYPE_TIMED_OUT            EventType = "TIMED_OUT"
	EVENT_TYPE_REASSIGNED           EventType = "REASSIGNED"
	EVENT_TYPE_SLOT_RELEASED        EventType = "SLOT_RELEASED"
	EVENT_TYPE_TIMEOUT_REFRESHED    EventType = "TIMEOUT_REFRESHED"
	EVENT_TYPE_RETRIED_BY_USER      EventType = "RETRIED_BY_USER"
	EVENT_TYPE_SENT_TO_WORKER       EventType = "SENT_TO_WORKER"
	EVENT_TYPE_RATE_LIMIT_ERROR     EventType = "RATE_LIMIT_ERROR"
	EVENT_TYPE_ACKNOWLEDGED         EventType = "ACKNOWLEDGED"
	EVENT_TYPE_CREATED              EventType = "CREATED"
	EVENT_TYPE_QUEUED               EventType = "QUEUED"
)

type EventWithPayload added in v0.74.3

type EventWithPayload struct {
	*ListEventsRow
	Payload []byte `json:"payload"`
}

type ExternalCreateSignalMatchOpts added in v0.74.3

type ExternalCreateSignalMatchOpts struct {
	Conditions []CreateExternalSignalConditionOpt `validate:"required,min=1,dive"`

	SignalTaskId int64 `validate:"required,gt=0"`

	SignalTaskInsertedAt pgtype.Timestamptz

	SignalExternalId string `validate:"required,uuid"`

	SignalKey string `validate:"required"`
}

type ExternalPayloadLocationKey added in v0.74.3

type ExternalPayloadLocationKey string

type ExternalStore added in v0.74.3

type ExternalStore interface {
	Store(ctx context.Context, payloads ...OffloadToExternalStoreOpts) (map[PayloadExternalId]ExternalPayloadLocationKey, error)
	Retrieve(ctx context.Context, keys ...ExternalPayloadLocationKey) (map[ExternalPayloadLocationKey][]byte, error)
}

type FailTaskOpts added in v0.74.3

type FailTaskOpts struct {
	*TaskIdInsertedAtRetryCount

	// (required) whether this is an application-level error or an internal error on the Hatchet side
	IsAppError bool

	// (optional) the error message for the task
	ErrorMessage string

	// (optional) A boolean flag to indicate whether the error is non-retryable, meaning it should _not_ be retried. Defaults to false.
	IsNonRetryable bool
}

type FailTasksResponse added in v0.74.3

type FailTasksResponse struct {
	*FinalizedTaskResponse

	RetriedTasks []RetriedTask
}

type FilterRepository added in v0.74.3

type FilterRepository interface {
	CreateFilter(ctx context.Context, tenantId string, params CreateFilterOpts) (*sqlcv1.V1Filter, error)
	ListFilters(ctx context.Context, tenantId string, params ListFiltersOpts) ([]*sqlcv1.V1Filter, int64, error)
	DeleteFilter(ctx context.Context, tenantId, filterId string) (*sqlcv1.V1Filter, error)
	GetFilter(ctx context.Context, tenantId, filterId string) (*sqlcv1.V1Filter, error)
	UpdateFilter(ctx context.Context, tenantId string, filterId string, opts UpdateFilterOpts) (*sqlcv1.V1Filter, error)
}

type FinalizedTaskResponse added in v0.74.3

type FinalizedTaskResponse struct {
	ReleasedTasks []*sqlcv1.ReleaseTasksRow

	InternalEvents []InternalTaskEvent
}

type GetQueueMetricsOpts

type GetQueueMetricsOpts struct {
	// (optional) a list of workflow ids to filter by
	WorkflowIds []string `validate:"omitempty,dive,uuid"`

	// (optional) exact metadata to filter by
	AdditionalMetadata map[string]interface{} `validate:"omitempty"`
}

type GetQueueMetricsResponse

type GetQueueMetricsResponse struct {
	Total QueueMetric `json:"total"`

	ByWorkflowId map[string]QueueMetric `json:"by_workflow"`
}

type GetTenantAlertingSettingsResponse

type GetTenantAlertingSettingsResponse struct {
	Settings *sqlcv1.TenantAlertingSettings

	SlackWebhooks []*sqlcv1.SlackAppWebhook

	EmailGroups []*TenantAlertEmailGroupForSend

	Tenant *sqlcv1.Tenant
}

type GroupMatchCondition added in v0.74.3

type GroupMatchCondition struct {
	GroupId string `validate:"required,uuid"`

	EventType sqlcv1.V1EventType

	EventKey string

	// (optional) a hint for querying the event data
	EventResourceHint *string

	// the data key which gets inserted into the returned data from a satisfied match condition
	ReadableDataKey string

	Expression string

	Action sqlcv1.V1MatchConditionAction

	// (optional) the data which was used to satisfy the condition (relevant for replays)
	Data []byte
}

type HMACAuthCredentials added in v0.74.3

type HMACAuthCredentials struct {
	Algorithm                     sqlcv1.V1IncomingWebhookHmacAlgorithm `json:"algorithm" validate:"required"`
	Encoding                      sqlcv1.V1IncomingWebhookHmacEncoding  `json:"encoding" validate:"required"`
	SignatureHeaderName           string                                `json:"signature_header_name" validate:"required"`
	EncryptedWebhookSigningSecret []byte                                `json:"webhook_signing_secret" validate:"required"`
}

type HealthRepository

type HealthRepository interface {
	IsHealthy(ctx context.Context) bool
	PgStat() *pgxpool.Stat
}

type IdInsertedAt added in v0.74.3

type IdInsertedAt struct {
	ID         int64              `json:"id"`
	InsertedAt pgtype.Timestamptz `json:"inserted_at"`
}

type IdempotencyKey added in v0.74.3

type IdempotencyKey string

type IdempotencyRepository added in v0.74.3

type IdempotencyRepository interface {
	CreateIdempotencyKey(context context.Context, tenantId, key string, expiresAt pgtype.Timestamptz) error
	EvictExpiredIdempotencyKeys(context context.Context, tenantId pgtype.UUID) error
}

type InternalTaskEvent added in v0.74.3

type InternalTaskEvent struct {
	TenantID       string                 `json:"tenant_id"`
	TaskID         int64                  `json:"task_id"`
	TaskExternalID string                 `json:"task_external_id"`
	RetryCount     int32                  `json:"retry_count"`
	EventType      sqlcv1.V1TaskEventType `json:"event_type"`
	EventKey       string                 `json:"event_key"`
	Data           []byte                 `json:"data"`
}

InternalTaskEvent resembles sqlcv1.V1TaskEvent, but doesn't include the id field as we use COPY FROM to write the events to the database.

type IntervalSettingsRepository added in v0.74.3

type IntervalSettingsRepository interface {
	ReadAllIntervals(ctx context.Context, operationId string) (map[string]time.Duration, error)
	ReadInterval(ctx context.Context, operationId string, tenantId string) (time.Duration, error)
	SetInterval(ctx context.Context, operationId string, tenantId string, d time.Duration) (time.Duration, error)
}

func NewNoOpIntervalSettingsRepository added in v0.74.3

func NewNoOpIntervalSettingsRepository() IntervalSettingsRepository

type JobRunHasCycleError

type JobRunHasCycleError struct {
	JobName string
}

func (*JobRunHasCycleError) Error

func (e *JobRunHasCycleError) Error() string

type KeyClaimantPair added in v0.74.3

type KeyClaimantPair struct {
	IdempotencyKey      IdempotencyKey
	ClaimedByExternalId pgtype.UUID
}

type LeaseRepository

type LeaseRepository interface {
	ListQueues(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1Queue, error)
	ListActiveWorkers(ctx context.Context, tenantId pgtype.UUID) ([]*ListActiveWorkersResult, error)
	ListConcurrencyStrategies(ctx context.Context, tenantId pgtype.UUID) ([]*sqlcv1.V1StepConcurrency, error)

	AcquireOrExtendLeases(ctx context.Context, tenantId pgtype.UUID, kind sqlcv1.LeaseKind, resourceIds []string, existingLeases []*sqlcv1.Lease) ([]*sqlcv1.Lease, error)
	ReleaseLeases(ctx context.Context, tenantId pgtype.UUID, leases []*sqlcv1.Lease) error
}

type Limit

type Limit struct {
	Resource         sqlcv1.LimitResource
	Limit            int32
	Alarm            int32
	Window           *time.Duration
	CustomValueMeter bool
}

type ListActiveWorkersResult

type ListActiveWorkersResult struct {
	ID      string
	MaxRuns int
	Name    string
	Labels  []*sqlcv1.ListManyWorkerLabelsRow
}

type ListCronWorkflowsOpts

type ListCronWorkflowsOpts struct {
	// (optional) number of events to skip
	Offset *int

	// (optional) number of events to return
	Limit *int

	// (optional) the order by field
	OrderBy *string `validate:"omitempty,oneof=createdAt name"`

	// (optional) the order direction
	OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`

	// (optional) the workflow id
	WorkflowId *string `validate:"omitempty,uuid"`

	// (optional) additional metadata for the workflow run
	AdditionalMetadata map[string]interface{} `validate:"omitempty"`

	// (optional) the name of the cron to filter by
	CronName *string `validate:"omitempty"`

	// (optional) the name of the workflow to filter by
	WorkflowName *string `validate:"omitempty"`
}

type ListEventsRow added in v0.74.3

type ListEventsRow struct {
	TenantID                pgtype.UUID        `json:"tenant_id"`
	EventID                 int64              `json:"event_id"`
	EventExternalID         pgtype.UUID        `json:"event_external_id"`
	EventSeenAt             pgtype.Timestamptz `json:"event_seen_at"`
	EventKey                string             `json:"event_key"`
	EventPayload            []byte             `json:"event_payload"`
	EventAdditionalMetadata []byte             `json:"event_additional_metadata"`
	EventScope              string             `json:"event_scope"`
	QueuedCount             int64              `json:"queued_count"`
	RunningCount            int64              `json:"running_count"`
	CompletedCount          int64              `json:"completed_count"`
	CancelledCount          int64              `json:"cancelled_count"`
	FailedCount             int64              `json:"failed_count"`
	TriggeredRuns           []byte             `json:"triggered_runs"`
	TriggeringWebhookName   *string            `json:"triggering_webhook_name,omitempty"`
}

type ListFiltersOpts added in v0.74.3

type ListFiltersOpts struct {
	WorkflowIds []pgtype.UUID `json:"workflow_ids"`
	Scopes      []string      `json:"scopes"`
	Limit       int64         `json:"limit" validate:"omitnil,min=1"`
	Offset      int64         `json:"offset" validate:"omitnil,min=0"`
}

type ListFinalizedWorkflowRunsResponse added in v0.74.3

type ListFinalizedWorkflowRunsResponse struct {
	WorkflowRunId string

	OutputEvents []*TaskOutputEvent
}

type ListLogsOpts

type ListLogsOpts struct {
	// (optional) number of logs to skip
	Offset *int

	// (optional) number of logs to return
	Limit *int `validate:"omitnil,min=1,max=10000"`

	// (optional) a list of log levels to filter by
	Levels []string `validate:"omitnil,dive,oneof=INFO ERROR WARN DEBUG"`

	// (optional) a search query
	Search *string

	// (optional) the start time to get logs for
	Since *time.Time

	// (optional) the end time to get logs for
	Until *time.Time
}

type ListRateLimitOpts

type ListRateLimitOpts struct {
	// (optional) a search query for the key
	Search *string

	// (optional) number of events to skip
	Offset *int

	// (optional) number of events to return
	Limit *int

	// (optional) the order by field
	OrderBy *string `validate:"omitempty,oneof=key value limitValue"`

	// (optional) the order direction
	OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`
}

type ListRateLimitsResult

type ListRateLimitsResult struct {
	Rows  []*sqlcv1.ListRateLimitsForTenantNoMutateRow
	Count int
}

type ListScheduledWorkflowsOpts

type ListScheduledWorkflowsOpts struct {
	// (optional) number of events to skip
	Offset *int

	// (optional) number of events to return
	Limit *int

	// (optional) the order by field
	OrderBy *string `validate:"omitempty,oneof=createdAt triggerAt"`

	// (optional) the order direction
	OrderDirection *string `validate:"omitempty,oneof=ASC DESC"`

	// (optional) the workflow id
	WorkflowId *string `validate:"omitempty,uuid"`

	// (optional) the parent workflow run id
	ParentWorkflowRunId *string `validate:"omitempty,uuid"`

	// (optional) the parent step run id
	ParentStepRunId *string `validate:"omitempty,uuid"`

	// (optional) statuses to filter by
	Statuses *[]sqlcv1.WorkflowRunStatus

	// (optional) include scheduled runs that are in the future
	IncludeFuture *bool

	// (optional) additional metadata for the workflow run
	AdditionalMetadata map[string]interface{} `validate:"omitempty"`
}

type ListTaskRunOpts added in v0.74.3

type ListTaskRunOpts struct {
	CreatedAfter time.Time

	Statuses []sqlcv1.V1ReadableStatusOlap

	WorkflowIds []uuid.UUID

	WorkerId *uuid.UUID

	StartedAfter time.Time

	FinishedBefore *time.Time

	AdditionalMetadata map[string]interface{}

	TriggeringEventExternalId *uuid.UUID

	Limit int64

	Offset int64

	IncludePayloads bool
}

type ListTenantInvitesOpts

type ListTenantInvitesOpts struct {
	// (optional) the status of the invite
	Status *string `validate:"omitempty,oneof=PENDING ACCEPTED REJECTED"`

	// (optional) whether the invite has expired
	Expired *bool `validate:"omitempty"`
}

type ListTickerOpts

type ListTickerOpts struct {
	// Set this to only return tickers whose heartbeat is more recent than this time
	LatestHeartbeatAfter *time.Time

	Active *bool
}

type ListWebhooksOpts added in v0.74.3

type ListWebhooksOpts struct {
	WebhookNames       []string                             `json:"webhook_names"`
	WebhookSourceNames []sqlcv1.V1IncomingWebhookSourceName `json:"webhook_source_names"`
	Limit              *int64                               `json:"limit" validate:"omitnil,min=1"`
	Offset             *int64                               `json:"offset" validate:"omitnil,min=0"`
}

type ListWorkersOpts

type ListWorkersOpts struct {
	Action *string `validate:"omitempty,actionId"`

	LastHeartbeatAfter *time.Time

	Assignable *bool
}

type ListWorkflowRunOpts added in v0.74.3

type ListWorkflowRunOpts struct {
	CreatedAfter time.Time

	Statuses []sqlcv1.V1ReadableStatusOlap

	WorkflowIds []uuid.UUID

	StartedAfter time.Time

	FinishedBefore *time.Time

	AdditionalMetadata map[string]interface{}

	Limit int64

	Offset int64

	ParentTaskExternalId *pgtype.UUID

	TriggeringEventExternalId *pgtype.UUID

	IncludePayloads bool
}

type ListWorkflowsOpts

type ListWorkflowsOpts struct {
	// (optional) number of workflows to skip
	Offset *int

	// (optional) number of workflows to return
	Limit *int

	// (optional) the workflow name to filter by
	Name *string
}

type ListWorkflowsResult

type ListWorkflowsResult struct {
	Rows  []*sqlcv1.Workflow
	Count int
}

type LogLineRepository added in v0.74.3

type LogLineRepository interface {
	ListLogLines(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, opts *ListLogsOpts) ([]*sqlcv1.V1LogLine, error)

	PutLog(ctx context.Context, tenantId string, opts *CreateLogLineOpts) error
}

type MatchData added in v0.74.3

type MatchData struct {
	// contains filtered or unexported fields
}

parses match aggregated data

func NewMatchData added in v0.74.3

func NewMatchData(mcAggregatedData []byte) (*MatchData, error)

func (*MatchData) Action added in v0.74.3

func (*MatchData) DataKeys added in v0.74.3

func (m *MatchData) DataKeys() []string

func (*MatchData) DataValueAsTaskOutputEvent added in v0.74.3

func (m *MatchData) DataValueAsTaskOutputEvent(key string) *TaskOutputEvent

Helper function for internal events

func (*MatchData) TriggerDataKeys added in v0.74.3

func (m *MatchData) TriggerDataKeys() []string

func (*MatchData) TriggerDataValue added in v0.74.3

func (m *MatchData) TriggerDataValue(key string) map[string]interface{}

type MatchRepository added in v0.74.3

type MatchRepository interface {
	RegisterSignalMatchConditions(ctx context.Context, tenantId string, eventMatches []ExternalCreateSignalMatchOpts) error

	ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
	ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)
}

type MatchRepositoryImpl added in v0.74.3

type MatchRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (MatchRepositoryImpl) DesiredWorkerId added in v0.74.3

func (s MatchRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (MatchRepositoryImpl) PopulateExternalIdsForWorkflow added in v0.74.3

func (s MatchRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*MatchRepositoryImpl) ProcessInternalEventMatches added in v0.74.3

func (m *MatchRepositoryImpl) ProcessInternalEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)

ProcessInternalEventMatches processes a list of internal events

func (*MatchRepositoryImpl) ProcessUserEventMatches added in v0.74.3

func (m *MatchRepositoryImpl) ProcessUserEventMatches(ctx context.Context, tenantId string, events []CandidateEventMatch) (*EventMatchResults, error)

ProcessUserEventMatches processes a list of user events

func (*MatchRepositoryImpl) RegisterSignalMatchConditions added in v0.74.3

func (m *MatchRepositoryImpl) RegisterSignalMatchConditions(ctx context.Context, tenantId string, signalMatches []ExternalCreateSignalMatchOpts) error

func (MatchRepositoryImpl) ToV1StepRunData added in v0.74.3

func (s MatchRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

type MessageQueueRepository

type MessageQueueRepository interface {
	// PubSub
	Listen(ctx context.Context, name string, f func(ctx context.Context, notification *PubSubMessage) error) error
	Notify(ctx context.Context, name string, payload string) error

	// Queues
	BindQueue(ctx context.Context, queue string, durable, autoDeleted, exclusive bool, exclusiveConsumer *string) error
	UpdateQueueLastActive(ctx context.Context, queue string) error
	CleanupQueues(ctx context.Context) error

	// Messages
	AddMessage(ctx context.Context, queue string, payload []byte) error
	ReadMessages(ctx context.Context, queue string, qos int) ([]*sqlcv1.ReadMessagesRow, error)
	AckMessage(ctx context.Context, id int64) error
	CleanupMessageQueueItems(ctx context.Context) error
}

type NoOpExternalStore added in v0.74.3

type NoOpExternalStore struct{}

func (*NoOpExternalStore) Retrieve added in v0.74.3

func (*NoOpExternalStore) Store added in v0.74.3

type NoOpIntervalSettingsRepository added in v0.74.3

type NoOpIntervalSettingsRepository struct{}

func (*NoOpIntervalSettingsRepository) ReadAllIntervals added in v0.74.3

func (r *NoOpIntervalSettingsRepository) ReadAllIntervals(ctx context.Context, operationId string) (map[string]time.Duration, error)

func (*NoOpIntervalSettingsRepository) ReadInterval added in v0.74.3

func (r *NoOpIntervalSettingsRepository) ReadInterval(ctx context.Context, operationId string, tenantId string) (time.Duration, error)

func (*NoOpIntervalSettingsRepository) SetInterval added in v0.74.3

func (r *NoOpIntervalSettingsRepository) SetInterval(ctx context.Context, operationId string, tenantId string, d time.Duration) (time.Duration, error)

type OAuthOpts

type OAuthOpts struct {
	Provider       string     `validate:"required,oneof=google github"`
	ProviderUserId string     `validate:"required,min=1"`
	AccessToken    []byte     `validate:"required,min=1"`
	RefreshToken   []byte     // optional
	ExpiresAt      *time.Time // optional
}

type OLAPCutoverBatchOutcome added in v0.74.3

type OLAPCutoverBatchOutcome struct {
	ShouldContinue bool
	NextPagination OLAPPaginationParams
}

type OLAPCutoverJobRunMetadata added in v0.74.3

type OLAPCutoverJobRunMetadata struct {
	ShouldRun      bool
	Pagination     OLAPPaginationParams
	PartitionDate  PartitionDate
	LeaseProcessId pgtype.UUID
}

type OLAPPaginationParams added in v0.74.3

type OLAPPaginationParams struct {
	LastTenantId   pgtype.UUID
	LastInsertedAt pgtype.Timestamptz
	LastExternalId pgtype.UUID
	Limit          int32
}

type OLAPPayloadToOffload added in v0.74.3

type OLAPPayloadToOffload struct {
	ExternalId          pgtype.UUID
	ExternalLocationKey string
}

type OLAPPayloadsToOffload added in v0.74.3

type OLAPPayloadsToOffload struct {
	Payloads []OLAPPayloadToOffload
}

type OLAPRepository added in v0.74.3

type OLAPRepository interface {
	UpdateTablePartitions(ctx context.Context) error
	SetReadReplicaPool(pool *pgxpool.Pool)

	ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)
	ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)
	ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz, retryCount *int) (*TaskWithPayloads, pgtype.UUID, error)

	ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*TaskWithPayloads, int, error)
	ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)
	ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)
	ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*TaskEventWithPayloads, error)
	ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)
	ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)
	CreateTasks(ctx context.Context, tenantId string, tasks []*V1TaskWithPayload) error
	CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error
	CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error
	GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, endTimestamp *time.Time, bucketInterval time.Duration) ([]*sqlcv1.GetTaskPointMetricsRow, error)
	UpdateTaskStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateTaskStatusRow, error)
	UpdateDAGStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateDAGStatusRow, error)
	ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)
	ListTasksByDAGId(ctx context.Context, tenantId string, dagIds []pgtype.UUID, includePayloads bool) ([]*TaskWithPayloads, map[int64]uuid.UUID, error)
	ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata, includePayloads bool) ([]*TaskWithPayloads, error)

	// ListTasksByExternalIds returns a list of tasks based on their external ids or the external id of their parent DAG.
	// In the case of a DAG, we flatten the result into the list of tasks which belong to that DAG.
	ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)

	GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error)
	BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error
	ListEvents(ctx context.Context, opts sqlcv1.ListEventsParams) ([]*EventWithPayload, *int64, error)
	GetEvent(ctx context.Context, externalId string) (*sqlcv1.V1EventsOlap, error)
	GetEventWithPayload(ctx context.Context, externalId, tenantId string) (*EventWithPayload, error)
	ListEventKeys(ctx context.Context, tenantId string) ([]string, error)

	GetDAGDurations(ctx context.Context, tenantId string, externalIds []pgtype.UUID, minInsertedAt pgtype.Timestamptz) (map[string]*sqlcv1.GetDagDurationsRow, error)
	GetTaskDurationsByTaskIds(ctx context.Context, tenantId string, taskIds []int64, taskInsertedAts []pgtype.Timestamptz, readableStatuses []sqlcv1.V1ReadableStatusOlap) (map[int64]*sqlcv1.GetTaskDurationsByTaskIdsRow, error)

	CreateIncomingWebhookValidationFailureLogs(ctx context.Context, tenantId string, opts []CreateIncomingWebhookFailureLogOpts) error
	StoreCELEvaluationFailures(ctx context.Context, tenantId string, failures []CELEvaluationFailure) error
	PutPayloads(ctx context.Context, tx sqlcv1.DBTX, tenantId TenantID, putPayloadOpts ...StoreOLAPPayloadOpts) (map[PayloadExternalId]ExternalPayloadLocationKey, error)
	ReadPayload(ctx context.Context, tenantId string, externalId pgtype.UUID) ([]byte, error)
	ReadPayloads(ctx context.Context, tenantId string, externalIds ...pgtype.UUID) (map[pgtype.UUID][]byte, error)

	AnalyzeOLAPTables(ctx context.Context) error
	OffloadPayloads(ctx context.Context, tenantId string, payloads []OffloadPayloadOpts) error

	PayloadStore() PayloadStoreRepository
	StatusUpdateBatchSizeLimits() StatusUpdateBatchSizeLimits

	ListWorkflowRunExternalIds(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]pgtype.UUID, error)

	ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) error

	CountOLAPTempTableSizeForDAGStatusUpdates(ctx context.Context) (int64, error)
	CountOLAPTempTableSizeForTaskStatusUpdates(ctx context.Context) (int64, error)
	ListYesterdayRunCountsByStatus(ctx context.Context) (map[sqlcv1.V1ReadableStatusOlap]int64, error)
}

func NewOLAPRepositoryFromPool added in v0.74.3

func NewOLAPRepositoryFromPool(
	pool *pgxpool.Pool,
	l *zerolog.Logger,
	olapRetentionPeriod time.Duration,
	tenantLimitConfig limits.LimitConfigFile, enforceLimits bool,
	shouldPartitionEventsTables bool,
	payloadStoreOpts PayloadStoreRepositoryOpts,
	statusUpdateBatchSizeLimits StatusUpdateBatchSizeLimits,
	cacheDuration time.Duration,
) (OLAPRepository, func() error)

type OLAPRepositoryImpl added in v0.74.3

type OLAPRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (*OLAPRepositoryImpl) AnalyzeOLAPTables added in v0.74.3

func (r *OLAPRepositoryImpl) AnalyzeOLAPTables(ctx context.Context) error

func (*OLAPRepositoryImpl) BulkCreateEventsAndTriggers added in v0.74.3

func (r *OLAPRepositoryImpl) BulkCreateEventsAndTriggers(ctx context.Context, events sqlcv1.BulkCreateEventsParams, triggers []EventTriggersFromExternalId) error

func (*OLAPRepositoryImpl) CountOLAPTempTableSizeForDAGStatusUpdates added in v0.74.3

func (r *OLAPRepositoryImpl) CountOLAPTempTableSizeForDAGStatusUpdates(ctx context.Context) (int64, error)

func (*OLAPRepositoryImpl) CountOLAPTempTableSizeForTaskStatusUpdates added in v0.74.3

func (r *OLAPRepositoryImpl) CountOLAPTempTableSizeForTaskStatusUpdates(ctx context.Context) (int64, error)

func (*OLAPRepositoryImpl) CreateDAGs added in v0.74.3

func (r *OLAPRepositoryImpl) CreateDAGs(ctx context.Context, tenantId string, dags []*DAGWithData) error

func (*OLAPRepositoryImpl) CreateIncomingWebhookValidationFailureLogs added in v0.74.3

func (r *OLAPRepositoryImpl) CreateIncomingWebhookValidationFailureLogs(ctx context.Context, tenantId string, opts []CreateIncomingWebhookFailureLogOpts) error

func (*OLAPRepositoryImpl) CreateTaskEvents added in v0.74.3

func (r *OLAPRepositoryImpl) CreateTaskEvents(ctx context.Context, tenantId string, events []sqlcv1.CreateTaskEventsOLAPParams) error

func (*OLAPRepositoryImpl) CreateTasks added in v0.74.3

func (r *OLAPRepositoryImpl) CreateTasks(ctx context.Context, tenantId string, tasks []*V1TaskWithPayload) error

func (OLAPRepositoryImpl) DesiredWorkerId added in v0.74.3

func (s OLAPRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (*OLAPRepositoryImpl) GetDAGDurations added in v0.74.3

func (r *OLAPRepositoryImpl) GetDAGDurations(ctx context.Context, tenantId string, externalIds []pgtype.UUID, minInsertedAt pgtype.Timestamptz) (map[string]*sqlcv1.GetDagDurationsRow, error)

func (*OLAPRepositoryImpl) GetEvent added in v0.74.3

func (r *OLAPRepositoryImpl) GetEvent(ctx context.Context, externalId string) (*sqlcv1.V1EventsOlap, error)

func (*OLAPRepositoryImpl) GetEventWithPayload added in v0.74.3

func (r *OLAPRepositoryImpl) GetEventWithPayload(ctx context.Context, externalId, tenantId string) (*EventWithPayload, error)

func (*OLAPRepositoryImpl) GetTaskDurationsByTaskIds added in v0.74.3

func (r *OLAPRepositoryImpl) GetTaskDurationsByTaskIds(ctx context.Context, tenantId string, taskIds []int64, taskInsertedAts []pgtype.Timestamptz, readableStatuses []sqlcv1.V1ReadableStatusOlap) (map[int64]*sqlcv1.GetTaskDurationsByTaskIdsRow, error)

func (*OLAPRepositoryImpl) GetTaskPointMetrics added in v0.74.3

func (r *OLAPRepositoryImpl) GetTaskPointMetrics(ctx context.Context, tenantId string, startTimestamp *time.Time, endTimestamp *time.Time, bucketInterval time.Duration) ([]*sqlcv1.GetTaskPointMetricsRow, error)

func (*OLAPRepositoryImpl) GetTaskTimings added in v0.74.3

func (r *OLAPRepositoryImpl) GetTaskTimings(ctx context.Context, tenantId string, workflowRunId pgtype.UUID, depth int32) ([]*sqlcv1.PopulateTaskRunDataRow, map[string]int32, error)

func (*OLAPRepositoryImpl) ListEventKeys added in v0.74.3

func (r *OLAPRepositoryImpl) ListEventKeys(ctx context.Context, tenantId string) ([]string, error)

func (*OLAPRepositoryImpl) ListEvents added in v0.74.3

func (*OLAPRepositoryImpl) ListTaskRunEvents added in v0.74.3

func (r *OLAPRepositoryImpl) ListTaskRunEvents(ctx context.Context, tenantId string, taskId int64, taskInsertedAt pgtype.Timestamptz, limit, offset int64) ([]*sqlcv1.ListTaskEventsRow, error)

func (*OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId added in v0.74.3

func (r *OLAPRepositoryImpl) ListTaskRunEventsByWorkflowRunId(ctx context.Context, tenantId string, workflowRunId pgtype.UUID) ([]*TaskEventWithPayloads, error)

func (*OLAPRepositoryImpl) ListTasks added in v0.74.3

func (r *OLAPRepositoryImpl) ListTasks(ctx context.Context, tenantId string, opts ListTaskRunOpts) ([]*TaskWithPayloads, int, error)

func (*OLAPRepositoryImpl) ListTasksByDAGId added in v0.74.3

func (r *OLAPRepositoryImpl) ListTasksByDAGId(ctx context.Context, tenantId string, dagids []pgtype.UUID, includePayloads bool) ([]*TaskWithPayloads, map[int64]uuid.UUID, error)

func (*OLAPRepositoryImpl) ListTasksByExternalIds added in v0.74.3

func (r *OLAPRepositoryImpl) ListTasksByExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenTasksByExternalIdsRow, error)

func (*OLAPRepositoryImpl) ListTasksByIdAndInsertedAt added in v0.74.3

func (r *OLAPRepositoryImpl) ListTasksByIdAndInsertedAt(ctx context.Context, tenantId string, taskMetadata []TaskMetadata, includePayloads bool) ([]*TaskWithPayloads, error)

func (*OLAPRepositoryImpl) ListWorkflowRunDisplayNames added in v0.74.3

func (r *OLAPRepositoryImpl) ListWorkflowRunDisplayNames(ctx context.Context, tenantId pgtype.UUID, externalIds []pgtype.UUID) ([]*sqlcv1.ListWorkflowRunDisplayNamesRow, error)

func (*OLAPRepositoryImpl) ListWorkflowRunExternalIds added in v0.74.3

func (r *OLAPRepositoryImpl) ListWorkflowRunExternalIds(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]pgtype.UUID, error)

func (*OLAPRepositoryImpl) ListWorkflowRuns added in v0.74.3

func (r *OLAPRepositoryImpl) ListWorkflowRuns(ctx context.Context, tenantId string, opts ListWorkflowRunOpts) ([]*WorkflowRunData, int, error)

func (*OLAPRepositoryImpl) ListYesterdayRunCountsByStatus added in v0.74.3

func (r *OLAPRepositoryImpl) ListYesterdayRunCountsByStatus(ctx context.Context) (map[sqlcv1.V1ReadableStatusOlap]int64, error)

func (*OLAPRepositoryImpl) OffloadPayloads added in v0.74.3

func (r *OLAPRepositoryImpl) OffloadPayloads(ctx context.Context, tenantId string, payloads []OffloadPayloadOpts) error

func (*OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize added in v0.74.3

func (p *OLAPRepositoryImpl) OptimizeOLAPPayloadWindowSize(ctx context.Context, partitionDate PartitionDate, candidateBatchNumRows int32, pagination OLAPPaginationParams) (*int32, error)

func (*OLAPRepositoryImpl) PayloadStore added in v0.74.3

func (r *OLAPRepositoryImpl) PayloadStore() PayloadStoreRepository

func (*OLAPRepositoryImpl) PopulateEventData added in v0.74.3

func (r *OLAPRepositoryImpl) PopulateEventData(ctx context.Context, tenantId pgtype.UUID, eventExternalIds []pgtype.UUID) (map[pgtype.UUID]sqlcv1.PopulateEventDataRow, error)

func (OLAPRepositoryImpl) PopulateExternalIdsForWorkflow added in v0.74.3

func (s OLAPRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*OLAPRepositoryImpl) ProcessOLAPPayloadCutovers added in v0.74.3

func (p *OLAPRepositoryImpl) ProcessOLAPPayloadCutovers(ctx context.Context, externalStoreEnabled bool, inlineStoreTTL *time.Duration, externalCutoverBatchSize, externalCutoverNumConcurrentOffloads int32) error

func (*OLAPRepositoryImpl) PutPayloads added in v0.74.3

func (*OLAPRepositoryImpl) ReadDAG added in v0.74.3

func (r *OLAPRepositoryImpl) ReadDAG(ctx context.Context, dagExternalId string) (*sqlcv1.V1DagsOlap, error)

func (*OLAPRepositoryImpl) ReadPayload added in v0.74.3

func (r *OLAPRepositoryImpl) ReadPayload(ctx context.Context, tenantId string, externalId pgtype.UUID) ([]byte, error)

func (*OLAPRepositoryImpl) ReadPayloads added in v0.74.3

func (r *OLAPRepositoryImpl) ReadPayloads(ctx context.Context, tenantId string, externalIds ...pgtype.UUID) (map[pgtype.UUID][]byte, error)

func (*OLAPRepositoryImpl) ReadTaskRun added in v0.74.3

func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId string) (*sqlcv1.V1TasksOlap, error)

func (*OLAPRepositoryImpl) ReadTaskRunData added in v0.74.3

func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId pgtype.UUID, taskId int64, taskInsertedAt pgtype.Timestamptz, retryCount *int) (*TaskWithPayloads, pgtype.UUID, error)

func (*OLAPRepositoryImpl) ReadTaskRunMetrics added in v0.74.3

func (r *OLAPRepositoryImpl) ReadTaskRunMetrics(ctx context.Context, tenantId string, opts ReadTaskRunMetricsOpts) ([]TaskRunMetric, error)

func (*OLAPRepositoryImpl) ReadWorkflowRun added in v0.74.3

func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExternalId pgtype.UUID) (*V1WorkflowRunPopulator, error)

func (*OLAPRepositoryImpl) SetReadReplicaPool added in v0.74.3

func (r *OLAPRepositoryImpl) SetReadReplicaPool(pool *pgxpool.Pool)

func (*OLAPRepositoryImpl) StatusUpdateBatchSizeLimits added in v0.74.3

func (r *OLAPRepositoryImpl) StatusUpdateBatchSizeLimits() StatusUpdateBatchSizeLimits

func (*OLAPRepositoryImpl) StoreCELEvaluationFailures added in v0.74.3

func (r *OLAPRepositoryImpl) StoreCELEvaluationFailures(ctx context.Context, tenantId string, failures []CELEvaluationFailure) error

func (OLAPRepositoryImpl) ToV1StepRunData added in v0.74.3

func (s OLAPRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

func (*OLAPRepositoryImpl) UpdateDAGStatuses added in v0.74.3

func (r *OLAPRepositoryImpl) UpdateDAGStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateDAGStatusRow, error)

func (*OLAPRepositoryImpl) UpdateTablePartitions added in v0.74.3

func (r *OLAPRepositoryImpl) UpdateTablePartitions(ctx context.Context) error

func (*OLAPRepositoryImpl) UpdateTaskStatuses added in v0.74.3

func (r *OLAPRepositoryImpl) UpdateTaskStatuses(ctx context.Context, tenantIds []string) (bool, []UpdateTaskStatusRow, error)

type OffloadPayloadOpts added in v0.74.3

type OffloadPayloadOpts struct {
	ExternalId          pgtype.UUID
	ExternalLocationKey string
}

type OffloadToExternalStoreOpts added in v0.74.3

type OffloadToExternalStoreOpts struct {
	TenantId   TenantID
	ExternalID PayloadExternalId
	InsertedAt pgtype.Timestamptz
	Payload    []byte
}

type PGHealthError added in v0.74.3

type PGHealthError string
const (
	PGHealthAlert PGHealthError = "alert"
	PGHealthWarn  PGHealthError = "warn"
	PGHealthOK    PGHealthError = "ok"
)

type PGHealthRepository added in v0.74.3

type PGHealthRepository interface {
	PGStatStatementsEnabled(ctx context.Context) (bool, error)
	TrackCountsEnabled(ctx context.Context) (bool, error)
	CheckBloat(ctx context.Context) (PGHealthError, int, error)
	GetBloatDetails(ctx context.Context) ([]*sqlcv1.CheckBloatRow, error)
	CheckLongRunningQueries(ctx context.Context) (PGHealthError, int, error)
	CheckQueryCache(ctx context.Context) (PGHealthError, int, error)
	CheckQueryCaches(ctx context.Context) ([]*sqlcv1.CheckQueryCachesRow, error)
	CheckLongRunningVacuum(ctx context.Context) (PGHealthError, int, error)
	CheckLastAutovacuumForPartitionedTables(ctx context.Context) ([]*sqlcv1.CheckLastAutovacuumForPartitionedTablesRow, error)
	CheckLastAutovacuumForPartitionedTablesCoreDB(ctx context.Context) ([]*sqlcv1.CheckLastAutovacuumForPartitionedTablesCoreDBRow, error)
}

type PaginationParams added in v0.74.3

type PaginationParams struct {
	LastTenantID   pgtype.UUID
	LastInsertedAt pgtype.Timestamptz
	LastID         int64
	LastType       sqlcv1.V1PayloadType
}

type PartitionDate added in v0.74.3

type PartitionDate pgtype.Date

func (PartitionDate) String added in v0.74.3

func (d PartitionDate) String() string

type PayloadExternalId added in v0.74.3

type PayloadExternalId string

type PayloadLocation added in v0.74.3

type PayloadLocation string

type PayloadMetadata added in v0.74.3

type PayloadMetadata struct {
	TenantID   pgtype.UUID
	ID         int64
	InsertedAt pgtype.Timestamptz
	Type       sqlcv1.V1PayloadType
}

type PayloadStoreRepository added in v0.74.3

type PayloadStoreRepository interface {
	Store(ctx context.Context, tx sqlcv1.DBTX, payloads ...StorePayloadOpts) error
	Retrieve(ctx context.Context, tx sqlcv1.DBTX, opts ...RetrievePayloadOpts) (map[RetrievePayloadOpts][]byte, error)
	RetrieveFromExternal(ctx context.Context, keys ...ExternalPayloadLocationKey) (map[ExternalPayloadLocationKey][]byte, error)
	OverwriteExternalStore(store ExternalStore)
	DualWritesEnabled() bool
	TaskEventDualWritesEnabled() bool
	DagDataDualWritesEnabled() bool
	OLAPDualWritesEnabled() bool
	ExternalCutoverProcessInterval() time.Duration
	InlineStoreTTL() *time.Duration
	ExternalCutoverBatchSize() int32
	ExternalCutoverNumConcurrentOffloads() int32
	ExternalStoreEnabled() bool
	ExternalStore() ExternalStore
	ImmediateOffloadsEnabled() bool
	ProcessPayloadCutovers(ctx context.Context) error
}

func NewPayloadStoreRepository added in v0.74.3

func NewPayloadStoreRepository(
	pool *pgxpool.Pool,
	l *zerolog.Logger,
	queries *sqlcv1.Queries,
	opts PayloadStoreRepositoryOpts,
) PayloadStoreRepository

type PayloadStoreRepositoryOpts added in v0.74.3

type PayloadStoreRepositoryOpts struct {
	EnablePayloadDualWrites              bool
	EnableTaskEventPayloadDualWrites     bool
	EnableDagDataPayloadDualWrites       bool
	EnableOLAPPayloadDualWrites          bool
	ExternalCutoverProcessInterval       time.Duration
	ExternalCutoverBatchSize             int32
	ExternalCutoverNumConcurrentOffloads int32
	InlineStoreTTL                       *time.Duration
	EnableImmediateOffloads              bool
}

type PayloadUniqueKey added in v0.74.3

type PayloadUniqueKey struct {
	ID         int64
	InsertedAt pgtype.Timestamptz
	TenantId   pgtype.UUID
	Type       sqlcv1.V1PayloadType
}

type PlanLimitMap

type PlanLimitMap map[string][]Limit

type PubSubMessage

type PubSubMessage struct {
	QueueName string `json:"queue_name"`
	Payload   []byte `json:"payload"`
}

type QueueFactoryRepository

type QueueFactoryRepository interface {
	NewQueue(tenantId pgtype.UUID, queueName string) QueueRepository
}

type QueueMetric

type QueueMetric struct {
	// the total number of PENDING_ASSIGNMENT step runs in the queue
	PendingAssignment int `json:"pending_assignment"`

	// the total number of PENDING step runs in the queue
	Pending int `json:"pending"`

	// the total number of RUNNING step runs in the queue
	Running int `json:"running"`
}

type QueueRepository

type QueueRepository interface {
	ListQueueItems(ctx context.Context, limit int) ([]*sqlcv1.V1QueueItem, error)
	MarkQueueItemsProcessed(ctx context.Context, r *AssignResults) (succeeded []*AssignedItem, failed []*AssignedItem, err error)

	GetTaskRateLimits(ctx context.Context, queueItems []*sqlcv1.V1QueueItem) (map[int64]map[string]int32, error)
	RequeueRateLimitedItems(ctx context.Context, tenantId pgtype.UUID, queueName string) ([]*sqlcv1.RequeueRateLimitedQueueItemsRow, error)
	GetDesiredLabels(ctx context.Context, stepIds []pgtype.UUID) (map[string][]*sqlcv1.GetDesiredLabelsRow, error)
	Cleanup()
}

type RateLimitRepository

type RateLimitRepository interface {
	UpdateRateLimits(ctx context.Context, tenantId pgtype.UUID, updates map[string]int) ([]*sqlcv1.ListRateLimitsForTenantWithMutateRow, *time.Time, error)

	UpsertRateLimit(ctx context.Context, tenantId string, key string, opts *UpsertRateLimitOpts) (*sqlcv1.RateLimit, error)

	ListRateLimits(ctx context.Context, tenantId string, opts *ListRateLimitOpts) (*ListRateLimitsResult, error)
}

type RateLimitResult

type RateLimitResult struct {
	*sqlcv1.V1QueueItem

	ExceededKey    string
	ExceededUnits  int32
	ExceededVal    int32
	NextRefillAt   *time.Time
	TaskId         int64
	TaskInsertedAt pgtype.Timestamptz
	RetryCount     int32
}

type ReadTaskRunMetricsOpts added in v0.74.3

type ReadTaskRunMetricsOpts struct {
	CreatedAfter time.Time

	CreatedBefore *time.Time

	WorkflowIds []uuid.UUID

	ParentTaskExternalID *pgtype.UUID

	TriggeringEventExternalId *pgtype.UUID

	AdditionalMetadata map[string]interface{}
}

type ReadableTaskStatus added in v0.74.3

type ReadableTaskStatus string
const (
	READABLE_TASK_STATUS_QUEUED    ReadableTaskStatus = "QUEUED"
	READABLE_TASK_STATUS_RUNNING   ReadableTaskStatus = "RUNNING"
	READABLE_TASK_STATUS_COMPLETED ReadableTaskStatus = "COMPLETED"
	READABLE_TASK_STATUS_CANCELLED ReadableTaskStatus = "CANCELLED"
	READABLE_TASK_STATUS_FAILED    ReadableTaskStatus = "FAILED"
)

func StringToReadableStatus added in v0.74.3

func StringToReadableStatus(status string) ReadableTaskStatus

func (ReadableTaskStatus) EnumValue added in v0.74.3

func (s ReadableTaskStatus) EnumValue() int

type RefreshTimeoutBy

type RefreshTimeoutBy struct {
	TaskExternalId string `validate:"required,uuid"`

	IncrementTimeoutBy string `validate:"required,duration"`
}

type ReplayTaskOpts added in v0.74.3

type ReplayTaskOpts struct {
	// (required) the task id
	TaskId int64

	// (required) the inserted at time
	InsertedAt pgtype.Timestamptz

	// (required) the external id
	ExternalId string

	// (required) the step id
	StepId string

	// (optional) the input bytes to the task, uses the existing input if not set
	Input *TaskInput

	// (required) the initial state for the task
	InitialState sqlcv1.V1TaskInitialState

	// (optional) the additional metadata for the task
	AdditionalMetadata []byte
}

type ReplayTasksResult added in v0.74.3

type ReplayTasksResult struct {
	ReplayedTasks []TaskIdInsertedAtRetryCount

	UpsertedTasks []*V1TaskWithPayload

	InternalEventResults *EventMatchResults
}

type Repository added in v0.74.3

type Repository interface {
	APIToken() APITokenRepository
	Dispatcher() DispatcherRepository
	Health() HealthRepository
	MessageQueue() MessageQueueRepository
	RateLimit() RateLimitRepository
	Triggers() TriggerRepository
	Tasks() TaskRepository
	Scheduler() SchedulerRepository
	Matches() MatchRepository
	OLAP() OLAPRepository
	OverwriteOLAPRepository(o OLAPRepository)
	Logs() LogLineRepository
	OverwriteLogsRepository(l LogLineRepository)
	Payloads() PayloadStoreRepository
	OverwriteExternalPayloadStore(o ExternalStore)
	Workers() WorkerRepository
	Workflows() WorkflowRepository
	Ticker() TickerRepository
	Filters() FilterRepository
	Webhooks() WebhookRepository
	Idempotency() IdempotencyRepository
	IntervalSettings() IntervalSettingsRepository
	PGHealth() PGHealthRepository
	SecurityCheck() SecurityCheckRepository
	Slack() SlackRepository
	SNS() SNSRepository
	TenantInvite() TenantInviteRepository
	TenantLimit() TenantLimitRepository
	TenantAlertingSettings() TenantAlertingRepository
	Tenant() TenantRepository
	User() UserRepository
	UserSession() UserSessionRepository
	WorkflowSchedules() WorkflowScheduleRepository
}

func NewRepository added in v0.74.3

func NewRepository(
	pool *pgxpool.Pool,
	l *zerolog.Logger,
	cacheDuration time.Duration,
	taskRetentionPeriod, olapRetentionPeriod time.Duration,
	maxInternalRetryCount int32,
	taskLimits TaskOperationLimits,
	payloadStoreOpts PayloadStoreRepositoryOpts,
	statusUpdateBatchSizeLimits StatusUpdateBatchSizeLimits,
	tenantLimitConfig limits.LimitConfigFile,
	enforceLimits bool,
) (Repository, func() error)

type RetriedTask added in v0.74.3

type RetriedTask struct {
	*TaskIdInsertedAtRetryCount

	IsAppError bool

	AppRetryCount int32

	RetryBackoffFactor pgtype.Float8

	RetryMaxBackoff pgtype.Int4
}

type RetrievePayloadOpts added in v0.74.3

type RetrievePayloadOpts struct {
	Id         int64
	InsertedAt pgtype.Timestamptz
	Type       sqlcv1.V1PayloadType
	TenantId   pgtype.UUID
}

type Run added in v0.74.3

type Run struct {
	Id         int64
	InsertedAt time.Time
	FilterId   *string
}

type RunConcurrencyResult added in v0.74.3

type RunConcurrencyResult struct {
	// The tasks which were enqueued
	Queued []TaskWithQueue

	// If the strategy involves cancelling a task, these are the tasks to cancel
	Cancelled []TaskWithCancelledReason

	// If the step has multiple concurrency strategies, these are the next ones to notify
	NextConcurrencyStrategies []int64
}

type RuntimeInfo

type RuntimeInfo struct {
	SdkVersion      *string         `validate:"omitempty"`
	Language        *contracts.SDKS `validate:"omitempty"`
	LanguageVersion *string         `validate:"omitempty"`
	Os              *string         `validate:"omitempty"`
	Extra           *string         `validate:"omitempty"`
}

type SDK added in v0.74.3

type SDK struct {
	OperatingSystem string
	Language        string
	LanguageVersion string
	SdkVersion      string
}

type SNSRepository

type SNSRepository interface {
	GetSNSIntegration(ctx context.Context, tenantId, topicArn string) (*sqlcv1.SNSIntegration, error)

	GetSNSIntegrationById(ctx context.Context, id string) (*sqlcv1.SNSIntegration, error)

	CreateSNSIntegration(ctx context.Context, tenantId string, opts *CreateSNSIntegrationOpts) (*sqlcv1.SNSIntegration, error)

	ListSNSIntegrations(ctx context.Context, tenantId string) ([]*sqlcv1.SNSIntegration, error)

	DeleteSNSIntegration(ctx context.Context, tenantId, id string) error
}

type ScheduledWorkflowMeta added in v0.73.104

type ScheduledWorkflowMeta struct {
	Id              string
	Method          sqlcv1.WorkflowTriggerScheduledRefMethods
	HasTriggeredRun bool
}

type ScheduledWorkflowUpdate added in v0.73.104

type ScheduledWorkflowUpdate struct {
	Id        string
	TriggerAt time.Time
}

type SchedulerRepository

type SchedulerRepository interface {
	Concurrency() ConcurrencyRepository
	Lease() LeaseRepository
	QueueFactory() QueueFactoryRepository
	RateLimit() RateLimitRepository
	Assignment() AssignmentRepository
}

type SecurityCheckRepository

type SecurityCheckRepository interface {
	GetIdent() (string, error)
}

type SlackRepository

type SlackRepository interface {
	UpsertSlackWebhook(ctx context.Context, tenantId string, opts *UpsertSlackWebhookOpts) (*sqlcv1.SlackAppWebhook, error)

	ListSlackWebhooks(ctx context.Context, tenantId string) ([]*sqlcv1.SlackAppWebhook, error)

	GetSlackWebhookById(ctx context.Context, id string) (*sqlcv1.SlackAppWebhook, error)

	DeleteSlackWebhook(ctx context.Context, tenantId string, id string) error
}

type StatusUpdateBatchSizeLimits added in v0.74.3

type StatusUpdateBatchSizeLimits struct {
	Task int32
	DAG  int32
}

type Sticky added in v0.74.3

type Sticky string
const (
	STICKY_HARD Sticky = "HARD"
	STICKY_SOFT Sticky = "SOFT"
	STICKY_NONE Sticky = "NONE"
)

type StoreOLAPPayloadOpts added in v0.74.3

type StoreOLAPPayloadOpts struct {
	ExternalId pgtype.UUID
	InsertedAt pgtype.Timestamptz
	Payload    []byte
}

type StorePayloadOpts added in v0.74.3

type StorePayloadOpts struct {
	Id         int64
	InsertedAt pgtype.Timestamptz
	ExternalId pgtype.UUID
	Type       sqlcv1.V1PayloadType
	Payload    []byte
	TenantId   string
}

type TaskEventWithPayloads added in v0.74.3

type TaskEventWithPayloads struct {
	*sqlcv1.ListTaskEventsForWorkflowRunRow
	OutputPayload []byte
}

type TaskIdEventKeyTuple added in v0.74.3

type TaskIdEventKeyTuple struct {
	Id int64 `validate:"required"`

	EventKey string `validate:"required"`
}

type TaskIdInsertedAtRetryCount added in v0.74.3

type TaskIdInsertedAtRetryCount struct {
	// (required) the external id
	Id int64 `validate:"required"`

	// (required) the inserted at time
	InsertedAt pgtype.Timestamptz

	// (required) the retry count
	RetryCount int32
}

type TaskIdInsertedAtSignalKey added in v0.74.3

type TaskIdInsertedAtSignalKey struct {
	// (required) the external id
	Id int64 `validate:"required"`

	// (required) the inserted at time
	InsertedAt pgtype.Timestamptz

	// (required) the signal key for the event
	SignalKey string
}

type TaskInput added in v0.74.3

type TaskInput struct {
	Input map[string]interface{} `json:"input"`

	TriggerData *MatchData `json:"trigger_datas"`

	FilterPayload map[string]interface{} `json:"filter_payload"`
}

func (*TaskInput) Bytes added in v0.74.3

func (t *TaskInput) Bytes() []byte

type TaskMetadata added in v0.74.3

type TaskMetadata struct {
	TaskID         int64     `json:"task_id"`
	TaskInsertedAt time.Time `json:"task_inserted_at"`
}

func ParseTaskMetadata added in v0.74.3

func ParseTaskMetadata(jsonData []byte) ([]TaskMetadata, error)

type TaskOperationLimits added in v0.74.3

type TaskOperationLimits struct {
	TimeoutLimit      int
	ReassignLimit     int
	RetryQueueLimit   int
	DurableSleepLimit int
}

type TaskOutputEvent added in v0.74.3

type TaskOutputEvent struct {
	IsFailure bool `json:"is_failure"`

	EventType sqlcv1.V1TaskEventType `json:"event_type"`

	TaskExternalId string `json:"task_external_id"`

	TaskId int64 `json:"task_id"`

	RetryCount int32 `json:"retry_count"`

	WorkerId *string `json:"worker_id"`

	Output []byte `json:"output"`

	ErrorMessage string `json:"error_message"`

	StepReadableID string `json:"step_readable_id"`
}

func NewCancelledTaskOutputEvent added in v0.74.3

func NewCancelledTaskOutputEvent(row *sqlcv1.ReleaseTasksRow) *TaskOutputEvent

func NewCancelledTaskOutputEventFromTask added in v0.74.3

func NewCancelledTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent

func NewCompletedTaskOutputEvent added in v0.74.3

func NewCompletedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, output []byte) *TaskOutputEvent

func NewFailedTaskOutputEvent added in v0.74.3

func NewFailedTaskOutputEvent(row *sqlcv1.ReleaseTasksRow, errorMsg string) *TaskOutputEvent

func NewFailedTaskOutputEventFromTask added in v0.74.3

func NewFailedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent

func NewSkippedTaskOutputEventFromTask added in v0.74.3

func NewSkippedTaskOutputEventFromTask(task *V1TaskWithPayload) *TaskOutputEvent

func (*TaskOutputEvent) Bytes added in v0.74.3

func (e *TaskOutputEvent) Bytes() []byte

func (*TaskOutputEvent) IsCancelled added in v0.74.3

func (e *TaskOutputEvent) IsCancelled() bool

func (*TaskOutputEvent) IsCompleted added in v0.74.3

func (e *TaskOutputEvent) IsCompleted() bool

func (*TaskOutputEvent) IsFailed added in v0.74.3

func (e *TaskOutputEvent) IsFailed() bool

type TaskRepository added in v0.74.3

type TaskRepository interface {
	EnsureTablePartitionsExist(ctx context.Context) (bool, error)
	UpdateTablePartitions(ctx context.Context) error

	// GetTaskByExternalId is a heavily cached method to return task metadata by its external id
	GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)

	// FlattenExternalIds is a non-cached method to look up all tasks in a workflow run by their external ids.
	// This is non-cacheable because tasks can be added to a workflow run as it executes.
	FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)

	CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)

	FailTasks(ctx context.Context, tenantId string, tasks []FailTaskOpts) (*FailTasksResponse, error)

	CancelTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error)

	ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)

	ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)

	ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)

	// ListTaskParentOutputs is a method to return the output of a task's parent and grandparent tasks. This is for v0 compatibility
	// with the v1 engine, and shouldn't be called from new v1 endpoints.
	ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)

	DefaultTaskActivityGauge(ctx context.Context, tenantId string) (int, error)

	ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)

	ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)

	ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)

	ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)

	GetQueueCounts(ctx context.Context, tenantId string) (map[string]interface{}, error)

	ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)

	RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)

	ReleaseSlot(ctx context.Context, tenantId string, externalId string) (*sqlcv1.V1TaskRuntime, error)

	ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*V1TaskEventWithPayload, error)

	// AnalyzeTaskTables runs ANALYZE on the task tables
	AnalyzeTaskTables(ctx context.Context) error

	// Cleanup makes sure to get rid of invalid old entries
	// Returns (shouldContinue, error) where shouldContinue indicates if there's more work
	Cleanup(ctx context.Context) (bool, error)

	GetTaskStats(ctx context.Context, tenantId string) (map[string]TaskStat, error)

	FindOldestRunningTaskInsertedAt(ctx context.Context) (*time.Time, error)
	FindOldestTaskInsertedAt(ctx context.Context) (*time.Time, error)
}

type TaskRepositoryImpl added in v0.74.3

type TaskRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (*TaskRepositoryImpl) AnalyzeTaskTables added in v0.74.3

func (r *TaskRepositoryImpl) AnalyzeTaskTables(ctx context.Context) error

func (*TaskRepositoryImpl) CancelTasks added in v0.74.3

func (*TaskRepositoryImpl) Cleanup added in v0.74.3

func (r *TaskRepositoryImpl) Cleanup(ctx context.Context) (bool, error)

func (*TaskRepositoryImpl) CompleteTasks added in v0.74.3

func (r *TaskRepositoryImpl) CompleteTasks(ctx context.Context, tenantId string, tasks []CompleteTaskOpts) (*FinalizedTaskResponse, error)

func (*TaskRepositoryImpl) DefaultTaskActivityGauge added in v0.74.3

func (r *TaskRepositoryImpl) DefaultTaskActivityGauge(ctx context.Context, tenantId string) (int, error)

DefaultTaskActivityGauge is a heavily cached method that returns the number of queues that have had activity since the task retention period.

func (TaskRepositoryImpl) DesiredWorkerId added in v0.74.3

func (s TaskRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (*TaskRepositoryImpl) EnsureTablePartitionsExist added in v0.74.3

func (r *TaskRepositoryImpl) EnsureTablePartitionsExist(ctx context.Context) (bool, error)

func (*TaskRepositoryImpl) FailTasks added in v0.74.3

func (r *TaskRepositoryImpl) FailTasks(ctx context.Context, tenantId string, failureOpts []FailTaskOpts) (*FailTasksResponse, error)

func (*TaskRepositoryImpl) FindOldestRunningTaskInsertedAt added in v0.74.3

func (r *TaskRepositoryImpl) FindOldestRunningTaskInsertedAt(ctx context.Context) (*time.Time, error)

func (*TaskRepositoryImpl) FindOldestTaskInsertedAt added in v0.74.3

func (r *TaskRepositoryImpl) FindOldestTaskInsertedAt(ctx context.Context) (*time.Time, error)

func (*TaskRepositoryImpl) FlattenExternalIds added in v0.74.3

func (r *TaskRepositoryImpl) FlattenExternalIds(ctx context.Context, tenantId string, externalIds []string) ([]*sqlcv1.FlattenExternalIdsRow, error)

func (*TaskRepositoryImpl) GetQueueCounts added in v0.74.3

func (r *TaskRepositoryImpl) GetQueueCounts(ctx context.Context, tenantId string) (map[string]interface{}, error)

func (*TaskRepositoryImpl) GetTaskByExternalId added in v0.74.3

func (r *TaskRepositoryImpl) GetTaskByExternalId(ctx context.Context, tenantId, taskExternalId string, skipCache bool) (*sqlcv1.FlattenExternalIdsRow, error)

func (*TaskRepositoryImpl) GetTaskStats added in v0.74.3

func (r *TaskRepositoryImpl) GetTaskStats(ctx context.Context, tenantId string) (map[string]TaskStat, error)

func (*TaskRepositoryImpl) ListFinalizedWorkflowRuns added in v0.74.3

func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tenantId string, rootExternalIds []string) ([]*ListFinalizedWorkflowRunsResponse, error)

func (*TaskRepositoryImpl) ListSignalCompletedEvents added in v0.74.3

func (r *TaskRepositoryImpl) ListSignalCompletedEvents(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtSignalKey) ([]*V1TaskEventWithPayload, error)

func (*TaskRepositoryImpl) ListTaskMetas added in v0.74.3

func (r *TaskRepositoryImpl) ListTaskMetas(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.ListTaskMetasRow, error)

func (*TaskRepositoryImpl) ListTaskParentOutputs added in v0.74.3

func (r *TaskRepositoryImpl) ListTaskParentOutputs(ctx context.Context, tenantId string, tasks []*sqlcv1.V1Task) (map[int64][]*TaskOutputEvent, error)

func (*TaskRepositoryImpl) ListTasks added in v0.74.3

func (r *TaskRepositoryImpl) ListTasks(ctx context.Context, tenantId string, tasks []int64) ([]*sqlcv1.V1Task, error)

func (TaskRepositoryImpl) PopulateExternalIdsForWorkflow added in v0.74.3

func (s TaskRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*TaskRepositoryImpl) ProcessDurableSleeps added in v0.74.3

func (r *TaskRepositoryImpl) ProcessDurableSleeps(ctx context.Context, tenantId string) (*EventMatchResults, bool, error)

func (*TaskRepositoryImpl) ProcessTaskReassignments added in v0.74.3

func (r *TaskRepositoryImpl) ProcessTaskReassignments(ctx context.Context, tenantId string) (*FailTasksResponse, bool, error)

func (*TaskRepositoryImpl) ProcessTaskRetryQueueItems added in v0.74.3

func (r *TaskRepositoryImpl) ProcessTaskRetryQueueItems(ctx context.Context, tenantId string) ([]*sqlcv1.V1RetryQueueItem, bool, error)

func (*TaskRepositoryImpl) ProcessTaskTimeouts added in v0.74.3

func (r *TaskRepositoryImpl) ProcessTaskTimeouts(ctx context.Context, tenantId string) (*TimeoutTasksResponse, bool, error)

func (*TaskRepositoryImpl) RefreshTimeoutBy added in v0.74.3

func (r *TaskRepositoryImpl) RefreshTimeoutBy(ctx context.Context, tenantId string, opt RefreshTimeoutBy) (*sqlcv1.V1TaskRuntime, error)

func (*TaskRepositoryImpl) ReleaseSlot added in v0.74.3

func (r *TaskRepositoryImpl) ReleaseSlot(ctx context.Context, tenantId, externalId string) (*sqlcv1.V1TaskRuntime, error)

func (*TaskRepositoryImpl) ReplayTasks added in v0.74.3

func (r *TaskRepositoryImpl) ReplayTasks(ctx context.Context, tenantId string, tasks []TaskIdInsertedAtRetryCount) (*ReplayTasksResult, error)

func (TaskRepositoryImpl) ToV1StepRunData added in v0.74.3

func (s TaskRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

func (*TaskRepositoryImpl) UpdateTablePartitions added in v0.74.3

func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error

type TaskRunMetric added in v0.74.3

type TaskRunMetric struct {
	Status string `json:"status"`
	Count  uint64 `json:"count"`
}

type TaskStat added in v0.74.3

type TaskStat struct {
	Queued  *TaskStatusStat `json:"queued,omitempty"`
	Running *TaskStatusStat `json:"running,omitempty"`
}

TaskStat represents the statistics for a single task step

type TaskStatusStat added in v0.74.3

type TaskStatusStat struct {
	Total       int64             `json:"total"`
	Oldest      *time.Time        `json:"oldest,omitempty"`
	Queues      map[string]int64  `json:"queues,omitempty"`
	Concurrency []ConcurrencyStat `json:"concurrency,omitempty"`
}

TaskStatusStat represents statistics for a specific task status (queued or running)

type TaskWithCancelledReason added in v0.74.3

type TaskWithCancelledReason struct {
	*TaskIdInsertedAtRetryCount

	CancelledReason string

	TaskExternalId string

	WorkflowRunId string
}

type TaskWithPayloads added in v0.74.3

type TaskWithPayloads struct {
	*sqlcv1.PopulateTaskRunDataRow
	InputPayload       []byte
	OutputPayload      []byte
	NumSpawnedChildren int64
}

type TaskWithQueue added in v0.74.3

type TaskWithQueue struct {
	*TaskIdInsertedAtRetryCount

	Queue string
}

type TenantAlertEmailGroupForSend

type TenantAlertEmailGroupForSend struct {
	TenantId pgtype.UUID `json:"tenantId"`
	Emails   []string    `validate:"required,dive,email,max=255"`
}

type TenantAlertingRepository

type TenantAlertingRepository interface {
	UpsertTenantAlertingSettings(ctx context.Context, tenantId string, opts *UpsertTenantAlertingSettingsOpts) (*sqlcv1.TenantAlertingSettings, error)

	GetTenantAlertingSettings(ctx context.Context, tenantId string) (*GetTenantAlertingSettingsResponse, error)

	GetTenantResourceLimitState(ctx context.Context, tenantId string, resource string) (*sqlcv1.GetTenantResourceLimitRow, error)

	UpdateTenantAlertingSettings(ctx context.Context, tenantId string, opts *UpdateTenantAlertingSettingsOpts) error

	CreateTenantAlertGroup(ctx context.Context, tenantId string, opts *CreateTenantAlertGroupOpts) (*sqlcv1.TenantAlertEmailGroup, error)

	UpdateTenantAlertGroup(ctx context.Context, id string, opts *UpdateTenantAlertGroupOpts) (*sqlcv1.TenantAlertEmailGroup, error)

	ListTenantAlertGroups(ctx context.Context, tenantId string) ([]*sqlcv1.TenantAlertEmailGroup, error)

	GetTenantAlertGroupById(ctx context.Context, id string) (*sqlcv1.TenantAlertEmailGroup, error)

	DeleteTenantAlertGroup(ctx context.Context, tenantId string, id string) error
}

type TenantCallbackOpts

type TenantCallbackOpts[T any] struct {
	// contains filtered or unexported fields
}

func (*TenantCallbackOpts[T]) Run

func (o *TenantCallbackOpts[T]) Run(l *zerolog.Logger, tenantId string, v T)

type TenantID added in v0.74.3

type TenantID string

type TenantIdSDKTuple added in v0.74.3

type TenantIdSDKTuple struct {
	TenantId string
	SDK      SDK
}

type TenantInviteRepository

type TenantInviteRepository interface {
	// CreateTenantInvite creates a new tenant invite with the given options
	CreateTenantInvite(ctx context.Context, tenantId string, opts *CreateTenantInviteOpts) (*sqlcv1.TenantInviteLink, error)

	// GetTenantInvite returns the tenant invite with the given id
	GetTenantInvite(ctx context.Context, id string) (*sqlcv1.TenantInviteLink, error)

	// ListTenantInvitesByEmail returns the list of tenant invites for the given invitee email for invites
	// which are not expired
	ListTenantInvitesByEmail(ctx context.Context, email string) ([]*sqlcv1.ListTenantInvitesByEmailRow, error)

	// ListTenantInvitesByTenantId returns the list of tenant invites for the given tenant id
	ListTenantInvitesByTenantId(ctx context.Context, tenantId string, opts *ListTenantInvitesOpts) ([]*sqlcv1.TenantInviteLink, error)

	// UpdateTenantInvite updates the tenant invite with the given id
	UpdateTenantInvite(ctx context.Context, id string, opts *UpdateTenantInviteOpts) (*sqlcv1.TenantInviteLink, error)

	// DeleteTenantInvite deletes the tenant invite with the given id
	DeleteTenantInvite(ctx context.Context, id string) error
}

type TenantLimitConfig

type TenantLimitConfig struct {
	EnforceLimits bool
}

type TenantLimitRepository

type TenantLimitRepository interface {
	GetLimits(ctx context.Context, tenantId string) ([]*sqlcv1.TenantResourceLimit, error)

	// CanCreate checks if the tenant can create a resource
	CanCreate(ctx context.Context, resource sqlcv1.LimitResource, tenantId string, numberOfResources int32) (bool, int, error)

	// Create new Tenant Resource Limits for a tenant
	SelectOrInsertTenantLimits(ctx context.Context, tenantId string, plan *string) error

	// UpsertTenantLimits updates or inserts new tenant limits
	UpsertTenantLimits(ctx context.Context, tenantId string, plan *string) error

	// Resolve all tenant resource limits
	ResolveAllTenantResourceLimits(ctx context.Context) error

	// SetPlanLimitMap sets the plan limit map
	SetPlanLimitMap(planLimitMap PlanLimitMap) error

	DefaultLimits() []Limit

	Stop()

	Meter(ctx context.Context, resource sqlcv1.LimitResource, tenantId string, numberOfResources int32) (precommit func() error, postcommit func())
}

type TenantRepository added in v0.74.3

type TenantRepository interface {
	// CreateTenant creates a new tenant.
	CreateTenant(ctx context.Context, opts *CreateTenantOpts) (*sqlcv1.Tenant, error)

	// UpdateTenant updates an existing tenant in the db.
	UpdateTenant(ctx context.Context, tenantId string, opts *UpdateTenantOpts) (*sqlcv1.Tenant, error)

	// GetTenantByID returns the tenant with the given id
	GetTenantByID(ctx context.Context, tenantId string) (*sqlcv1.Tenant, error)

	// GetTenantBySlug returns the tenant with the given slug
	GetTenantBySlug(ctx context.Context, slug string) (*sqlcv1.Tenant, error)

	// CreateTenantMember creates a new member in the tenant
	CreateTenantMember(ctx context.Context, tenantId string, opts *CreateTenantMemberOpts) (*sqlcv1.PopulateTenantMembersRow, error)

	// GetTenantMemberByID returns the tenant member with the given id
	GetTenantMemberByID(ctx context.Context, memberId string) (*sqlcv1.PopulateTenantMembersRow, error)

	// GetTenantMemberByUserID returns the tenant member with the given user id
	GetTenantMemberByUserID(ctx context.Context, tenantId string, userId string) (*sqlcv1.PopulateTenantMembersRow, error)

	// GetTenantMemberByEmail returns the tenant member with the given email
	GetTenantMemberByEmail(ctx context.Context, tenantId string, email string) (*sqlcv1.PopulateTenantMembersRow, error)

	// ListTenantMembers returns the list of tenant members for the given tenant
	ListTenantMembers(ctx context.Context, tenantId string) ([]*sqlcv1.PopulateTenantMembersRow, error)

	// UpdateTenantMember updates the tenant member with the given id
	UpdateTenantMember(ctx context.Context, memberId string, opts *UpdateTenantMemberOpts) (*sqlcv1.PopulateTenantMembersRow, error)

	// DeleteTenantMember deletes the tenant member with the given id
	DeleteTenantMember(ctx context.Context, memberId string) error

	// GetQueueMetrics returns the queue metrics for the given tenant
	GetQueueMetrics(ctx context.Context, tenantId string, opts *GetQueueMetricsOpts) (*GetQueueMetricsResponse, error)

	// ListTenants lists all tenants in the instance
	ListTenants(ctx context.Context) ([]*sqlcv1.Tenant, error)

	// Gets the tenant corresponding to the "internal" tenant if it's assigned to this controller.
	// Returns nil if the tenant is not assigned to this controller.
	GetInternalTenantForController(ctx context.Context, controllerPartitionId string) (*sqlcv1.Tenant, error)

	// ListTenantsByPartition lists all tenants in the given partition
	ListTenantsByControllerPartition(ctx context.Context, controllerPartitionId string, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error)

	ListTenantsByWorkerPartition(ctx context.Context, workerPartitionId string, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error)

	ListTenantsBySchedulerPartition(ctx context.Context, schedulerPartitionId string, majorVersion sqlcv1.TenantMajorEngineVersion) ([]*sqlcv1.Tenant, error)

	// CreateEnginePartition creates a new partition for tenants within the engine
	CreateControllerPartition(ctx context.Context) (string, error)

	// UpdateControllerPartitionHeartbeat updates the heartbeat for the given partition. If the partition no longer exists,
	// it creates a new partition and returns the new partition id. Otherwise, it returns the existing partition id.
	UpdateControllerPartitionHeartbeat(ctx context.Context, partitionId string) (string, error)

	DeleteControllerPartition(ctx context.Context, id string) error

	RebalanceAllControllerPartitions(ctx context.Context) error

	RebalanceInactiveControllerPartitions(ctx context.Context) error

	CreateSchedulerPartition(ctx context.Context) (string, error)

	UpdateSchedulerPartitionHeartbeat(ctx context.Context, partitionId string) (string, error)

	DeleteSchedulerPartition(ctx context.Context, id string) error

	RebalanceAllSchedulerPartitions(ctx context.Context) error

	RebalanceInactiveSchedulerPartitions(ctx context.Context) error

	CreateTenantWorkerPartition(ctx context.Context) (string, error)

	UpdateWorkerPartitionHeartbeat(ctx context.Context, partitionId string) (string, error)

	DeleteTenantWorkerPartition(ctx context.Context, id string) error

	RebalanceAllTenantWorkerPartitions(ctx context.Context) error

	RebalanceInactiveTenantWorkerPartitions(ctx context.Context) error
}

type TenantScopedCallback

type TenantScopedCallback[T any] func(string, T) error

func (TenantScopedCallback[T]) Do

func (c TenantScopedCallback[T]) Do(l *zerolog.Logger, tenantId string, v T)

type TickerRepository added in v0.74.3

type TickerRepository interface {
	IsTenantAlertActive(ctx context.Context, tenantId string) (bool, time.Time, error)

	// CreateNewTicker creates a new ticker.
	CreateNewTicker(ctx context.Context, opts *CreateTickerOpts) (*sqlcv1.Ticker, error)

	// UpdateTicker updates a ticker.
	UpdateTicker(ctx context.Context, tickerId string, opts *UpdateTickerOpts) (*sqlcv1.Ticker, error)

	// ListTickers lists tickers.
	ListTickers(ctx context.Context, opts *ListTickerOpts) ([]*sqlcv1.Ticker, error)

	// DeactivateTicker deletes a ticker.
	DeactivateTicker(ctx context.Context, tickerId string) error

	// PollCronSchedules returns all cron schedules which should be managed by the ticker
	PollCronSchedules(ctx context.Context, tickerId string) ([]*sqlcv1.PollCronSchedulesRow, error)

	PollScheduledWorkflows(ctx context.Context, tickerId string) ([]*sqlcv1.PollScheduledWorkflowsRow, error)

	PollTenantAlerts(ctx context.Context, tickerId string) ([]*sqlcv1.PollTenantAlertsRow, error)

	PollExpiringTokens(ctx context.Context) ([]*sqlcv1.PollExpiringTokensRow, error)

	PollTenantResourceLimitAlerts(ctx context.Context) ([]*sqlcv1.TenantResourceLimitAlert, error)
}

type TimeoutTasksResponse added in v0.74.3

type TimeoutTasksResponse struct {
	*FailTasksResponse

	TimeoutTasks []*sqlcv1.ListTasksToTimeoutRow
}

type TriggerDecision added in v0.74.3

type TriggerDecision struct {
	ShouldTrigger bool
	FilterPayload []byte
	FilterId      *string
}

type TriggerFromEventsResult added in v0.74.3

type TriggerFromEventsResult struct {
	Tasks                 []*V1TaskWithPayload
	Dags                  []*DAGWithData
	EventExternalIdToRuns map[string][]*Run
	CELEvaluationFailures []CELEvaluationFailure
}

type TriggerRepository added in v0.74.3

type TriggerRepository interface {
	TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)

	TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*V1TaskWithPayload, []*DAGWithData, error)

	PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

	PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error
}

type TriggerRepositoryImpl added in v0.74.3

type TriggerRepositoryImpl struct {
	// contains filtered or unexported fields
}

func (TriggerRepositoryImpl) DesiredWorkerId added in v0.74.3

func (s TriggerRepositoryImpl) DesiredWorkerId(t *TaskInput) *string

func (TriggerRepositoryImpl) PopulateExternalIdsForWorkflow added in v0.74.3

func (s TriggerRepositoryImpl) PopulateExternalIdsForWorkflow(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

GenerateExternalIdsForWorkflow generates external ids and additional looks up child workflows and whether they already exist.

func (*TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts added in v0.74.3

func (r *TriggerRepositoryImpl) PreflightVerifyWorkflowNameOpts(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) error

func (TriggerRepositoryImpl) ToV1StepRunData added in v0.74.3

func (s TriggerRepositoryImpl) ToV1StepRunData(t *TaskInput) *V1StepRunData

func (*TriggerRepositoryImpl) TriggerFromEvents added in v0.74.3

func (r *TriggerRepositoryImpl) TriggerFromEvents(ctx context.Context, tenantId string, opts []EventTriggerOpts) (*TriggerFromEventsResult, error)

func (*TriggerRepositoryImpl) TriggerFromWorkflowNames added in v0.74.3

func (r *TriggerRepositoryImpl) TriggerFromWorkflowNames(ctx context.Context, tenantId string, opts []*WorkflowNameTriggerOpts) ([]*V1TaskWithPayload, []*DAGWithData, error)

type TriggerTaskData added in v0.74.3

type TriggerTaskData struct {
	// (required) the workflow name
	WorkflowName string `json:"workflow_name" validate:"required"`

	// (optional) the workflow run data
	Data []byte `json:"data"`

	// (optional) the workflow run metadata
	AdditionalMetadata []byte `json:"additional_metadata"`

	// (optional) the desired worker id
	DesiredWorkerId *string `json:"desired_worker_id"`

	// (optional) the parent external id
	ParentExternalId *string `json:"parent_external_id"`

	// (optional) the parent task id
	ParentTaskId *int64 `json:"parent_task_id"`

	// (optional) the parent inserted_at
	ParentTaskInsertedAt *time.Time `json:"parent_task_inserted_at"`

	// (optional) the child index
	ChildIndex *int64 `json:"child_index"`

	// (optional) the child key
	ChildKey *string `json:"child_key"`

	// (optional) the priority of the task
	Priority *int32 `json:"priority"`
}

type TriggeredBy added in v0.74.3

type TriggeredBy interface {
	ToMetadata([]byte) []byte
}

type TriggeredByEvent added in v0.74.3

type TriggeredByEvent struct {
	// contains filtered or unexported fields
}

func (*TriggeredByEvent) ToMetadata added in v0.74.3

func (t *TriggeredByEvent) ToMetadata(additionalMetadata []byte) []byte

type UnscopedCallback

type UnscopedCallback[T any] func(T) error

func (UnscopedCallback[T]) Do

func (c UnscopedCallback[T]) Do(l *zerolog.Logger, v T)

type UpdateCronOpts added in v0.73.35

type UpdateCronOpts struct {
	// (optional) a flag indicating whether or not the cron is enabled
	Enabled *bool
}

type UpdateDAGStatusRow added in v0.74.3

type UpdateDAGStatusRow struct {
	TenantId       pgtype.UUID
	DagId          int64
	DagInsertedAt  pgtype.Timestamptz
	ReadableStatus sqlcv1.V1ReadableStatusOlap
	ExternalId     pgtype.UUID
	WorkflowId     pgtype.UUID
}

type UpdateDispatcherOpts

type UpdateDispatcherOpts struct {
	LastHeartbeatAt *time.Time
}

type UpdateFilterOpts added in v0.74.3

type UpdateFilterOpts struct {
	Scope      *string `json:"scope"`
	Expression *string `json:"expression"`
	Payload    []byte  `json:"payload"`
}

type UpdateSessionOpts

type UpdateSessionOpts struct {
	UserId *string `validate:"omitempty,uuid"`

	Data []byte
}

type UpdateTaskStatusRow added in v0.74.3

type UpdateTaskStatusRow struct {
	TenantId       pgtype.UUID
	TaskId         int64
	TaskInsertedAt pgtype.Timestamptz
	ReadableStatus sqlcv1.V1ReadableStatusOlap
	ExternalId     pgtype.UUID
	LatestWorkerId pgtype.UUID
	WorkflowId     pgtype.UUID
	IsDAGTask      bool
}

type UpdateTenantAlertGroupOpts

type UpdateTenantAlertGroupOpts struct {
	Emails []string `validate:"required,dive,email,max=255"`
}

type UpdateTenantAlertingSettingsOpts

type UpdateTenantAlertingSettingsOpts struct {
	LastAlertedAt *time.Time
}

type UpdateTenantInviteOpts

type UpdateTenantInviteOpts struct {
	Status *string `validate:"omitempty,oneof=ACCEPTED REJECTED"`

	// (optional) the role of the invitee
	Role *string `validate:"omitempty,oneof=OWNER ADMIN MEMBER"`
}

type UpdateTenantMemberOpts

type UpdateTenantMemberOpts struct {
	Role *string `validate:"omitempty,oneof=OWNER ADMIN MEMBER"`
}

type UpdateTenantOpts

type UpdateTenantOpts struct {
	Name *string

	AnalyticsOptOut *bool `validate:"omitempty"`

	AlertMemberEmails *bool `validate:"omitempty"`

	Version *sqlcv1.NullTenantMajorEngineVersion `validate:"omitempty"`
}

type UpdateTickerOpts

type UpdateTickerOpts struct {
	LastHeartbeatAt *time.Time
}

type UpdateUserOpts

type UpdateUserOpts struct {
	EmailVerified *bool
	Name          *string

	// auth options
	Password *string    `validate:"omitempty,required_without=OAuth,excluded_with=OAuth"`
	OAuth    *OAuthOpts `validate:"omitempty,required_without=Password,excluded_with=Password"`
}

type UpdateWorkerOpts

type UpdateWorkerOpts struct {
	// The id of the dispatcher
	DispatcherId *string `validate:"omitempty,uuid"`

	// When the last worker heartbeat was
	LastHeartbeatAt *time.Time

	// If the worker is active and accepting new runs
	IsActive *bool

	// A list of actions this worker can run
	Actions []string `validate:"dive,actionId"`

	// If the worker is paused
	IsPaused *bool
}

type UpsertRateLimitOpts

type UpsertRateLimitOpts struct {
	// The rate limit max value
	Limit int

	// The rate limit duration
	Duration *string `validate:"omitnil,oneof=SECOND MINUTE HOUR DAY WEEK MONTH YEAR"`
}

type UpsertSlackWebhookOpts

type UpsertSlackWebhookOpts struct {
	TeamId string `validate:"required,min=1,max=255"`

	TeamName string `validate:"required,min=1,max=255"`

	ChannelId string `validate:"required,min=1,max=255"`

	ChannelName string `validate:"required,min=1,max=255"`

	WebhookURL []byte `validate:"required,min=1"`
}

type UpsertTenantAlertingSettingsOpts

type UpsertTenantAlertingSettingsOpts struct {
	MaxFrequency                    *string `validate:"omitnil,duration"`
	EnableExpiringTokenAlerts       *bool   `validate:"omitnil"`
	EnableWorkflowRunFailureAlerts  *bool   `validate:"omitnil"`
	EnableTenantResourceLimitAlerts *bool   `validate:"omitnil"`
}

type UpsertWorkerLabelOpts

type UpsertWorkerLabelOpts struct {
	Key      string
	IntValue *int32
	StrValue *string
}

type UserRepository

type UserRepository interface {
	RegisterCreateCallback(callback UnscopedCallback[*sqlcv1.User])

	// GetUserByID returns the user with the given id
	GetUserByID(ctx context.Context, id string) (*sqlcv1.User, error)

	// GetUserByEmail returns the user with the given email
	GetUserByEmail(ctx context.Context, email string) (*sqlcv1.User, error)

	// GetUserPassword returns the user password with the given id
	GetUserPassword(ctx context.Context, id string) (*sqlcv1.UserPassword, error)

	// CreateUser creates a new user with the given options
	CreateUser(ctx context.Context, opts *CreateUserOpts) (*sqlcv1.User, error)

	// UpdateUser updates the user with the given email
	UpdateUser(ctx context.Context, id string, opts *UpdateUserOpts) (*sqlcv1.User, error)

	// ListTenantMemberships returns the list of tenant memberships for the given user
	ListTenantMemberships(ctx context.Context, userId string) ([]*sqlcv1.PopulateTenantMembersRow, error)
}

type UserSessionRepository

type UserSessionRepository interface {
	Create(ctx context.Context, opts *CreateSessionOpts) (*sqlcv1.UserSession, error)
	Update(ctx context.Context, sessionId string, opts *UpdateSessionOpts) (*sqlcv1.UserSession, error)
	Delete(ctx context.Context, sessionId string) (*sqlcv1.UserSession, error)
	GetById(ctx context.Context, sessionId string) (*sqlcv1.UserSession, error)
}

UserSessionRepository represents the set of queries on the UserSession model

type V1StepRunData added in v0.74.3

type V1StepRunData struct {
	Input       map[string]interface{}            `json:"input"`
	TriggeredBy string                            `json:"triggered_by"`
	Parents     map[string]map[string]interface{} `json:"parents"`

	Triggers map[string]map[string]interface{} `json:"triggers"`

	// custom-set user data for the step
	UserData map[string]interface{} `json:"user_data"`

	// overrides set from the playground
	Overrides map[string]interface{} `json:"overrides"`

	// errors in upstream steps (only used in on-failure step)
	StepRunErrors map[string]string `json:"step_run_errors,omitempty"`
}

func (*V1StepRunData) Bytes added in v0.74.3

func (v1 *V1StepRunData) Bytes() []byte

type V1TaskEventWithPayload added in v0.74.3

type V1TaskEventWithPayload struct {
	*sqlcv1.V1TaskEvent
	Payload []byte `json:"payload"`
}

type V1TaskWithPayload added in v0.74.3

type V1TaskWithPayload struct {
	*sqlcv1.V1Task
	Payload []byte `json:"payload"`
}

type V1WorkflowRunPopulator added in v0.74.3

type V1WorkflowRunPopulator struct {
	WorkflowRun  *WorkflowRunData
	TaskMetadata []TaskMetadata
}

type WasSuccessfullyClaimed added in v0.74.3

type WasSuccessfullyClaimed bool

type WebhookRepository added in v0.74.3

type WebhookRepository interface {
	CreateWebhook(ctx context.Context, tenantId string, params CreateWebhookOpts) (*sqlcv1.V1IncomingWebhook, error)
	ListWebhooks(ctx context.Context, tenantId string, params ListWebhooksOpts) ([]*sqlcv1.V1IncomingWebhook, error)
	DeleteWebhook(ctx context.Context, tenantId, webhookId string) (*sqlcv1.V1IncomingWebhook, error)
	GetWebhook(ctx context.Context, tenantId, webhookId string) (*sqlcv1.V1IncomingWebhook, error)
	CanCreate(ctx context.Context, tenantId string, webhookLimit int32) (bool, error)
	UpdateWebhook(ctx context.Context, tenantId string, webhookId, newExpression string) (*sqlcv1.V1IncomingWebhook, error)
}

type WorkerRepository added in v0.74.3

type WorkerRepository interface {
	ListWorkers(tenantId string, opts *ListWorkersOpts) ([]*sqlcv1.ListWorkersWithSlotCountRow, error)
	GetWorkerById(workerId string) (*sqlcv1.GetWorkerByIdRow, error)
	ListWorkerState(tenantId, workerId string, maxRuns int) ([]*sqlcv1.ListSemaphoreSlotsWithStateForWorkerRow, error)
	CountActiveSlotsPerTenant() (map[string]int64, error)
	CountActiveWorkersPerTenant() (map[string]int64, error)
	ListActiveSDKsPerTenant() (map[TenantIdSDKTuple]int64, error)

	// GetWorkerActionsByWorkerId returns a list of actions for a worker
	GetWorkerActionsByWorkerId(tenantid string, workerId []string) (map[string][]string, error)

	// GetWorkerWorkflowsByWorkerId returns a list of workflows for a worker
	GetWorkerWorkflowsByWorkerId(tenantid string, workerId string) ([]*sqlcv1.Workflow, error)

	// ListWorkerLabels returns a list of labels config for a worker
	ListWorkerLabels(tenantId, workerId string) ([]*sqlcv1.ListWorkerLabelsRow, error)

	// CreateNewWorker creates a new worker for a given tenant.
	CreateNewWorker(ctx context.Context, tenantId string, opts *CreateWorkerOpts) (*sqlcv1.Worker, error)

	// UpdateWorker updates a worker for a given tenant.
	UpdateWorker(ctx context.Context, tenantId, workerId string, opts *UpdateWorkerOpts) (*sqlcv1.Worker, error)

	// UpdateWorker updates a worker in the
	// It will only update the worker if there is no lock on the worker, else it will skip.
	UpdateWorkerHeartbeat(ctx context.Context, tenantId, workerId string, lastHeartbeatAt time.Time) error

	// DeleteWorker removes the worker from the database
	DeleteWorker(ctx context.Context, tenantId, workerId string) error

	GetWorkerForEngine(ctx context.Context, tenantId, workerId string) (*sqlcv1.GetWorkerForEngineRow, error)

	UpdateWorkerActiveStatus(ctx context.Context, tenantId, workerId string, isActive bool, timestamp time.Time) (*sqlcv1.Worker, error)

	UpsertWorkerLabels(ctx context.Context, workerId pgtype.UUID, opts []UpsertWorkerLabelOpts) ([]*sqlcv1.WorkerLabel, error)

	DeleteOldWorkers(ctx context.Context, tenantId string, lastHeartbeatBefore time.Time) (bool, error)

	GetDispatcherIdsForWorkers(ctx context.Context, tenantId string, workerIds []string) (map[string][]string, error)
}

type WorkflowAndScope added in v0.74.3

type WorkflowAndScope struct {
	WorkflowId pgtype.UUID
	Scope      string
}

type WorkflowMetrics

type WorkflowMetrics struct {
	// the number of runs for a specific group key
	GroupKeyRunsCount int `json:"groupKeyRunsCount,omitempty"`

	// the total number of concurrency group keys
	GroupKeyCount int `json:"groupKeyCount,omitempty"`
}

type WorkflowNameTriggerOpts added in v0.74.3

type WorkflowNameTriggerOpts struct {
	*TriggerTaskData

	ExternalId string

	// (optional) The idempotency key to use for debouncing this task
	IdempotencyKey *IdempotencyKey

	// Whether to skip the creation of the child workflow
	ShouldSkip bool
}

type WorkflowRepository added in v0.74.3

type WorkflowRepository interface {
	ListWorkflowNamesByIds(ctx context.Context, tenantId string, workflowIds []pgtype.UUID) (map[pgtype.UUID]string, error)
	PutWorkflowVersion(ctx context.Context, tenantId string, opts *CreateWorkflowVersionOpts) (*sqlcv1.GetWorkflowVersionForEngineRow, error)
	GetWorkflowShape(ctx context.Context, workflowVersionId uuid.UUID) ([]*sqlcv1.GetWorkflowShapeRow, error)

	// ListWorkflows returns all workflows for a given tenant.
	ListWorkflows(tenantId string, opts *ListWorkflowsOpts) (*ListWorkflowsResult, error)

	// GetWorkflowById returns a workflow by its name. It will return db.ErrNotFound if the workflow does not exist.
	GetWorkflowById(ctx context.Context, workflowId string) (*sqlcv1.GetWorkflowByIdRow, error)

	// GetWorkflowVersionById returns a workflow version by its id. It will return db.ErrNotFound if the workflow
	// version does not exist.
	GetWorkflowVersionWithTriggers(ctx context.Context, tenantId, workflowVersionId string) (*sqlcv1.GetWorkflowVersionByIdRow,
		[]*sqlcv1.WorkflowTriggerCronRef,
		[]*sqlcv1.WorkflowTriggerEventRef,
		[]*sqlcv1.WorkflowTriggerScheduledRef,
		error)

	GetWorkflowVersionById(ctx context.Context, tenantId, workflowId string) (*sqlcv1.GetWorkflowVersionForEngineRow, error)

	// DeleteWorkflow deletes a workflow for a given tenant.
	DeleteWorkflow(ctx context.Context, tenantId, workflowId string) (*sqlcv1.Workflow, error)

	GetWorkflowByName(ctx context.Context, tenantId, workflowName string) (*sqlcv1.Workflow, error)

	GetLatestWorkflowVersion(ctx context.Context, tenantId, workflowId string) (*sqlcv1.GetWorkflowVersionForEngineRow, error)
}

type WorkflowRunData added in v0.74.3

type WorkflowRunData struct {
	AdditionalMetadata   []byte                      `json:"additional_metadata"`
	CreatedAt            pgtype.Timestamptz          `json:"created_at"`
	DisplayName          string                      `json:"display_name"`
	ErrorMessage         string                      `json:"error_message"`
	ExternalID           pgtype.UUID                 `json:"external_id"`
	FinishedAt           pgtype.Timestamptz          `json:"finished_at"`
	Input                []byte                      `json:"input"`
	InsertedAt           pgtype.Timestamptz          `json:"inserted_at"`
	Kind                 sqlcv1.V1RunKind            `json:"kind"`
	Output               []byte                      `json:"output,omitempty"`
	ParentTaskExternalId *pgtype.UUID                `json:"parent_task_external_id,omitempty"`
	ReadableStatus       sqlcv1.V1ReadableStatusOlap `json:"readable_status"`
	StepId               *pgtype.UUID                `json:"step_id,omitempty"`
	StartedAt            pgtype.Timestamptz          `json:"started_at"`
	TaskExternalId       *pgtype.UUID                `json:"task_external_id,omitempty"`
	TaskId               *int64                      `json:"task_id,omitempty"`
	TaskInsertedAt       *pgtype.Timestamptz         `json:"task_inserted_at,omitempty"`
	TenantID             pgtype.UUID                 `json:"tenant_id"`
	WorkflowID           pgtype.UUID                 `json:"workflow_id"`
	WorkflowVersionId    pgtype.UUID                 `json:"workflow_version_id"`
	RetryCount           *int                        `json:"retry_count,omitempty"`
}

type WorkflowScheduleRepository added in v0.74.3

type WorkflowScheduleRepository interface {
	// List ScheduledWorkflows lists workflows by scheduled trigger
	ListScheduledWorkflows(ctx context.Context, tenantId string, opts *ListScheduledWorkflowsOpts) ([]*sqlcv1.ListScheduledWorkflowsRow, int64, error)

	// DeleteScheduledWorkflow deletes a scheduled workflow run
	DeleteScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string) error

	// GetScheduledWorkflow gets a scheduled workflow run
	GetScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string) (*sqlcv1.ListScheduledWorkflowsRow, error)

	// UpdateScheduledWorkflow updates a scheduled workflow run
	UpdateScheduledWorkflow(ctx context.Context, tenantId, scheduledWorkflowId string, triggerAt time.Time) error

	// ScheduledWorkflowMetaByIds returns minimal metadata for scheduled workflows by id.
	// Intended for bulk operations to avoid N+1 DB calls.
	ScheduledWorkflowMetaByIds(ctx context.Context, tenantId string, scheduledWorkflowIds []string) (map[string]ScheduledWorkflowMeta, error)

	// BulkDeleteScheduledWorkflows deletes scheduled workflows in bulk and returns deleted ids.
	BulkDeleteScheduledWorkflows(ctx context.Context, tenantId string, scheduledWorkflowIds []string) ([]string, error)

	// BulkUpdateScheduledWorkflows updates scheduled workflows in bulk and returns updated ids.
	BulkUpdateScheduledWorkflows(ctx context.Context, tenantId string, updates []ScheduledWorkflowUpdate) ([]string, error)

	CreateScheduledWorkflow(ctx context.Context, tenantId string, opts *CreateScheduledWorkflowRunForWorkflowOpts) (*sqlcv1.ListScheduledWorkflowsRow, error)

	// CreateCronWorkflow creates a cron trigger
	CreateCronWorkflow(ctx context.Context, tenantId string, opts *CreateCronWorkflowTriggerOpts) (*sqlcv1.ListCronWorkflowsRow, error)

	// List ScheduledWorkflows lists workflows by scheduled trigger
	ListCronWorkflows(ctx context.Context, tenantId string, opts *ListCronWorkflowsOpts) ([]*sqlcv1.ListCronWorkflowsRow, int64, error)

	// GetCronWorkflow gets a cron workflow run
	GetCronWorkflow(ctx context.Context, tenantId, cronWorkflowId string) (*sqlcv1.ListCronWorkflowsRow, error)

	// DeleteCronWorkflow deletes a cron workflow run
	DeleteCronWorkflow(ctx context.Context, tenantId, id string) error

	// UpdateCronWorkflow updates a cron workflow
	UpdateCronWorkflow(ctx context.Context, tenantId, id string, opts *UpdateCronOpts) error

	DeleteInvalidCron(ctx context.Context, id pgtype.UUID) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL