eventprocessor

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2026 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EmbeddedTarget = "embedded"
)
View Source
const (
	// KeyVersionMismatchCode is returned when the key version in the event
	// doesn't match the current version task operator has cached. This triggers CMK to resync immediately.
	//nolint:godox
	// TODO: Replace with actual error code once provided
	KeyVersionMismatchCode = "KEY_VERSION_MISMATCH"
)

Placeholder error codes

Variables

View Source
var (
	ErrEventSendingFailed = errors.New("failed to send event")

	ErrNoPreviousEvent  = errors.New("no previous events found for selected item")
	ErrSystemProcessing = errors.New("system is still in processing state")

	ErrMissingKeyID = errors.New("keyID is required to create key event job")
)
View Source
var (
	ErrInvalidJobType            = errors.New("invalid job type")
	ErrUnsupportedJobType        = errors.New("unsupported job type")
	ErrTargetNotConfigured       = errors.New("target not configured for region")
	ErrKeyAccessMetadataNotFound = errors.New("key access metadata not found for system region")
	ErrPluginNotFound            = errors.New("plugin not found for key provider")
	ErrSettingKeyClaim           = errors.New("error setting key claim for system")
	ErrUnsupportedRegion         = errors.New("unsupported region")
	ErrNoConnectedRegionsForKey  = errors.New("no connected regions found for key")
	ErrNoTasksResolvedForJob     = errors.New("no tasks resolved for the job")
	ErrKeyRotateMismatchedKeyIDs = errors.New("system key rotate requires identical key IDs")
)

Functions

func GetOrbitalError added in v0.4.0

func GetOrbitalError(ctx context.Context, err error) string

GetOrbitalError returns the string format of an orbital error If the error is not mapped to an orbital error return it as a string

func IsVersionMismatchError added in v0.7.0

func IsVersionMismatchError(errorMessage string) bool

IsVersionMismatchError checks if the error message contains a version mismatch indicator from task operator. This can happen when: - Task operator detects a newer key version before CMK's scheduled detection runs - Multiple rapid rotations occur and task operator is ahead of CMK

Types

type CryptoReconciler

type CryptoReconciler struct {
	// contains filtered or unexported fields
}

CryptoReconciler is responsible for handling orbital jobs and managing the lifecycle of systems in CMK.

func NewCryptoReconciler

func NewCryptoReconciler(
	ctx context.Context,
	cfg *config.Config,
	repository repo.Repo,
	svcRegistry *cmkpluginregistry.Registry,
	clientsFactory clients.Factory,
	opts ...Option,
) (*CryptoReconciler, error)

NewCryptoReconciler creates a new CryptoReconciler instance.

func (*CryptoReconciler) CloseAmqpClients

func (c *CryptoReconciler) CloseAmqpClients(ctx context.Context)

func (*CryptoReconciler) GetHandlerByJobType added in v0.4.0

func (c *CryptoReconciler) GetHandlerByJobType(jobType string) (JobHandler, error)

func (*CryptoReconciler) Start

func (c *CryptoReconciler) Start(ctx context.Context) error

Start starts the orbital manager.

type Event

type Event struct {
	Name  string
	Event func(ctx context.Context) (orbital.Job, error)
}

type EventFactory added in v0.3.0

type EventFactory struct {
	// contains filtered or unexported fields
}

func NewEventFactory added in v0.3.0

func NewEventFactory(
	ctx context.Context,
	cfg *config.Config,
	repository repo.Repo,
) (*EventFactory, error)

func (*EventFactory) CreateJob added in v0.3.0

func (f *EventFactory) CreateJob(ctx context.Context, event *model.Event) (orbital.Job, error)

func (*EventFactory) GetLastEvent added in v0.3.0

func (f *EventFactory) GetLastEvent(
	ctx context.Context,
	cmkItemID string,
) (*model.Event, error)

GetLastEvent returns the last event of an item

func (*EventFactory) KeyDetach added in v0.3.0

func (f *EventFactory) KeyDetach(ctx context.Context, keyID string) (orbital.Job, error)

KeyDetach creates a job to detach a key. Context provided must have the tenant set.

func (*EventFactory) KeyDisable added in v0.3.0

func (f *EventFactory) KeyDisable(ctx context.Context, keyID string) (orbital.Job, error)

KeyDisable creates a job to disable a key make sure the ctx provided has the tenant set.

func (*EventFactory) KeyEnable added in v0.3.0

func (f *EventFactory) KeyEnable(ctx context.Context, keyID string) (orbital.Job, error)

KeyEnable creates a job to enable a key make sure the ctx provided has the tenant set.

func (*EventFactory) SendEvent added in v0.3.0

func (f *EventFactory) SendEvent(ctx context.Context, event Event) error

func (*EventFactory) SystemKeyRotate added in v0.7.0

func (f *EventFactory) SystemKeyRotate(
	ctx context.Context,
	system *model.System,
	keyID string,
) (orbital.Job, error)

SystemKeyRotate creates a job to rotate the key material for a system. This triggers re-encryption on the system with the new key version/material. The keyID remains the same, but the key material has changed.

Unlike LINK/UNLINK/SWITCH, this does NOT set the system to PROCESSING — rotation is an external event and the system remains CONNECTED while in flight. A model.Event row is still written (PreviousItemStatus) so that retry and unlink are possible if the job fails.

func (f *EventFactory) SystemLink(ctx context.Context, system *model.System, keyID string) (orbital.Job, error)

SystemLink creates a job to link a system with a key make sure the ctx provided has the tenant set.

func (*EventFactory) SystemSwitch added in v0.3.0

func (f *EventFactory) SystemSwitch(
	ctx context.Context,
	system *model.System,
	keyIDTo string,
	keyIDFrom string,
) (orbital.Job, error)

SystemSwitch creates a job to switch the key of a system from keyIDFrom to keyIDTo make sure the ctx provided has the tenant set. trigger can be KeyActionSetPrimary to indicate this switch is from a make primary key action

func (*EventFactory) SystemSwitchNewPrimaryKey added in v0.4.0

func (f *EventFactory) SystemSwitchNewPrimaryKey(
	ctx context.Context,
	system *model.System,
	keyIDTo string,
	keyIDFrom string,
) (orbital.Job, error)

SystemSwitchNewPrimaryKey creates a job to switch the key of a system from keyIDFrom to keyIDTo make sure the ctx provided has the tenant set, triggered by a new primary key being set.

func (f *EventFactory) SystemUnlink(
	ctx context.Context,
	system *model.System,
	keyID string,
) (orbital.Job, error)

SystemUnlink creates a job to unlink a system from a key make sure the ctx provided has the tenant set.

func (*EventFactory) SystemUnlinkDecommission added in v0.7.0

func (f *EventFactory) SystemUnlinkDecommission(
	ctx context.Context,
	system *model.System,
	keyID string,
) (orbital.Job, error)

SystemUnlinkDecommission creates a job to unlink a system, triggered by a decommission action. No need to create internal tracking event model since manual retry or cancel will not be possible at this stage. If event fails, the system will still be set to DISCONNECTED. If event is canceled, the system will be set to FAILED and the decommission action will be retried later.

type JobHandler added in v0.4.0

type JobHandler interface {
	ResolveTasks(ctx context.Context, job orbital.Job) ([]orbital.TaskInfo, error)
	HandleJobConfirm(ctx context.Context, job orbital.Job) (orbital.JobConfirmerResult, error)
	HandleJobDoneEvent(ctx context.Context, job orbital.Job) error
	HandleJobFailedEvent(ctx context.Context, job orbital.Job) error
	HandleJobCanceledEvent(ctx context.Context, job orbital.Job) error
}

type JobType added in v0.4.0

type JobType string
const (
	JobTypeSystemLink               JobType = "SYSTEM_LINK"
	JobTypeSystemUnlink             JobType = "SYSTEM_UNLINK"
	JobTypeSystemUnlinkDecommission JobType = "SYSTEM_UNLINK_DECOMMISSION"
	JobTypeSystemSwitch             JobType = "SYSTEM_SWITCH"
	JobTypeSystemSwitchNewPK        JobType = "SYSTEM_SWITCH_NEW_PK"
	JobTypeSystemKeyRotate          JobType = "SYSTEM_KEY_ROTATE"
	JobTypeKeyEnable                JobType = "KEY_ENABLE"
	JobTypeKeyDisable               JobType = "KEY_DISABLE"
	JobTypeKeyDetach                JobType = "KEY_DETACH"
	JobTypeKeyDelete                JobType = "KEY_DELETE"
)

func (JobType) String added in v0.4.0

func (t JobType) String() string

type KeyActionJobData

type KeyActionJobData struct {
	TenantID string `json:"tenantID"`
	KeyID    string `json:"keyID"`
}

KeyActionJobData contains the data needed for a key action orbital job.

type KeyConfigActionJobData added in v0.4.0

type KeyConfigActionJobData struct {
	TenantID    string `json:"tenantID"`
	KeyConfigID string `json:"keyConfigID"`
}

KeyConfigActionJobData contains the data needed for a key configuration action orbital job.

type KeyDetachJobHandler added in v0.7.0

type KeyDetachJobHandler struct {
	// contains filtered or unexported fields
}

func NewKeyDetachJobHandler added in v0.7.0

func NewKeyDetachJobHandler(
	repo repo.Repo,
	cmkAuditor *auditor.Auditor,
	orbitalManager *orbital.Manager,
	taskResolver *KeyTaskInfoResolver,
) *KeyDetachJobHandler

func (*KeyDetachJobHandler) HandleJobCanceledEvent added in v0.7.0

func (h *KeyDetachJobHandler) HandleJobCanceledEvent(
	ctx context.Context,
	job orbital.Job,
) error

func (*KeyDetachJobHandler) HandleJobConfirm added in v0.7.0

func (h *KeyDetachJobHandler) HandleJobConfirm(
	ctx context.Context,
	job orbital.Job,
) (orbital.JobConfirmerResult, error)

func (*KeyDetachJobHandler) HandleJobDoneEvent added in v0.7.0

func (h *KeyDetachJobHandler) HandleJobDoneEvent(
	ctx context.Context,
	job orbital.Job,
) error

func (*KeyDetachJobHandler) HandleJobFailedEvent added in v0.7.0

func (h *KeyDetachJobHandler) HandleJobFailedEvent(
	ctx context.Context,
	job orbital.Job,
) error

func (*KeyDetachJobHandler) ResolveTasks added in v0.7.0

func (h *KeyDetachJobHandler) ResolveTasks(
	ctx context.Context,
	job orbital.Job,
) ([]orbital.TaskInfo, error)

type KeyJobHandler added in v0.4.0

type KeyJobHandler struct {
	// contains filtered or unexported fields
}

func NewKeyJobHandler added in v0.4.0

func NewKeyJobHandler(taskResolver *KeyTaskInfoResolver) *KeyJobHandler

func (*KeyJobHandler) HandleJobCanceledEvent added in v0.4.0

func (h *KeyJobHandler) HandleJobCanceledEvent(
	_ context.Context,
	_ orbital.Job,
) error

func (*KeyJobHandler) HandleJobConfirm added in v0.4.0

func (h *KeyJobHandler) HandleJobConfirm(
	_ context.Context,
	_ orbital.Job,
) (orbital.JobConfirmerResult, error)

func (*KeyJobHandler) HandleJobDoneEvent added in v0.4.0

func (h *KeyJobHandler) HandleJobDoneEvent(
	_ context.Context,
	_ orbital.Job,
) error

func (*KeyJobHandler) HandleJobFailedEvent added in v0.4.0

func (h *KeyJobHandler) HandleJobFailedEvent(
	_ context.Context,
	_ orbital.Job,
) error

func (*KeyJobHandler) ResolveTasks added in v0.4.0

func (h *KeyJobHandler) ResolveTasks(
	ctx context.Context,
	job orbital.Job,
) ([]orbital.TaskInfo, error)

type KeyTaskInfoResolver added in v0.4.0

type KeyTaskInfoResolver struct {
	// contains filtered or unexported fields
}

KeyTaskInfoResolver is responsible for resolving the necessary information to create a TaskInfo for key-related tasks such as enabling, disabling, detaching.

func (*KeyTaskInfoResolver) Resolve added in v0.4.0

func (r *KeyTaskInfoResolver) Resolve(
	ctx context.Context,
	job orbital.Job,
) ([]orbital.TaskInfo, error)

type Option

type Option func(manager *orbital.Manager)

func WithConfirmJobAfter

func WithConfirmJobAfter(d time.Duration) Option

func WithExecInterval

func WithExecInterval(d time.Duration) Option

func WithMaxPendingReconciles added in v0.4.0

func WithMaxPendingReconciles(n uint64) Option

type OrbitalError added in v0.4.0

type OrbitalError struct {
	Message string
	Code    string
}

func ParseOrbitalError added in v0.4.0

func ParseOrbitalError(errorMessage string) OrbitalError

ParseOrbitalError returns a parsed error in orbital format If there is no code or no entries to parse, it sets the code to the default value The parsing is done with an expression expect a ":" right after the error code in SCREAMING_SNAKE_CASE

func (*OrbitalError) DefaultError added in v0.4.0

func (e *OrbitalError) DefaultError() *OrbitalError

func (*OrbitalError) IsDefaultError added in v0.4.0

func (e *OrbitalError) IsDefaultError() bool

func (*OrbitalError) SetContext added in v0.4.0

func (e *OrbitalError) SetContext(context *map[string]any)

func (*OrbitalError) String added in v0.4.0

func (e *OrbitalError) String() string

type SystemActionJobData

type SystemActionJobData struct {
	SystemID  string `json:"systemID"`
	TenantID  string `json:"tenantID"`
	KeyIDTo   string `json:"keyIDTo"`
	KeyIDFrom string `json:"keyIDFrom"`
	Trigger   string `json:"trigger,omitempty"`
}

SystemActionJobData contains the data needed for a system action orbital job.

func GetSystemJobData added in v0.4.0

func GetSystemJobData(e *model.Event) (SystemActionJobData, error)

type SystemKeyRotateJobHandler added in v0.7.0

type SystemKeyRotateJobHandler struct {
	// contains filtered or unexported fields
}

SystemKeyRotateJobHandler handles SYSTEM_KEY_ROTATE events. This event notifies about key material rotation.

func NewSystemKeyRotateJobHandler added in v0.7.0

func NewSystemKeyRotateJobHandler(
	repo repo.Repo,
	cmkAuditor *auditor.Auditor,
	orbitalManager *orbital.Manager,
	taskResolver *SystemTaskInfoResolver,
) *SystemKeyRotateJobHandler

func (*SystemKeyRotateJobHandler) HandleJobCanceledEvent added in v0.7.0

func (h *SystemKeyRotateJobHandler) HandleJobCanceledEvent(ctx context.Context, job orbital.Job) error

func (*SystemKeyRotateJobHandler) HandleJobConfirm added in v0.7.0

func (*SystemKeyRotateJobHandler) HandleJobDoneEvent added in v0.7.0

func (h *SystemKeyRotateJobHandler) HandleJobDoneEvent(ctx context.Context, job orbital.Job) error

func (*SystemKeyRotateJobHandler) HandleJobFailedEvent added in v0.7.0

func (h *SystemKeyRotateJobHandler) HandleJobFailedEvent(ctx context.Context, job orbital.Job) error

func (*SystemKeyRotateJobHandler) ResolveTasks added in v0.7.0

func (h *SystemKeyRotateJobHandler) ResolveTasks(
	ctx context.Context,
	job orbital.Job,
) ([]orbital.TaskInfo, error)

type SystemLinkJobHandler added in v0.4.0

type SystemLinkJobHandler struct {
	// contains filtered or unexported fields
}

func NewSystemLinkJobHandler added in v0.4.0

func NewSystemLinkJobHandler(
	repo repo.Repo,
	registry registry.Service,
	cmkAuditor *auditor.Auditor,
	orbitalManager *orbital.Manager,
	taskResolver *SystemTaskInfoResolver,
) *SystemLinkJobHandler

func (*SystemLinkJobHandler) HandleJobCanceledEvent added in v0.4.0

func (h *SystemLinkJobHandler) HandleJobCanceledEvent(ctx context.Context, job orbital.Job) error

func (*SystemLinkJobHandler) HandleJobConfirm added in v0.4.0

func (h *SystemLinkJobHandler) HandleJobConfirm(
	ctx context.Context,
	job orbital.Job,
) (orbital.JobConfirmerResult, error)

func (*SystemLinkJobHandler) HandleJobDoneEvent added in v0.4.0

func (h *SystemLinkJobHandler) HandleJobDoneEvent(ctx context.Context, job orbital.Job) error

func (*SystemLinkJobHandler) HandleJobFailedEvent added in v0.4.0

func (h *SystemLinkJobHandler) HandleJobFailedEvent(ctx context.Context, job orbital.Job) error

func (*SystemLinkJobHandler) ResolveTasks added in v0.4.0

func (h *SystemLinkJobHandler) ResolveTasks(
	ctx context.Context,
	job orbital.Job,
) ([]orbital.TaskInfo, error)

type SystemSwitchJobHandler added in v0.4.0

type SystemSwitchJobHandler struct {
	// contains filtered or unexported fields
}

func NewSystemSwitchJobHandler added in v0.4.0

func NewSystemSwitchJobHandler(
	repo repo.Repo,
	registry registry.Service,
	cmkAuditor *auditor.Auditor,
	orbitalManager *orbital.Manager,
	taskResolver *SystemTaskInfoResolver,
) *SystemSwitchJobHandler

func (*SystemSwitchJobHandler) HandleJobCanceledEvent added in v0.4.0

func (h *SystemSwitchJobHandler) HandleJobCanceledEvent(ctx context.Context, job orbital.Job) error

func (*SystemSwitchJobHandler) HandleJobConfirm added in v0.4.0

func (h *SystemSwitchJobHandler) HandleJobConfirm(
	ctx context.Context,
	job orbital.Job,
) (orbital.JobConfirmerResult, error)

func (*SystemSwitchJobHandler) HandleJobDoneEvent added in v0.4.0

func (h *SystemSwitchJobHandler) HandleJobDoneEvent(ctx context.Context, job orbital.Job) error

func (*SystemSwitchJobHandler) HandleJobFailedEvent added in v0.4.0

func (h *SystemSwitchJobHandler) HandleJobFailedEvent(ctx context.Context, job orbital.Job) error

func (*SystemSwitchJobHandler) ResolveTasks added in v0.4.0

func (h *SystemSwitchJobHandler) ResolveTasks(
	ctx context.Context,
	job orbital.Job,
) ([]orbital.TaskInfo, error)

type SystemTaskInfoResolver added in v0.4.0

type SystemTaskInfoResolver struct {
	// contains filtered or unexported fields
}

SystemTaskInfoResolver is responsible for resolving the necessary information to create a TaskInfo for system-related tasks such as linking and unlinking systems.

func (*SystemTaskInfoResolver) Resolve added in v0.4.0

func (r *SystemTaskInfoResolver) Resolve(
	ctx context.Context,
	job orbital.Job,
) ([]orbital.TaskInfo, error)

type SystemUnlinkDecommissionJobHandler added in v0.7.0

type SystemUnlinkDecommissionJobHandler struct {
	// contains filtered or unexported fields
}

func NewSystemUnlinkDecommissionJobHandler added in v0.7.0

func NewSystemUnlinkDecommissionJobHandler(
	repo repo.Repo,
	registry registry.Service,
	cmkAuditor *auditor.Auditor,
	orbitalManager *orbital.Manager,
	taskResolver *SystemTaskInfoResolver,
) *SystemUnlinkDecommissionJobHandler

func (*SystemUnlinkDecommissionJobHandler) HandleJobCanceledEvent added in v0.7.0

func (h *SystemUnlinkDecommissionJobHandler) HandleJobCanceledEvent(ctx context.Context, job orbital.Job) error

HandleJobCanceledEvent puts the system to failed state so the system can be processed again during the next decommission reconciliation loop.

func (*SystemUnlinkDecommissionJobHandler) HandleJobConfirm added in v0.7.0

func (*SystemUnlinkDecommissionJobHandler) HandleJobDoneEvent added in v0.7.0

func (h *SystemUnlinkDecommissionJobHandler) HandleJobDoneEvent(ctx context.Context, job orbital.Job) error

func (*SystemUnlinkDecommissionJobHandler) HandleJobFailedEvent added in v0.7.0

func (h *SystemUnlinkDecommissionJobHandler) HandleJobFailedEvent(ctx context.Context, job orbital.Job) error

func (*SystemUnlinkDecommissionJobHandler) ResolveTasks added in v0.7.0

type SystemUnlinkJobHandler added in v0.4.0

type SystemUnlinkJobHandler struct {
	// contains filtered or unexported fields
}

func NewSystemUnlinkJobHandler added in v0.4.0

func NewSystemUnlinkJobHandler(
	repo repo.Repo,
	registry registry.Service,
	cmkAuditor *auditor.Auditor,
	orbitalManager *orbital.Manager,
	taskResolver *SystemTaskInfoResolver,
) *SystemUnlinkJobHandler

func (*SystemUnlinkJobHandler) HandleJobCanceledEvent added in v0.4.0

func (h *SystemUnlinkJobHandler) HandleJobCanceledEvent(ctx context.Context, job orbital.Job) error

func (*SystemUnlinkJobHandler) HandleJobConfirm added in v0.4.0

func (h *SystemUnlinkJobHandler) HandleJobConfirm(
	ctx context.Context,
	job orbital.Job,
) (orbital.JobConfirmerResult, error)

func (*SystemUnlinkJobHandler) HandleJobDoneEvent added in v0.4.0

func (h *SystemUnlinkJobHandler) HandleJobDoneEvent(ctx context.Context, job orbital.Job) error

func (*SystemUnlinkJobHandler) HandleJobFailedEvent added in v0.4.0

func (h *SystemUnlinkJobHandler) HandleJobFailedEvent(ctx context.Context, job orbital.Job) error

func (*SystemUnlinkJobHandler) ResolveTasks added in v0.4.0

func (h *SystemUnlinkJobHandler) ResolveTasks(
	ctx context.Context,
	job orbital.Job,
) ([]orbital.TaskInfo, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL