Documentation
¶
Index ¶
- Constants
- Variables
- func ErrInstanceNameTaken(name string) error
- func Load(ctx context.Context, state *State, storage Storage, cfg Config, prefix string) error
- type CategoryConfig
- type ChangesetsContainer
- type ChildrenContainer
- func (c *ChildrenContainer) Attach(ctx context.Context, parentInstanceID string, childInstanceID string, ...) error
- func (c *ChildrenContainer) Detach(ctx context.Context, instanceID string, childName string) error
- func (c *ChildrenContainer) Get(ctx context.Context, instanceID string, childName string) (state.InstanceState, error)
- func (c *ChildrenContainer) SaveDependencies(ctx context.Context, instanceID string, childName string, ...) error
- type ClaimFunc
- type CleanupOperationsContainer
- func (c *CleanupOperationsContainer) Get(ctx context.Context, id string) (*manage.CleanupOperation, error)
- func (c *CleanupOperationsContainer) GetLatestByType(ctx context.Context, cleanupType manage.CleanupType) (*manage.CleanupOperation, error)
- func (c *CleanupOperationsContainer) Save(ctx context.Context, operation *manage.CleanupOperation) error
- func (c *CleanupOperationsContainer) Update(ctx context.Context, operation *manage.CleanupOperation) error
- type Config
- type EntityLoader
- type Error
- type ErrorReasonCode
- type EventIndexLocation
- type EventsContainer
- func (c *EventsContainer) Cleanup(ctx context.Context, thresholdDate time.Time) (int64, error)
- func (c *EventsContainer) Get(ctx context.Context, id string) (manage.Event, error)
- func (c *EventsContainer) GetLastEventID(ctx context.Context, channelType, channelID string) (string, error)
- func (c *EventsContainer) Save(ctx context.Context, event *manage.Event) error
- func (c *EventsContainer) Stream(ctx context.Context, params *manage.EventStreamParams, ...) (chan struct{}, error)
- type EventsContainerOption
- type ExportsContainer
- func (c *ExportsContainer) Get(ctx context.Context, instanceID string, exportName string) (state.ExportState, error)
- func (c *ExportsContainer) GetAll(ctx context.Context, instanceID string) (map[string]*state.ExportState, error)
- func (c *ExportsContainer) Remove(ctx context.Context, instanceID string, exportName string) (state.ExportState, error)
- func (c *ExportsContainer) RemoveAll(ctx context.Context, instanceID string) (map[string]*state.ExportState, error)
- func (c *ExportsContainer) Save(ctx context.Context, instanceID string, exportName string, ...) error
- func (c *ExportsContainer) SaveAll(ctx context.Context, instanceID string, exports map[string]*state.ExportState) error
- type IndexLocation
- type InitialiseAndClaimFunc
- type InstanceNameRecord
- type InstancesContainer
- func (c *InstancesContainer) ClaimForDeployment(ctx context.Context, instanceID string, expectedVersion int64, ...) (int64, error)
- func (c *InstancesContainer) Get(ctx context.Context, instanceID string) (state.InstanceState, error)
- func (c *InstancesContainer) GetBatch(ctx context.Context, instanceIDsOrNames []string) ([]state.InstanceState, error)
- func (c *InstancesContainer) InitialiseAndClaim(ctx context.Context, instanceState state.InstanceState, ...) (int64, error)
- func (c *InstancesContainer) List(ctx context.Context, params state.ListInstancesParams) (state.ListInstancesResult, error)
- func (c *InstancesContainer) LookupIDByName(ctx context.Context, instanceName string) (string, error)
- func (c *InstancesContainer) Remove(ctx context.Context, instanceID string) (state.InstanceState, error)
- func (c *InstancesContainer) Save(ctx context.Context, instanceState state.InstanceState) error
- func (c *InstancesContainer) SaveBatch(ctx context.Context, instances []state.InstanceState) error
- func (c *InstancesContainer) UpdateStatus(ctx context.Context, instanceID string, statusInfo state.InstanceStatusInfo) error
- type KeyBuilder
- func (k KeyBuilder) Changeset(changesetID string) string
- func (k KeyBuilder) ChangesetChunk(chunkNumber int) string
- func (k KeyBuilder) ChangesetIndex() string
- func (k KeyBuilder) CleanupOperation(operationID string) string
- func (k KeyBuilder) EventIndex() string
- func (k KeyBuilder) EventPartition(partitionName string) string
- func (k KeyBuilder) Instance(instanceID string) string
- func (k KeyBuilder) InstanceByName(name string) string
- func (k KeyBuilder) InstanceChunk(chunkNumber int) string
- func (k KeyBuilder) InstanceIndex() string
- func (k KeyBuilder) LinkDrift(linkID string) string
- func (k KeyBuilder) LinkDriftChunk(chunkNumber int) string
- func (k KeyBuilder) LinkDriftIndex() string
- func (k KeyBuilder) ReconciliationResult(resultID string) string
- func (k KeyBuilder) ReconciliationResultChunk(chunkNumber int) string
- func (k KeyBuilder) ReconciliationResultIndex() string
- func (k KeyBuilder) ResourceDrift(resourceID string) string
- func (k KeyBuilder) ResourceDriftChunk(chunkNumber int) string
- func (k KeyBuilder) ResourceDriftIndex() string
- func (k KeyBuilder) Validation(validationID string) string
- func (k KeyBuilder) ValidationChunk(chunkNumber int) string
- func (k KeyBuilder) ValidationIndex() string
- type Layout
- type LinksContainer
- func (c *LinksContainer) Get(ctx context.Context, linkID string) (state.LinkState, error)
- func (c *LinksContainer) GetByName(ctx context.Context, instanceID string, linkName string) (state.LinkState, error)
- func (c *LinksContainer) GetDrift(ctx context.Context, linkID string) (state.LinkDriftState, error)
- func (c *LinksContainer) ListWithResourceDataMappings(ctx context.Context, instanceID string, resourceName string) ([]state.LinkState, error)
- func (c *LinksContainer) Remove(ctx context.Context, linkID string) (state.LinkState, error)
- func (c *LinksContainer) RemoveDrift(ctx context.Context, linkID string) (state.LinkDriftState, error)
- func (c *LinksContainer) Save(ctx context.Context, linkState state.LinkState) error
- func (c *LinksContainer) SaveDrift(ctx context.Context, driftState state.LinkDriftState) error
- func (c *LinksContainer) UpdateStatus(ctx context.Context, linkID string, statusInfo state.LinkStatusInfo) error
- type LoadMode
- type MetadataContainer
- func (c *MetadataContainer) Get(ctx context.Context, instanceID string) (map[string]*core.MappingNode, error)
- func (c *MetadataContainer) Remove(ctx context.Context, instanceID string) (map[string]*core.MappingNode, error)
- func (c *MetadataContainer) Save(ctx context.Context, instanceID string, metadata map[string]*core.MappingNode) error
- type NameRecordReserver
- type PersistedInstanceState
- type Persister
- func (p *Persister) CleanupChangesets(ctx context.Context, thresholdDate time.Time) (map[string]*manage.Changeset, error)
- func (p *Persister) CleanupReconciliationResults(ctx context.Context, thresholdDate time.Time) (map[string]*manage.ReconciliationResult, error)
- func (p *Persister) CleanupValidations(ctx context.Context, thresholdDate time.Time) (map[string]*manage.BlueprintValidation, error)
- func (p *Persister) CreateChangeset(ctx context.Context, cs *manage.Changeset) error
- func (p *Persister) CreateCleanupOperation(ctx context.Context, op *manage.CleanupOperation) error
- func (p *Persister) CreateInstance(ctx context.Context, instance *state.InstanceState) error
- func (p *Persister) CreateLinkDrift(ctx context.Context, drift *state.LinkDriftState) error
- func (p *Persister) CreateReconciliationResult(ctx context.Context, r *manage.ReconciliationResult) error
- func (p *Persister) CreateResourceDrift(ctx context.Context, drift *state.ResourceDriftState) error
- func (p *Persister) CreateValidation(ctx context.Context, v *manage.BlueprintValidation) error
- func (p *Persister) GetEventIndexEntry(eventID string) *EventIndexLocation
- func (p *Persister) RemoveChangeset(ctx context.Context, changesetID string) error
- func (p *Persister) RemoveCleanupOperation(ctx context.Context, operationID string) error
- func (p *Persister) RemoveInstance(ctx context.Context, instance *state.InstanceState) error
- func (p *Persister) RemoveLinkDrift(ctx context.Context, drift *state.LinkDriftState) error
- func (p *Persister) RemoveReconciliationResult(ctx context.Context, resultID string) error
- func (p *Persister) RemoveResourceDrift(ctx context.Context, drift *state.ResourceDriftState) error
- func (p *Persister) RemoveValidation(ctx context.Context, validationID string) error
- func (p *Persister) SaveEventPartition(ctx context.Context, partitionName string, partition []*manage.Event, ...) error
- func (p *Persister) UpdateChangeset(ctx context.Context, cs *manage.Changeset) error
- func (p *Persister) UpdateCleanupOperation(ctx context.Context, op *manage.CleanupOperation) error
- func (p *Persister) UpdateEventPartitionsForRemovals(ctx context.Context, partitions map[string][]*manage.Event, ...) error
- func (p *Persister) UpdateInstance(ctx context.Context, instance *state.InstanceState) error
- func (p *Persister) UpdateLinkDrift(ctx context.Context, drift *state.LinkDriftState) error
- func (p *Persister) UpdateReconciliationResult(ctx context.Context, r *manage.ReconciliationResult) error
- func (p *Persister) UpdateResourceDrift(ctx context.Context, drift *state.ResourceDriftState) error
- func (p *Persister) UpdateValidation(ctx context.Context, v *manage.BlueprintValidation) error
- type PersisterOption
- type ReconciliationResultsContainer
- func (c *ReconciliationResultsContainer) Cleanup(ctx context.Context, thresholdDate time.Time) (int64, error)
- func (c *ReconciliationResultsContainer) Get(ctx context.Context, id string) (*manage.ReconciliationResult, error)
- func (c *ReconciliationResultsContainer) GetAllByChangesetID(ctx context.Context, changesetID string) ([]*manage.ReconciliationResult, error)
- func (c *ReconciliationResultsContainer) GetAllByInstanceID(ctx context.Context, instanceID string) ([]*manage.ReconciliationResult, error)
- func (c *ReconciliationResultsContainer) GetLatestByChangesetID(ctx context.Context, changesetID string) (*manage.ReconciliationResult, error)
- func (c *ReconciliationResultsContainer) GetLatestByInstanceID(ctx context.Context, instanceID string) (*manage.ReconciliationResult, error)
- func (c *ReconciliationResultsContainer) Save(ctx context.Context, result *manage.ReconciliationResult) error
- type ResourcesContainer
- func (c *ResourcesContainer) Get(ctx context.Context, resourceID string) (state.ResourceState, error)
- func (c *ResourcesContainer) GetByName(ctx context.Context, instanceID string, resourceName string) (state.ResourceState, error)
- func (c *ResourcesContainer) GetDrift(ctx context.Context, resourceID string) (state.ResourceDriftState, error)
- func (c *ResourcesContainer) Remove(ctx context.Context, resourceID string) (state.ResourceState, error)
- func (c *ResourcesContainer) RemoveDrift(ctx context.Context, resourceID string) (state.ResourceDriftState, error)
- func (c *ResourcesContainer) Save(ctx context.Context, resourceState state.ResourceState) error
- func (c *ResourcesContainer) SaveDrift(ctx context.Context, driftState state.ResourceDriftState) error
- func (c *ResourcesContainer) UpdateStatus(ctx context.Context, resourceID string, statusInfo state.ResourceStatusInfo) error
- type Scope
- type State
- func (s *State) EachInstance(visit func(*state.InstanceState) bool)
- func (s *State) Instance(instanceID string) (*state.InstanceState, bool)
- func (s *State) Lock()
- func (s *State) LookupChangeset(ctx context.Context, id string) (*manage.Changeset, bool, error)
- func (s *State) LookupCleanupOperation(ctx context.Context, id string) (*manage.CleanupOperation, bool, error)
- func (s *State) LookupEvent(ctx context.Context, id string) (*manage.Event, bool, error)
- func (s *State) LookupInstance(ctx context.Context, id string) (*state.InstanceState, bool, error)
- func (s *State) LookupInstanceIDByName(ctx context.Context, name string) (string, bool, error)
- func (s *State) LookupLink(ctx context.Context, id string) (*state.LinkState, bool, error)
- func (s *State) LookupLinkDrift(ctx context.Context, id string) (*state.LinkDriftState, bool, error)
- func (s *State) LookupReconciliation(ctx context.Context, id string) (*manage.ReconciliationResult, bool, error)
- func (s *State) LookupResource(ctx context.Context, id string) (*state.ResourceState, bool, error)
- func (s *State) LookupResourceDrift(ctx context.Context, id string) (*state.ResourceDriftState, bool, error)
- func (s *State) LookupValidation(ctx context.Context, id string) (*manage.BlueprintValidation, bool, error)
- func (s *State) RLock()
- func (s *State) RUnlock()
- func (s *State) RebuildNameLookup()
- func (s *State) RemoveInstanceFromMemory(instanceID string)
- func (s *State) SetInstanceInMemory(instance *state.InstanceState)
- func (s *State) Unlock()
- type StateOption
- func WithEntityLoader(l EntityLoader) StateOption
- func WithSharedChangesetIndex(index map[string]*IndexLocation) StateOption
- func WithSharedChangesets(changesets map[string]*manage.Changeset) StateOption
- func WithSharedCleanupOps(ops map[string]*manage.CleanupOperation) StateOption
- func WithSharedEventIndex(index map[string]*EventIndexLocation) StateOption
- func WithSharedEvents(events map[string]*manage.Event) StateOption
- func WithSharedInstanceIndex(index map[string]*IndexLocation) StateOption
- func WithSharedInstances(instances map[string]*state.InstanceState) StateOption
- func WithSharedLinkDrift(drift map[string]*state.LinkDriftState) StateOption
- func WithSharedLinkDriftIndex(index map[string]*IndexLocation) StateOption
- func WithSharedLinks(links map[string]*state.LinkState) StateOption
- func WithSharedMutex(mu *sync.RWMutex) StateOption
- func WithSharedPartitionEvents(partitionEvents map[string][]*manage.Event) StateOption
- func WithSharedReconciliationIndex(index map[string]*IndexLocation) StateOption
- func WithSharedReconciliations(results map[string]*manage.ReconciliationResult) StateOption
- func WithSharedResourceDrift(drift map[string]*state.ResourceDriftState) StateOption
- func WithSharedResourceDriftIndex(index map[string]*IndexLocation) StateOption
- func WithSharedResources(resources map[string]*state.ResourceState) StateOption
- func WithSharedValidationIndex(index map[string]*IndexLocation) StateOption
- func WithSharedValidations(validations map[string]*manage.BlueprintValidation) StateOption
- type Storage
- type ValidationsContainer
- func (c *ValidationsContainer) Cleanup(ctx context.Context, thresholdDate time.Time) (int64, error)
- func (c *ValidationsContainer) Get(ctx context.Context, id string) (*manage.BlueprintValidation, error)
- func (c *ValidationsContainer) Save(ctx context.Context, validation *manage.BlueprintValidation) error
Constants ¶
const ( // DefaultMaxGuideFileSize is the default maximum size of a state chunk file // in bytes. Used by the persister as a guide for chunk rollover — if a // single record exceeds this size, it will not be split across multiple // files. Actual file sizes are often slightly larger than this guide. DefaultMaxGuideFileSize int64 = 1048576 // DefaultMaxEventPartitionSize is the default maximum size of an event // partition file in bytes. When saving a new event would push the partition // past this size, the persister returns an error for the save operation. DefaultMaxEventPartitionSize int64 = 10485760 )
const MaxCleanupOperationsPerType = 50
MaxCleanupOperationsPerType caps how many cleanup operations of a given type are retained. Older entries are evicted from in-memory state and storage when the cap is exceeded.
Variables ¶
var ErrNotFound = errors.New("statestore: key not found")
ErrNotFound is returned by Storage implementations when a target key does not exist. Backends map their native not-found signals onto this sentinel.
Functions ¶
func ErrInstanceNameTaken ¶
ErrInstanceNameTaken constructs the sentinel error backends return when an atomic name-stub reservation fails because another instance already claims the name.
func Load ¶
Load prepares state for use according to cfg.Mode.
Under ModeLazy state starts empty; entries materialise on demand via state's EntityLoader (set at construction via WithEntityLoader). Load is a no-op in this mode.
Under ModeEager Load walks every key under prefix in storage, classifies each by its canonical KeyBuilder filename, unmarshals into state's maps, re-wires the instance parent/child pointer graph, and rebuilds the name-lookup cache. Missing storage (empty dir / empty bucket) is not an error — state is left with its initialised-but-empty maps.
Types ¶
type CategoryConfig ¶
CategoryConfig defines the layout and scope for a category of state entities.
type ChangesetsContainer ¶
type ChangesetsContainer struct {
// contains filtered or unexported fields
}
ChangesetsContainer implements manage.Changesets against a shared statestore.State and Persister.
func NewChangesetsContainer ¶
func NewChangesetsContainer(st *State, persister *Persister, logger core.Logger) *ChangesetsContainer
func (*ChangesetsContainer) Cleanup ¶
func (c *ChangesetsContainer) Cleanup( ctx context.Context, thresholdDate time.Time, ) (int64, error)
Cleanup removes changesets older than thresholdDate and returns the count deleted. The persister reconstructs on-disk chunks from scratch; the returned lookup replaces the in-memory map.
type ChildrenContainer ¶
type ChildrenContainer struct {
// contains filtered or unexported fields
}
ChildrenContainer implements state.ChildrenContainer against a shared statestore.State and Persister.
func NewChildrenContainer ¶
func NewChildrenContainer(st *State, persister *Persister, logger core.Logger) *ChildrenContainer
func (*ChildrenContainer) Get ¶
func (c *ChildrenContainer) Get( ctx context.Context, instanceID string, childName string, ) (state.InstanceState, error)
func (*ChildrenContainer) SaveDependencies ¶
func (c *ChildrenContainer) SaveDependencies( ctx context.Context, instanceID string, childName string, dependencies *state.DependencyInfo, ) error
type ClaimFunc ¶
type ClaimFunc func( ctx context.Context, instanceID string, expectedVersion int64, newStatus core.InstanceStatus, ) (int64, error)
ClaimFunc is the backend-specific compare-and-swap used by InstancesContainer.ClaimForDeployment. Memfile supplies an in-memory mutex-based CAS; objectstore supplies an ETag-based CAS via its Service. Implementations must either apply the transition (status change + version bump) and return the new version, or return state.ErrVersionConflict with the current persisted version.
func SingleProcessClaimFunc ¶
SingleProcessClaimFunc returns a ClaimFunc suitable for backends that run in a single deploy-engine process: an in-memory compare-and-swap under the state's write lock. Appropriate for memfile-style backends that own the state exclusively. Multi-writer backends (e.g. shared object stores) must supply their own ClaimFunc backed by a distributed CAS primitive — the in-memory mutex offers no protection against concurrent writers in other processes.
type CleanupOperationsContainer ¶
type CleanupOperationsContainer struct {
// contains filtered or unexported fields
}
CleanupOperationsContainer implements manage.CleanupOperations. Storage-backed via the Persister; in-memory rolling window enforced internally and mirrored to Storage on eviction.
func NewCleanupOperationsContainer ¶
func NewCleanupOperationsContainer(st *State, persister *Persister, logger core.Logger) *CleanupOperationsContainer
func (*CleanupOperationsContainer) Get ¶
func (c *CleanupOperationsContainer) Get( ctx context.Context, id string, ) (*manage.CleanupOperation, error)
func (*CleanupOperationsContainer) GetLatestByType ¶
func (c *CleanupOperationsContainer) GetLatestByType( ctx context.Context, cleanupType manage.CleanupType, ) (*manage.CleanupOperation, error)
func (*CleanupOperationsContainer) Save ¶
func (c *CleanupOperationsContainer) Save( ctx context.Context, operation *manage.CleanupOperation, ) error
func (*CleanupOperationsContainer) Update ¶
func (c *CleanupOperationsContainer) Update( ctx context.Context, operation *manage.CleanupOperation, ) error
type Config ¶
type Config struct {
// Mode selects eager vs lazy cache population for State. Defaults to
// ModeEager (zero value) for backwards compatibility with memfile.
Mode LoadMode
// WriteNameRecords toggles per-entity name-lookup record writes alongside
// instance creates/updates/removes. Required for ModeLazy so
// LookupInstanceIDByName can avoid enumerating all instances. Memfile
// (ModeEager) leaves this false and resolves names from its in-memory map.
WriteNameRecords bool
Instances CategoryConfig
Resources CategoryConfig
Links CategoryConfig
ResourceDrift CategoryConfig
LinkDrift CategoryConfig
Events CategoryConfig
Changesets CategoryConfig
Validations CategoryConfig
ReconciliationResults CategoryConfig
CleanupOperations CategoryConfig
}
Config defines the layout and scope for each category of state entities in a state container.
type EntityLoader ¶
type EntityLoader interface {
LoadInstance(ctx context.Context, id string) (*state.InstanceState, bool, error)
LoadInstanceIDByName(ctx context.Context, name string) (string, bool, error)
LoadResource(ctx context.Context, id string) (*state.ResourceState, bool, error)
LoadResourceDrift(ctx context.Context, id string) (*state.ResourceDriftState, bool, error)
LoadLink(ctx context.Context, id string) (*state.LinkState, bool, error)
LoadLinkDrift(ctx context.Context, id string) (*state.LinkDriftState, bool, error)
LoadEvent(ctx context.Context, id string) (*manage.Event, bool, error)
LoadChangeset(ctx context.Context, id string) (*manage.Changeset, bool, error)
LoadValidation(ctx context.Context, id string) (*manage.BlueprintValidation, bool, error)
LoadReconciliation(ctx context.Context, id string) (*manage.ReconciliationResult, bool, error)
LoadCleanupOperation(ctx context.Context, id string) (*manage.CleanupOperation, bool, error)
}
EntityLoader fetches a single entity from Storage when a State cache miss occurs under ModeLazy. The concrete implementation lives in each backend: memfile supplies a no-op loader (ModeEager); objectstore supplies one that reads and unmarshals objects from its Service.
Every method returns (entity, found, error):
- entity non-nil, found true, err nil — entity materialised
- entity nil, found false, err nil — entity does not exist
- entity nil, found false, err non-nil — transient error; caller decides
type Error ¶
type Error struct {
ReasonCode ErrorReasonCode
Err error
}
Error is the structured error type returned by statestore. Callers can distinguish cases by inspecting ReasonCode, typically via errors.As:
var sErr *statestore.Error
if errors.As(err, &sErr) && sErr.ReasonCode == statestore.ErrorReasonCodeMalformedStateFile {
...
}
Error also implements Unwrap so errors.Is against wrapped sentinels works.
type ErrorReasonCode ¶
type ErrorReasonCode string
ErrorReasonCode enumerates the distinct error conditions statestore can surface. Add new values when introducing a new failure mode rather than overloading an existing one.
const ( // ErrorReasonCodeMalformedStateFile indicates a persisted chunk file or // index file is corrupted, truncated, or out of sync with the index — // e.g. the stored IndexInChunk points beyond the end of the chunk. ErrorReasonCodeMalformedStateFile ErrorReasonCode = "malformed_state_file" // ErrorReasonCodeMalformedState indicates the in-memory state is // inconsistent — for example, a resource's InstanceID references an // instance that no longer exists. ErrorReasonCodeMalformedState ErrorReasonCode = "malformed_state" // ErrorReasonCodeMaxEventPartitionSizeExceeded indicates a save-event // operation was refused because accepting the event would push the // partition's on-disk size past its configured maximum. ErrorReasonCodeMaxEventPartitionSizeExceeded ErrorReasonCode = "max_event_partition_size_exceeded" // ErrorReasonCodeUnknownLayout indicates a CategoryConfig carries a // Layout value the persister does not recognise. Likely a programmer // error at container-construction time. ErrorReasonCodeUnknownLayout ErrorReasonCode = "unknown_layout" // ErrorReasonCodeInstanceNameTaken indicates a CreateInstance call // tried to write a name-lookup record for a name already reserved by // another instance. Only surfaces when Config.WriteNameRecords is set // and a NameRecordReserver is installed (objectstore with conditional // writes). Single-process backends like memfile never produce this. ErrorReasonCodeInstanceNameTaken ErrorReasonCode = "instance_name_taken" )
type EventIndexLocation ¶
type EventIndexLocation struct {
Partition string `json:"partition"`
IndexInPartition int `json:"indexInPartition"`
}
EventIndexLocation records the position of an event within its channel's partition file. Used by the Events category under ScopePerChannel.
type EventsContainer ¶
type EventsContainer struct {
// contains filtered or unexported fields
}
EventsContainer implements manage.Events against a shared statestore.State and Persister. Listener channels for Stream live in the container; events are broadcast to them after persistence completes.
func NewEventsContainer ¶
func NewEventsContainer( st *State, persister *Persister, logger core.Logger, opts ...EventsContainerOption, ) *EventsContainer
func (*EventsContainer) GetLastEventID ¶
type EventsContainerOption ¶
type EventsContainerOption func(*EventsContainer)
EventsContainerOption configures an EventsContainer at construction.
func WithEventsClock ¶
func WithEventsClock(clock commoncore.Clock) EventsContainerOption
WithEventsClock injects a clock for deterministic testing of the recently- queued events window.
func WithEventsRecentlyQueuedThreshold ¶
func WithEventsRecentlyQueuedThreshold(d time.Duration) EventsContainerOption
WithEventsRecentlyQueuedThreshold sets the window used when a Stream call doesn't provide a StartingEventID — events created within this duration of "now" are sent to the new listener.
type ExportsContainer ¶
type ExportsContainer struct {
// contains filtered or unexported fields
}
ExportsContainer implements state.ExportsContainer against a shared statestore.State and Persister. Backend-agnostic; every backend consumes it unchanged.
func NewExportsContainer ¶
func NewExportsContainer(st *State, persister *Persister, logger core.Logger) *ExportsContainer
func (*ExportsContainer) Get ¶
func (c *ExportsContainer) Get( ctx context.Context, instanceID string, exportName string, ) (state.ExportState, error)
func (*ExportsContainer) GetAll ¶
func (c *ExportsContainer) GetAll( ctx context.Context, instanceID string, ) (map[string]*state.ExportState, error)
func (*ExportsContainer) Remove ¶
func (c *ExportsContainer) Remove( ctx context.Context, instanceID string, exportName string, ) (state.ExportState, error)
func (*ExportsContainer) RemoveAll ¶
func (c *ExportsContainer) RemoveAll( ctx context.Context, instanceID string, ) (map[string]*state.ExportState, error)
func (*ExportsContainer) Save ¶
func (c *ExportsContainer) Save( ctx context.Context, instanceID string, exportName string, export state.ExportState, ) error
func (*ExportsContainer) SaveAll ¶
func (c *ExportsContainer) SaveAll( ctx context.Context, instanceID string, exports map[string]*state.ExportState, ) error
type IndexLocation ¶
type IndexLocation struct {
ChunkNumber int `json:"chunkNumber"`
IndexInChunk int `json:"indexInChunk"`
}
IndexLocation records the position of an entity within a chunk file. Used for LayoutChunked categories where the index file maps entity IDs to (chunk number, position-in-chunk) pairs.
type InitialiseAndClaimFunc ¶
type InitialiseAndClaimFunc func( ctx context.Context, instanceState state.InstanceState, newStatus core.InstanceStatus, ) (int64, error)
InitialiseAndClaimFunc is the backend-specific atomic-create-at-version-1 primitive used by InstancesContainer.InitialiseAndClaim. Memfile supplies an in-memory check-and-insert under the state's write lock; objectstore supplies a conditional Put with IfNoneMatch: "*". Implementations must either insert the instance at version 1 with the given status and return 1, or return state.ErrInstanceAlreadyExists without mutating existing state.
func SingleProcessInitialiseAndClaimFunc ¶
func SingleProcessInitialiseAndClaimFunc(st *State, persister *Persister) InitialiseAndClaimFunc
SingleProcessInitialiseAndClaimFunc returns an InitialiseAndClaimFunc suitable for backends that run in a single deploy-engine process: an in-memory check-and-insert under the state's write lock. Appropriate for memfile-style backends that own the state exclusively. Multi-writer backends (e.g. shared object stores) must supply their own InitialiseAndClaimFunc backed by an atomic create-if-absent primitive (e.g. IfNoneMatch:"*") — the in-memory mutex offers no protection against concurrent writers in other processes.
type InstanceNameRecord ¶
InstanceNameRecord is the minimal JSON shape written to instances_by_name/ when Config.WriteNameRecords is true. Gives a single read to resolve a name to an id under ModeLazy.
type InstancesContainer ¶
type InstancesContainer struct {
// contains filtered or unexported fields
}
InstancesContainer implements state.InstancesContainer against a shared statestore.State and Persister. Concurrency-critical operations (ClaimForDeployment, InitialiseAndClaim) delegate to backend-supplied funcs; everything else is backend-agnostic.
func NewInstancesContainer ¶
func NewInstancesContainer( st *State, persister *Persister, claim ClaimFunc, initialiseAndClaim InitialiseAndClaimFunc, logger core.Logger, ) *InstancesContainer
func (*InstancesContainer) ClaimForDeployment ¶
func (c *InstancesContainer) ClaimForDeployment( ctx context.Context, instanceID string, expectedVersion int64, newStatus core.InstanceStatus, ) (int64, error)
ClaimForDeployment delegates to the backend-supplied ClaimFunc. Memfile's ClaimFunc performs an in-memory mutex-based CAS; objectstore's performs an ETag-based CAS via its Service.
func (*InstancesContainer) Get ¶
func (c *InstancesContainer) Get( ctx context.Context, instanceID string, ) (state.InstanceState, error)
func (*InstancesContainer) GetBatch ¶
func (c *InstancesContainer) GetBatch( ctx context.Context, instanceIDsOrNames []string, ) ([]state.InstanceState, error)
func (*InstancesContainer) InitialiseAndClaim ¶
func (c *InstancesContainer) InitialiseAndClaim( ctx context.Context, instanceState state.InstanceState, newStatus core.InstanceStatus, ) (int64, error)
InitialiseAndClaim delegates to the backend-supplied InitialiseAndClaimFunc. Memfile performs an in-memory check-and-insert under the state's write lock; objectstore performs a conditional Put with IfNoneMatch: "*".
func (*InstancesContainer) List ¶
func (c *InstancesContainer) List( ctx context.Context, params state.ListInstancesParams, ) (state.ListInstancesResult, error)
func (*InstancesContainer) LookupIDByName ¶
func (*InstancesContainer) Remove ¶
func (c *InstancesContainer) Remove( ctx context.Context, instanceID string, ) (state.InstanceState, error)
func (*InstancesContainer) Save ¶
func (c *InstancesContainer) Save( ctx context.Context, instanceState state.InstanceState, ) error
func (*InstancesContainer) SaveBatch ¶
func (c *InstancesContainer) SaveBatch( ctx context.Context, instances []state.InstanceState, ) error
func (*InstancesContainer) UpdateStatus ¶
func (c *InstancesContainer) UpdateStatus( ctx context.Context, instanceID string, statusInfo state.InstanceStatusInfo, ) error
type KeyBuilder ¶
type KeyBuilder struct {
// contains filtered or unexported fields
}
KeyBuilder produces the canonical storage keys for each persisted category, prefixed by a storage-level root (memfile's state directory or objectstore's bucket-level prefix). Each method emits one fixed key format — layout dispatch (chunked vs per-entity) currently lives in Persister, not here.
When a category grows scope-aware keys (e.g. ScopePerInstance prefixing resource chunks with "instances/<id>/"), those method variants belong on KeyBuilder so the scope decision lives in one place.
func NewKeyBuilder ¶
func NewKeyBuilder(prefix string) KeyBuilder
NewKeyBuilder returns a KeyBuilder with the given storage-level prefix. The prefix may be empty (memfile passes its state directory here; objectstore passes its bucket-level prefix).
func (KeyBuilder) Changeset ¶
func (k KeyBuilder) Changeset(changesetID string) string
Changeset returns the per-entity storage key for a changeset.
func (KeyBuilder) ChangesetChunk ¶
func (k KeyBuilder) ChangesetChunk(chunkNumber int) string
ChangesetChunk returns the chunked storage key for a changeset chunk file at the given chunk number.
func (KeyBuilder) ChangesetIndex ¶
func (k KeyBuilder) ChangesetIndex() string
ChangesetIndex returns the storage key for the changeset chunk-index file.
func (KeyBuilder) CleanupOperation ¶
func (k KeyBuilder) CleanupOperation(operationID string) string
CleanupOperation returns the per-entity storage key for a cleanup operation.
func (KeyBuilder) EventIndex ¶
func (k KeyBuilder) EventIndex() string
EventIndex returns the storage key for the global event index, which maps event IDs to (partition, indexInPartition).
func (KeyBuilder) EventPartition ¶
func (k KeyBuilder) EventPartition(partitionName string) string
EventPartition returns the storage key for a single channel's event partition file. Events are grouped into per-channel partitions rather than chunked globally, so the partition name (e.g. "changesets_<id>") fully identifies the file.
func (KeyBuilder) Instance ¶
func (k KeyBuilder) Instance(instanceID string) string
Instance returns the per-entity storage key for a single instance record.
func (KeyBuilder) InstanceByName ¶
func (k KeyBuilder) InstanceByName(name string) string
InstanceByName returns the storage key for an instance's name-lookup record — a tiny object containing {id, name} written alongside the main instance object under Config.WriteNameRecords. Enables O(1) name → id resolution under ModeLazy without enumerating the instances prefix.
func (KeyBuilder) InstanceChunk ¶
func (k KeyBuilder) InstanceChunk(chunkNumber int) string
InstanceChunk returns the chunked storage key for the instance chunk file at the given chunk number.
func (KeyBuilder) InstanceIndex ¶
func (k KeyBuilder) InstanceIndex() string
InstanceIndex returns the storage key for the instance chunk-index file.
func (KeyBuilder) LinkDrift ¶
func (k KeyBuilder) LinkDrift(linkID string) string
LinkDrift returns the per-entity storage key for a link-drift record.
func (KeyBuilder) LinkDriftChunk ¶
func (k KeyBuilder) LinkDriftChunk(chunkNumber int) string
LinkDriftChunk returns the chunked storage key for a link-drift chunk file at the given chunk number.
func (KeyBuilder) LinkDriftIndex ¶
func (k KeyBuilder) LinkDriftIndex() string
LinkDriftIndex returns the storage key for the link-drift chunk-index file.
func (KeyBuilder) ReconciliationResult ¶
func (k KeyBuilder) ReconciliationResult(resultID string) string
ReconciliationResult returns the per-entity storage key for a reconciliation result.
func (KeyBuilder) ReconciliationResultChunk ¶
func (k KeyBuilder) ReconciliationResultChunk(chunkNumber int) string
ReconciliationResultChunk returns the chunked storage key for a reconciliation-result chunk file at the given chunk number.
func (KeyBuilder) ReconciliationResultIndex ¶
func (k KeyBuilder) ReconciliationResultIndex() string
ReconciliationResultIndex returns the storage key for the reconciliation-result chunk-index file.
func (KeyBuilder) ResourceDrift ¶
func (k KeyBuilder) ResourceDrift(resourceID string) string
ResourceDrift returns the per-entity storage key for a resource-drift record.
func (KeyBuilder) ResourceDriftChunk ¶
func (k KeyBuilder) ResourceDriftChunk(chunkNumber int) string
ResourceDriftChunk returns the chunked storage key for a resource-drift chunk file at the given chunk number.
func (KeyBuilder) ResourceDriftIndex ¶
func (k KeyBuilder) ResourceDriftIndex() string
ResourceDriftIndex returns the storage key for the resource-drift chunk-index file.
func (KeyBuilder) Validation ¶
func (k KeyBuilder) Validation(validationID string) string
Validation returns the per-entity storage key for a blueprint validation.
func (KeyBuilder) ValidationChunk ¶
func (k KeyBuilder) ValidationChunk(chunkNumber int) string
ValidationChunk returns the chunked storage key for a blueprint-validation chunk file at the given chunk number.
func (KeyBuilder) ValidationIndex ¶
func (k KeyBuilder) ValidationIndex() string
ValidationIndex returns the storage key for the blueprint-validation chunk-index file.
type Layout ¶
type Layout int
Layout determines how entities are organised on disk or in a cloud object store.
const ( // LayoutChunked packs many entities into JSON chunk files with a shared index file. // Small per-file I/O overhead. fewer writes per mutation. LayoutChunked Layout = iota // LayoutPerEntity stores each entity as its own JSON object keyed by ID. // Predictable keys enable per-entity ETag CAS at the storage layer for backends // that support it, and avoids cross-run chunk contention for globally shared // entity types (e.g. changesets, validations). LayoutPerEntity )
type LinksContainer ¶
type LinksContainer struct {
// contains filtered or unexported fields
}
LinksContainer implements state.LinksContainer against a shared statestore.State and Persister. Maintains a container-local resourceDataMappingIDs cache (instance → resourceName → []linkID) so ListWithResourceDataMappings avoids re-scanning links on every call.
func NewLinksContainer ¶
func NewLinksContainer(st *State, persister *Persister, logger core.Logger) *LinksContainer
func (*LinksContainer) GetDrift ¶
func (c *LinksContainer) GetDrift( ctx context.Context, linkID string, ) (state.LinkDriftState, error)
func (*LinksContainer) ListWithResourceDataMappings ¶
func (*LinksContainer) RemoveDrift ¶
func (c *LinksContainer) RemoveDrift( ctx context.Context, linkID string, ) (state.LinkDriftState, error)
func (*LinksContainer) SaveDrift ¶
func (c *LinksContainer) SaveDrift( ctx context.Context, driftState state.LinkDriftState, ) error
func (*LinksContainer) UpdateStatus ¶
func (c *LinksContainer) UpdateStatus( ctx context.Context, linkID string, statusInfo state.LinkStatusInfo, ) error
type LoadMode ¶
type LoadMode int
LoadMode picks how State behaves on a lookup that misses the cache.
const ( // ModeEager populates the full State at construction time. Cache misses // imply the entity does not exist. Appropriate for memfile (local, // bounded data) and postgres-style backends where the database is the // live source of truth. ModeEager LoadMode = iota // ModeLazy leaves State empty at construction and materialises entities // on demand via EntityLoader. Appropriate for objectstore backends where // the state set can be large and a given process only touches a small // working subset. ModeLazy )
type MetadataContainer ¶
type MetadataContainer struct {
// contains filtered or unexported fields
}
MetadataContainer implements state.MetadataContainer against a shared statestore.State and Persister. Moved here from memfile during phase 3 of the statestore migration — logic is backend-agnostic and is consumed by every backend unchanged.
func NewMetadataContainer ¶
func NewMetadataContainer(st *State, persister *Persister, logger core.Logger) *MetadataContainer
NewMetadataContainer constructs a MetadataContainer bound to the given state and persister. logger may be nil — the container never logs on a hot path; debug messages only fire in Save / Remove.
func (*MetadataContainer) Get ¶
func (c *MetadataContainer) Get( ctx context.Context, instanceID string, ) (map[string]*core.MappingNode, error)
func (*MetadataContainer) Remove ¶
func (c *MetadataContainer) Remove( ctx context.Context, instanceID string, ) (map[string]*core.MappingNode, error)
func (*MetadataContainer) Save ¶
func (c *MetadataContainer) Save( ctx context.Context, instanceID string, metadata map[string]*core.MappingNode, ) error
type NameRecordReserver ¶
NameRecordReserver is an optional hook called by CreateInstance before any main-record I/O to atomically reserve an instance name. Backends that support conditional writes (objectstore via IfNoneMatch: "*") supply an implementation; single-process backends leave it nil and the plain write path applies. Returning ErrInstanceNameTaken from the reserver aborts the CreateInstance call with that same error surfaced to the caller.
type PersistedInstanceState ¶
type PersistedInstanceState struct {
InstanceID string `json:"id"`
InstanceName string `json:"name"`
Status core.InstanceStatus `json:"status"`
LastStatusUpdateTimestamp int `json:"lastStatusUpdateTimestamp,omitempty"`
LastDeployedTimestamp int `json:"lastDeployedTimestamp"`
LastDeployAttemptTimestamp int `json:"lastDeployAttemptTimestamp"`
ResourceIDs map[string]string `json:"resourceIds"`
Resources map[string]*state.ResourceState `json:"resources"`
Links map[string]*state.LinkState `json:"links"`
Metadata map[string]*core.MappingNode `json:"metadata"`
Exports map[string]*state.ExportState `json:"exports"`
// A mapping of child blueprint names to their blueprint instance IDs.
ChildBlueprints map[string]string `json:"childBlueprints"`
ChildDependencies map[string]*state.DependencyInfo `json:"childDependencies,omitempty"`
Durations *state.InstanceCompletionDuration `json:"durations,omitempty"`
Version int64 `json:"version"`
}
PersistedInstanceState is the on-disk / in-object-store JSON representation of a blueprint instance. It differs from state.InstanceState in that child blueprints are stored as ID references rather than being embedded — the loader re-wires the object graph on startup.
This shape is part of statestore's public surface because backends that implement ETag-based atomic claims (objectstore) JSON-unmarshal directly into it when performing a compare-and-swap against a single instance object.
func NewPersistedInstanceState ¶
func NewPersistedInstanceState(instance *state.InstanceState) *PersistedInstanceState
NewPersistedInstanceState returns the persistence representation of an in-memory instance. Child blueprints are flattened to an ID-reference map — callers are responsible for persisting each child blueprint separately.
func (*PersistedInstanceState) ToInstanceState ¶
func (p *PersistedInstanceState) ToInstanceState() *state.InstanceState
ToInstanceState returns a live state.InstanceState populated from the persisted shape. Child blueprints are left as an empty map — the loader re-wires the parent/child object graph after all instances are loaded.
type Persister ¶
type Persister struct {
// contains filtered or unexported fields
}
Persister manages durable persistence of state data via the Storage seam. Sub-container callers hold State.Lock for the in-memory mutation; Persister acquires its own mu internally for chunk-file I/O and index updates.
func NewPersister ¶
func NewPersister( state *State, storage Storage, conf Config, prefix string, opts ...PersisterOption, ) *Persister
NewPersister constructs a Persister bound to the given State and Storage. prefix is prepended to every storage key (memfile: stateDir; objectstore: bucket-level prefix). It may be empty.
func (*Persister) CleanupChangesets ¶
func (p *Persister) CleanupChangesets( ctx context.Context, thresholdDate time.Time, ) (map[string]*manage.Changeset, error)
CleanupChangesets removes changesets older than thresholdDate and re-persists the remaining entries. Returns a lookup of retained changesets keyed by ID so callers can refresh their in-memory view.
func (*Persister) CleanupReconciliationResults ¶
func (p *Persister) CleanupReconciliationResults( ctx context.Context, thresholdDate time.Time, ) (map[string]*manage.ReconciliationResult, error)
CleanupReconciliationResults removes reconciliation results older than thresholdDate and re-persists the remaining entries.
func (*Persister) CleanupValidations ¶
func (p *Persister) CleanupValidations( ctx context.Context, thresholdDate time.Time, ) (map[string]*manage.BlueprintValidation, error)
CleanupValidations removes blueprint validations older than thresholdDate and re-persists the remaining entries.
func (*Persister) CreateChangeset ¶
func (*Persister) CreateCleanupOperation ¶
CreateCleanupOperation persists a newly-recorded cleanup operation. Only LayoutPerEntity is supported — cleanup operations are low-volume globally-shared entities where per-object keys give the simplest story for concurrent CI/CD runs.
func (*Persister) CreateInstance ¶
CreateInstance persists a newly-added instance. When Config.WriteNameRecords is true, the name-lookup record is reserved FIRST (via NameRecordReserver if installed, else a plain write) so duplicate-name creates are rejected before any main-record I/O. On main-record failure the name record is best-effort rolled back.
func (*Persister) CreateLinkDrift ¶
func (*Persister) CreateReconciliationResult ¶
func (*Persister) CreateResourceDrift ¶
func (*Persister) CreateValidation ¶
func (*Persister) GetEventIndexEntry ¶
func (p *Persister) GetEventIndexEntry(eventID string) *EventIndexLocation
GetEventIndexEntry returns the indexed location of an event (or nil if not found). Acquires the persister mutex so readers see a consistent view of the event index while a write is in progress.
func (*Persister) RemoveChangeset ¶
func (*Persister) RemoveCleanupOperation ¶
RemoveCleanupOperation deletes a cleanup operation's persisted record. A missing target is not an error (matches other Remove semantics).
func (*Persister) RemoveInstance ¶
RemoveInstance removes an instance's persisted record, its index entry, and its name-lookup record (if any).
func (*Persister) RemoveLinkDrift ¶
func (*Persister) RemoveReconciliationResult ¶
func (*Persister) RemoveResourceDrift ¶
func (*Persister) RemoveValidation ¶
func (*Persister) SaveEventPartition ¶
func (p *Persister) SaveEventPartition( ctx context.Context, partitionName string, partition []*manage.Event, eventToSave *manage.Event, indexInPartition int, ) error
SaveEventPartition writes the full partition for a channel and records the event's position in the event index. Returns ErrorReasonCodeMaxEventPartitionSizeExceeded if the serialised partition would exceed the configured maximum.
func (*Persister) UpdateChangeset ¶
func (*Persister) UpdateCleanupOperation ¶
UpdateCleanupOperation rewrites an existing cleanup operation.
func (*Persister) UpdateEventPartitionsForRemovals ¶
func (p *Persister) UpdateEventPartitionsForRemovals( ctx context.Context, partitions map[string][]*manage.Event, removedPartitions []string, removedEvents []string, ) error
UpdateEventPartitionsForRemovals persists the post-cleanup state of event partitions. It deletes files for fully-removed partitions, overwrites the remaining partitions, and drops the removed event IDs from the event index.
func (*Persister) UpdateInstance ¶
UpdateInstance rewrites an existing instance in place. If the instance's name has changed and Config.WriteNameRecords is true, the stale record is left in place (self-corrected on lookup) and a fresh record is written for the current name.
func (*Persister) UpdateLinkDrift ¶
func (*Persister) UpdateReconciliationResult ¶
func (*Persister) UpdateResourceDrift ¶
func (*Persister) UpdateValidation ¶
type PersisterOption ¶
type PersisterOption func(*Persister)
PersisterOption configures a Persister at construction.
func WithLogger ¶
func WithLogger(logger core.Logger) PersisterOption
WithLogger sets the persister's logger. Defaults to a no-op logger.
func WithMaxEventPartitionSize ¶
func WithMaxEventPartitionSize(maxEventPartitionSize int64) PersisterOption
WithMaxEventPartitionSize sets the max bytes in a single event partition. Default: DefaultMaxEventPartitionSize (10 MiB). Saving an event that would push the partition past this size returns an error.
func WithMaxGuideFileSize ¶
func WithMaxGuideFileSize(maxGuideFileSize int64) PersisterOption
WithMaxGuideFileSize sets the chunk-rollover guide (bytes). Default: DefaultMaxGuideFileSize (1 MiB). A single record exceeding this size will not be split across chunks; actual file sizes may be larger.
func WithNameRecordReserver ¶
func WithNameRecordReserver(r NameRecordReserver) PersisterOption
WithNameRecordReserver installs an atomic name-record reservation hook. Only meaningful when Config.WriteNameRecords is also true.
type ReconciliationResultsContainer ¶
type ReconciliationResultsContainer struct {
// contains filtered or unexported fields
}
ReconciliationResultsContainer implements manage.ReconciliationResults against a shared statestore.State and Persister. Maintains in-container secondary indexes (changesetID → []resultID, instanceID → []resultID) sorted by Created desc. Indexes are rebuilt on Cleanup and kept current on Save; they assume eager-mode population for correctness and become best-effort (only covering materialised entries) under ModeLazy.
func NewReconciliationResultsContainer ¶
func NewReconciliationResultsContainer(st *State, persister *Persister, logger core.Logger) *ReconciliationResultsContainer
NewReconciliationResultsContainer builds secondary indexes from whatever reconciliation results are currently in state (typically the full set under ModeEager).
func (*ReconciliationResultsContainer) Get ¶
func (c *ReconciliationResultsContainer) Get( ctx context.Context, id string, ) (*manage.ReconciliationResult, error)
func (*ReconciliationResultsContainer) GetAllByChangesetID ¶
func (c *ReconciliationResultsContainer) GetAllByChangesetID( ctx context.Context, changesetID string, ) ([]*manage.ReconciliationResult, error)
func (*ReconciliationResultsContainer) GetAllByInstanceID ¶
func (c *ReconciliationResultsContainer) GetAllByInstanceID( ctx context.Context, instanceID string, ) ([]*manage.ReconciliationResult, error)
func (*ReconciliationResultsContainer) GetLatestByChangesetID ¶
func (c *ReconciliationResultsContainer) GetLatestByChangesetID( ctx context.Context, changesetID string, ) (*manage.ReconciliationResult, error)
func (*ReconciliationResultsContainer) GetLatestByInstanceID ¶
func (c *ReconciliationResultsContainer) GetLatestByInstanceID( ctx context.Context, instanceID string, ) (*manage.ReconciliationResult, error)
func (*ReconciliationResultsContainer) Save ¶
func (c *ReconciliationResultsContainer) Save( ctx context.Context, result *manage.ReconciliationResult, ) error
type ResourcesContainer ¶
type ResourcesContainer struct {
// contains filtered or unexported fields
}
ResourcesContainer implements state.ResourcesContainer against a shared statestore.State and Persister. Instance mutations (Save/UpdateStatus/ Remove) flow through Persister.UpdateInstance; drift mutations flow through Persister.CreateResourceDrift / UpdateResourceDrift / RemoveResourceDrift.
func NewResourcesContainer ¶
func NewResourcesContainer(st *State, persister *Persister, logger core.Logger) *ResourcesContainer
func (*ResourcesContainer) Get ¶
func (c *ResourcesContainer) Get( ctx context.Context, resourceID string, ) (state.ResourceState, error)
func (*ResourcesContainer) GetByName ¶
func (c *ResourcesContainer) GetByName( ctx context.Context, instanceID string, resourceName string, ) (state.ResourceState, error)
func (*ResourcesContainer) GetDrift ¶
func (c *ResourcesContainer) GetDrift( ctx context.Context, resourceID string, ) (state.ResourceDriftState, error)
func (*ResourcesContainer) Remove ¶
func (c *ResourcesContainer) Remove( ctx context.Context, resourceID string, ) (state.ResourceState, error)
func (*ResourcesContainer) RemoveDrift ¶
func (c *ResourcesContainer) RemoveDrift( ctx context.Context, resourceID string, ) (state.ResourceDriftState, error)
func (*ResourcesContainer) Save ¶
func (c *ResourcesContainer) Save( ctx context.Context, resourceState state.ResourceState, ) error
func (*ResourcesContainer) SaveDrift ¶
func (c *ResourcesContainer) SaveDrift( ctx context.Context, driftState state.ResourceDriftState, ) error
func (*ResourcesContainer) UpdateStatus ¶
func (c *ResourcesContainer) UpdateStatus( ctx context.Context, resourceID string, statusInfo state.ResourceStatusInfo, ) error
type Scope ¶
type Scope int
const ( // ScopeGlobal puts all entities for the category in one namespace, // shared across all blueprint instances. ScopeGlobal Scope = iota // ScopePerInstance prefixes all keys with "instances/{instanceID}/". // Two runs deploying different instances touch disjoint namespaces // meaning no chunk contention even without CAS on chunk writes. ScopePerInstance // ScopePerChannel prefixes keys with the event channel identifier // (e.g. "channels/{channelType}/{channelID}/"). This is the // partitioning that existed in the `memfile` implementation // before the introduction of the `statestore` package. ScopePerChannel )
type State ¶
type State struct {
// contains filtered or unexported fields
}
State is the in-memory representation of all state data managed by the state store.
func NewState ¶
func NewState(opts ...StateOption) *State
NewState returns a new, empty State instance. Options let callers share specific maps or the mutex with an external owner (memfile's legacy state during the incremental migration).
func (*State) EachInstance ¶
func (s *State) EachInstance(visit func(*state.InstanceState) bool)
EachInstance iterates over all instances under an RLock held internally. The visitor must not mutate the instance. Return false to stop early.
func (*State) Instance ¶
func (s *State) Instance(instanceID string) (*state.InstanceState, bool)
Instance returns the in-memory instance pointer. The caller must already hold at least an RLock on the State; the returned pointer is shared and must not be mutated without holding the write lock. For durable updates, go through the Persister.
func (*State) Lock ¶
func (s *State) Lock()
Lock helpers for callers that need to hold the State lock across a sequence of operations. Prefer the focused accessors below where possible.
func (*State) LookupChangeset ¶
LookupChangeset returns a changeset by ID.
func (*State) LookupCleanupOperation ¶
func (s *State) LookupCleanupOperation(ctx context.Context, id string) (*manage.CleanupOperation, bool, error)
LookupCleanupOperation returns a cleanup operation by ID.
func (*State) LookupEvent ¶
LookupEvent returns an event by ID.
func (*State) LookupInstance ¶
LookupInstance returns the instance for id. Under ModeEager this is a pure cache read (loader is the noop). Under ModeLazy, a miss triggers loader.LoadInstance, which populates the cache before returning.
func (*State) LookupInstanceIDByName ¶
LookupInstanceIDByName resolves a name to an instance ID. Under ModeEager the nameLookup map is populated at load time and kept in sync by SetInstanceInMemory / RemoveInstanceFromMemory. Under ModeLazy a miss hits the loader, which typically reads an `instances_by_name/<name>.json` stub.
func (*State) LookupLink ¶
LookupLink returns a link by ID.
func (*State) LookupLinkDrift ¶
func (s *State) LookupLinkDrift(ctx context.Context, id string) (*state.LinkDriftState, bool, error)
LookupLinkDrift returns a link-drift entry by link ID.
func (*State) LookupReconciliation ¶
func (s *State) LookupReconciliation(ctx context.Context, id string) (*manage.ReconciliationResult, bool, error)
LookupReconciliation returns a reconciliation result by ID.
func (*State) LookupResource ¶
LookupResource returns a resource by ID, materialising it on cache miss.
func (*State) LookupResourceDrift ¶
func (s *State) LookupResourceDrift(ctx context.Context, id string) (*state.ResourceDriftState, bool, error)
LookupResourceDrift returns a resource-drift entry by resource ID.
func (*State) LookupValidation ¶
func (s *State) LookupValidation(ctx context.Context, id string) (*manage.BlueprintValidation, bool, error)
LookupValidation returns a blueprint validation by ID.
func (*State) RebuildNameLookup ¶
func (s *State) RebuildNameLookup()
RebuildNameLookup regenerates the name → id cache from the current instances map. Intended for ModeEager backends that populate State's instances pointer-share-style (e.g. memfile's loadStateFromDir) without going through SetInstanceInMemory.
func (*State) RemoveInstanceFromMemory ¶
RemoveInstanceFromMemory removes an instance from the in-memory map and its name-lookup entry. Acquires the write lock internally.
func (*State) SetInstanceInMemory ¶
func (s *State) SetInstanceInMemory(instance *state.InstanceState)
SetInstanceInMemory replaces (or inserts) an instance in the in-memory map without triggering any persistence. Also keeps the name-lookup cache in sync. Intended for:
- the loader, populating State from Storage on startup, and
- backend flows that handled durability out of band (e.g. objectstore's ClaimForDeployment via a direct ETag CAS on the Service).
Acquires the write lock internally.
type StateOption ¶
type StateOption func(*State)
StateOption configures a State at construction.
func WithEntityLoader ¶
func WithEntityLoader(l EntityLoader) StateOption
WithEntityLoader injects the loader used to materialise entities on cache miss under ModeLazy. Memfile passes a noopLoader (or omits the option); objectstore supplies its Service-backed loader.
func WithSharedChangesetIndex ¶
func WithSharedChangesetIndex(index map[string]*IndexLocation) StateOption
WithSharedChangesetIndex replaces State's changeset chunk-index map.
func WithSharedChangesets ¶
func WithSharedChangesets(changesets map[string]*manage.Changeset) StateOption
WithSharedChangesets replaces State's changesets map.
func WithSharedCleanupOps ¶
func WithSharedCleanupOps(ops map[string]*manage.CleanupOperation) StateOption
WithSharedCleanupOps replaces State's cleanup-operations map.
func WithSharedEventIndex ¶
func WithSharedEventIndex(index map[string]*EventIndexLocation) StateOption
WithSharedEventIndex replaces State's event index map.
func WithSharedEvents ¶
func WithSharedEvents(events map[string]*manage.Event) StateOption
WithSharedEvents replaces State's events map.
func WithSharedInstanceIndex ¶
func WithSharedInstanceIndex(index map[string]*IndexLocation) StateOption
WithSharedInstanceIndex replaces State's instance chunk-index map with the given pointer. Same sharing semantics as WithSharedInstances.
func WithSharedInstances ¶
func WithSharedInstances(instances map[string]*state.InstanceState) StateOption
WithSharedInstances replaces State's instance map with the given pointer. The caller retains ownership; changes made via either reference are visible through the other.
func WithSharedLinkDrift ¶
func WithSharedLinkDrift(drift map[string]*state.LinkDriftState) StateOption
WithSharedLinkDrift replaces State's link-drift map.
func WithSharedLinkDriftIndex ¶
func WithSharedLinkDriftIndex(index map[string]*IndexLocation) StateOption
WithSharedLinkDriftIndex replaces State's link-drift chunk-index map.
func WithSharedLinks ¶
func WithSharedLinks(links map[string]*state.LinkState) StateOption
WithSharedLinks replaces State's link map with the given pointer.
func WithSharedMutex ¶
func WithSharedMutex(mu *sync.RWMutex) StateOption
WithSharedMutex replaces State's internal RWMutex with the given pointer. Used by memfile during its incremental migration so legacy sub-containers and statestore.Persister lock against the same primitive.
func WithSharedPartitionEvents ¶
func WithSharedPartitionEvents(partitionEvents map[string][]*manage.Event) StateOption
WithSharedPartitionEvents replaces State's partition events map.
func WithSharedReconciliationIndex ¶
func WithSharedReconciliationIndex(index map[string]*IndexLocation) StateOption
WithSharedReconciliationIndex replaces State's reconciliation chunk-index map.
func WithSharedReconciliations ¶
func WithSharedReconciliations(results map[string]*manage.ReconciliationResult) StateOption
WithSharedReconciliations replaces State's reconciliation results map.
func WithSharedResourceDrift ¶
func WithSharedResourceDrift(drift map[string]*state.ResourceDriftState) StateOption
WithSharedResourceDrift replaces State's resource-drift map with the given pointer. Same sharing semantics as WithSharedInstances.
func WithSharedResourceDriftIndex ¶
func WithSharedResourceDriftIndex(index map[string]*IndexLocation) StateOption
WithSharedResourceDriftIndex replaces State's resource-drift chunk-index map with the given pointer. Same sharing semantics as WithSharedInstances.
func WithSharedResources ¶
func WithSharedResources(resources map[string]*state.ResourceState) StateOption
WithSharedResources replaces State's resource map with the given pointer.
func WithSharedValidationIndex ¶
func WithSharedValidationIndex(index map[string]*IndexLocation) StateOption
WithSharedValidationIndex replaces State's validation chunk-index map.
func WithSharedValidations ¶
func WithSharedValidations(validations map[string]*manage.BlueprintValidation) StateOption
WithSharedValidations replaces State's validations map.
type Storage ¶
type Storage interface {
// Read returns the full contents of the object at the given key.
// Returns ErrNotFound if the key does not exist.
Read(ctx context.Context, key string) ([]byte, error)
// Write stores data at the given key, overwriting any existing data.
Write(ctx context.Context, key string, data []byte) error
// Delete removes the object at key.
// Returns ErrNotFound if the key does not exist.
Delete(ctx context.Context, key string) error
// List returns all keys under the given prefix, recursively.
// An empty prefix lists every key. Order is down to the implementation.
List(ctx context.Context, prefix string) ([]string, error)
// Exists reports whether key is present without reading its contents.
Exists(ctx context.Context, key string) (bool, error)
}
Storage is a context aware persistence state container implementations can provide to a statestore backed implementation. Keys are opaque forward-slash-separated strings; The storage implementation maps them to files on a local file system or objects in a remote object store.
type ValidationsContainer ¶
type ValidationsContainer struct {
// contains filtered or unexported fields
}
ValidationsContainer implements manage.Validation against a shared statestore.State and Persister.
func NewValidationsContainer ¶
func NewValidationsContainer(st *State, persister *Persister, logger core.Logger) *ValidationsContainer
func (*ValidationsContainer) Get ¶
func (c *ValidationsContainer) Get( ctx context.Context, id string, ) (*manage.BlueprintValidation, error)
func (*ValidationsContainer) Save ¶
func (c *ValidationsContainer) Save( ctx context.Context, validation *manage.BlueprintValidation, ) error
Source Files
¶
- chunking.go
- config.go
- consts.go
- container_changesets.go
- container_children.go
- container_cleanup_operations.go
- container_events.go
- container_exports.go
- container_instances.go
- container_links.go
- container_metadata.go
- container_reconciliation_results.go
- container_resources.go
- container_validations.go
- copy.go
- errors.go
- keys.go
- load_eager.go
- loader.go
- lookup.go
- persisted.go
- persister.go
- state.go
- storage.go