orbital

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

README

REUSE status

orbital

About this project

Orbital is an open-source framework crafted to synchronize resources seamlessly across system boundaries. Achieving eventual consistency, it offers real-time introspection to report resource states with ease. Simplifying resource management, it requires consumers to implement only a single functional operator, eliminating the need for on-site databases or meta state storage.

Requirements and Setup

Orbital is a library to reconcile state across service boundaries.

Support, Feedback, Contributing

This project is open to feature requests/suggestions, bug reports etc. via GitHub issues. Contribution and feedback are encouraged and always welcome. For more information about how to contribute, the project structure, as well as additional contribution information, see our Contribution Guidelines.

Security / Disclosure

If you find any bug that may be a security problem, please follow our instructions at in our security policy on how to report it. Please do not create GitHub issues for security-related doubts or problems.

Code of Conduct

We as members, contributors, and leaders pledge to make participation in our community a harassment-free experience for everyone. By participating in this project, you agree to abide by its Code of Conduct at all times.

Licensing

Copyright 2025 SAP SE or an SAP affiliate company and orbital contributors. Please see our LICENSE for copyright and license information. Detailed information including third-party components and their licensing/copyright information is available via the REUSE tool.

Documentation

Index

Constants

View Source
const MessageSignatureKey = "X-Message-Signature"

MessageSignatureKey is the key used to store the message signature.

Variables

View Source
var (
	ErrManagerAlreadyStarted   = errors.New("manager was already started")
	ErrManagerNotStarted       = errors.New("manager was not started")
	ErrManagerInvalidConfig    = errors.New("manager has invalid configuration")
	ErrTaskResolverNotSet      = errors.New("taskResolver not set")
	ErrUnknownTaskResolverType = errors.New("unknown task resolver result type")
	ErrMsgFailedTasks          = "job has failed tasks"
	ErrJobUnCancelable         = errors.New("job cannot be canceled in its current state")
	ErrJobNotFound             = errors.New("job not found")
	ErrJobAlreadyExists        = errors.New("job already exists")
	ErrNoTargetManager         = errors.New("no target manager provided")
	ErrNoClientForTarget       = errors.New("no client for task target")
	ErrLoadingJob              = errors.New("failed to load job")
	ErrUpdatingJob             = errors.New("failed to update job")
	ErrTaskNotFound            = errors.New("task not found")
)
View Source
var (
	ErrInvalidEntityType = errors.New("invalid entity type")
	ErrMandatoryFields   = errors.New("mandatory fields")
)
View Source
var (
	ErrOperatorInvalidConfig      = errors.New("invalid operator configuration")
	ErrHandlerNil                 = errors.New("handler cannot be nil")
	ErrBufferSizeNegative         = errors.New("buffer size cannot be negative")
	ErrNumberOfWorkersNotPositive = errors.New("number of workers must be greater than 0")
	ErrUnknownTaskType            = errors.New("unknown task type")
)
View Source
var (
	// ErrRepoCreate is returned when the repository fails to create an entity.
	ErrRepoCreate = errors.New("failed to create entity")
	// ErrEntityUniqueViolation is returned when an entity violates a unique constraint.
	ErrEntityUniqueViolation = errors.New("entity violates unique contraint")
)
View Source
var (
	ErrMissingMessageSignature = errors.New("message signature not found in metadata")
	ErrUnsupportedDataType     = errors.New("signer unsupported data type")
	ErrSignerVerifierNil       = errors.New("both signer and verifier cannot be nil")
)
View Source
var ErrWorkingStateInvalid = errors.New("invalid working state")

ErrWorkingStateInvalid is returned when the working state is invalid.

Functions

func CancelJobConfirmer added in v0.5.0

func CancelJobConfirmer(reason string) jobConfirmerCanceled

CancelJobConfirmer creates a result indicating that the job confirmer should cancel the job with a reason.

Example:

return orbital.CancelJobConfirmer("resource not available"), nil

func CancelTaskResolver added in v0.5.0

func CancelTaskResolver(reason string) taskResolverCanceled

CancelTaskResolver creates a result indicating the job should be canceled for the given reason.

Example:

return orbital.CancelTaskResolver("invalid job"), nil

func CompleteJobConfirmer added in v0.5.0

func CompleteJobConfirmer() jobConfirmerDone

CompleteJobConfirmer creates a result indicating that the job confirmer should mark the job as confirmed and ready for resolving tasks.

Example:

return orbital.CompleteJobConfirmer(), nil

func CompleteTaskResolver added in v0.5.0

func CompleteTaskResolver() taskResolverDone

CompleteTaskResolver creates a result indicating task resolution is complete.

Example:

return orbital.CompleteTaskResolver().
    WithTaskInfo(finalBatchOfTasks), nil

func ContinueJobConfirmer added in v0.5.0

func ContinueJobConfirmer() jobConfirmerProcessing

ContinueJobConfirmer creates a result indicating that the job confirmer should continue confirming the job.

Example:

return orbital.ContinueJobConfirmer(), nil

func ContinueTaskResolver added in v0.5.0

func ContinueTaskResolver() taskResolverProcessing

ContinueTaskResolver creates a result indicating more resolution iterations are needed.

Example:

return orbital.ContinueTaskResolver().
    WithTaskInfo(batchOfTasks).
    WithCursor("page-2-token"), nil

func Decode

func Decode[T EntityTypes](entity Entity) (T, error)

Decode converts a store entity to a domain object.

func Decodes

func Decodes[T EntityTypes](entities ...Entity) ([]T, error)

Decodes converts a list of store entities to domain objects.

func Init

func Init(e *Entity)

Init ensures that the metadata of the entity is properly initialized. It sets default values for CreatedAt, UpdatedAt, ID and ExternalID if they are not already set.

Types

type Codec

type Codec interface {
	EncodeTaskRequest(request TaskRequest) ([]byte, error)
	DecodeTaskRequest(bytes []byte) (TaskRequest, error)
	EncodeTaskResponse(response TaskResponse) ([]byte, error)
	DecodeTaskResponse(bytes []byte) (TaskResponse, error)
}

Codec defines the methods for encoding and decoding task requests and responses.

type Config

type Config struct {
	// TaskLimitNum is the maximum number of tasks to process at once.
	TaskLimitNum int
	// ConfirmJobAfter is the delay before confirming a job.
	ConfirmJobAfter time.Duration
	// ConfirmJobWorkerConfig holds the configuration for the job confirmation worker.
	ConfirmJobWorkerConfig WorkerConfig
	// CreateTasksWorkerConfig holds the configuration for the task creation worker.
	CreateTasksWorkerConfig WorkerConfig
	// ReconcilesWorkerConfig holds the configuration for the reconciliation worker.
	ReconcileWorkerConfig WorkerConfig
	// NotifyWorkerConfig holds the configuration for the notification worker.
	NotifyWorkerConfig WorkerConfig
	// BackoffBaseIntervalSec is the base interval for exponential backoff in seconds.
	// Default is 10 seconds.
	BackoffBaseIntervalSec uint64
	// BackoffMaxIntervalSec is the maximum interval for exponential backoff in seconds.
	// Default is 10240 seconds (2 hours and 50 minutes).
	BackoffMaxIntervalSec uint64
	// MaxPendingReconciles is the maximum number of pending reconciles for a task before marking it as failed.
	// Default is 10.
	MaxPendingReconciles uint64
}

Config contains configuration for job processing.

type Entity

type Entity struct {
	Name      query.EntityName
	ID        uuid.UUID
	CreatedAt int64
	UpdatedAt int64
	Values    map[string]any
}

Entity represents a generic entity stored in the data store. It includes metadata such as the entity's name, unique identifier,created and updated timestamps, and a map of all values.

func Encode

func Encode[T EntityTypes](entityType T) (Entity, error)

Encode converts a domain object to a store entity.

func Encodes

func Encodes[T EntityTypes](entityTypes ...T) ([]Entity, error)

Encodes converts a list of domain objects to store entities.

func TransformToEntities

func TransformToEntities(entityName query.EntityName, objs ...map[string]any) ([]Entity, error)

TransformToEntities transforms a list of maps into domain entities based on the provided entity name.

func TransformToEntity

func TransformToEntity(entityName query.EntityName, objs map[string]any) (Entity, error)

TransformToEntity transforms a map into a domain entity based on the provided entity name.

type EntityTypes

type EntityTypes interface {
	Job | Task | JobCursor | JobEvent
}

EntityTypes defines a type constraint for entities that can be used in the repository. It allows only types Job, Task, or JobCursor to satisfy this interface.

type FindResult

type FindResult struct {
	Entity Entity
	Exists bool
}

type HandlerFunc added in v0.5.0

type HandlerFunc func(ctx context.Context, request HandlerRequest, resp *HandlerResponse)

HandlerFunc processes a handler request and populates the handler response. Per default, the handler response will continue processing and the working state will be preserved.

type HandlerRequest

type HandlerRequest struct {
	TaskID               uuid.UUID
	TaskType             string
	TaskData             []byte
	TaskRawWorkingState  []byte
	TaskCreatedAt        time.Time
	TaskLastReconciledAt time.Time
}

HandlerRequest contains information extracted from orbital.TaskRequest that are relevant for the operator's processing.

type HandlerResponse

type HandlerResponse struct {
	// contains filtered or unexported fields
}

HandlerResponse is used by the handler to indicate the result of processing.

func (*HandlerResponse) Complete added in v0.5.0

func (r *HandlerResponse) Complete()

Complete indicates that the handler has processed the request and wants to mark the task as completed.

Note: This will terminate the processing of the task.

func (*HandlerResponse) ContinueAndWaitFor added in v0.5.0

func (r *HandlerResponse) ContinueAndWaitFor(duration time.Duration)

ContinueAndWaitFor indicates that the handler has processed the request and wants to continue processing after a defined duration.

Note: Duration will be converted to seconds and rounded down.

func (*HandlerResponse) Fail added in v0.5.0

func (r *HandlerResponse) Fail(reason string)

Fail indicates that the handler has processed the request and wants to mark the task as failed with a reason.

Note: This will terminate the processing of the task.

func (*HandlerResponse) UseRawWorkingState added in v0.5.0

func (r *HandlerResponse) UseRawWorkingState(raw []byte)

UseRawWorkingState allows the handler to set the rawWorkingState directly.

Note: If the WorkingState is used in the handler, the changes in the WorkingState take precedence over the rawWorkingState.

func (*HandlerResponse) WorkingState

func (r *HandlerResponse) WorkingState() (*WorkingState, error)

WorkingState returns the WorkingState from the HandlerResponse. It returns an error if the decoding of the rawWorkingState fails.

The WorkingState is automatically encoded back into the orbital.TaskResponse. If the working state is not decoded, if the changes are discarded, or if there is an error during encoding, the rawWorkingState field will take precedence.

type Initiator

type Initiator interface {
	SendTaskRequest(ctx context.Context, request TaskRequest) error
	ReceiveTaskResponse(ctx context.Context) (TaskResponse, error)
	Close(ctx context.Context) error
}

Initiator defines the methods for sending task requests and receiving task responses.

type InitiatorSignatureHandler added in v0.4.0

type InitiatorSignatureHandler struct {
	// contains filtered or unexported fields
}

InitiatorSignatureHandler manages signing and verification operations for initiators. It holds references to a signer for generating signatures and a verifier for validating them.

func NewInitiatorSignatureHandler added in v0.4.0

func NewInitiatorSignatureHandler(signer *jwtsigning.Signer, verifier *jwtsigning.Verifier) (*InitiatorSignatureHandler, error)

NewInitiatorSignatureHandler creates a new InitiatorSignatureHandler using the provided signer and verifier. Returns an error if both signer or verifier is nil.

func (*InitiatorSignatureHandler) Sign added in v0.4.0

Sign generates a signature for the provided TaskRequest using the InitiatorSignatureHandler's signer. Returns the generated Signature and any error encountered during signing.

func (*InitiatorSignatureHandler) Verify added in v0.4.0

func (m *InitiatorSignatureHandler) Verify(ctx context.Context, response TaskResponse) error

Verify checks the signature of the provided TaskResponse using the InitiatorSignatureHandler's verifier. Returns an error if the signature is invalid.

type Job

type Job struct {
	ID           uuid.UUID
	ExternalID   string
	Data         []byte
	Type         string
	Status       JobStatus
	ErrorMessage string
	UpdatedAt    int64
	CreatedAt    int64
}

Job is the translated domain object representing an event.

func NewJob

func NewJob(jobType string, data []byte) Job

NewJob creates a new Job instance with the given parameters.

func (Job) WithExternalID added in v0.2.0

func (j Job) WithExternalID(id string) Job

WithExternalID allows to set an external identifier for the job.

type JobConfirmFunc

type JobConfirmFunc func(ctx context.Context, job Job) (JobConfirmerResult, error)

JobConfirmFunc validates a job's readiness before task resolution. It can verify that the underlying resource is available and in the expected state.

Return one of:

  • ContinueJobConfirmer() to continue confirming
  • CompleteJobConfirmer() to mark the job as confirmed and ready for resolving tasks
  • CancelJobConfirmer(reason) to cancel the job with a reason

Errors returned from this function will be treated as recoverable meaning the confirmer can retry later.

Example:

func myConfirmer(ctx context.Context, job Job) (JobConfirmerResult, error) {
    if resourceIsAvailableAndValid() {
        return orbital.CompleteJobConfirmer(), nil
    }
    return orbital.ContinueJobConfirmer(), nil
}

type JobConfirmerResult added in v0.5.0

type JobConfirmerResult interface {
	Type() JobConfirmerResultType
}

JobConfirmerResult represents the outcome of a job confirmation attempt.

type JobConfirmerResultType added in v0.5.0

type JobConfirmerResultType int

JobConfirmerResultType is the type for the job confirmer result.

type JobCursor

type JobCursor struct {
	ID        uuid.UUID
	Cursor    TaskResolverCursor
	UpdatedAt int64
	CreatedAt int64
}

JobCursor stores the cursor for the next taskResolver.

type JobEvent

type JobEvent struct {
	ID         uuid.UUID
	IsNotified bool
	UpdatedAt  int64
	CreatedAt  int64
}

JobEvent represents an event related to a job, used for tracking and processing job state changes.

type JobEventQuery

type JobEventQuery struct {
	ID                 uuid.UUID // Filter job events by their ID.
	IsNotified         *bool     // Filter job events by whether they have been notified.
	RetrievalModeQueue bool      // If true, enables queue-like retrieval mode.
	Limit              int       // Maximum number of job events to return.
	OrderByUpdatedAt   bool      // If true, orders job events by updated_at in ascending order.
}

JobEventQuery defines the parameters for querying job events from the repository.

type JobStatus

type JobStatus string

JobStatus represents the possible statuses of a Job.

const (
	JobStatusCreated         JobStatus = "CREATED"
	JobStatusConfirming      JobStatus = "CONFIRMING"
	JobStatusConfirmed       JobStatus = "CONFIRMED"
	JobStatusResolving       JobStatus = "RESOLVING"
	JobStatusReady           JobStatus = "READY"
	JobStatusProcessing      JobStatus = "PROCESSING"
	JobStatusDone            JobStatus = "DONE"
	JobStatusFailed          JobStatus = "FAILED"
	JobStatusResolveCanceled JobStatus = "RESOLVE_CANCELED"
	JobStatusConfirmCanceled JobStatus = "CONFIRM_CANCELED"
	JobStatusUserCanceled    JobStatus = "USER_CANCELED"
)

Possible job statuses.

func TerminalStatuses added in v0.2.0

func TerminalStatuses() []JobStatus

TerminalStatuses returns the list of job statuses that are considered terminal.

type JobStatuses added in v0.2.0

type JobStatuses []JobStatus

JobStatuses is a slice of JobStatus values.

func TransientStatuses added in v0.2.0

func TransientStatuses() JobStatuses

TransientStatuses returns the list of job statuses that are considered transient.

func (JobStatuses) StringSlice added in v0.2.0

func (js JobStatuses) StringSlice() []string

StringSlice converts the JobStatuses to a slice of strings.

type JobTerminatedEventFunc added in v0.2.0

type JobTerminatedEventFunc func(ctx context.Context, job Job) error

JobTerminatedEventFunc defines a callback function type for sending job events.

type ListJobsQuery

type ListJobsQuery struct {
	Status             JobStatus   // Filter jobs by their status.
	StatusIn           []JobStatus // Filter jobs by a list of statuses.
	UpdatedAt          int64       // Filter jobs updated at or after this timestamp.
	CreatedAt          int64       // Filter jobs created at or after this timestamp.
	Limit              int         // Maximum number of jobs to return.
	RetrievalModeQueue bool        // If true, enables queue-like retrieval mode.
	OrderByUpdatedAt   bool        // If true, orders jobs by updated_at in descending order.
}

ListJobsQuery defines the parameters for querying jobs from the repository. It allows filtering by status, creation time, result limit, and enables queue-like retrieval mode when specified.

type ListResult

type ListResult struct {
	Entities []Entity
	Cursor   query.Cursor
	Exists   bool
}

type ListTasksQuery

type ListTasksQuery struct {
	JobID              uuid.UUID    // Filter tasks by the associated job ID.
	Status             TaskStatus   // Filter tasks by their status.
	StatusIn           []TaskStatus // Filter tasks by a list of statuses.
	CreatedAt          int64        // Filter tasks created at or after this timestamp.
	UpdatedAt          int64        // Filter tasks updated at or after this timestamp.
	Limit              int          // Maximum number of tasks to return.
	IsReconcileReady   bool         // If true, filters tasks that are ready to be reconciled.
	RetrievalModeQueue bool         // If true, enables queue-like retrieval mode.
	OrderByUpdatedAt   bool         // If true, orders jobs by updated_at in ascending order.
}

ListTasksQuery defines the parameters for querying tasks from the repository. It supports filtering by job ID, task status, creation timestamp, and result limit.

type Manager

type Manager struct {
	Config Config
	// contains filtered or unexported fields
}

Manager is the interface for managing jobs, including their creation, state transitions, and lifecycle handling.

func NewManager

func NewManager(repo *Repository, taskResolver TaskResolveFunc, optFuncs ...ManagerOptsFunc) (*Manager, error)

NewManager creates a new Manager instance.

func (*Manager) CancelJob added in v0.2.0

func (m *Manager) CancelJob(ctx context.Context, jobID uuid.UUID) error

CancelJob cancels a job and associated running tasks. It updates the job status to "USER_CANCEL".

func (*Manager) GetJob

func (m *Manager) GetJob(ctx context.Context, jobID uuid.UUID) (Job, bool, error)

GetJob retrieves a job by its ID from the repository.

func (*Manager) ListTasks

func (m *Manager) ListTasks(ctx context.Context, query ListTasksQuery) ([]Task, error)

ListTasks retrieves tasks by query from the repository.

func (*Manager) PrepareJob

func (m *Manager) PrepareJob(ctx context.Context, job Job) (Job, error)

PrepareJob prepares a job by creating it in the repository with status CREATED. It returns an error if a job with the same type and external ID in a non-terminal status already exists.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts the job manager to process jobs.

func (*Manager) Stop added in v0.4.0

func (m *Manager) Stop(ctx context.Context) error

type ManagerOptsFunc

type ManagerOptsFunc func(mgr *Manager)

ManagerOptsFunc is a function type to configure Manager options.

func WithJobCanceledEventFunc added in v0.2.0

func WithJobCanceledEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc

WithJobCanceledEventFunc registers a function to send job canceled events.

func WithJobConfirmFunc

func WithJobConfirmFunc(f JobConfirmFunc) ManagerOptsFunc

WithJobConfirmFunc registers a function to confirm jobs.

func WithJobDoneEventFunc added in v0.2.0

func WithJobDoneEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc

WithJobDoneEventFunc registers a function to send job done events.

func WithJobFailedEventFunc added in v0.2.0

func WithJobFailedEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc

WithJobFailedEventFunc registers a function to send job failed events.

func WithTargets added in v0.3.0

func WithTargets(targets map[string]TargetManager) ManagerOptsFunc

WithTargets set the map that maps the target string to its TargetManager.

type MetaData added in v0.3.0

type MetaData map[string]string

MetaData represents a set of key-value pairs containing metadata information. It is commonly used for storing additional attributes related to requests, responses, or signatures.

type Operator

type Operator struct {
	// contains filtered or unexported fields
}

Operator handles task requests and responses.

func NewOperator

func NewOperator(target TargetOperator, opts ...Option) (*Operator, error)

NewOperator creates a new Operator instance with the given Responder and options.

func (*Operator) ListenAndRespond

func (o *Operator) ListenAndRespond(ctx context.Context)

ListenAndRespond starts listening for task requests and responding to them.

func (*Operator) RegisterHandler

func (o *Operator) RegisterHandler(taskType string, h HandlerFunc) error

RegisterHandler registers a handler for a specific task type. It returns an error if the handler is nil.

type Option

type Option func(*config) error

Option is a function that modifies the config parameter of the Operator.

func WithBufferSize

func WithBufferSize(size int) Option

WithBufferSize sets the buffer size for the requests channel. It returns an error if the size is negative.

func WithNumberOfWorkers

func WithNumberOfWorkers(num int) Option

WithNumberOfWorkers sets the number of workers for processing requests. It returns an error if the number is not positive.

type Repository

type Repository struct {
	Store Store
}

Repository provides methods to interact with the underlying data store. It encapsulates a Store implementation for data persistence operations.

func NewRepository

func NewRepository(store Store) *Repository

NewRepository creates a new Repository instance using the provided Store. It returns a pointer to the initialized Repository.

type Responder

type Responder interface {
	ReceiveTaskRequest(ctx context.Context) (TaskRequest, error)
	SendTaskResponse(ctx context.Context, response TaskResponse) error
	Close(ctx context.Context) error
}

Responder defines the methods for receiving task requests and sending task responses.

type ResponderSignatureHandler added in v0.4.0

type ResponderSignatureHandler struct {
	// contains filtered or unexported fields
}

ResponderSignatureHandler manages signing and verification operations for responders. It holds references to a signer for generating signatures and a verifier for validating them.

func NewResponderSignatureHandler added in v0.4.0

func NewResponderSignatureHandler(signer *jwtsigning.Signer, verifier *jwtsigning.Verifier) (*ResponderSignatureHandler, error)

NewResponderSignatureHandler creates a new ResponderSignatureHandler using the provided signer and verifier. Returns an error if both signer or verifier is nil.

func (*ResponderSignatureHandler) Sign added in v0.4.0

Sign generates a signature for the provided TaskResponse using the ResponderSignatureHandler's signer. Returns the generated Signature and any error encountered during signing.

func (*ResponderSignatureHandler) Verify added in v0.4.0

func (o *ResponderSignatureHandler) Verify(ctx context.Context, request TaskRequest) error

Verify checks the signature of the provided TaskRequest using the ResponderSignatureHandler's verifier. Returns an error if the signature is invalid.

type Signature added in v0.3.0

type Signature MetaData

Signature represents the metadata used for signing and verifying requests. It typically contains cryptographic information such as signatures or tokens.

type Store

type Store interface {
	Create(ctx context.Context, r ...Entity) ([]Entity, error)
	Update(ctx context.Context, r ...Entity) ([]Entity, error)
	Find(ctx context.Context, q query.Query) (FindResult, error)
	List(ctx context.Context, q query.Query) (ListResult, error)
	Transaction(ctx context.Context, txFunc TransactionFunc) error
}

Store defines the interface for a data store that supports CRUD operations and transactions.

type TargetManager added in v0.4.0

type TargetManager struct {
	Client             Initiator
	Signer             TaskRequestSigner
	Verifier           TaskResponseVerifier
	MustCheckSignature bool
}

TargetManager holds the client and cryptographic implementation for initiating tasks. It provides access to the Initiator for communication, Signer and Verifier for signing and verification operations.

type TargetOperator added in v0.4.0

type TargetOperator struct {
	Client             Responder
	Verifier           TaskRequestVerifier
	Signer             TaskResponseSigner
	MustCheckSignature bool
}

TargetOperator holds the client and cryptographic implementation for responding to tasks. It provides access to the Responder for communication, Signer and Verifier for signing and verification operations.

type Task

type Task struct {
	ID                 uuid.UUID
	JobID              uuid.UUID
	Type               string
	Data               []byte
	WorkingState       []byte
	LastReconciledAt   int64  // The last time the task was reconciled.
	ReconcileCount     uint64 // The number of times the task has been reconciled.
	ReconcileAfterSec  uint64 // The number of seconds after which the task should be reconciled.
	TotalSentCount     uint64
	TotalReceivedCount uint64
	ETag               string
	Status             TaskStatus
	Target             string
	ErrorMessage       string
	UpdatedAt          int64
	CreatedAt          int64
}

Task is a trackable unit derived from the Job.

type TaskInfo

type TaskInfo struct {
	// Data contains the byte data that needs to be sent.
	Data []byte
	// Type specifies the type of the data.
	Type string
	// Targets lists the target identifiers associated with job.
	Target string
}

TaskInfo represents the result of resolving a task.

type TaskRequest

type TaskRequest struct {
	TaskID               uuid.UUID `json:"taskId"`               // TaskID is used to identify the task.
	Type                 string    `json:"type"`                 // Type is the type of the task.
	ExternalID           string    `json:"externalId"`           // External ID serves as an identifier for a Job
	Data                 []byte    `json:"data"`                 // Data is the static context for the task.
	WorkingState         []byte    `json:"workingState"`         // WorkingState is the current state of the task that the operator works upon.
	ETag                 string    `json:"eTag"`                 // ETag is used to identify the version of the TaskRequest.
	MetaData             MetaData  `json:"metaData"`             // MetaData contains additional information about the TaskRequest.
	TaskCreatedAt        int64     `json:"taskCreatedAt"`        // TaskCreatedAt is the timestamp when the task was created, represented as Unix time in seconds.
	TaskLastReconciledAt int64     `json:"taskLastReconciledAt"` // TaskLastReconciledAt is the timestamp when the task was last reconciled, represented as Unix time in seconds.
}

TaskRequest is the request object that will be sent to the operator.

type TaskRequestSigner added in v0.3.0

type TaskRequestSigner interface {
	// Sign signs the given TaskRequest and returns a Signature.
	Sign(ctx context.Context, request TaskRequest) (Signature, error)
}

TaskRequestSigner defines an interface for signing TaskRequest objects. Implementations of this interface are responsible for generating a Signature for a given TaskRequest, typically for authentication or verification purposes.

type TaskRequestVerifier added in v0.3.0

type TaskRequestVerifier interface {
	// Verify verifies the authenticity of the given TaskRequest.
	Verify(ctx context.Context, request TaskRequest) error
}

TaskRequestVerifier defines an interface for verifying the authenticity of TaskRequest objects. Implementations of this interface are responsible for checking the validity and integrity of a given TaskRequest, typically using cryptographic methods.

type TaskResolveFunc added in v0.2.0

type TaskResolveFunc func(ctx context.Context, job Job, cursor TaskResolverCursor) (TaskResolverResult, error)

TaskResolveFunc resolves tasks for a job, potentially in multiple iterations.

Return one of:

  • ContinueTaskResolver() to continue resolving (with optional tasks and cursor)
  • CompleteTaskResolver() to finish resolving (with optional final tasks)
  • CancelTaskResolver(reason) to abort the job

Errors returned from this function will be treated as recoverable meaning the resolver can retry later.

Example:

func myResolver(ctx context.Context, job Job, cursor TaskResolverCursor) (TaskResolverResult, error) {
    tasks := []TaskInfo{...} // resolve some tasks
    if moreTasksExist {
        return orbital.ContinueTaskResolver().WithTaskInfo(tasks).WithCursor("next-page-token"), nil
    }
    return orbital.CompleteTaskResolver().WithTaskInfo(tasks), nil
}

type TaskResolverCursor

type TaskResolverCursor string

TaskResolverCursor is an opaque token used to resume or paginate task resolution. The resolver function controls its format and meaning.

type TaskResolverResult

type TaskResolverResult interface {
	Type() TaskResolverResultType
}

TaskResolverResult represents the outcome of a task resolution attempt.

type TaskResolverResultType added in v0.5.0

type TaskResolverResultType int

TaskResolverResultType is the type for the task resolver result.

type TaskResponse

type TaskResponse struct {
	TaskID            uuid.UUID `json:"taskId"`                 // TaskID is used to identify the task.
	Type              string    `json:"type"`                   // Type is the type of the task.
	ExternalID        string    `json:"externalId"`             // External ID serves as an identifier for a Job
	WorkingState      []byte    `json:"workingState"`           // WorkingState is the state of the task that the operator updates.
	ETag              string    `json:"eTag"`                   // ETag is used to correlate the TaskResponse with the TaskRequest.
	Status            string    `json:"status"`                 // Status is the status of the task.
	ErrorMessage      string    `json:"errorMessage,omitempty"` // ErrorMessage contains the error message if the task fails.
	ReconcileAfterSec uint64    `json:"reconcileAfterSec"`      // ReconcileAfterSec is the time in seconds after which the next TaskRequest should be queued again.
	MetaData          MetaData  `json:"metaData"`               // MetaData contains additional information about the TaskResponse.
}

TaskResponse is the response object received from the operator.

func ExecuteHandler added in v0.5.0

func ExecuteHandler(ctx context.Context, h HandlerFunc, req TaskRequest) TaskResponse

ExecuteHandler transforms the TaskRequest into a HandlerRequest, executes the handler, and transforms the HandlerResponse back into a TaskResponse.

type TaskResponseSigner added in v0.3.0

type TaskResponseSigner interface {
	// Sign signs the given TaskResponse and returns a Signature.
	Sign(ctx context.Context, response TaskResponse) (Signature, error)
}

TaskResponseSigner defines an interface for signing TaskResponse objects. Implementations of this interface are responsible for generating a Signature for a given TaskResponse, typically for authentication or verification purposes.

type TaskResponseVerifier added in v0.3.0

type TaskResponseVerifier interface {
	// Verify verifies the authenticity of the given TaskResponse.
	Verify(ctx context.Context, response TaskResponse) error
}

TaskResponseVerifier defines an interface for verifying the authenticity of TaskResponse objects. Implementations of this interface are responsible for checking the validity and integrity of a given TaskResponse, typically using cryptographic methods.

type TaskStatus

type TaskStatus string

TaskStatus represents the status of the Task.

const (
	TaskStatusCreated    TaskStatus = "CREATED"
	TaskStatusProcessing TaskStatus = "PROCESSING"
	TaskStatusDone       TaskStatus = "DONE"
	TaskStatusFailed     TaskStatus = "FAILED"
)

Possible Task status.

type TransactionFunc

type TransactionFunc func(context.Context, Repository) error

TransactionFunc defines a function type for executing operations within a transaction.

type WorkerConfig

type WorkerConfig struct {
	// NoOfWorkers specifies the number of concurrent workers.
	NoOfWorkers int
	// ExecInterval is the interval between executions.
	ExecInterval time.Duration
	// Timeout is the maximum duration allowed for a single execution.
	Timeout time.Duration
}

WorkerConfig defines the configuration for a worker process.

type WorkingState added in v0.4.0

type WorkingState struct {
	// contains filtered or unexported fields
}

WorkingState represents the working state of a task. It provides methods for storing arbitrary key-value pairs and convenience methods for tracking metrics.

func (*WorkingState) Add added in v0.4.0

func (w *WorkingState) Add(key string, amount float64) float64

Add adds the specified amount to a key and returns the new value.

func (*WorkingState) Dec added in v0.4.0

func (w *WorkingState) Dec(key string) float64

Dec decrements a key and returns the new value.

func (*WorkingState) Delete added in v0.4.0

func (w *WorkingState) Delete(key string)

Delete removes a key from the WorkingState.

func (*WorkingState) DiscardChanges added in v0.4.0

func (w *WorkingState) DiscardChanges()

DiscardChanges discards any changes made to the WorkingState.

func (*WorkingState) Inc added in v0.4.0

func (w *WorkingState) Inc(key string) float64

Inc increments the value of a key and returns the new value.

func (*WorkingState) Set added in v0.4.0

func (w *WorkingState) Set(key string, value any)

Set sets a key-value pair in the WorkingState.

func (*WorkingState) Sub added in v0.4.0

func (w *WorkingState) Sub(key string, amount float64) float64

Sub subtracts the specified amount from a key and returns the new value.

func (*WorkingState) Value added in v0.4.0

func (w *WorkingState) Value(key string) (any, bool)

Value gets the value for a key from the WorkingState.

Directories

Path Synopsis
client
examples
embedded_client command
manager command
operator command
internal
proto
store
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL