taskdb

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2022 License: AGPL-3.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPayloadNotFound = errors.New("payload not found")
)

Functions

This section is empty.

Types

type AttachIterator

type AttachIterator interface {
	// Returns an array of task states. Returns nil when there are no more
	Next() ([]*TaskDBTaskState, error)
}

type DrainIterator

type DrainIterator interface {
	// Returns an array of task payloads that can be joined to their current states, and sent to other partitions. Returns nil when there are no more
	Next() ([]*DrainTask, error)
}

type DrainTask

type DrainTask struct {
	Topic   string
	ID      string
	Payload []byte
}

type FakeAttachIterator added in v0.0.2

type FakeAttachIterator struct{}

func (*FakeAttachIterator) Next added in v0.0.2

func (fai *FakeAttachIterator) Next() ([]*TaskDBTaskState, error)

type MemoryDrainIterator added in v0.0.2

type MemoryDrainIterator struct{}

func (*MemoryDrainIterator) Next added in v0.0.2

func (mdi *MemoryDrainIterator) Next() ([]*DrainTask, error)

type MemoryTaskDB added in v0.0.2

type MemoryTaskDB struct {
	// contains filtered or unexported fields
}

func NewMemoryTaskDB added in v0.0.2

func NewMemoryTaskDB() (*MemoryTaskDB, error)

func (*MemoryTaskDB) Attach added in v0.0.2

func (mtdb *MemoryTaskDB) Attach() AttachIterator

func (*MemoryTaskDB) Delete added in v0.0.2

func (mtdb *MemoryTaskDB) Delete(topicName, taskID string) WriteResult

func (*MemoryTaskDB) Drain added in v0.0.2

func (mtdb *MemoryTaskDB) Drain() DrainIterator

func (*MemoryTaskDB) GetPayload added in v0.0.2

func (mtdb *MemoryTaskDB) GetPayload(topicName, taskID string) ([]byte, error)

func (*MemoryTaskDB) NewMemoryTaskDB added in v0.0.2

func (mtdb *MemoryTaskDB) NewMemoryTaskDB() (*MemoryTaskDB, error)

func (*MemoryTaskDB) PutPayload added in v0.0.2

func (mtdb *MemoryTaskDB) PutPayload(topicName, taskID string, payload []byte) WriteResult

func (*MemoryTaskDB) PutState added in v0.0.2

func (mtdb *MemoryTaskDB) PutState(state *TaskDBTaskState) WriteResult

type MemoryWriteResult added in v0.0.2

type MemoryWriteResult struct{}

func (MemoryWriteResult) Get added in v0.0.2

func (mwr MemoryWriteResult) Get() error

type TaskDB

type TaskDB interface {
	// Acquires the TaskDB lock as needed, then returns an AttachIterator
	Attach() AttachIterator

	// A Task will be inserted into the task table, and its first state inserted
	PutPayload(topicName, taskID string, payload []byte) WriteResult

	// A new task state
	PutState(state *TaskDBTaskState) WriteResult

	// Retrieves the payload for a given task
	GetPayload(topicName, taskID string) ([]byte, error)

	// Deletes all task states for a topic, and removes the topic from the task table. If no more tasks exist then the task will be removed from the task table
	Delete(topicName, taskID string) WriteResult

	// Returns an iterator that will read all payloads from the DB, so they can be drained into other partitions
	Drain() DrainIterator
}

type TaskDBTaskState

type TaskDBTaskState struct {
	Topic     string
	Partition string

	ID               string
	State            TaskState
	Version          int32
	DeliveryAttempts int32
	CreatedAt        time.Time
	Priority         int32
}

func NewTaskDBTaskState

func NewTaskDBTaskState(partition, topicName, taskID string, state TaskState, version, deliveryAttempts, priority int32, createdAt time.Time) *TaskDBTaskState

type TaskState

type TaskState int32
const (
	TASK_STATE_ENQUEUED TaskState = 0
	TASK_STATE_DELAYED  TaskState = 1
	TASK_STATE_INFLIGHT TaskState = 2
)

func (TaskState) String

func (ts TaskState) String() string

type WriteResult

type WriteResult interface {
	// Waits for the write to be committed to the TaskDB.
	// Should use a buffered channel to wait for a batch to be committed.
	// If the TaskDB does not batch it should still immediately write to
	// the buffered channel so that function calls are non-blocking and
	// return immediately
	Get() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL