taskdb

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: AGPL-3.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAttachEnded = errors.New("attach ended")
)
View Source
var (
	ErrPayloadNotFound = errors.New("payload not found")
)

Functions

This section is empty.

Types

type AttachIterator

type AttachIterator interface {
	// Returns an array of task states. Returns nil when there are no more
	Next() ([]*TaskDBTaskState, error)
}

type BadgerAttachIterator added in v0.0.4

type BadgerAttachIterator struct {
	// contains filtered or unexported fields
}

func (*BadgerAttachIterator) Next added in v0.0.4

func (ai *BadgerAttachIterator) Next() ([]*TaskDBTaskState, error)

type BadgerDrainIterator added in v0.0.4

type BadgerDrainIterator struct {
	// contains filtered or unexported fields
}

func (*BadgerDrainIterator) Next added in v0.0.4

func (di *BadgerDrainIterator) Next() ([]*DrainTask, error)

type BadgerTaskDB added in v0.0.4

type BadgerTaskDB struct {
	// contains filtered or unexported fields
}

func NewBadgerTaskDB added in v0.0.4

func NewBadgerTaskDB() (*BadgerTaskDB, error)

func (*BadgerTaskDB) Attach added in v0.0.4

func (tdb *BadgerTaskDB) Attach() AttachIterator

func (*BadgerTaskDB) Delete added in v0.0.4

func (tdb *BadgerTaskDB) Delete(topicName, taskID string) WriteResult

func (*BadgerTaskDB) Drain added in v0.0.4

func (tdb *BadgerTaskDB) Drain() DrainIterator

func (*BadgerTaskDB) GetPayload added in v0.0.4

func (tdb *BadgerTaskDB) GetPayload(topicName, taskID string) (payload string, err error)

func (*BadgerTaskDB) PutPayload added in v0.0.4

func (tdb *BadgerTaskDB) PutPayload(topicName, taskID string, payload string) WriteResult

func (*BadgerTaskDB) PutState added in v0.0.4

func (tdb *BadgerTaskDB) PutState(state *TaskDBTaskState) WriteResult

type BadgerWriteResult added in v0.0.4

type BadgerWriteResult struct {
	// contains filtered or unexported fields
}

func (BadgerWriteResult) Get added in v0.0.4

func (wr BadgerWriteResult) Get() error

type DrainIterator

type DrainIterator interface {
	// Returns an array of task payloads that can be joined to their current states, and sent to other partitions. Returns nil when there are no more
	Next() ([]*DrainTask, error)
}

type DrainTask

type DrainTask struct {
	Topic   string
	ID      string
	Payload string
}

type FakeAttachIterator added in v0.0.2

type FakeAttachIterator struct{}

func (*FakeAttachIterator) Next added in v0.0.2

func (fai *FakeAttachIterator) Next() ([]*TaskDBTaskState, error)

type MemoryDrainIterator added in v0.0.2

type MemoryDrainIterator struct{}

func (*MemoryDrainIterator) Next added in v0.0.2

func (mdi *MemoryDrainIterator) Next() ([]*DrainTask, error)

type MemoryTaskDB added in v0.0.2

type MemoryTaskDB struct {
	// contains filtered or unexported fields
}

func NewMemoryTaskDB added in v0.0.2

func NewMemoryTaskDB() (*MemoryTaskDB, error)

func (*MemoryTaskDB) Attach added in v0.0.2

func (mtdb *MemoryTaskDB) Attach() AttachIterator

func (*MemoryTaskDB) Delete added in v0.0.2

func (mtdb *MemoryTaskDB) Delete(topicName, taskID string) WriteResult

func (*MemoryTaskDB) Drain added in v0.0.2

func (mtdb *MemoryTaskDB) Drain() DrainIterator

func (*MemoryTaskDB) GetPayload added in v0.0.2

func (mtdb *MemoryTaskDB) GetPayload(topicName, taskID string) (string, error)

func (*MemoryTaskDB) NewMemoryTaskDB added in v0.0.2

func (mtdb *MemoryTaskDB) NewMemoryTaskDB() (*MemoryTaskDB, error)

func (*MemoryTaskDB) PutPayload added in v0.0.2

func (mtdb *MemoryTaskDB) PutPayload(topicName, taskID string, payload string) WriteResult

func (*MemoryTaskDB) PutState added in v0.0.2

func (mtdb *MemoryTaskDB) PutState(state *TaskDBTaskState) WriteResult

type MemoryWriteResult added in v0.0.2

type MemoryWriteResult struct{}

func (MemoryWriteResult) Get added in v0.0.2

func (mwr MemoryWriteResult) Get() error

type TaskDB

type TaskDB interface {
	// Acquires the TaskDB lock as needed, then returns an AttachIterator
	Attach() AttachIterator

	// A Task will be inserted into the task table, and its first state inserted
	PutPayload(topicName, taskID string, payload string) WriteResult

	// A new task state
	PutState(state *TaskDBTaskState) WriteResult

	// Retrieves the payload for a given task
	GetPayload(topicName, taskID string) (string, error)

	// Deletes all task states for a topic, and removes the topic from the task table. If no more tasks exist then the task will be removed from the task table
	Delete(topicName, taskID string) WriteResult

	// Returns an iterator that will read all payloads from the DB, so they can be drained into other partitions
	Drain() DrainIterator
}

type TaskDBTaskState

type TaskDBTaskState struct {
	Topic     string
	Partition string

	ID               string
	State            TaskState
	Version          int32
	DeliveryAttempts int32
	CreatedAt        time.Time
	Priority         int32
}

func NewTaskDBTaskState

func NewTaskDBTaskState(partition, topicName, taskID string, state TaskState, version, deliveryAttempts, priority int32, createdAt time.Time) *TaskDBTaskState

type TaskState

type TaskState int32
const (
	TASK_STATE_ENQUEUED TaskState = 0
	TASK_STATE_DELAYED  TaskState = 1
	TASK_STATE_INFLIGHT TaskState = 2
)

func (TaskState) String

func (ts TaskState) String() string

type WriteResult

type WriteResult interface {
	// Waits for the write to be committed to the TaskDB.
	// Should use a buffered channel to wait for a batch to be committed.
	// If the TaskDB does not batch it should still immediately write to
	// the buffered channel so that function calls are non-blocking and
	// return immediately
	Get() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL