Documentation
¶
Overview ¶
Package cosmosdbstore is the Azure Cosmos DB (NoSQL API) implementation of protosource.Store, protosource.AggregateStore, and protosource.SnapshotTailStore. It is the cross-cloud counterpart of stores/dynamodbstore — same event sourcing semantics, same single-character attribute names ("a", "v", "d", "t"), same opaquedata-backed aggregates container with 20 GSI slot pairs.
Cosmos differs from DynamoDB in three ways that shape this code:
- There is no per-row conditional write. The version-uniqueness guarantee comes from Cosmos's rule that document `id` must be unique within a partition; we set `id = strconv(version)` and use CreateItem semantics (via TransactionalBatch) so a duplicate version returns HTTP 409.
- TTL is relative (seconds remaining), not absolute (epoch). The on-wire document stores both: `t` keeps the absolute epoch (so the app-level TTL filter still works) and `ttl` carries Cosmos-native auto-purge.
- Transactional batches are partition-local — same as Dynamo's TransactWriteItems requiring a single table — and capped at 100 ops, so the batching logic carries over directly.
Index ¶
- Constants
- Variables
- func EnsureContainers(ctx context.Context, db *azcosmos.DatabaseClient, ...) error
- func EnsureDatabase(ctx context.Context, client *azcosmos.Client, databaseID string) (*azcosmos.DatabaseClient, error)
- func IsConflict(err error) bool
- func ProvideOpaqueStore(client AggregatesContainerClient) *opaquecosmos.Store
- type AggregatesContainerClient
- type CosmosDBStore
- func (s *CosmosDBStore) Load(ctx context.Context, aggregateID string) (*historyv1.History, error)
- func (s *CosmosDBStore) LoadTail(ctx context.Context, aggregateID string, n int) (*historyv1.History, error)
- func (s *CosmosDBStore) Save(ctx context.Context, aggregateID string, records ...*recordv1.Record) error
- func (s *CosmosDBStore) SaveAggregate(ctx context.Context, aggregate proto.Message) error
- type EventsContainerClient
- type Option
Constants ¶
const ( DefaultEventsContainer = "events" DefaultAggregatesContainer = "aggregates" )
const NumGSIs = 20
NumGSIs is the number of GSI slot pairs projected onto the aggregates container. The opaquedata model preallocates 20 slots; with PAY-PER-REQUEST / serverless billing, empty slots cost nothing.
Variables ¶
var ProviderSet = wire.NewSet( ProvideOpaqueStore, ProvideStore, wire.Bind(new(protosource.Store), new(*CosmosDBStore)), wire.Bind(new(protosource.AggregateStore), new(*CosmosDBStore)), )
ProviderSet provides the Cosmos event store, opaque store, and binds to the protosource interfaces. The consumer must supply both container clients and the typed aliases above.
Functions ¶
func EnsureContainers ¶
func EnsureContainers(ctx context.Context, db *azcosmos.DatabaseClient, eventsContainer, aggregatesContainer string) error
EnsureContainers idempotently creates the events and aggregates containers inside the supplied database. Both containers are created with DefaultTimeToLive = -1 so per-item `ttl` overrides expire records, mirroring the DynamoDB TTL-on-attribute behavior.
Partition keys:
- events: /a (aggregate ID)
- aggregates: /pk (opaquedata partition key)
Indexing falls back to Cosmos defaults (index all paths), which is enough for the predicate + single-property ORDER BY queries the framework issues. Composite indexes on (gsiNpk, gsiNsk) can be layered on later if a workload proves it needs them.
func EnsureDatabase ¶
func EnsureDatabase(ctx context.Context, client *azcosmos.Client, databaseID string) (*azcosmos.DatabaseClient, error)
EnsureDatabase idempotently creates the Cosmos database. If it already exists, no action is taken. A 409 Conflict from CreateDatabase is treated as success so concurrent startup/deploys can race safely.
func IsConflict ¶
IsConflict reports whether err came from a duplicate-version write. Useful for callers that want to distinguish concurrent-writer collisions from transient infra failures. Mirrors errors.Is semantics; works on errors returned by Save.
func ProvideOpaqueStore ¶
func ProvideOpaqueStore(client AggregatesContainerClient) *opaquecosmos.Store
Types ¶
type AggregatesContainerClient ¶
type AggregatesContainerClient cosmosclient.ContainerClient
AggregatesContainerClient is a Wire-typed alias for the cosmosclient targeting the aggregates container.
type CosmosDBStore ¶
type CosmosDBStore struct {
// contains filtered or unexported fields
}
CosmosDBStore implements the protosource Store, AggregateStore, and SnapshotTailStore interfaces backed by Cosmos DB (NoSQL API).
func New ¶
func New(events cosmosclient.ContainerClient, opts ...Option) (*CosmosDBStore, error)
New creates a new CosmosDBStore. The events client targets the events container; the aggregates container is supplied via WithOpaqueStore as a separately wired OpaqueStore.
func ProvideStore ¶
func ProvideStore(client EventsContainerClient, opaqueStore *opaquecosmos.Store) (*CosmosDBStore, error)
func (*CosmosDBStore) Load ¶
Load retrieves the full event history for the given aggregate ID in ascending version order. The query is partition-scoped (single partition), the cheapest read pattern Cosmos supports.
func (*CosmosDBStore) LoadTail ¶
func (s *CosmosDBStore) LoadTail(ctx context.Context, aggregateID string, n int) (*historyv1.History, error)
LoadTail returns the last n events for the given aggregate, ordered by version ascending. It queries in descending order with PageSizeHint = n and terminates early once n records are collected — analogous to Dynamo's `ScanIndexForward: false` + `Limit: n` pattern.
If n <= 0, an empty History is returned immediately.
func (*CosmosDBStore) Save ¶
func (s *CosmosDBStore) Save(ctx context.Context, aggregateID string, records ...*recordv1.Record) error
Save stores records for the given aggregate ID. Each batch of up to 100 records is written atomically using a Cosmos transactional batch keyed on the aggregate ID partition. Duplicate versions surface as HTTP 409 conflicts (CreateItem rejects duplicate doc IDs within a partition).
When len(records) exceeds 100, Save splits the work into multiple transactions. Atomicity holds within each batch, not across batches — same semantics as the DynamoDB store.
Saving zero records is a no-op.
func (*CosmosDBStore) SaveAggregate ¶
SaveAggregate persists the materialized aggregate state via the OpaqueStore. The aggregate must implement opaquedata.AutoPKSK and an OpaqueStore must be configured via WithOpaqueStore. The aggregates container uses pk/sk keys with 20 GSI slot pairs for query access patterns.
type EventsContainerClient ¶
type EventsContainerClient cosmosclient.ContainerClient
EventsContainerClient is a Wire-typed alias for the cosmosclient targeting the events container. Consumers wire two named clients — one for events, one for aggregates — so the wire graph stays unambiguous.
type Option ¶
type Option func(*CosmosDBStore)
Option configures a CosmosDBStore.
func WithOpaqueStore ¶
func WithOpaqueStore(store opaquedata.OpaqueStore) Option
WithOpaqueStore sets the OpaqueStore used by SaveAggregate to persist materialized aggregates against the aggregates container. All aggregates must implement opaquedata.AutoPKSK to be materialized.
func WithTTL ¶
WithTTL sets a time-to-live duration for event records. Each saved event includes both `t` (absolute epoch — feeds the app-level TTL query filter) and `ttl` (relative seconds — feeds Cosmos automatic purge). The container must be created with DefaultTimeToLive = -1 (see containers.go) for per-item `ttl` to be honored.
A zero or negative duration disables TTL stamping (the default).