orbital

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

README

REUSE status

orbital

About this project

Orbital is an open-source framework crafted to synchronize resources seamlessly across system boundaries. Achieving eventual consistency, it offers real-time introspection to report resource states with ease. Simplifying resource management, it requires consumers to implement only a single functional operator, eliminating the need for on-site databases or meta state storage.

Requirements and Setup

Orbital is a library to reconcile state across service boundaries.

Support, Feedback, Contributing

This project is open to feature requests/suggestions, bug reports etc. via GitHub issues. Contribution and feedback are encouraged and always welcome. For more information about how to contribute, the project structure, as well as additional contribution information, see our Contribution Guidelines.

Security / Disclosure

If you find any bug that may be a security problem, please follow our instructions at in our security policy on how to report it. Please do not create GitHub issues for security-related doubts or problems.

Code of Conduct

We as members, contributors, and leaders pledge to make participation in our community a harassment-free experience for everyone. By participating in this project, you agree to abide by its Code of Conduct at all times.

Licensing

Copyright 2025 SAP SE or an SAP affiliate company and orbital contributors. Please see our LICENSE for copyright and license information. Detailed information including third-party components and their licensing/copyright information is available via the REUSE tool.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidEntityType = errors.New("invalid entity type")
	ErrMandatoryFields   = errors.New("mandatory fields")
)
View Source
var (
	ErrHandlerNil                 = errors.New("handler cannot be nil")
	ErrBufferSizeNegative         = errors.New("buffer size cannot be negative")
	ErrNumberOfWorkersNotPositive = errors.New("number of workers must be greater than 0")
)
View Source
var ErrMsgFailedTasks = "job has failed tasks"
View Source
var ErrMsgUnknownTaskType = "unknown task type"
View Source
var ErrRepoCreate = errors.New("failed to create entity")

ErrRepoCreate is returned when the repository fails to create an entity.

View Source
var ErrTaskResolverNotSet = errors.New("taskResolver not set")

Functions

func Decode

func Decode[T EntityTypes](entity Entity) (T, error)

Decode converts a store entity to a domain object.

func Decodes

func Decodes[T EntityTypes](entities ...Entity) ([]T, error)

Decodes converts a list of store entities to domain objects.

func Init

func Init(e *Entity)

Init ensures that the metadata of the entity is properly initialized. It sets default values for CreatedAt, UpdatedAt, and ID if they are not already set.

Types

type Codec

type Codec interface {
	EncodeTaskRequest(request TaskRequest) ([]byte, error)
	DecodeTaskRequest(bytes []byte) (TaskRequest, error)
	EncodeTaskResponse(response TaskResponse) ([]byte, error)
	DecodeTaskResponse(bytes []byte) (TaskResponse, error)
}

Codec defines the methods for encoding and decoding task requests and responses.

type Config

type Config struct {
	// JobsLimitNum is the maximum number of jobs to process at once.
	JobsLimitNum int
	// TaskLimitNum is the maximum number of tasks to process at once.
	TaskLimitNum int
	// ConfirmJobInterval is the interval between jobs scheduler attempts.
	ConfirmJobInterval time.Duration
	// JobConfig contains configuration for job processing.
	// ConfirmJobWorkerConfig holds the configuration for the job confirmation worker.
	ConfirmJobWorkerConfig WorkerConfig
	// CreateTasksWorkerConfig holds the configuration for the task creation worker.
	CreateTasksWorkerConfig WorkerConfig
	// ReconcilesWorkerConfig holds the configuration for the reconciliation worker.
	ReconcileWorkerConfig WorkerConfig
	// NotifyWorkerConfig holds the configuration for the notification worker.
	NotifyWorkerConfig WorkerConfig
	// ConfirmJobDelay is the delay before confirming a job.
	ConfirmJobDelay time.Duration
	// ConfirmJobTimeout is the timeout for jobs confirmation attempts.
	ConfirmJobTimeout time.Duration
	// CreateTasksInterval is the interval for task creation.
	CreateTasksInterval time.Duration
	// ProcessingJobDelay is the delay before processing jobs in reconcileTasks.
	ProcessingJobDelay time.Duration
}

Config contains configuration for job processing.

type Entity

type Entity struct {
	Name      query.EntityName
	ID        uuid.UUID
	CreatedAt int64
	UpdatedAt int64
	Values    map[string]any
}

Entity represents a generic entity stored in the data store. It includes metadata such as the entity's name, unique identifier,created and updated timestamps, and a map of all values.

func Encode

func Encode[T EntityTypes](entityType T) (Entity, error)

Encode converts a domain object to a store entity.

func Encodes

func Encodes[T EntityTypes](entityTypes ...T) ([]Entity, error)

Encodes converts a list of domain objects to store entities.

func TransformToEntities

func TransformToEntities(entityName query.EntityName, objs ...map[string]any) ([]Entity, error)

TransformToEntities transforms a list of maps into domain entities based on the provided entity name.

func TransformToEntity

func TransformToEntity(entityName query.EntityName, objs map[string]any) (Entity, error)

TransformToEntity transforms a map into a domain entity based on the provided entity name.

type EntityTypes

type EntityTypes interface {
	Job | Task | JobCursor | JobEvent
}

EntityTypes defines a type constraint for entities that can be used in the repository. It allows only types Job, Task, or JobCursor to satisfy this interface.

type FindResult

type FindResult struct {
	Entity Entity
	Exists bool
}

type Handler

type Handler func(ctx context.Context, request HandlerRequest) (HandlerResponse, error)

Handler is a function that takes a HandlerRequest, and returns a HandlerResponse and an error.

type HandlerRequest

type HandlerRequest struct {
	TaskID       uuid.UUID
	Type         string
	Data         []byte
	WorkingState []byte
}

HandlerRequest contains the fields extracted from orbital.TaskRequest that are relevant for the operator's processing.

type HandlerResponse

type HandlerResponse struct {
	WorkingState      []byte
	Result            Result
	ReconcileAfterSec int64
}

HandlerResponse contains the fields extracted from orbital.TaskResponse that can be modified by the operator during processing.

type Initiator

type Initiator interface {
	SendTaskRequest(ctx context.Context, request TaskRequest) error
	ReceiveTaskResponse(ctx context.Context) (TaskResponse, error)
}

Initiator defines the methods for sending task requests and receiving task responses.

type Job

type Job struct {
	ID           uuid.UUID
	Data         []byte
	Type         string
	Status       JobStatus
	ErrorMessage string
	UpdatedAt    int64
	CreatedAt    int64
}

Job is the translated domain object representing an event.

func NewJob

func NewJob(jobType string, data []byte) Job

NewJob creates a new Job instance with the given parameters.

type JobConfirmFunc

type JobConfirmFunc func(ctx context.Context, job Job) (JobConfirmResult, error)

JobConfirmFunc defines a function that determines whether a job can be confirmed. It returns a ConfirmResult struct with the confirmation result and an error if the process fails.

type JobConfirmResult

type JobConfirmResult struct {
	Confirmed bool
}

JobConfirmResult represents the result of a job confirmation operation.

type JobCursor

type JobCursor struct {
	ID        uuid.UUID
	Cursor    TaskResolverCursor
	UpdatedAt int64
	CreatedAt int64
}

JobCursor stores the cursor for the next taskResolver.

type JobEvent

type JobEvent struct {
	ID         uuid.UUID
	IsNotified bool
	UpdatedAt  int64
	CreatedAt  int64
}

JobEvent represents an event related to a job, used for tracking and processing job state changes.

type JobEventQuery

type JobEventQuery struct {
	ID                 uuid.UUID // Filter job events by their ID.
	IsNotified         *bool     // Filter job events by whether they have been notified.
	RetrievalModeQueue bool      // If true, enables queue-like retrieval mode.
	Limit              int       // Maximum number of job events to return.
	OrderByUpdatedAt   bool      // If true, orders job events by updated_at in ascending order.
}

JobEventQuery defines the parameters for querying job events from the repository.

type JobStatus

type JobStatus string

JobStatus represents the possible states of a Job.

const (
	JobStatusCreated    JobStatus = "CREATED"
	JobStatusConfirmed  JobStatus = "CONFIRMED"
	JobStatusResolving  JobStatus = "RESOLVING"
	JobStatusReady      JobStatus = "READY"
	JobStatusProcessing JobStatus = "PROCESSING"
	JobStatusDone       JobStatus = "DONE"
	JobStatusFailed     JobStatus = "FAILED"
	JobStatusAborted    JobStatus = "ABORTED"
	JobStatusCanceled   JobStatus = "CANCELED"
)

Possible job states.

type ListJobsQuery

type ListJobsQuery struct {
	Status             JobStatus   // Filter jobs by their status.
	StatusIn           []JobStatus // Filter jobs by a list of statuses.
	UpdatedAt          int64       // Filter jobs updated at or after this timestamp.
	CreatedAt          int64       // Filter jobs created at or after this timestamp.
	Limit              int         // Maximum number of jobs to return.
	RetrievalModeQueue bool        // If true, enables queue-like retrieval mode.
	OrderByUpdatedAt   bool        // If true, orders jobs by updated_at in descending order.
}

ListJobsQuery defines the parameters for querying jobs from the repository. It allows filtering by status, creation time, result limit, and enables queue-like retrieval mode when specified.

type ListResult

type ListResult struct {
	Entities []Entity
	Cursor   query.Cursor
	Exists   bool
}

type ListTasksQuery

type ListTasksQuery struct {
	JobID              uuid.UUID    // Filter tasks by the associated job ID.
	Status             TaskStatus   // Filter tasks by their status.
	StatusIn           []TaskStatus // Filter tasks by a list of statuses.
	CreatedAt          int64        // Filter tasks created at or after this timestamp.
	UpdatedAt          int64        // Filter tasks updated at or after this timestamp.
	Limit              int          // Maximum number of tasks to return.
	IsReconcileReady   bool         // If true, filters tasks that are ready to be reconciled.
	RetrievalModeQueue bool         // If true, enables queue-like retrieval mode.
	OrderByUpdatedAt   bool         // If true, orders jobs by updated_at in ascending order.
}

ListTasksQuery defines the parameters for querying tasks from the repository. It supports filtering by job ID, task status, creation timestamp, and result limit.

type Manager

type Manager struct {
	Config *Config

	JobTerminationEventFunc TerminationEventFunc
	// contains filtered or unexported fields
}

Manager is the interface for managing jobs, including their creation, state transitions, and lifecycle handling.

func NewManager

func NewManager(repo *Repository, taskResolver TaskResolverFunc, optFuncs ...ManagerOptsFunc) (*Manager, error)

NewManager creates a new Manager instance.

func (*Manager) GetJob

func (m *Manager) GetJob(ctx context.Context, jobID uuid.UUID) (Job, bool, error)

GetJob retrieves a job by its ID from the repository.

func (*Manager) ListTasks

func (m *Manager) ListTasks(ctx context.Context, query ListTasksQuery) ([]Task, error)

ListTasks retrieves tasks by query from the repository.

func (*Manager) PrepareJob

func (m *Manager) PrepareJob(ctx context.Context, job Job) (Job, error)

PrepareJob prepares a job by creating it in the repository with status CREATED.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts the job manager to process jobs.

type ManagerOptsFunc

type ManagerOptsFunc func(mgr *Manager)

ManagerOptsFunc is a function type to configure Manager options.

func WithJobConfirmFunc

func WithJobConfirmFunc(f JobConfirmFunc) ManagerOptsFunc

WithJobConfirmFunc registers a function to confirm jobs.

func WithProcessingJobDelay

func WithProcessingJobDelay(d time.Duration) ManagerOptsFunc

WithProcessingJobDelay sets the delay before processing jobs in reconcileTasks.

func WithTargetClients

func WithTargetClients(targetToInitiators map[string]Initiator) ManagerOptsFunc

WithTargetClients set the map that maps the target string to its Initiator.

func WithTasksLimitNum

func WithTasksLimitNum(n int) ManagerOptsFunc

WithTasksLimitNum sets Managers JobsLimitNum param.

type Operator

type Operator struct {
	// contains filtered or unexported fields
}

Operator handles task requests and responses.

func NewOperator

func NewOperator(r Responder, opts ...Option) (*Operator, error)

NewOperator creates a new Operator instance with the given Responder and options.

func (*Operator) ListenAndRespond

func (o *Operator) ListenAndRespond(ctx context.Context)

ListenAndRespond starts listening for task requests and responding to them.

NOTE: Handlers must be registered before calling ListenAndRespond.

func (*Operator) RegisterHandler

func (o *Operator) RegisterHandler(taskType string, h Handler) error

RegisterHandler registers a handler for a specific task type. It returns an error if the handler is nil.

type Option

type Option func(*config) error

Option is a function that modifies the config parameter of the Operator.

func WithBufferSize

func WithBufferSize(size int) Option

WithBufferSize sets the buffer size for the requests channel. It returns an error if the size is negative.

func WithNumberOfWorkers

func WithNumberOfWorkers(num int) Option

WithNumberOfWorkers sets the number of workers for processing requests. It returns an error if the number is not positive.

type Repository

type Repository struct {
	Store Store
}

Repository provides methods to interact with the underlying data store. It encapsulates a Store implementation for data persistence operations.

func NewRepository

func NewRepository(store Store) *Repository

NewRepository creates a new Repository instance using the provided Store. It returns a pointer to the initialized Repository.

type Responder

type Responder interface {
	ReceiveTaskRequest(ctx context.Context) (TaskRequest, error)
	SendTaskResponse(ctx context.Context, response TaskResponse) error
}

Responder defines the methods for receiving task requests and sending task responses.

type Result

type Result string

Result represents the result of the operator's processing.

const (
	ResultFailed   Result = "FAILED"
	ResultContinue Result = "CONTINUE"
	ResultDone     Result = "DONE"
)

type Store

type Store interface {
	Create(ctx context.Context, r ...Entity) ([]Entity, error)
	Update(ctx context.Context, r ...Entity) ([]Entity, error)
	Find(ctx context.Context, q query.Query) (FindResult, error)
	List(ctx context.Context, q query.Query) (ListResult, error)
	Transaction(ctx context.Context, txFunc TransactionFunc) error
}

Store defines the interface for a data store that supports CRUD operations and transactions.

type Task

type Task struct {
	ID                uuid.UUID
	JobID             uuid.UUID
	Type              string
	Data              []byte
	WorkingState      []byte
	LastSentAt        int64
	SentCount         int64
	ReconcileAfterSec int64
	ETag              string
	Status            TaskStatus
	Target            string
	UpdatedAt         int64
	CreatedAt         int64
}

Task is a trackable unit derived from the Job.

type TaskInfo

type TaskInfo struct {
	// Data contains the byte data that needs to be sent.
	Data []byte
	// Type specifies the type of the data.
	Type string
	// Targets lists the target identifiers associated with job.
	Target string
}

TaskInfo represents the result of resolving a task.

type TaskRequest

type TaskRequest struct {
	TaskID       uuid.UUID `json:"taskId"`       // TaskID is used to identify the task.
	Type         string    `json:"type"`         // Type is the type of the task.
	Data         []byte    `json:"data"`         // Data is the static context for the task.
	WorkingState []byte    `json:"workingState"` // WorkingState is the current state of the task that the operator works upon.
	ETag         string    `json:"eTag"`         // ETag is used to identify the version of the TaskRequest.
}

TaskRequest is the request object that will be sent to the operator.

type TaskResolverCursor

type TaskResolverCursor string

TaskResolverCursor is the type for the next cursor.

type TaskResolverFunc

type TaskResolverFunc func(ctx context.Context, job Job, cursor TaskResolverCursor) (TaskResolverResult, error)

TaskResolverFunc is a function type that resolves targets for the creation tasks for a job and the cursor.

type TaskResolverResult

type TaskResolverResult struct {
	// TaskInfos contains the data to be sent for each target.
	TaskInfos []TaskInfo
	// Cursor provides information for pagination or continuation.
	Cursor TaskResolverCursor
	// Done indicates whether the resolution process is complete.
	Done bool
}

TaskResolverResult represents the response from resolving tasks.

type TaskResponse

type TaskResponse struct {
	TaskID            uuid.UUID `json:"taskId"`                 // TaskID is used to identify the task.
	Type              string    `json:"type"`                   // Type is the type of the task.
	WorkingState      []byte    `json:"workingState"`           // WorkingState is the state of the task that the operator updates.
	ETag              string    `json:"eTag"`                   // ETag is used to correlate the TaskResponse with the TaskRequest.
	Status            string    `json:"status"`                 // Status is the status of the task.
	ErrorMessage      string    `json:"errorMessage,omitempty"` // ErrorMessage contains the error message if the task fails.
	ReconcileAfterSec int64     `json:"reconcileAfterSec"`      // ReconcileAfterSec is the time in seconds after which the next TaskRequest should be queued again.
}

TaskResponse is the response object received from the operator.

type TaskStatus

type TaskStatus string

TaskStatus represents the status of the Task.

const (
	TaskStatusCreated    TaskStatus = "CREATED"
	TaskStatusProcessing TaskStatus = "PROCESSING"
	TaskStatusDone       TaskStatus = "DONE"
	TaskStatusFailed     TaskStatus = "FAILED"
)

Possible Task status.

type TerminationEventFunc

type TerminationEventFunc func(ctx context.Context, job Job) error

TerminationEventFunc defines a callback function type for sending job events.

type TransactionFunc

type TransactionFunc func(context.Context, Repository) error

TransactionFunc defines a function type for executing operations within a transaction.

type WorkerConfig

type WorkerConfig struct {
	// NoOfWorkers specifies the number of concurrent workers.
	NoOfWorkers int
	// ExecInterval is the interval between executions.
	ExecInterval time.Duration
	// Timeout is the maximum duration allowed for a single execution.
	Timeout time.Duration
}

WorkerConfig defines the configuration for a worker process.

Directories

Path Synopsis
client
examples
manager command
operator command
internal
store
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL