Documentation
¶
Index ¶
- Constants
- Variables
- func CancelJobConfirmer(reason string) jobConfirmerCanceled
- func CancelTaskResolver(reason string) taskResolverCanceled
- func CompleteJobConfirmer() jobConfirmerDone
- func CompleteTaskResolver() taskResolverDone
- func ContinueJobConfirmer() jobConfirmerProcessing
- func ContinueTaskResolver() taskResolverProcessing
- func Decode[T EntityTypes](entity Entity) (T, error)
- func Decodes[T EntityTypes](entities ...Entity) ([]T, error)
- func Init(e *Entity)
- type Codec
- type Config
- type Entity
- func Encode[T EntityTypes](entityType T) (Entity, error)
- func Encodes[T EntityTypes](entityTypes ...T) ([]Entity, error)
- func TransformToEntities(entityName query.EntityName, objs ...map[string]any) ([]Entity, error)
- func TransformToEntity(entityName query.EntityName, objs map[string]any) (Entity, error)
- type EntityTypes
- type FindResult
- type HandlerFunc
- type HandlerRequest
- type HandlerResponse
- type Initiator
- type InitiatorSignatureHandler
- type Job
- type JobConfirmFunc
- type JobConfirmerResult
- type JobConfirmerResultType
- type JobCursor
- type JobEvent
- type JobEventQuery
- type JobStatus
- type JobStatuses
- type JobTerminatedEventFunc
- type ListJobsQuery
- type ListResult
- type ListTasksQuery
- type Manager
- func (m *Manager) CancelJob(ctx context.Context, jobID uuid.UUID) error
- func (m *Manager) GetJob(ctx context.Context, jobID uuid.UUID) (Job, bool, error)
- func (m *Manager) ListTasks(ctx context.Context, query ListTasksQuery) ([]Task, error)
- func (m *Manager) PrepareJob(ctx context.Context, job Job) (Job, error)
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) Stop(ctx context.Context) error
- type ManagerOptsFunc
- func WithJobCanceledEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc
- func WithJobConfirmFunc(f JobConfirmFunc) ManagerOptsFunc
- func WithJobDoneEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc
- func WithJobFailedEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc
- func WithTargets(targets map[string]TargetManager) ManagerOptsFunc
- type MetaData
- type Operator
- type Option
- type Repository
- type Responder
- type ResponderSignatureHandler
- type Signature
- type Store
- type TargetManager
- type TargetOperator
- type Task
- type TaskInfo
- type TaskRequest
- type TaskRequestSigner
- type TaskRequestVerifier
- type TaskResolveFunc
- type TaskResolverCursor
- type TaskResolverResult
- type TaskResolverResultType
- type TaskResponse
- type TaskResponseSigner
- type TaskResponseVerifier
- type TaskStatus
- type TransactionFunc
- type WorkerConfig
- type WorkingState
- func (w *WorkingState) Add(key string, amount float64) float64
- func (w *WorkingState) Dec(key string) float64
- func (w *WorkingState) Delete(key string)
- func (w *WorkingState) DiscardChanges()
- func (w *WorkingState) Inc(key string) float64
- func (w *WorkingState) Set(key string, value any)
- func (w *WorkingState) Sub(key string, amount float64) float64
- func (w *WorkingState) Value(key string) (any, bool)
Constants ¶
const MessageSignatureKey = "X-Message-Signature"
MessageSignatureKey is the key used to store the message signature.
Variables ¶
var ( ErrManagerAlreadyStarted = errors.New("manager was already started") ErrManagerNotStarted = errors.New("manager was not started") ErrManagerInvalidConfig = errors.New("manager has invalid configuration") ErrTaskResolverNotSet = errors.New("taskResolver not set") ErrUnknownTaskResolverType = errors.New("unknown task resolver result type") ErrMsgFailedTasks = "job has failed tasks" ErrJobUnCancelable = errors.New("job cannot be canceled in its current state") ErrJobNotFound = errors.New("job not found") ErrJobAlreadyExists = errors.New("job already exists") ErrNoTargetManager = errors.New("no target manager provided") ErrNoClientForTarget = errors.New("no client for task target") ErrLoadingJob = errors.New("failed to load job") ErrUpdatingJob = errors.New("failed to update job") ErrTaskNotFound = errors.New("task not found") )
var ( ErrInvalidEntityType = errors.New("invalid entity type") ErrMandatoryFields = errors.New("mandatory fields") )
var ( ErrOperatorInvalidConfig = errors.New("invalid operator configuration") ErrHandlerNil = errors.New("handler cannot be nil") ErrBufferSizeNegative = errors.New("buffer size cannot be negative") ErrNumberOfWorkersNotPositive = errors.New("number of workers must be greater than 0") ErrUnknownTaskType = errors.New("unknown task type") )
var ( // ErrRepoCreate is returned when the repository fails to create an entity. ErrRepoCreate = errors.New("failed to create entity") // ErrEntityUniqueViolation is returned when an entity violates a unique constraint. ErrEntityUniqueViolation = errors.New("entity violates unique contraint") )
var ( ErrMissingMessageSignature = errors.New("message signature not found in metadata") ErrUnsupportedDataType = errors.New("signer unsupported data type") ErrSignerVerifierNil = errors.New("both signer and verifier cannot be nil") )
var ErrWorkingStateInvalid = errors.New("invalid working state")
ErrWorkingStateInvalid is returned when the working state is invalid.
Functions ¶
func CancelJobConfirmer ¶ added in v0.5.0
func CancelJobConfirmer(reason string) jobConfirmerCanceled
CancelJobConfirmer creates a result indicating that the job confirmer should cancel the job with a reason.
Example:
return orbital.CancelJobConfirmer("resource not available"), nil
func CancelTaskResolver ¶ added in v0.5.0
func CancelTaskResolver(reason string) taskResolverCanceled
CancelTaskResolver creates a result indicating the job should be canceled for the given reason.
Example:
return orbital.CancelTaskResolver("invalid job"), nil
func CompleteJobConfirmer ¶ added in v0.5.0
func CompleteJobConfirmer() jobConfirmerDone
CompleteJobConfirmer creates a result indicating that the job confirmer should mark the job as confirmed and ready for resolving tasks.
Example:
return orbital.CompleteJobConfirmer(), nil
func CompleteTaskResolver ¶ added in v0.5.0
func CompleteTaskResolver() taskResolverDone
CompleteTaskResolver creates a result indicating task resolution is complete.
Example:
return orbital.CompleteTaskResolver().
WithTaskInfo(finalBatchOfTasks), nil
func ContinueJobConfirmer ¶ added in v0.5.0
func ContinueJobConfirmer() jobConfirmerProcessing
ContinueJobConfirmer creates a result indicating that the job confirmer should continue confirming the job.
Example:
return orbital.ContinueJobConfirmer(), nil
func ContinueTaskResolver ¶ added in v0.5.0
func ContinueTaskResolver() taskResolverProcessing
ContinueTaskResolver creates a result indicating more resolution iterations are needed.
Example:
return orbital.ContinueTaskResolver().
WithTaskInfo(batchOfTasks).
WithCursor("page-2-token"), nil
func Decode ¶
func Decode[T EntityTypes](entity Entity) (T, error)
Decode converts a store entity to a domain object.
func Decodes ¶
func Decodes[T EntityTypes](entities ...Entity) ([]T, error)
Decodes converts a list of store entities to domain objects.
Types ¶
type Codec ¶
type Codec interface {
EncodeTaskRequest(request TaskRequest) ([]byte, error)
DecodeTaskRequest(bytes []byte) (TaskRequest, error)
EncodeTaskResponse(response TaskResponse) ([]byte, error)
DecodeTaskResponse(bytes []byte) (TaskResponse, error)
}
Codec defines the methods for encoding and decoding task requests and responses.
type Config ¶
type Config struct {
// TaskLimitNum is the maximum number of tasks to process at once.
TaskLimitNum int
// ConfirmJobAfter is the delay before confirming a job.
ConfirmJobAfter time.Duration
// ConfirmJobWorkerConfig holds the configuration for the job confirmation worker.
ConfirmJobWorkerConfig WorkerConfig
// CreateTasksWorkerConfig holds the configuration for the task creation worker.
CreateTasksWorkerConfig WorkerConfig
// ReconcilesWorkerConfig holds the configuration for the reconciliation worker.
ReconcileWorkerConfig WorkerConfig
// NotifyWorkerConfig holds the configuration for the notification worker.
NotifyWorkerConfig WorkerConfig
// BackoffBaseIntervalSec is the base interval for exponential backoff in seconds.
// Default is 10 seconds.
BackoffBaseIntervalSec uint64
// BackoffMaxIntervalSec is the maximum interval for exponential backoff in seconds.
// Default is 10240 seconds (2 hours and 50 minutes).
BackoffMaxIntervalSec uint64
// MaxPendingReconciles is the maximum number of pending reconciles for a task before marking it as failed.
// Default is 10.
MaxPendingReconciles uint64
}
Config contains configuration for job processing.
type Entity ¶
type Entity struct {
Name query.EntityName
ID uuid.UUID
CreatedAt int64
UpdatedAt int64
Values map[string]any
}
Entity represents a generic entity stored in the data store. It includes metadata such as the entity's name, unique identifier,created and updated timestamps, and a map of all values.
func Encode ¶
func Encode[T EntityTypes](entityType T) (Entity, error)
Encode converts a domain object to a store entity.
func Encodes ¶
func Encodes[T EntityTypes](entityTypes ...T) ([]Entity, error)
Encodes converts a list of domain objects to store entities.
func TransformToEntities ¶
TransformToEntities transforms a list of maps into domain entities based on the provided entity name.
func TransformToEntity ¶
TransformToEntity transforms a map into a domain entity based on the provided entity name.
type EntityTypes ¶
EntityTypes defines a type constraint for entities that can be used in the repository. It allows only types Job, Task, or JobCursor to satisfy this interface.
type FindResult ¶
type HandlerFunc ¶ added in v0.5.0
type HandlerFunc func(ctx context.Context, request HandlerRequest, resp *HandlerResponse)
HandlerFunc processes a handler request and populates the handler response. Per default, the handler response will continue processing and the working state will be preserved.
type HandlerRequest ¶
type HandlerRequest struct {
TaskID uuid.UUID
TaskType string
TaskData []byte
TaskRawWorkingState []byte
TaskCreatedAt time.Time
TaskLastReconciledAt time.Time
}
HandlerRequest contains information extracted from orbital.TaskRequest that are relevant for the operator's processing.
type HandlerResponse ¶
type HandlerResponse struct {
// contains filtered or unexported fields
}
HandlerResponse is used by the handler to indicate the result of processing.
func (*HandlerResponse) Complete ¶ added in v0.5.0
func (r *HandlerResponse) Complete()
Complete indicates that the handler has processed the request and wants to mark the task as completed.
Note: This will terminate the processing of the task.
func (*HandlerResponse) ContinueAndWaitFor ¶ added in v0.5.0
func (r *HandlerResponse) ContinueAndWaitFor(duration time.Duration)
ContinueAndWaitFor indicates that the handler has processed the request and wants to continue processing after a defined duration.
Note: Duration will be converted to seconds and rounded down.
func (*HandlerResponse) Fail ¶ added in v0.5.0
func (r *HandlerResponse) Fail(reason string)
Fail indicates that the handler has processed the request and wants to mark the task as failed with a reason.
Note: This will terminate the processing of the task.
func (*HandlerResponse) UseRawWorkingState ¶ added in v0.5.0
func (r *HandlerResponse) UseRawWorkingState(raw []byte)
UseRawWorkingState allows the handler to set the rawWorkingState directly.
Note: If the WorkingState is used in the handler, the changes in the WorkingState take precedence over the rawWorkingState.
func (*HandlerResponse) WorkingState ¶
func (r *HandlerResponse) WorkingState() (*WorkingState, error)
WorkingState returns the WorkingState from the HandlerResponse. It returns an error if the decoding of the rawWorkingState fails.
The WorkingState is automatically encoded back into the orbital.TaskResponse. If the working state is not decoded, if the changes are discarded, or if there is an error during encoding, the rawWorkingState field will take precedence.
type Initiator ¶
type Initiator interface {
SendTaskRequest(ctx context.Context, request TaskRequest) error
ReceiveTaskResponse(ctx context.Context) (TaskResponse, error)
Close(ctx context.Context) error
}
Initiator defines the methods for sending task requests and receiving task responses.
type InitiatorSignatureHandler ¶ added in v0.4.0
type InitiatorSignatureHandler struct {
// contains filtered or unexported fields
}
InitiatorSignatureHandler manages signing and verification operations for initiators. It holds references to a signer for generating signatures and a verifier for validating them.
func NewInitiatorSignatureHandler ¶ added in v0.4.0
func NewInitiatorSignatureHandler(signer *jwtsigning.Signer, verifier *jwtsigning.Verifier) (*InitiatorSignatureHandler, error)
NewInitiatorSignatureHandler creates a new InitiatorSignatureHandler using the provided signer and verifier. Returns an error if both signer or verifier is nil.
func (*InitiatorSignatureHandler) Sign ¶ added in v0.4.0
func (m *InitiatorSignatureHandler) Sign(ctx context.Context, request TaskRequest) (Signature, error)
Sign generates a signature for the provided TaskRequest using the InitiatorSignatureHandler's signer. Returns the generated Signature and any error encountered during signing.
func (*InitiatorSignatureHandler) Verify ¶ added in v0.4.0
func (m *InitiatorSignatureHandler) Verify(ctx context.Context, response TaskResponse) error
Verify checks the signature of the provided TaskResponse using the InitiatorSignatureHandler's verifier. Returns an error if the signature is invalid.
type Job ¶
type Job struct {
ID uuid.UUID
ExternalID string
Data []byte
Type string
Status JobStatus
ErrorMessage string
UpdatedAt int64
CreatedAt int64
}
Job is the translated domain object representing an event.
func (Job) WithExternalID ¶ added in v0.2.0
WithExternalID allows to set an external identifier for the job.
type JobConfirmFunc ¶
type JobConfirmFunc func(ctx context.Context, job Job) (JobConfirmerResult, error)
JobConfirmFunc validates a job's readiness before task resolution. It can verify that the underlying resource is available and in the expected state.
Return one of:
- ContinueJobConfirmer() to continue confirming
- CompleteJobConfirmer() to mark the job as confirmed and ready for resolving tasks
- CancelJobConfirmer(reason) to cancel the job with a reason
Errors returned from this function will be treated as recoverable meaning the confirmer can retry later.
Example:
func myConfirmer(ctx context.Context, job Job) (JobConfirmerResult, error) {
if resourceIsAvailableAndValid() {
return orbital.CompleteJobConfirmer(), nil
}
return orbital.ContinueJobConfirmer(), nil
}
type JobConfirmerResult ¶ added in v0.5.0
type JobConfirmerResult interface {
Type() JobConfirmerResultType
}
JobConfirmerResult represents the outcome of a job confirmation attempt.
type JobConfirmerResultType ¶ added in v0.5.0
type JobConfirmerResultType int
JobConfirmerResultType is the type for the job confirmer result.
type JobCursor ¶
type JobCursor struct {
ID uuid.UUID
Cursor TaskResolverCursor
UpdatedAt int64
CreatedAt int64
}
JobCursor stores the cursor for the next taskResolver.
type JobEvent ¶
JobEvent represents an event related to a job, used for tracking and processing job state changes.
type JobEventQuery ¶
type JobEventQuery struct {
ID uuid.UUID // Filter job events by their ID.
IsNotified *bool // Filter job events by whether they have been notified.
RetrievalModeQueue bool // If true, enables queue-like retrieval mode.
Limit int // Maximum number of job events to return.
OrderByUpdatedAt bool // If true, orders job events by updated_at in ascending order.
}
JobEventQuery defines the parameters for querying job events from the repository.
type JobStatus ¶
type JobStatus string
JobStatus represents the possible statuses of a Job.
const ( JobStatusCreated JobStatus = "CREATED" JobStatusConfirming JobStatus = "CONFIRMING" JobStatusConfirmed JobStatus = "CONFIRMED" JobStatusResolving JobStatus = "RESOLVING" JobStatusReady JobStatus = "READY" JobStatusProcessing JobStatus = "PROCESSING" JobStatusDone JobStatus = "DONE" JobStatusFailed JobStatus = "FAILED" JobStatusResolveCanceled JobStatus = "RESOLVE_CANCELED" JobStatusConfirmCanceled JobStatus = "CONFIRM_CANCELED" JobStatusUserCanceled JobStatus = "USER_CANCELED" )
Possible job statuses.
func TerminalStatuses ¶ added in v0.2.0
func TerminalStatuses() []JobStatus
TerminalStatuses returns the list of job statuses that are considered terminal.
type JobStatuses ¶ added in v0.2.0
type JobStatuses []JobStatus
JobStatuses is a slice of JobStatus values.
func TransientStatuses ¶ added in v0.2.0
func TransientStatuses() JobStatuses
TransientStatuses returns the list of job statuses that are considered transient.
func (JobStatuses) StringSlice ¶ added in v0.2.0
func (js JobStatuses) StringSlice() []string
StringSlice converts the JobStatuses to a slice of strings.
type JobTerminatedEventFunc ¶ added in v0.2.0
JobTerminatedEventFunc defines a callback function type for sending job events.
type ListJobsQuery ¶
type ListJobsQuery struct {
Status JobStatus // Filter jobs by their status.
StatusIn []JobStatus // Filter jobs by a list of statuses.
UpdatedAt int64 // Filter jobs updated at or after this timestamp.
CreatedAt int64 // Filter jobs created at or after this timestamp.
Limit int // Maximum number of jobs to return.
RetrievalModeQueue bool // If true, enables queue-like retrieval mode.
OrderByUpdatedAt bool // If true, orders jobs by updated_at in descending order.
}
ListJobsQuery defines the parameters for querying jobs from the repository. It allows filtering by status, creation time, result limit, and enables queue-like retrieval mode when specified.
type ListTasksQuery ¶
type ListTasksQuery struct {
JobID uuid.UUID // Filter tasks by the associated job ID.
Status TaskStatus // Filter tasks by their status.
StatusIn []TaskStatus // Filter tasks by a list of statuses.
CreatedAt int64 // Filter tasks created at or after this timestamp.
UpdatedAt int64 // Filter tasks updated at or after this timestamp.
Limit int // Maximum number of tasks to return.
IsReconcileReady bool // If true, filters tasks that are ready to be reconciled.
RetrievalModeQueue bool // If true, enables queue-like retrieval mode.
OrderByUpdatedAt bool // If true, orders jobs by updated_at in ascending order.
}
ListTasksQuery defines the parameters for querying tasks from the repository. It supports filtering by job ID, task status, creation timestamp, and result limit.
type Manager ¶
type Manager struct {
Config Config
// contains filtered or unexported fields
}
Manager is the interface for managing jobs, including their creation, state transitions, and lifecycle handling.
func NewManager ¶
func NewManager(repo *Repository, taskResolver TaskResolveFunc, optFuncs ...ManagerOptsFunc) (*Manager, error)
NewManager creates a new Manager instance.
func (*Manager) CancelJob ¶ added in v0.2.0
CancelJob cancels a job and associated running tasks. It updates the job status to "USER_CANCEL".
func (*Manager) PrepareJob ¶
PrepareJob prepares a job by creating it in the repository with status CREATED. It returns an error if a job with the same type and external ID in a non-terminal status already exists.
type ManagerOptsFunc ¶
type ManagerOptsFunc func(mgr *Manager)
ManagerOptsFunc is a function type to configure Manager options.
func WithJobCanceledEventFunc ¶ added in v0.2.0
func WithJobCanceledEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc
WithJobCanceledEventFunc registers a function to send job canceled events.
func WithJobConfirmFunc ¶
func WithJobConfirmFunc(f JobConfirmFunc) ManagerOptsFunc
WithJobConfirmFunc registers a function to confirm jobs.
func WithJobDoneEventFunc ¶ added in v0.2.0
func WithJobDoneEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc
WithJobDoneEventFunc registers a function to send job done events.
func WithJobFailedEventFunc ¶ added in v0.2.0
func WithJobFailedEventFunc(f JobTerminatedEventFunc) ManagerOptsFunc
WithJobFailedEventFunc registers a function to send job failed events.
func WithTargets ¶ added in v0.3.0
func WithTargets(targets map[string]TargetManager) ManagerOptsFunc
WithTargets set the map that maps the target string to its TargetManager.
type MetaData ¶ added in v0.3.0
MetaData represents a set of key-value pairs containing metadata information. It is commonly used for storing additional attributes related to requests, responses, or signatures.
type Operator ¶
type Operator struct {
// contains filtered or unexported fields
}
Operator handles task requests and responses.
func NewOperator ¶
func NewOperator(target TargetOperator, opts ...Option) (*Operator, error)
NewOperator creates a new Operator instance with the given Responder and options.
func (*Operator) ListenAndRespond ¶
ListenAndRespond starts listening for task requests and responding to them.
func (*Operator) RegisterHandler ¶
func (o *Operator) RegisterHandler(taskType string, h HandlerFunc) error
RegisterHandler registers a handler for a specific task type. It returns an error if the handler is nil.
type Option ¶
type Option func(*config) error
Option is a function that modifies the config parameter of the Operator.
func WithBufferSize ¶
WithBufferSize sets the buffer size for the requests channel. It returns an error if the size is negative.
func WithNumberOfWorkers ¶
WithNumberOfWorkers sets the number of workers for processing requests. It returns an error if the number is not positive.
type Repository ¶
type Repository struct {
Store Store
}
Repository provides methods to interact with the underlying data store. It encapsulates a Store implementation for data persistence operations.
func NewRepository ¶
func NewRepository(store Store) *Repository
NewRepository creates a new Repository instance using the provided Store. It returns a pointer to the initialized Repository.
type Responder ¶
type Responder interface {
ReceiveTaskRequest(ctx context.Context) (TaskRequest, error)
SendTaskResponse(ctx context.Context, response TaskResponse) error
Close(ctx context.Context) error
}
Responder defines the methods for receiving task requests and sending task responses.
type ResponderSignatureHandler ¶ added in v0.4.0
type ResponderSignatureHandler struct {
// contains filtered or unexported fields
}
ResponderSignatureHandler manages signing and verification operations for responders. It holds references to a signer for generating signatures and a verifier for validating them.
func NewResponderSignatureHandler ¶ added in v0.4.0
func NewResponderSignatureHandler(signer *jwtsigning.Signer, verifier *jwtsigning.Verifier) (*ResponderSignatureHandler, error)
NewResponderSignatureHandler creates a new ResponderSignatureHandler using the provided signer and verifier. Returns an error if both signer or verifier is nil.
func (*ResponderSignatureHandler) Sign ¶ added in v0.4.0
func (o *ResponderSignatureHandler) Sign(ctx context.Context, response TaskResponse) (Signature, error)
Sign generates a signature for the provided TaskResponse using the ResponderSignatureHandler's signer. Returns the generated Signature and any error encountered during signing.
func (*ResponderSignatureHandler) Verify ¶ added in v0.4.0
func (o *ResponderSignatureHandler) Verify(ctx context.Context, request TaskRequest) error
Verify checks the signature of the provided TaskRequest using the ResponderSignatureHandler's verifier. Returns an error if the signature is invalid.
type Signature ¶ added in v0.3.0
type Signature MetaData
Signature represents the metadata used for signing and verifying requests. It typically contains cryptographic information such as signatures or tokens.
type Store ¶
type Store interface {
Create(ctx context.Context, r ...Entity) ([]Entity, error)
Update(ctx context.Context, r ...Entity) ([]Entity, error)
Find(ctx context.Context, q query.Query) (FindResult, error)
List(ctx context.Context, q query.Query) (ListResult, error)
Transaction(ctx context.Context, txFunc TransactionFunc) error
}
Store defines the interface for a data store that supports CRUD operations and transactions.
type TargetManager ¶ added in v0.4.0
type TargetManager struct {
Client Initiator
Signer TaskRequestSigner
Verifier TaskResponseVerifier
MustCheckSignature bool
}
TargetManager holds the client and cryptographic implementation for initiating tasks. It provides access to the Initiator for communication, Signer and Verifier for signing and verification operations.
type TargetOperator ¶ added in v0.4.0
type TargetOperator struct {
Client Responder
Verifier TaskRequestVerifier
Signer TaskResponseSigner
MustCheckSignature bool
}
TargetOperator holds the client and cryptographic implementation for responding to tasks. It provides access to the Responder for communication, Signer and Verifier for signing and verification operations.
type Task ¶
type Task struct {
ID uuid.UUID
JobID uuid.UUID
Type string
Data []byte
WorkingState []byte
LastReconciledAt int64 // The last time the task was reconciled.
ReconcileCount uint64 // The number of times the task has been reconciled.
ReconcileAfterSec uint64 // The number of seconds after which the task should be reconciled.
TotalSentCount uint64
TotalReceivedCount uint64
ETag string
Status TaskStatus
Target string
ErrorMessage string
UpdatedAt int64
CreatedAt int64
}
Task is a trackable unit derived from the Job.
type TaskInfo ¶
type TaskInfo struct {
// Data contains the byte data that needs to be sent.
Data []byte
// Type specifies the type of the data.
Type string
// Targets lists the target identifiers associated with job.
Target string
}
TaskInfo represents the result of resolving a task.
type TaskRequest ¶
type TaskRequest struct {
TaskID uuid.UUID `json:"taskId"` // TaskID is used to identify the task.
Type string `json:"type"` // Type is the type of the task.
ExternalID string `json:"externalId"` // External ID serves as an identifier for a Job
Data []byte `json:"data"` // Data is the static context for the task.
WorkingState []byte `json:"workingState"` // WorkingState is the current state of the task that the operator works upon.
ETag string `json:"eTag"` // ETag is used to identify the version of the TaskRequest.
MetaData MetaData `json:"metaData"` // MetaData contains additional information about the TaskRequest.
TaskCreatedAt int64 `json:"taskCreatedAt"` // TaskCreatedAt is the timestamp when the task was created, represented as Unix time in seconds.
TaskLastReconciledAt int64 `json:"taskLastReconciledAt"` // TaskLastReconciledAt is the timestamp when the task was last reconciled, represented as Unix time in seconds.
}
TaskRequest is the request object that will be sent to the operator.
type TaskRequestSigner ¶ added in v0.3.0
type TaskRequestSigner interface {
// Sign signs the given TaskRequest and returns a Signature.
Sign(ctx context.Context, request TaskRequest) (Signature, error)
}
TaskRequestSigner defines an interface for signing TaskRequest objects. Implementations of this interface are responsible for generating a Signature for a given TaskRequest, typically for authentication or verification purposes.
type TaskRequestVerifier ¶ added in v0.3.0
type TaskRequestVerifier interface {
// Verify verifies the authenticity of the given TaskRequest.
Verify(ctx context.Context, request TaskRequest) error
}
TaskRequestVerifier defines an interface for verifying the authenticity of TaskRequest objects. Implementations of this interface are responsible for checking the validity and integrity of a given TaskRequest, typically using cryptographic methods.
type TaskResolveFunc ¶ added in v0.2.0
type TaskResolveFunc func(ctx context.Context, job Job, cursor TaskResolverCursor) (TaskResolverResult, error)
TaskResolveFunc resolves tasks for a job, potentially in multiple iterations.
Return one of:
- ContinueTaskResolver() to continue resolving (with optional tasks and cursor)
- CompleteTaskResolver() to finish resolving (with optional final tasks)
- CancelTaskResolver(reason) to abort the job
Errors returned from this function will be treated as recoverable meaning the resolver can retry later.
Example:
func myResolver(ctx context.Context, job Job, cursor TaskResolverCursor) (TaskResolverResult, error) {
tasks := []TaskInfo{...} // resolve some tasks
if moreTasksExist {
return orbital.ContinueTaskResolver().WithTaskInfo(tasks).WithCursor("next-page-token"), nil
}
return orbital.CompleteTaskResolver().WithTaskInfo(tasks), nil
}
type TaskResolverCursor ¶
type TaskResolverCursor string
TaskResolverCursor is an opaque token used to resume or paginate task resolution. The resolver function controls its format and meaning.
type TaskResolverResult ¶
type TaskResolverResult interface {
Type() TaskResolverResultType
}
TaskResolverResult represents the outcome of a task resolution attempt.
type TaskResolverResultType ¶ added in v0.5.0
type TaskResolverResultType int
TaskResolverResultType is the type for the task resolver result.
type TaskResponse ¶
type TaskResponse struct {
TaskID uuid.UUID `json:"taskId"` // TaskID is used to identify the task.
Type string `json:"type"` // Type is the type of the task.
ExternalID string `json:"externalId"` // External ID serves as an identifier for a Job
WorkingState []byte `json:"workingState"` // WorkingState is the state of the task that the operator updates.
ETag string `json:"eTag"` // ETag is used to correlate the TaskResponse with the TaskRequest.
Status string `json:"status"` // Status is the status of the task.
ErrorMessage string `json:"errorMessage,omitempty"` // ErrorMessage contains the error message if the task fails.
ReconcileAfterSec uint64 `json:"reconcileAfterSec"` // ReconcileAfterSec is the time in seconds after which the next TaskRequest should be queued again.
MetaData MetaData `json:"metaData"` // MetaData contains additional information about the TaskResponse.
}
TaskResponse is the response object received from the operator.
func ExecuteHandler ¶ added in v0.5.0
func ExecuteHandler(ctx context.Context, h HandlerFunc, req TaskRequest) TaskResponse
ExecuteHandler transforms the TaskRequest into a HandlerRequest, executes the handler, and transforms the HandlerResponse back into a TaskResponse.
type TaskResponseSigner ¶ added in v0.3.0
type TaskResponseSigner interface {
// Sign signs the given TaskResponse and returns a Signature.
Sign(ctx context.Context, response TaskResponse) (Signature, error)
}
TaskResponseSigner defines an interface for signing TaskResponse objects. Implementations of this interface are responsible for generating a Signature for a given TaskResponse, typically for authentication or verification purposes.
type TaskResponseVerifier ¶ added in v0.3.0
type TaskResponseVerifier interface {
// Verify verifies the authenticity of the given TaskResponse.
Verify(ctx context.Context, response TaskResponse) error
}
TaskResponseVerifier defines an interface for verifying the authenticity of TaskResponse objects. Implementations of this interface are responsible for checking the validity and integrity of a given TaskResponse, typically using cryptographic methods.
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the status of the Task.
const ( TaskStatusCreated TaskStatus = "CREATED" TaskStatusProcessing TaskStatus = "PROCESSING" TaskStatusDone TaskStatus = "DONE" TaskStatusFailed TaskStatus = "FAILED" )
Possible Task status.
type TransactionFunc ¶
type TransactionFunc func(context.Context, Repository) error
TransactionFunc defines a function type for executing operations within a transaction.
type WorkerConfig ¶
type WorkerConfig struct {
// NoOfWorkers specifies the number of concurrent workers.
NoOfWorkers int
// ExecInterval is the interval between executions.
ExecInterval time.Duration
// Timeout is the maximum duration allowed for a single execution.
Timeout time.Duration
}
WorkerConfig defines the configuration for a worker process.
type WorkingState ¶ added in v0.4.0
type WorkingState struct {
// contains filtered or unexported fields
}
WorkingState represents the working state of a task. It provides methods for storing arbitrary key-value pairs and convenience methods for tracking metrics.
func (*WorkingState) Add ¶ added in v0.4.0
func (w *WorkingState) Add(key string, amount float64) float64
Add adds the specified amount to a key and returns the new value.
func (*WorkingState) Dec ¶ added in v0.4.0
func (w *WorkingState) Dec(key string) float64
Dec decrements a key and returns the new value.
func (*WorkingState) Delete ¶ added in v0.4.0
func (w *WorkingState) Delete(key string)
Delete removes a key from the WorkingState.
func (*WorkingState) DiscardChanges ¶ added in v0.4.0
func (w *WorkingState) DiscardChanges()
DiscardChanges discards any changes made to the WorkingState.
func (*WorkingState) Inc ¶ added in v0.4.0
func (w *WorkingState) Inc(key string) float64
Inc increments the value of a key and returns the new value.
func (*WorkingState) Set ¶ added in v0.4.0
func (w *WorkingState) Set(key string, value any)
Set sets a key-value pair in the WorkingState.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
client
|
|
|
examples
|
|
|
embedded_client
command
|
|
|
manager
command
|
|
|
operator
command
|
|
|
internal
|
|
|
proto
|
|
|
store
|
|