Documentation
¶
Index ¶
- Variables
- func Decode[T EntityTypes](entity Entity) (T, error)
- func Decodes[T EntityTypes](entities ...Entity) ([]T, error)
- func Init(e *Entity)
- type Codec
- type Config
- type Entity
- func Encode[T EntityTypes](entityType T) (Entity, error)
- func Encodes[T EntityTypes](entityTypes ...T) ([]Entity, error)
- func TransformToEntities(entityName query.EntityName, objs ...map[string]any) ([]Entity, error)
- func TransformToEntity(entityName query.EntityName, objs map[string]any) (Entity, error)
- type EntityTypes
- type FindResult
- type Handler
- type HandlerRequest
- type HandlerResponse
- type Initiator
- type Job
- type JobConfirmFunc
- type JobConfirmResult
- type JobCursor
- type JobEvent
- type JobEventQuery
- type JobStatus
- type JobStatuses
- type JobTerminatedEventFunc
- type ListJobsQuery
- type ListResult
- type ListTasksQuery
- type Manager
- func (m *Manager) CancelJob(ctx context.Context, jobID uuid.UUID) error
- func (m *Manager) GetJob(ctx context.Context, jobID uuid.UUID) (Job, bool, error)
- func (m *Manager) ListTasks(ctx context.Context, query ListTasksQuery) ([]Task, error)
- func (m *Manager) PrepareJob(ctx context.Context, job Job) (Job, error)
- func (m *Manager) Start(ctx context.Context) error
- type ManagerOptsFunc
- func WithJobCanceledEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc
- func WithJobConfirmFunc(f JobConfirmFunc) ManagerOptsFunc
- func WithJobDoneEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc
- func WithJobFailedEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc
- func WithTargetClients(targetToInitiators map[string]Initiator) ManagerOptsFunc
- type Operator
- type Option
- type Repository
- type Responder
- type Result
- type Store
- type Task
- type TaskInfo
- type TaskRequest
- type TaskResolveFunc
- type TaskResolverCursor
- type TaskResolverResult
- type TaskResponse
- type TaskStatus
- type TransactionFunc
- type WorkerConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrTaskResolverNotSet = errors.New("taskResolver not set") ErrMsgFailedTasks = "job has failed tasks" ErrJobUnCancelable = errors.New("job cannot be canceled in its current state") ErrJobNotFound = errors.New("job not found") ErrJobAlreadyExists = errors.New("job already exists") ErrNoClientForTarget = errors.New("no client for task target") ErrLoadingJob = errors.New("failed to load job") ErrUpdatingJob = errors.New("failed to update job") )
var ( ErrInvalidEntityType = errors.New("invalid entity type") ErrMandatoryFields = errors.New("mandatory fields") )
var ( ErrHandlerNil = errors.New("handler cannot be nil") ErrBufferSizeNegative = errors.New("buffer size cannot be negative") ErrNumberOfWorkersNotPositive = errors.New("number of workers must be greater than 0") )
var ( // ErrRepoCreate is returned when the repository fails to create an entity. ErrRepoCreate = errors.New("failed to create entity") // ErrEntityUniqueViolation is returned when an entity violates a unique constraint. ErrEntityUniqueViolation = errors.New("entity violates unique contraint") )
var ErrMsgUnknownTaskType = "unknown task type"
Functions ¶
func Decode ¶
func Decode[T EntityTypes](entity Entity) (T, error)
Decode converts a store entity to a domain object.
func Decodes ¶
func Decodes[T EntityTypes](entities ...Entity) ([]T, error)
Decodes converts a list of store entities to domain objects.
Types ¶
type Codec ¶
type Codec interface {
EncodeTaskRequest(request TaskRequest) ([]byte, error)
DecodeTaskRequest(bytes []byte) (TaskRequest, error)
EncodeTaskResponse(response TaskResponse) ([]byte, error)
DecodeTaskResponse(bytes []byte) (TaskResponse, error)
}
Codec defines the methods for encoding and decoding task requests and responses.
type Config ¶
type Config struct {
// TaskLimitNum is the maximum number of tasks to process at once.
TaskLimitNum int
// ConfirmJobAfter is the delay before confirming a job.
ConfirmJobAfter time.Duration
// ConfirmJobWorkerConfig holds the configuration for the job confirmation worker.
ConfirmJobWorkerConfig WorkerConfig
// CreateTasksWorkerConfig holds the configuration for the task creation worker.
CreateTasksWorkerConfig WorkerConfig
// ReconcilesWorkerConfig holds the configuration for the reconciliation worker.
ReconcileWorkerConfig WorkerConfig
// NotifyWorkerConfig holds the configuration for the notification worker.
NotifyWorkerConfig WorkerConfig
// BackoffBaseIntervalSec is the base interval for exponential backoff in seconds.
// Default is 10 seconds.
BackoffBaseIntervalSec int64
// BackoffMaxIntervalSec is the maximum interval for exponential backoff in seconds.
// Default is 10240 seconds (2 hours and 50 minutes).
BackoffMaxIntervalSec int64
// MaxReconcileCount is the maximum number of times a task can be reconciled.
// Default is 10.
MaxReconcileCount int64
}
Config contains configuration for job processing.
type Entity ¶
type Entity struct {
Name query.EntityName
ID uuid.UUID
CreatedAt int64
UpdatedAt int64
Values map[string]any
}
Entity represents a generic entity stored in the data store. It includes metadata such as the entity's name, unique identifier,created and updated timestamps, and a map of all values.
func Encode ¶
func Encode[T EntityTypes](entityType T) (Entity, error)
Encode converts a domain object to a store entity.
func Encodes ¶
func Encodes[T EntityTypes](entityTypes ...T) ([]Entity, error)
Encodes converts a list of domain objects to store entities.
func TransformToEntities ¶
TransformToEntities transforms a list of maps into domain entities based on the provided entity name.
func TransformToEntity ¶
TransformToEntity transforms a map into a domain entity based on the provided entity name.
type EntityTypes ¶
EntityTypes defines a type constraint for entities that can be used in the repository. It allows only types Job, Task, or JobCursor to satisfy this interface.
type FindResult ¶
type Handler ¶
type Handler func(ctx context.Context, request HandlerRequest) (HandlerResponse, error)
Handler is a function that takes a HandlerRequest, and returns a HandlerResponse and an error.
type HandlerRequest ¶
HandlerRequest contains the fields extracted from orbital.TaskRequest that are relevant for the operator's processing.
type HandlerResponse ¶
HandlerResponse contains the fields extracted from orbital.TaskResponse that can be modified by the operator during processing.
type Initiator ¶
type Initiator interface {
SendTaskRequest(ctx context.Context, request TaskRequest) error
ReceiveTaskResponse(ctx context.Context) (TaskResponse, error)
}
Initiator defines the methods for sending task requests and receiving task responses.
type Job ¶
type Job struct {
ID uuid.UUID
ExternalID string
Data []byte
Type string
Status JobStatus
ErrorMessage string
UpdatedAt int64
CreatedAt int64
}
Job is the translated domain object representing an event.
func (Job) WithExternalID ¶ added in v0.2.0
WithExternalID allows to set an external identifier for the job.
type JobConfirmFunc ¶
type JobConfirmFunc func(ctx context.Context, job Job) (JobConfirmResult, error)
JobConfirmFunc defines a function that determines whether a job can be confirmed. It returns a ConfirmResult struct with the confirmation result and an error if the process fails.
type JobConfirmResult ¶
type JobConfirmResult struct {
// Done indicates whether the confirming process is complete.
Done bool
// IsCanceled indicates whether the job needs to be canceled.
IsCanceled bool
// CanceledErrorMessage provides an error message if the job is canceled.
CanceledErrorMessage string
}
JobConfirmResult represents the result of a job confirmation operation.
type JobCursor ¶
type JobCursor struct {
ID uuid.UUID
Cursor TaskResolverCursor
UpdatedAt int64
CreatedAt int64
}
JobCursor stores the cursor for the next taskResolver.
type JobEvent ¶
JobEvent represents an event related to a job, used for tracking and processing job state changes.
type JobEventQuery ¶
type JobEventQuery struct {
ID uuid.UUID // Filter job events by their ID.
IsNotified *bool // Filter job events by whether they have been notified.
RetrievalModeQueue bool // If true, enables queue-like retrieval mode.
Limit int // Maximum number of job events to return.
OrderByUpdatedAt bool // If true, orders job events by updated_at in ascending order.
}
JobEventQuery defines the parameters for querying job events from the repository.
type JobStatus ¶
type JobStatus string
JobStatus represents the possible statuses of a Job.
const ( JobStatusCreated JobStatus = "CREATED" JobStatusConfirming JobStatus = "CONFIRMING" JobStatusConfirmed JobStatus = "CONFIRMED" JobStatusResolving JobStatus = "RESOLVING" JobStatusReady JobStatus = "READY" JobStatusProcessing JobStatus = "PROCESSING" JobStatusDone JobStatus = "DONE" JobStatusFailed JobStatus = "FAILED" JobStatusResolveCanceled JobStatus = "RESOLVE_CANCELED" JobStatusConfirmCanceled JobStatus = "CONFIRM_CANCELED" JobStatusUserCanceled JobStatus = "USER_CANCELED" )
Possible job statuses.
func TerminalStatuses ¶ added in v0.2.0
func TerminalStatuses() []JobStatus
TerminalStatuses returns the list of job statuses that are considered terminal.
type JobStatuses ¶ added in v0.2.0
type JobStatuses []JobStatus
JobStatuses is a slice of JobStatus values.
func TransientStatuses ¶ added in v0.2.0
func TransientStatuses() JobStatuses
TransientStatuses returns the list of job statuses that are considered transient.
func (JobStatuses) StringSlice ¶ added in v0.2.0
func (js JobStatuses) StringSlice() []string
StringSlice converts the JobStatuses to a slice of strings.
type JobTerminatedEventFunc ¶ added in v0.2.0
JobTerminatedEventFunc defines a callback function type for sending job events.
type ListJobsQuery ¶
type ListJobsQuery struct {
Status JobStatus // Filter jobs by their status.
StatusIn []JobStatus // Filter jobs by a list of statuses.
UpdatedAt int64 // Filter jobs updated at or after this timestamp.
CreatedAt int64 // Filter jobs created at or after this timestamp.
Limit int // Maximum number of jobs to return.
RetrievalModeQueue bool // If true, enables queue-like retrieval mode.
OrderByUpdatedAt bool // If true, orders jobs by updated_at in descending order.
}
ListJobsQuery defines the parameters for querying jobs from the repository. It allows filtering by status, creation time, result limit, and enables queue-like retrieval mode when specified.
type ListTasksQuery ¶
type ListTasksQuery struct {
JobID uuid.UUID // Filter tasks by the associated job ID.
Status TaskStatus // Filter tasks by their status.
StatusIn []TaskStatus // Filter tasks by a list of statuses.
CreatedAt int64 // Filter tasks created at or after this timestamp.
UpdatedAt int64 // Filter tasks updated at or after this timestamp.
Limit int // Maximum number of tasks to return.
IsReconcileReady bool // If true, filters tasks that are ready to be reconciled.
RetrievalModeQueue bool // If true, enables queue-like retrieval mode.
OrderByUpdatedAt bool // If true, orders jobs by updated_at in ascending order.
}
ListTasksQuery defines the parameters for querying tasks from the repository. It supports filtering by job ID, task status, creation timestamp, and result limit.
type Manager ¶
type Manager struct {
Config Config
// contains filtered or unexported fields
}
Manager is the interface for managing jobs, including their creation, state transitions, and lifecycle handling.
func NewManager ¶
func NewManager(repo *Repository, taskResolver TaskResolveFunc, optFuncs ...ManagerOptsFunc) (*Manager, error)
NewManager creates a new Manager instance.
func (*Manager) CancelJob ¶ added in v0.2.0
CancelJob cancels a job and associated running tasks. It updates the job status to "USER_CANCEL".
func (*Manager) PrepareJob ¶
PrepareJob prepares a job by creating it in the repository with status CREATED. It returns an error if a job with the same type and external ID in a non-terminal status already exists.
type ManagerOptsFunc ¶
type ManagerOptsFunc func(mgr *Manager)
ManagerOptsFunc is a function type to configure Manager options.
func WithJobCanceledEventFunc ¶ added in v0.2.0
func WithJobCanceledEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc
WithJobCanceledEventFunc registers a function to send job canceled events.
func WithJobConfirmFunc ¶
func WithJobConfirmFunc(f JobConfirmFunc) ManagerOptsFunc
WithJobConfirmFunc registers a function to confirm jobs.
func WithJobDoneEventFunc ¶ added in v0.2.0
func WithJobDoneEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc
WithJobDoneEventFunc registers a function to send job done events.
func WithJobFailedEventFunc ¶ added in v0.2.0
func WithJobFailedEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc
WithJobFailedEventFunc registers a function to send job failed events.
func WithTargetClients ¶
func WithTargetClients(targetToInitiators map[string]Initiator) ManagerOptsFunc
WithTargetClients set the map that maps the target string to its Initiator.
type Operator ¶
type Operator struct {
// contains filtered or unexported fields
}
Operator handles task requests and responses.
func NewOperator ¶
NewOperator creates a new Operator instance with the given Responder and options.
func (*Operator) ListenAndRespond ¶
ListenAndRespond starts listening for task requests and responding to them.
NOTE: Handlers must be registered before calling ListenAndRespond.
type Option ¶
type Option func(*config) error
Option is a function that modifies the config parameter of the Operator.
func WithBufferSize ¶
WithBufferSize sets the buffer size for the requests channel. It returns an error if the size is negative.
func WithNumberOfWorkers ¶
WithNumberOfWorkers sets the number of workers for processing requests. It returns an error if the number is not positive.
type Repository ¶
type Repository struct {
Store Store
}
Repository provides methods to interact with the underlying data store. It encapsulates a Store implementation for data persistence operations.
func NewRepository ¶
func NewRepository(store Store) *Repository
NewRepository creates a new Repository instance using the provided Store. It returns a pointer to the initialized Repository.
type Responder ¶
type Responder interface {
ReceiveTaskRequest(ctx context.Context) (TaskRequest, error)
SendTaskResponse(ctx context.Context, response TaskResponse) error
}
Responder defines the methods for receiving task requests and sending task responses.
type Store ¶
type Store interface {
Create(ctx context.Context, r ...Entity) ([]Entity, error)
Update(ctx context.Context, r ...Entity) ([]Entity, error)
Find(ctx context.Context, q query.Query) (FindResult, error)
List(ctx context.Context, q query.Query) (ListResult, error)
Transaction(ctx context.Context, txFunc TransactionFunc) error
}
Store defines the interface for a data store that supports CRUD operations and transactions.
type Task ¶
type Task struct {
ID uuid.UUID
JobID uuid.UUID
Type string
Data []byte
WorkingState []byte
LastReconciledAt int64 // The last time the task was reconciled.
ReconcileCount int64 // The number of times the task has been reconciled.
ReconcileAfterSec int64 // The number of seconds after which the task should be reconciled.
TotalSentCount int64
TotalReceivedCount int64
ETag string
Status TaskStatus
Target string
ErrorMessage string
UpdatedAt int64
CreatedAt int64
}
Task is a trackable unit derived from the Job.
type TaskInfo ¶
type TaskInfo struct {
// Data contains the byte data that needs to be sent.
Data []byte
// Type specifies the type of the data.
Type string
// Targets lists the target identifiers associated with job.
Target string
}
TaskInfo represents the result of resolving a task.
type TaskRequest ¶
type TaskRequest struct {
TaskID uuid.UUID `json:"taskId"` // TaskID is used to identify the task.
Type string `json:"type"` // Type is the type of the task.
Data []byte `json:"data"` // Data is the static context for the task.
WorkingState []byte `json:"workingState"` // WorkingState is the current state of the task that the operator works upon.
ETag string `json:"eTag"` // ETag is used to identify the version of the TaskRequest.
}
TaskRequest is the request object that will be sent to the operator.
type TaskResolveFunc ¶ added in v0.2.0
type TaskResolveFunc func(ctx context.Context, job Job, cursor TaskResolverCursor) (TaskResolverResult, error)
TaskResolveFunc is a function type that resolves targets for the creation tasks for a job and the cursor.
type TaskResolverCursor ¶
type TaskResolverCursor string
TaskResolverCursor is the type for the next cursor.
type TaskResolverResult ¶
type TaskResolverResult struct {
// TaskInfos contains the data to be sent for each target.
TaskInfos []TaskInfo
// Cursor provides information for pagination or continuation.
Cursor TaskResolverCursor
// Done indicates whether the resolution process is complete.
Done bool
// IsCanceled indicates whether the job needs to be canceled.
IsCanceled bool
// CanceledErrorMessage provides an error message if the job is canceled.
CanceledErrorMessage string
}
TaskResolverResult represents the response from resolving tasks.
type TaskResponse ¶
type TaskResponse struct {
TaskID uuid.UUID `json:"taskId"` // TaskID is used to identify the task.
Type string `json:"type"` // Type is the type of the task.
WorkingState []byte `json:"workingState"` // WorkingState is the state of the task that the operator updates.
ETag string `json:"eTag"` // ETag is used to correlate the TaskResponse with the TaskRequest.
Status string `json:"status"` // Status is the status of the task.
ErrorMessage string `json:"errorMessage,omitempty"` // ErrorMessage contains the error message if the task fails.
ReconcileAfterSec int64 `json:"reconcileAfterSec"` // ReconcileAfterSec is the time in seconds after which the next TaskRequest should be queued again.
}
TaskResponse is the response object received from the operator.
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the status of the Task.
const ( TaskStatusCreated TaskStatus = "CREATED" TaskStatusProcessing TaskStatus = "PROCESSING" TaskStatusDone TaskStatus = "DONE" TaskStatusFailed TaskStatus = "FAILED" )
Possible Task status.
type TransactionFunc ¶
type TransactionFunc func(context.Context, Repository) error
TransactionFunc defines a function type for executing operations within a transaction.
type WorkerConfig ¶
type WorkerConfig struct {
// NoOfWorkers specifies the number of concurrent workers.
NoOfWorkers int
// ExecInterval is the interval between executions.
ExecInterval time.Duration
// Timeout is the maximum duration allowed for a single execution.
Timeout time.Duration
}
WorkerConfig defines the configuration for a worker process.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
client
|
|
|
examples
|
|
|
embedded_client
command
|
|
|
manager
command
|
|
|
operator
command
|
|
|
internal
|
|
|
proto
|
|
|
store
|
|