taskdb

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2022 License: AGPL-3.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAttachEnded = errors.New("attach ended")
)
View Source
var (
	ErrPayloadNotFound = errors.New("payload not found")
)

Functions

This section is empty.

Types

type AttachIterator

type AttachIterator interface {
	// Returns an array of task states. Returns nil when there are no more, safe to call multiple times when empty
	Next() ([]*TaskDBTaskState, error)
}

type BadgerAttachIterator added in v0.0.4

type BadgerAttachIterator struct {
	// contains filtered or unexported fields
}

func (*BadgerAttachIterator) Next added in v0.0.4

func (ai *BadgerAttachIterator) Next() ([]*TaskDBTaskState, error)

type BadgerDrainIterator added in v0.0.4

type BadgerDrainIterator struct {
	// contains filtered or unexported fields
}

func (*BadgerDrainIterator) Next added in v0.0.4

func (di *BadgerDrainIterator) Next() ([]*DrainTask, error)

type BadgerTaskDB added in v0.0.4

type BadgerTaskDB struct {
	// contains filtered or unexported fields
}

func NewBadgerTaskDB added in v0.0.4

func NewBadgerTaskDB(partition string) (*BadgerTaskDB, error)

func (*BadgerTaskDB) Attach added in v0.0.4

func (tdb *BadgerTaskDB) Attach() AttachIterator

func (*BadgerTaskDB) Delete added in v0.0.4

func (tdb *BadgerTaskDB) Delete(topicName, taskID string) WriteResult

func (*BadgerTaskDB) Drain added in v0.0.4

func (tdb *BadgerTaskDB) Drain() DrainIterator

func (*BadgerTaskDB) GetPayload added in v0.0.4

func (tdb *BadgerTaskDB) GetPayload(topicName, taskID string) (payload string, err error)

func (*BadgerTaskDB) PutPayload added in v0.0.4

func (tdb *BadgerTaskDB) PutPayload(topicName, taskID string, payload string) WriteResult

func (*BadgerTaskDB) PutState added in v0.0.4

func (tdb *BadgerTaskDB) PutState(state *TaskDBTaskState) WriteResult

type BadgerTaskStateWithID added in v0.0.7

type BadgerTaskStateWithID struct {
	State   *TaskDBTaskState
	ID      []byte
	Payload string
}

type BadgerWriteResult added in v0.0.4

type BadgerWriteResult struct {
	// contains filtered or unexported fields
}

func (BadgerWriteResult) Get added in v0.0.4

func (wr BadgerWriteResult) Get() error

type DrainIterator

type DrainIterator interface {
	// Returns an array of task payloads that can be joined to their current states, and sent to other partitions. Returns nil when there are no more, safe to call multiple times when empty
	Next() ([]*DrainTask, error)
}

type DrainTask

type DrainTask struct {
	Topic    string
	Priority int32
	Payload  string
}

type FakeAttachIterator added in v0.0.2

type FakeAttachIterator struct{}

func (*FakeAttachIterator) Next added in v0.0.2

func (fai *FakeAttachIterator) Next() ([]*TaskDBTaskState, error)

type MemoryDrainIterator added in v0.0.2

type MemoryDrainIterator struct {
	// contains filtered or unexported fields
}

func (*MemoryDrainIterator) Next added in v0.0.2

func (di *MemoryDrainIterator) Next() ([]*DrainTask, error)

type MemoryTaskDB added in v0.0.2

type MemoryTaskDB struct {
	// contains filtered or unexported fields
}

func NewMemoryTaskDB added in v0.0.2

func NewMemoryTaskDB() (*MemoryTaskDB, error)

func (*MemoryTaskDB) Attach added in v0.0.2

func (mtdb *MemoryTaskDB) Attach() AttachIterator

func (*MemoryTaskDB) Delete added in v0.0.2

func (mtdb *MemoryTaskDB) Delete(topicName, taskID string) WriteResult

func (*MemoryTaskDB) Drain added in v0.0.2

func (mtdb *MemoryTaskDB) Drain() DrainIterator

func (*MemoryTaskDB) GetPayload added in v0.0.2

func (mtdb *MemoryTaskDB) GetPayload(topicName, taskID string) (string, error)

func (*MemoryTaskDB) NewMemoryTaskDB added in v0.0.2

func (mtdb *MemoryTaskDB) NewMemoryTaskDB() (*MemoryTaskDB, error)

func (*MemoryTaskDB) PutPayload added in v0.0.2

func (mtdb *MemoryTaskDB) PutPayload(topicName, taskID string, payload string) WriteResult

func (*MemoryTaskDB) PutState added in v0.0.2

func (mtdb *MemoryTaskDB) PutState(state *TaskDBTaskState) WriteResult

type MemoryWriteResult added in v0.0.2

type MemoryWriteResult struct{}

func (MemoryWriteResult) Get added in v0.0.2

func (mwr MemoryWriteResult) Get() error

type TaskDB

type TaskDB interface {
	// Acquires the TaskDB lock as needed, then returns an AttachIterator
	Attach() AttachIterator

	// A Task will be inserted into the task table, and its first state inserted
	PutPayload(topicName, taskID string, payload string) WriteResult

	// A new task state
	PutState(state *TaskDBTaskState) WriteResult

	// Retrieves the payload for a given task
	GetPayload(topicName, taskID string) (string, error)

	// Deletes all task states for a topic, and removes the topic from the task table. If no more tasks exist then the task will be removed from the task table
	Delete(topicName, taskID string) WriteResult

	// Returns an iterator that will read all payloads from the DB, so they can be drained into other partitions
	Drain() DrainIterator
}

type TaskDBTaskState

type TaskDBTaskState struct {
	Topic     string
	Partition string

	ID               string
	State            TaskState
	Version          int32
	DeliveryAttempts int32
	CreatedAt        time.Time
	Priority         int32
}

func NewTaskDBTaskState

func NewTaskDBTaskState(partition, topicName, taskID string, state TaskState, version, deliveryAttempts, priority int32, createdAt time.Time) *TaskDBTaskState

type TaskState

type TaskState int32
const (
	TASK_STATE_ENQUEUED TaskState = 0
	TASK_STATE_DELAYED  TaskState = 1
	TASK_STATE_INFLIGHT TaskState = 2
)

func (TaskState) String

func (ts TaskState) String() string

type WriteResult

type WriteResult interface {
	// Waits for the write to be committed to the TaskDB.
	// Should use a buffered channel to wait for a batch to be committed.
	// If the TaskDB does not batch it should still immediately write to
	// the buffered channel so that function calls are non-blocking and
	// return immediately
	Get() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL