offline

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2025 License: BSD-3-Clause Imports: 18 Imported by: 1

README

go-offline

Minimalist and opinionated offline task (job) coordination system.

Documentation

Documentation is incomplete at this time.

Example

$> go run cmd/example/main.go -from 1847571929 -to 1763588199 -offline-database-uri 'mem://offline/Id'
1658531597231591424

Documentation

Overview

Package offline provides methods processing long-running tasks offline.

Index

Constants

This section is empty.

Variables

View Source
var SNOWFLAKE_NODE_ID = int64(575)

Functions

func NewJobId

func NewJobId(ctx context.Context) (int64, error)

NewJobId returns a unique identifier to be associated with a publication.

func ProcessJob added in v0.0.3

func ProcessJob(ctx context.Context, opts *ProcessJobOptions) error

func RegisterDatabase

func RegisterDatabase(ctx context.Context, scheme string, init_func DatabaseInitializationFunc) error

RegisterDatabase registers 'scheme' as a key pointing to 'init_func' in an internal lookup table used to create new `Database` instances by the `NewDatabase` method.

func RegisterQueue

func RegisterQueue(ctx context.Context, scheme string, init_func QueueInitializationFunc) error

RegisterQueue registers 'scheme' as a key pointing to 'init_func' in an internal lookup table used to create new `Queue` instances by the `NewQueue` method.

func Schemes

func Schemes() []string

Schemes returns the list of schemes that have been registered.

Types

type Database

type Database interface {
	// AddJob adds a `Job` instance to the database.
	AddJob(context.Context, *Job) error
	// Retrieve a specific `Job` instance from the database using its unique identifier.
	GetJob(context.Context, int64) (*Job, error)
	// Update a specific `Job` instance in the database.
	UpdateJob(context.Context, *Job) error
	// Remove a specific `Job` instance from the database.
	RemoveJob(context.Context, *Job) error
	// Prune zero or more `Job` instances matching a specific `Status` type and created date from the database.
	PruneJobs(context.Context, Status, int64) error
	// List all of the `Jobs` in the database.
	ListJobs(context.Context, ListJobsCallback) error
	// Close closes any underlying database connections
	Close(context.Context) error
}

type Database is an interface for storing and retrieving `Job` instance for future processing. Importantly the interface does not define any logic for how or when jobs are processed.

func NewDatabase

func NewDatabase(ctx context.Context, uri string) (Database, error)

NewDatabase returns a new `Database` instance configured by 'uri'. The value of 'uri' is parsed as a `url.URL` and its scheme is used as the key for a corresponding `DatabaseInitializationFunc` function used to instantiate the new `Database`. It is assumed that the scheme (and initialization function) have been registered by the `RegisterDatabase` method.

func NewDocstoreDatabase added in v0.3.2

func NewDocstoreDatabase(ctx context.Context, uri string) (Database, error)

func NewSyncMapDatabase

func NewSyncMapDatabase(ctx context.Context, uri string) (Database, error)

NewSyncMapDatabase returns a new `SyncMapDatabase` instance configured by 'uri' which is expected to take the form of:

syncmap://

type DatabaseInitializationFunc

type DatabaseInitializationFunc func(ctx context.Context, uri string) (Database, error)

DatabaseInitializationFunc is a function defined by individual database package and used to create an instance of that database

type DocstoreDatabase added in v0.3.2

type DocstoreDatabase struct {
	Database
	// contains filtered or unexported fields
}

func (*DocstoreDatabase) AddJob added in v0.3.2

func (db *DocstoreDatabase) AddJob(ctx context.Context, job *Job) error

func (*DocstoreDatabase) Close added in v0.3.2

func (db *DocstoreDatabase) Close(ctx context.Context) error

func (*DocstoreDatabase) GetJob added in v0.3.2

func (db *DocstoreDatabase) GetJob(ctx context.Context, id int64) (*Job, error)

func (*DocstoreDatabase) ListJobs added in v0.3.2

func (db *DocstoreDatabase) ListJobs(ctx context.Context, cb ListJobsCallback) error

func (*DocstoreDatabase) PruneJobs added in v0.3.2

func (db *DocstoreDatabase) PruneJobs(ctx context.Context, status Status, lastmodified int64) error

func (*DocstoreDatabase) RemoveJob added in v0.3.2

func (db *DocstoreDatabase) RemoveJob(ctx context.Context, job *Job) error

func (*DocstoreDatabase) UpdateJob added in v0.3.2

func (db *DocstoreDatabase) UpdateJob(ctx context.Context, job *Job) error

type Job

type Job struct {
	Id           int64  `json:"id"`
	Type         string `json:"type"`
	Status       Status `json:"status"`
	Creator      string `json:"creator"`
	Created      int64  `json:"created"`
	LastModified int64  `json:"lastmodified"`
	Instructions string `json:"instructions"`
	Results      string `json:"results,omitempty"`
	Error        string `json:"error,omitempty"`
}

func NewJob

func NewJob(ctx context.Context, creator string, job_type string, instructions string) (*Job, error)

func ScheduleJob added in v0.0.8

func ScheduleJob(ctx context.Context, offline_db Database, offline_q Queue, creator string, job_type string, instructions string) (*Job, error)

func (*Job) AsStatusResponse

func (job *Job) AsStatusResponse() *JobStatusResponse

func (*Job) String

func (job *Job) String() string

type JobStatusResponse

type JobStatusResponse struct {
	JobId        string `json:"job_id"` // because JavaScript 58-bit integer hoohah
	Status       string `json:"status"`
	LastModified int64  `json:"lastmodified"`
	Results      string `json:"results,omitempty"`
	Error        string `json:"error,omitempty"`
}

type ListJobsCallback

type ListJobsCallback func(context.Context, *Job) error

type ListJobsCallback is a function type for custom processing of jobs.

type NullQueue

type NullQueue struct {
	Queue
}

func (*NullQueue) Close

func (q *NullQueue) Close(ctx context.Context) error

func (*NullQueue) ScheduleJob

func (q *NullQueue) ScheduleJob(ctx context.Context, job_id int64) error

type ProcessJobCallbackFunc added in v0.0.3

type ProcessJobCallbackFunc func(context.Context, *Job) (string, error)

type ProcessJobOptions added in v0.0.3

type ProcessJobOptions struct {
	Database Database
	Callback ProcessJobCallbackFunc
	JobId    int64
}

type PubSubQueue added in v0.3.2

type PubSubQueue struct {
	Queue
	// contains filtered or unexported fields
}

func (*PubSubQueue) Close added in v0.3.2

func (q *PubSubQueue) Close(ctx context.Context) error

func (*PubSubQueue) ScheduleJob added in v0.3.2

func (q *PubSubQueue) ScheduleJob(ctx context.Context, job_id int64) error

type Queue

type Queue interface {
	ScheduleJob(context.Context, int64) error
	Close(context.Context) error
}

func NewNullQueue

func NewNullQueue(ctx context.Context, uri string) (Queue, error)

func NewPubSubQueue added in v0.3.2

func NewPubSubQueue(ctx context.Context, uri string) (Queue, error)

func NewQueue

func NewQueue(ctx context.Context, uri string) (Queue, error)

NewQueue returns a new `Queue` instance configured by 'uri'. The value of 'uri' is parsed as a `url.URL` and its scheme is used as the key for a corresponding `QueueInitializationFunc` function used to instantiate the new `Queue`. It is assumed that the scheme (and initialization function) have been registered by the `RegisterQueue` method.

func NewSlogQueue added in v0.0.19

func NewSlogQueue(ctx context.Context, uri string) (Queue, error)

type QueueInitializationFunc

type QueueInitializationFunc func(ctx context.Context, uri string) (Queue, error)

QueueInitializationFunc is a function defined by individual queue package and used to create an instance of that queue

type SlogQueue added in v0.0.19

type SlogQueue struct {
	Queue
}

func (*SlogQueue) Close added in v0.0.19

func (q *SlogQueue) Close(ctx context.Context) error

func (*SlogQueue) ScheduleJob added in v0.0.19

func (q *SlogQueue) ScheduleJob(ctx context.Context, job_id int64) error

type Status

type Status int
const (
	Pending Status = iota
	Queued
	Processing
	Completed
	Failed
)

func (Status) String

func (s Status) String() string

type SyncMapDatabase

type SyncMapDatabase struct {
	Database
	// contains filtered or unexported fields
}

SyncMapDatabase implements the `Database` interface storing and retrieving jobs from an internal `sync.Map` instance.

func (*SyncMapDatabase) AddJob

func (db *SyncMapDatabase) AddJob(ctx context.Context, job *Job) error

AddJob() stores 'job' in 'db'.

func (*SyncMapDatabase) Close added in v0.0.11

func (db *SyncMapDatabase) Close(ctx context.Context) error

func (*SyncMapDatabase) GetJob

func (db *SyncMapDatabase) GetJob(ctx context.Context, job_id int64) (*Job, error)

GetJob() returns a `Job` instance identified by 'job_id' from 'db'.

func (*SyncMapDatabase) ListJobs

func (db *SyncMapDatabase) ListJobs(ctx context.Context, list_cb ListJobsCallback) error

ListJobs() iterates through all the jobs in 'db' passing each to 'list_cb'.

func (*SyncMapDatabase) PruneJobs

func (db *SyncMapDatabase) PruneJobs(ctx context.Context, status Status, lastmodified int64) error

PruneJobs() removed jobs in 'db' whose status matches 'status' and whose last modified time is less that 'lastmodified'.

func (*SyncMapDatabase) RemoveJob

func (db *SyncMapDatabase) RemoveJob(ctx context.Context, job *Job) error

RemoveJob() removes 'job' from 'db'.

func (*SyncMapDatabase) UpdateJob

func (db *SyncMapDatabase) UpdateJob(ctx context.Context, job *Job) error

UpdateJob() stores the updated version of 'job' in 'db'.

Directories

Path Synopsis
app
cmd
add-job command
get-job command
job-server command
remove-job command
schedule-job command
api
Package api provides methods for creating API-related HTTP handlers
Package api provides methods for creating API-related HTTP handlers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL