persistent

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTaskLost = errors.New("task lost")
View Source
var ErrWorkflowNotFound = errors.New("workflow not found")
View Source
var WorkflowTaskID = uuid.MustParse("00000000-0000-0000-0000-000000000000").String() // reserved id for workflow task

Functions

This section is empty.

Types

type Event

type Event struct {
	WorkflowID string  `gorm:"column:workflow_id;type:varchar(255);primaryKey;index:idx_workflow_id_held_by_visible_at"`
	EventID    string  `gorm:"column:event_id;type:uuid;primaryKey"`
	HeldBy     *string `gorm:"column:held_by;type:varchar(255);index:idx_workflow_id_held_by_visible_at"`
	CreatedAt  int64   `gorm:"column:created_at;type:bigint"`
	VisibleAt  int64   `gorm:"column:visible_at;type:bigint;index:idx_workflow_id_held_by_visible_at"`
	Payload    []byte  `gorm:"column:payload;type:bytea"`
}

type EventRepository

type EventRepository interface {
	InsertEvents(ctx context.Context, events []*Event) error
	DeleteEventsByWorkflowID(ctx context.Context, workflowID string) (int64, error)
	DeleteEventsByWorkflowIDAndHeldBy(ctx context.Context, workflowID string, heldBy string) (int64, error)
	ReleaseEventsByWorkflowIDAndHeldBy(ctx context.Context, workflowID string, heldBy string) (int64, error)
	GetAvailableWorkflowEventsAndLock(ctx context.Context, workflowID string, heldBy string, previouslyHeldBy *string) ([]*Event, error)
}

func NewEventRepository

func NewEventRepository(db *gorm.DB) EventRepository

type HistoryEvent

type HistoryEvent struct {
	WorkflowID string `gorm:"column:workflow_id;type:varchar(255);primaryKey"`
	SequenceNo int64  `gorm:"column:sequence_no;type:bigint;primaryKey"`
	Payload    []byte `gorm:"column:payload;type:bytea"`
}

type HistoryEventRepository

type HistoryEventRepository interface {
	InsertHistoryEvents(ctx context.Context, events []*HistoryEvent) error
	GetWorkflowHistory(ctx context.Context, workflowID string) ([]*HistoryEvent, error)
	GetLastHistorySeqNo(ctx context.Context, workflowID string) (int64, error)
}

func NewHistoryEventRepository

func NewHistoryEventRepository(db *gorm.DB) HistoryEventRepository

type Task

type Task struct {
	WorkflowID    string  `gorm:"column:workflow_id;type:varchar(255);primaryKey"`
	TaskID        string  `gorm:"column:task_id;type:uuid;primaryKey"`
	TaskType      string  `gorm:"column:task_type;type:varchar(255);index:idx_task_type_locked_by_visible_at"`
	NumAttempted  int32   `gorm:"column:num_attempted;type:integer"`
	LockedBy      *string `gorm:"column:locked_by;type:varchar(255);index:idx_task_type_locked_by_visible_at"`
	LockedAt      int64   `gorm:"column:locked_at;type:bigint"`
	CreatedAt     int64   `gorm:"column:created_at;type:bigint"`
	VisibleAt     int64   `gorm:"column:visible_at;type:bigint;index:idx_task_type_locked_by_visible_at"`
	LastTouch     int64   `gorm:"column:last_touch;type:bigint"`
	Payload       []byte  `gorm:"column:payload;type:bytea"`
	ReleaseReason *string `gorm:"column:release_reason;type:text"`
}

type TaskRepository

type TaskRepository interface {
	InsertTask(ctx context.Context, task *Task) error
	GetTask(ctx context.Context, workflowID string, taskID string) (*Task, error)
	InsertTasks(ctx context.Context, tasks []*Task) error
	ReleaseTask(ctx context.Context, workflowID string, taskID string, taskType task.TaskType, lockedBy string, reason *string, nextScheduleTimestamp *int64) error
	DeleteTask(ctx context.Context, workflowID string, taskID string, taskType task.TaskType, lockedBy string) error
	DeleteTaskUnsafe(ctx context.Context, workflowID string, taskID string, taskType task.TaskType) error
	GetAndLockAvailableTask(ctx context.Context, taskType task.TaskType, lockedBy string, lockExpirationDuration time.Duration) (*Task, *string, error)
	ResetTaskLastTouchTimestamp(ctx context.Context, workflowID string, taskID string) error
	TouchTask(ctx context.Context, workflowID string, taskID string) error
}

func NewTaskRepository

func NewTaskRepository(db *gorm.DB) TaskRepository

type Workflow

type Workflow struct {
	ID                   string  `gorm:"column:id;type:varchar(255);primaryKey"`
	Name                 string  `gorm:"column:name;type:varchar(255)"`
	Version              string  `gorm:"column:version;type:varchar(255)"`
	CreatedAt            int64   `gorm:"column:created_at;type:bigint"`
	StartAt              *int64  `gorm:"column:start_at;type:bigint"`
	CompletedAt          *int64  `gorm:"column:completed_at;type:bigint"`
	CurrentRuntimeStatus string  `gorm:"column:current_runtime_status;type:varchar(255)"`
	Input                []byte  `gorm:"column:input;type:bytea"`
	ResultOutput         *[]byte `gorm:"column:result_output;type:bytea"`
	ResultError          *string `gorm:"column:result_error;type:text"`
	ParentWorkflowID     *string `gorm:"column:parent_workflow_id;type:varchar(255)"`
}

type WorkflowRepository

type WorkflowRepository interface {
	InsertWorkflow(ctx context.Context, workflow *Workflow) error
	GetWorkflow(ctx context.Context, workflowID string) (*Workflow, error)
	UpdateWorkflow(ctx context.Context, workflowID string, workflow *Workflow) error
}

func NewWorkflowRepository

func NewWorkflowRepository(db *gorm.DB) WorkflowRepository

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL