taskdb

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2022 License: AGPL-3.0 Imports: 2 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AttachIterator

type AttachIterator interface {
	// Returns an array of task states. Returns nil when there are no more
	Next() ([]*TaskDBTaskState, error)
}

type DrainIterator

type DrainIterator interface {
	// Returns an array of task payloads that can be joined to their current states, and sent to other partitions. Returns nil when there are no more
	Next() ([]*DrainTask, error)
}

type DrainTask

type DrainTask struct {
	Topic   string
	ID      string
	Payload []byte
}

type TaskDB

type TaskDB interface {
	// Acquires the TaskDB lock as needed, then returns an AttachIterator
	Attach() AttachIterator

	// A Task will be inserted into the task table, and its first state inserted
	Enqueue(state *TaskDBTaskState, payload []byte) WriteResult

	// A new task state
	PutState(state *TaskDBTaskState) WriteResult

	// Retrieves the payload for a given task
	GetPayload(topicName, taskID string) ([]byte, error)

	// Deletes all task states for a topic, and removes the topic from the task table. If no more tasks exist then the task will be removed from the task table
	Delete(topicName, taskID string) WriteResult

	// Returns an iterator that will read all payloads from the DB, so they can be drained into other partitions
	Drain() DrainIterator
}

type TaskDBTaskState

type TaskDBTaskState struct {
	Topic     string
	Partition string

	ID               string
	State            TaskState
	Version          int
	DeliveryAttempts int
	CreatedAt        time.Time
	Priority         int
}

func NewTaskDBTaskState

func NewTaskDBTaskState(partition, topicName, taskID string, state TaskState, version, deliveryAttempts, priority int, createdAt time.Time) *TaskDBTaskState

type TaskState

type TaskState int
const (
	TASK_STATE_QUEUED   TaskState = 0
	TASK_STATE_DELAYED  TaskState = 1
	TASK_STATE_INFLIGHT TaskState = 2
	TASK_STATE_ACKED    TaskState = 3
	TASK_STATE_NACKED   TaskState = 4
)

func (TaskState) String

func (ts TaskState) String() string

type WriteResult

type WriteResult interface {
	// Waits for the write to be committed to the TaskDB.
	// Should use a buffered channel to wait for a batch to be committed.
	// If the TaskDB does not batch it should still immediately write to
	// the buffered channel so that function calls are non-blocking and
	// return immediately
	Get() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL