objectstore

package
v0.8.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ClaimForDeployment

func ClaimForDeployment(
	ctx context.Context,
	svc Service,
	keys statestore.KeyBuilder,
	st *statestore.State,
	instanceID string,
	expectedVersion int64,
	newStatus core.InstanceStatus,
) (int64, error)

ClaimForDeployment performs an atomic compare-and-swap on a single instance object in the Service. On success the instance's version is bumped, its status is set to newStatus, and the in-memory State is refreshed.

The CAS bypasses statestore.Storage deliberately — statestore.Storage has no ETag awareness, and the conditional write is what makes this safe for concurrent executions such as CI/CD runs sharing a bucket storage backend.

Returns state.InstanceNotFoundError when the instance object does not exist, and state.ErrVersionConflict when the persisted version does not match expectedVersion (either caught cheaply before the write, or via a 412 on IfMatch).

func InitialiseAndClaim

func InitialiseAndClaim(
	ctx context.Context,
	svc Service,
	keys statestore.KeyBuilder,
	st *statestore.State,
	instanceState state.InstanceState,
	newStatus core.InstanceStatus,
) (int64, error)

InitialiseAndClaim performs an atomic create-if-absent of a new instance object at version 1 with the given status. The conditional Put with IfNoneMatch: "*" is what serialises concurrent first-deploys of the same instance ID sharing a bucket.

When the instance has a name the companion name-lookup record is written alongside it so LookupInstanceIDByName can resolve under ModeLazy without enumerating the instances prefix. A concurrent different-ID-same-name collision on the name record is also mapped to state.ErrInstanceAlreadyExists; the caller can distinguish by a subsequent read if needed.

Returns state.ErrInstanceAlreadyExists when the instance object already exists (412 on IfNoneMatch).

func NewAuthFailed

func NewAuthFailed(message string) error

NewAuthFailed returns an Error with ReasonCodeAuthFailed.

func NewObjectNotFound

func NewObjectNotFound(message string) error

NewObjectNotFound returns an Error with ReasonCodeObjectNotFound. Exported so provider-specific Service implementations can construct the canonical error shape from their SDK-native not-found signals.

func NewPreconditionFailed

func NewPreconditionFailed(message string) error

NewPreconditionFailed returns an Error with ReasonCodePreconditionFailed. Used by provider Services when IfMatch / IfNoneMatch conditional writes fail.

func NewRateLimited

func NewRateLimited(message string) error

NewRateLimited returns an Error with ReasonCodeRateLimited.

Types

type Error

type Error struct {
	ReasonCode ErrorReasonCode
	Err        error
}

Error is a custom error type that provides errors specific to the object store implementation of the object store state container.

func (*Error) Error

func (e *Error) Error() string

type ErrorReasonCode

type ErrorReasonCode string

ErrorReasonCode is an enum of possible error reasons that can be returned by the object store implementation.

const (
	// ErrorReasonCodeObjectNotFound is the error code that is used when
	// an object is not found in the object store.
	ErrorReasonCodeObjectNotFound ErrorReasonCode = "object_not_found"

	// ErrorReasonCodePreconditionFailed is the error code that is used when
	// a precondition for an operation is not met, such as an ETag mismatch.
	ErrorReasonCodePreconditionFailed ErrorReasonCode = "precondition_failed"

	// ErrorReasonCodeAuthFailed is the error code that is used when
	// authentication with the object store fails.
	ErrorReasonCodeAuthFailed ErrorReasonCode = "authentication_failed"

	// ErrorReasonCodeRateLimited is the error code that is used when
	// the object store rate limits the client.
	ErrorReasonCodeRateLimited ErrorReasonCode = "rate_limited"
)

type ObjectInfo

type ObjectInfo struct {
	// The key (or name) of the object in the storage service.
	Key string
	// The size of the object in bytes.
	Size int64
	// The ETag of the object, which is typically a hash of the object's content.
	ETag string
}

ObjectInfo contains metadata about an object stored in the object storage service.

type Option

type Option func(*StateContainer)

Option is a type for options that can be passed to LoadStateContainer.

func WithClock

func WithClock(clock commoncore.Clock) Option

WithClock sets the clock used by the events container.

func WithMaxEventPartitionSize

func WithMaxEventPartitionSize(maxEventPartitionSize int64) Option

WithMaxEventPartitionSize sets the maximum size of an event partition file in bytes. See statestore.WithMaxEventPartitionSize.

func WithMaxGuideFileSize

func WithMaxGuideFileSize(maxGuideFileSize int64) Option

WithMaxGuideFileSize sets a guide for the maximum size of a state chunk file in bytes. See statestore.WithMaxGuideFileSize.

func WithRecentlyQueuedEventsThreshold

func WithRecentlyQueuedEventsThreshold(thresholdSeconds int64) Option

WithRecentlyQueuedEventsThreshold sets the threshold in seconds for retrieving recently queued events for a stream when a starting event ID is not provided.

type PutOptions

type PutOptions struct {
	// IfNoneMatch is used to specify that the Put operation should only succeed if the object does not exist (ETag "*")
	// or if the ETag does not match the provided value.
	IfNoneMatch string
	// IfMatch is used to specify that the Put operation should only succeed
	// if the ETag matches the provided value.
	IfMatch string
}

PutOptions defines options for the Put operation in the object storage service.

type Service

type Service interface {
	// Get retrieves the object data and its ETag for the given key.
	Get(ctx context.Context, key string) (data []byte, etag string, err error)
	// Put stores the object data with the given key and options, returning the new ETag.
	Put(ctx context.Context, key string, data []byte, opts *PutOptions) (etag string, err error)
	// Delete removes the object associated with the given key.
	Delete(ctx context.Context, key string) error
	// Head retrieves the size and ETag of the object for the given key without fetching the data.
	Head(ctx context.Context, key string) (*ObjectInfo, error)
	// List returns a list of ObjectInfo for objects that match the given prefix.
	List(ctx context.Context, prefix string) ([]*ObjectInfo, error)
}

Service is an interface for interacting with object storage services.

type ServiceLoader

type ServiceLoader struct {
	// contains filtered or unexported fields
}

ServiceLoader resolves one entity per call from the objectstore Service, materialising it into statestore.State only on cache miss. This is the seam that lets objectstore run under statestore.ModeLazy — processes only touch the working subset of state rather than bulk-loading the entire bucket on startup.

func NewServiceLoader

func NewServiceLoader(svc Service, keys statestore.KeyBuilder) *ServiceLoader

func (*ServiceLoader) LoadChangeset

func (l *ServiceLoader) LoadChangeset(
	ctx context.Context,
	id string,
) (*manage.Changeset, bool, error)

func (*ServiceLoader) LoadCleanupOperation

func (l *ServiceLoader) LoadCleanupOperation(
	ctx context.Context,
	id string,
) (*manage.CleanupOperation, bool, error)

func (*ServiceLoader) LoadEvent

func (l *ServiceLoader) LoadEvent(_ context.Context, _ string) (*manage.Event, bool, error)

LoadEvent returns (nil, false, nil). Events are partition-keyed rather than per-ID; resolving a single event by ID would require reading the relevant partition. Event lookup is not a hot path under ModeLazy; if it becomes one, add an event-ID index.

func (*ServiceLoader) LoadInstance

func (l *ServiceLoader) LoadInstance(
	ctx context.Context,
	id string,
) (*state.InstanceState, bool, error)

func (*ServiceLoader) LoadInstanceIDByName

func (l *ServiceLoader) LoadInstanceIDByName(
	ctx context.Context,
	name string,
) (string, bool, error)
func (l *ServiceLoader) LoadLink(_ context.Context, _ string) (*state.LinkState, bool, error)

func (*ServiceLoader) LoadLinkDrift

func (l *ServiceLoader) LoadLinkDrift(
	ctx context.Context,
	id string,
) (*state.LinkDriftState, bool, error)

func (*ServiceLoader) LoadReconciliation

func (l *ServiceLoader) LoadReconciliation(
	ctx context.Context,
	id string,
) (*manage.ReconciliationResult, bool, error)

func (*ServiceLoader) LoadResource

func (l *ServiceLoader) LoadResource(
	_ context.Context,
	_ string,
) (*state.ResourceState, bool, error)

LoadResource and LoadLink intentionally return (nil, false, nil). Resources and links are persisted inline with the parent instance record, so their in-memory caches are warmed transitively via LoadInstance. A direct resource-or-link-by-ID lookup without a prior instance load won't resolve — callers must load the parent instance first.

func (*ServiceLoader) LoadResourceDrift

func (l *ServiceLoader) LoadResourceDrift(
	ctx context.Context,
	id string,
) (*state.ResourceDriftState, bool, error)

func (*ServiceLoader) LoadValidation

func (l *ServiceLoader) LoadValidation(
	ctx context.Context,
	id string,
) (*manage.BlueprintValidation, bool, error)

type ServiceStorage

type ServiceStorage struct {
	// contains filtered or unexported fields
}

ServiceStorage adapts an objectstore.Service to statestore.Storage so the shared persistence engine can back its writes with an object store. ETag semantics are intentionally hidden at this layer — the atomic claim path talks to the Service directly for IfMatch / IfNoneMatch.

func NewServiceStorage

func NewServiceStorage(svc Service) *ServiceStorage

NewServiceStorage returns a statestore.Storage adapter over svc.

func (*ServiceStorage) Delete

func (s *ServiceStorage) Delete(ctx context.Context, key string) error

func (*ServiceStorage) Exists

func (s *ServiceStorage) Exists(ctx context.Context, key string) (bool, error)

func (*ServiceStorage) List

func (s *ServiceStorage) List(ctx context.Context, prefix string) ([]string, error)

func (*ServiceStorage) Read

func (s *ServiceStorage) Read(ctx context.Context, key string) ([]byte, error)

func (*ServiceStorage) Write

func (s *ServiceStorage) Write(ctx context.Context, key string, data []byte) error

type StateContainer

type StateContainer struct {
	// contains filtered or unexported fields
}

StateContainer is the object storage backed implementation of the blueprint state.Container interface. It composes the shared statestore engine with a Service-backed Storage adapter; ETag-aware concurrency primitives (ClaimForDeployment, InitialiseAndClaim) go direct to the Service so IfMatch / IfNoneMatch conditional writes serialise concurrent deploys sharing a bucket.

func LoadStateContainer

func LoadStateContainer(
	_ context.Context,
	svc Service,
	prefix string,
	logger core.Logger,
	opts ...Option,
) (*StateContainer, error)

LoadStateContainer constructs an object-storage-backed state container. svc provides the low-level object operations; prefix is prepended to every storage key (e.g. "bluelink-state/"). State runs under ModeLazy — entries materialise on demand from the Service so each process only touches the working subset of the shared bucket rather than eagerly walking it.

func (*StateContainer) Changesets

func (c *StateContainer) Changesets() manage.Changesets

func (*StateContainer) Children

func (c *StateContainer) Children() state.ChildrenContainer

func (*StateContainer) CleanupOperations

func (c *StateContainer) CleanupOperations() manage.CleanupOperations

func (*StateContainer) Events

func (c *StateContainer) Events() manage.Events

func (*StateContainer) Exports

func (c *StateContainer) Exports() state.ExportsContainer

func (*StateContainer) Instances

func (c *StateContainer) Instances() state.InstancesContainer
func (c *StateContainer) Links() state.LinksContainer

func (*StateContainer) Metadata

func (c *StateContainer) Metadata() state.MetadataContainer

func (*StateContainer) ReconciliationResults

func (c *StateContainer) ReconciliationResults() manage.ReconciliationResults

func (*StateContainer) Resources

func (c *StateContainer) Resources() state.ResourcesContainer

func (*StateContainer) Validation

func (c *StateContainer) Validation() manage.Validation

Directories

Path Synopsis
internal
stores
gcs
s3

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL