cosmosdbstore

package
v0.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Overview

Package cosmosdbstore is the Azure Cosmos DB (NoSQL API) implementation of protosource.Store, protosource.AggregateStore, and protosource.SnapshotTailStore. It is the cross-cloud counterpart of stores/dynamodbstore — same event sourcing semantics, same single-character attribute names ("a", "v", "d", "t"), same opaquedata-backed aggregates container with 20 GSI slot pairs.

Cosmos differs from DynamoDB in three ways that shape this code:

  1. There is no per-row conditional write. The version-uniqueness guarantee comes from Cosmos's rule that document `id` must be unique within a partition; we set `id = strconv(version)` and use CreateItem semantics (via TransactionalBatch) so a duplicate version returns HTTP 409.
  2. TTL is relative (seconds remaining), not absolute (epoch). The on-wire document stores both: `t` keeps the absolute epoch (so the app-level TTL filter still works) and `ttl` carries Cosmos-native auto-purge.
  3. Transactional batches are partition-local — same as Dynamo's TransactWriteItems requiring a single table — and capped at 100 ops, so the batching logic carries over directly.

Index

Constants

View Source
const (
	DefaultEventsContainer     = "events"
	DefaultAggregatesContainer = "aggregates"
)
View Source
const NumGSIs = 20

NumGSIs is the number of GSI slot pairs projected onto the aggregates container. The opaquedata model preallocates 20 slots; with PAY-PER-REQUEST / serverless billing, empty slots cost nothing.

Variables

ProviderSet provides the Cosmos event store, opaque store, and binds to the protosource interfaces. The consumer must supply both container clients and the typed aliases above.

Functions

func EnsureContainers

func EnsureContainers(ctx context.Context, db *azcosmos.DatabaseClient, eventsContainer, aggregatesContainer string) error

EnsureContainers idempotently creates the events and aggregates containers inside the supplied database. Both containers are created with DefaultTimeToLive = -1 so per-item `ttl` overrides expire records, mirroring the DynamoDB TTL-on-attribute behavior.

Partition keys:

  • events: /a (aggregate ID)
  • aggregates: /pk (opaquedata partition key)

Indexing falls back to Cosmos defaults (index all paths), which is enough for the predicate + single-property ORDER BY queries the framework issues. Composite indexes on (gsiNpk, gsiNsk) can be layered on later if a workload proves it needs them.

func EnsureDatabase

func EnsureDatabase(ctx context.Context, client *azcosmos.Client, databaseID string) (*azcosmos.DatabaseClient, error)

EnsureDatabase idempotently creates the Cosmos database. If it already exists, no action is taken. A 409 Conflict from CreateDatabase is treated as success so concurrent startup/deploys can race safely.

func IsConflict

func IsConflict(err error) bool

IsConflict reports whether err came from a duplicate-version write. Useful for callers that want to distinguish concurrent-writer collisions from transient infra failures. Mirrors errors.Is semantics; works on errors returned by Save.

func ProvideOpaqueStore

func ProvideOpaqueStore(client AggregatesContainerClient) *opaquecosmos.Store

Types

type AggregatesContainerClient

type AggregatesContainerClient cosmosclient.ContainerClient

AggregatesContainerClient is a Wire-typed alias for the cosmosclient targeting the aggregates container.

type CosmosDBStore

type CosmosDBStore struct {
	// contains filtered or unexported fields
}

CosmosDBStore implements the protosource Store, AggregateStore, and SnapshotTailStore interfaces backed by Cosmos DB (NoSQL API).

func New

func New(events cosmosclient.ContainerClient, opts ...Option) (*CosmosDBStore, error)

New creates a new CosmosDBStore. The events client targets the events container; the aggregates container is supplied via WithOpaqueStore as a separately wired OpaqueStore.

func ProvideStore

func ProvideStore(client EventsContainerClient, opaqueStore *opaquecosmos.Store) (*CosmosDBStore, error)

func (*CosmosDBStore) Load

func (s *CosmosDBStore) Load(ctx context.Context, aggregateID string) (*historyv1.History, error)

Load retrieves the full event history for the given aggregate ID in ascending version order. The query is partition-scoped (single partition), the cheapest read pattern Cosmos supports.

func (*CosmosDBStore) LoadTail

func (s *CosmosDBStore) LoadTail(ctx context.Context, aggregateID string, n int) (*historyv1.History, error)

LoadTail returns the last n events for the given aggregate, ordered by version ascending. It queries in descending order with PageSizeHint = n and terminates early once n records are collected — analogous to Dynamo's `ScanIndexForward: false` + `Limit: n` pattern.

If n <= 0, an empty History is returned immediately.

func (*CosmosDBStore) Save

func (s *CosmosDBStore) Save(ctx context.Context, aggregateID string, records ...*recordv1.Record) error

Save stores records for the given aggregate ID. Each batch of up to 100 records is written atomically using a Cosmos transactional batch keyed on the aggregate ID partition. Duplicate versions surface as HTTP 409 conflicts (CreateItem rejects duplicate doc IDs within a partition).

When len(records) exceeds 100, Save splits the work into multiple transactions. Atomicity holds within each batch, not across batches — same semantics as the DynamoDB store.

Saving zero records is a no-op.

func (*CosmosDBStore) SaveAggregate

func (s *CosmosDBStore) SaveAggregate(ctx context.Context, aggregate proto.Message) error

SaveAggregate persists the materialized aggregate state via the OpaqueStore. The aggregate must implement opaquedata.AutoPKSK and an OpaqueStore must be configured via WithOpaqueStore. The aggregates container uses pk/sk keys with 20 GSI slot pairs for query access patterns.

type EventsContainerClient

type EventsContainerClient cosmosclient.ContainerClient

EventsContainerClient is a Wire-typed alias for the cosmosclient targeting the events container. Consumers wire two named clients — one for events, one for aggregates — so the wire graph stays unambiguous.

type Option

type Option func(*CosmosDBStore)

Option configures a CosmosDBStore.

func WithOpaqueStore

func WithOpaqueStore(store opaquedata.OpaqueStore) Option

WithOpaqueStore sets the OpaqueStore used by SaveAggregate to persist materialized aggregates against the aggregates container. All aggregates must implement opaquedata.AutoPKSK to be materialized.

func WithTTL

func WithTTL(ttl time.Duration) Option

WithTTL sets a time-to-live duration for event records. Each saved event includes both `t` (absolute epoch — feeds the app-level TTL query filter) and `ttl` (relative seconds — feeds Cosmos automatic purge). The container must be created with DefaultTimeToLive = -1 (see containers.go) for per-item `ttl` to be honored.

A zero or negative duration disables TTL stamping (the default).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL