store

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: BSD-2-Clause Imports: 13 Imported by: 0

Documentation

Overview

Package store provides observable in-memory stores and optional cross-broker synchronization.

Index

Constants

View Source
const (
	// StoreCreatedEvt is emitted when a store is created.
	StoreCreatedEvt = monitor.StoreCreatedEvt
	// StoreDestroyedEvt is emitted when a store is destroyed.
	StoreDestroyedEvt = monitor.StoreDestroyedEvt
	// StoreInitializedEvt is emitted when a store is initialized.
	StoreInitializedEvt = monitor.StoreInitializedEvt
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BusStore

type BusStore interface {
	// Get the name (the id) of the store.
	GetName() string
	// Add new or updates existing item in the store.
	Put(id string, value any, state any)
	PutContext(ctx context.Context, id string, value any, state any)
	// Returns an item from the store and a boolean flag
	// indicating whether the item exists
	Get(id string) (any, bool)
	// Shorten version of the Get() method, returns only the item value.
	GetValue(id string) any
	// Remove an item from the store. Returns true if the remove operation was successful.
	Remove(id string, state any) bool
	RemoveContext(ctx context.Context, id string, state any) bool
	// Return a slice containing all store items.
	AllValues() []any
	// Return a map with all items from the store.
	AllValuesAsMap() map[string]any
	// Return a map with all items from the store with the current store version.
	AllValuesAndVersion() (map[string]any, int64)
	// Subscribe to state changes for a specific object.
	OnChange(id string, state ...any) StoreStream
	// Subscribe to state changes for all objects
	OnAllChanges(state ...any) StoreStream
	// Notify when the store has been initialize (via populate() or initialize()
	WhenReady(readyFunction func())
	// Populate the store with a map of items and their ID's.
	Populate(items map[string]any) error
	// Mark the store as initialized and notify all watchers.
	Initialize()
	// Subscribe to mutation requests made via mutate() method.
	OnMutationRequest(mutationType ...any) MutationStoreStream
	// Send a mutation request to any subscribers handling mutations.
	Mutate(request any, requestType any,
		successHandler func(any), errorHandler func(any))
	MutateContext(ctx context.Context, request any, requestType any,
		successHandler func(any), errorHandler func(any))
	// Removes all items from the store and change its state to uninitialized".
	Reset()
	// Returns true if this is galactic store.
	IsGalactic() bool
	// Get the item type if such is specified during the creation of the
	// store
	GetItemType() reflect.Type
}

BusStore is a stateful in memory cache for objects. All state changes (any time the cache is modified) will broadcast that updated object to any subscribers of the BusStore for those specific objects or all objects of a certain type and state changes.

type Manager

type Manager interface {
	// Create a new Store, if the store already exists, then it will be returned.
	CreateStore(name string) BusStore
	// Create a new Store and use the itemType to deserialize item values when handling
	// incoming UpdateStoreRequest. If the store already exists, the method will return
	// the existing store instance.
	CreateStoreWithType(name string, itemType reflect.Type) BusStore
	// Get a reference to the existing store. Returns nil if the store doesn't exist.
	GetStore(name string) BusStore
	// Deletes a store.
	DestroyStore(name string) bool
	// Configure galactic store sync channel for a given connection.
	// Should be called before OpenGalacticStore() and OpenGalacticStoreWithItemType() APIs.
	ConfigureStoreSyncChannel(conn bridge.Connection, topicPrefix string, pubPrefix string) error
	// Open new galactic store
	OpenGalacticStore(name string, conn bridge.Connection) (BusStore, error)
	// Open new galactic store and deserialize items from server to itemType
	OpenGalacticStoreWithItemType(name string, conn bridge.Connection, itemType reflect.Type) (BusStore, error)
}

Manager controls all access to BusStores.

func NewManager

func NewManager(eventBus bus.EventBus) Manager

NewManager creates a store manager using the default logger.

func NewManagerWithLogger

func NewManagerWithLogger(eventBus bus.EventBus, logger *slog.Logger) Manager

NewManagerWithLogger creates a store manager using logger, or slog.Default when logger is nil.

type MutationRequest

type MutationRequest struct {
	Request        any
	RequestType    any
	SuccessHandler func(any)
	ErrorHandler   func(any)
}

MutationRequest is delivered to mutation subscribers when a store mutation is requested.

type MutationRequestHandlerFunction

type MutationRequestHandlerFunction func(mutationReq *MutationRequest)

MutationRequestHandlerFunction handles a store mutation request.

type MutationStoreStream

type MutationStoreStream interface {
	// Subscribe to the mutation requests stream.
	Subscribe(handler MutationRequestHandlerFunction) error
	// Unsubscribe from the stream.
	Unsubscribe() error
}

MutationStoreStream subscribes to store mutation requests.

type StoreChange

type StoreChange struct {
	Id             string // the id of the updated item
	Value          any    // the updated value of the item
	State          any    // state associated with this change
	IsDeleteChange bool   // true if the item was removed from the store
	StoreVersion   int64  // the store's version when this change was made
}

StoreChange describes a single store item change.

type StoreChangeHandlerFunction

type StoreChangeHandlerFunction func(change *StoreChange)

StoreChangeHandlerFunction handles committed store changes.

type StoreContentResponse

type StoreContentResponse struct {
	Items        map[string]any `json:"items"`
	ResponseType string         `json:"responseType"` // should be "storeContentResponse"
	StoreId      string         `json:"storeId"`
	StoreVersion int64          `json:"storeVersion"`
}

StoreContentResponse contains a complete store snapshot for sync clients.

func NewStoreContentResponse

func NewStoreContentResponse(
	storeId string, items map[string]any, storeVersion int64) *StoreContentResponse

NewStoreContentResponse creates a complete store snapshot response.

type StoreStream

type StoreStream interface {
	// Subscribe to the store changes stream.
	Subscribe(handler StoreChangeHandlerFunction) error
	// Unsubscribe from the stream.
	Unsubscribe() error
}

StoreStream subscribes to store changes.

type UpdateStoreResponse

type UpdateStoreResponse struct {
	ItemId       string `json:"itemId"`
	NewItemValue any    `json:"newItemValue"`
	ResponseType string `json:"responseType"` // should be "updateStoreResponse"
	StoreId      string `json:"storeId"`
	StoreVersion int64  `json:"storeVersion"`
}

UpdateStoreResponse contains a single item update for sync clients.

func NewUpdateStoreResponse

func NewUpdateStoreResponse(
	storeId string, itemId string, newValue any, storeVersion int64) *UpdateStoreResponse

NewUpdateStoreResponse creates a single item update response.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL