store

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2025 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsAlreadyExistsError

func IsAlreadyExistsError(err error) bool

func RunInTx added in v1.1.0

func RunInTx(
	ctx context.Context,
	db *sql.DB,
	opts *sql.TxOptions,
	fn func(ctx context.Context, txQueries *queries.Queries) error,
) error

Types

type AlreadyExistsError

type AlreadyExistsError struct {
	Err error
}

func NewAlreadyExistsError

func NewAlreadyExistsError(err error) *AlreadyExistsError

func (*AlreadyExistsError) Error

func (e *AlreadyExistsError) Error() string

type BackfillGroupMessage added in v1.1.0

type BackfillGroupMessage struct {
	ID       int64
	Data     []byte
	IsCommit bool
}

BackfillGroupMessage represents a single row from the group_message table that requires backfilling of its is_commit field. The Data field is used for validation, and IsCommit is the inferred result populated by the classification step.

type Backfiller added in v1.1.0

type Backfiller interface {
	Run()
	Close()
}

Backfiller defines the interface for a background backfill process. It can be started with Run() and gracefully stopped with Close().

type IdentityUpdate

type IdentityUpdate struct {
	Kind               IdentityUpdateKind
	InstallationKey    []byte
	CredentialIdentity []byte
	TimestampNs        uint64
}

type IdentityUpdateKind

type IdentityUpdateKind int
const (
	Create IdentityUpdateKind = iota
	Revoke
)

type IdentityUpdateList

type IdentityUpdateList []IdentityUpdate

Add the required methods to make a valid sort.Sort interface

func (IdentityUpdateList) Len

func (a IdentityUpdateList) Len() int

func (IdentityUpdateList) Less

func (a IdentityUpdateList) Less(i, j int) bool

func (IdentityUpdateList) Swap

func (a IdentityUpdateList) Swap(i, j int)

type InstallationsBackfiller added in v1.2.0

type InstallationsBackfiller struct {
	// contains filtered or unexported fields
}

InstallationsBackfiller is responsible for: - Appending to key_packages installations that have not been appended yet. - Updating the is_appended status of installations that have not been appended yet.

The backfill process runs in a loop and periodically:

  1. Starts a transaction
  2. Selects up to N unprocessed installations (e.g., where is_appended IS NULL) Skipping locked rows, to guarantee that no two workers process the same row.
  3. Inserts the key package into the key_packages table
  4. Updates the is_appended status of the installation

If no work is available, it sleeps briefly before retrying.

Because of how installations table works, HA is guaranteed: - Rows are inserted into installations via CreateOrUpdateInstallation, which has been modified accordingly to update the is_appended status to true, and to insert the key package into the key_packages table. This guarantees any new or updated installations makes its way into the key_packages table. - Meanwhile, the backfiller works by selecting installations with a NULL is_appended, inserting them into key_packages and updating the is_appended status to true. - InsertKeyPackage guarantees that on conflict with (installation_id, key_package) nothing happens. - UpdateIsAppendedStatus guarantees that the is_appended status is updated to true.

Important! Final ordering in key_packages is not needed, as this design guarantees that the newest key package is always inserted in last position in key_packages.

func NewInstallationsBackfiller added in v1.2.0

func NewInstallationsBackfiller(ctx context.Context, db *sql.DB,
	log *zap.Logger,
) *InstallationsBackfiller

func (*InstallationsBackfiller) Close added in v1.2.0

func (b *InstallationsBackfiller) Close()

func (*InstallationsBackfiller) Run added in v1.2.0

func (b *InstallationsBackfiller) Run()

type IsCommitBackfiller added in v1.1.0

type IsCommitBackfiller struct {
	DB *sql.DB

	WG sync.WaitGroup
	// contains filtered or unexported fields
}

IsCommitBackfiller is responsible for populating the `is_commit` column in the `group_message` table. This field is inferred from message data using a validation service.

In high-availability (HA) deployments where multiple instances of this service may be running concurrently, the backfiller is designed to be safe and non-conflicting. It relies on transactional locking and SKIP LOCKED semantics (or a manual locking scheme) to ensure that no two workers process the same rows. This avoids duplicate work and race conditions without requiring external coordination.

The backfill process runs in a loop and periodically:

  1. Starts a transaction
  2. Selects up to N unprocessed group messages (e.g., where is_commit IS NULL)
  3. Uses the MLSValidationService to classify each message
  4. Updates the corresponding rows with the is_commit result

If no work is available, it sleeps briefly before retrying.

func NewIsCommitBackfiller added in v1.1.0

func NewIsCommitBackfiller(ctx context.Context, db *sql.DB,
	log *zap.Logger, validationService mlsvalidate.MLSValidationService,
) *IsCommitBackfiller

func (*IsCommitBackfiller) Close added in v1.1.0

func (b *IsCommitBackfiller) Close()

func (*IsCommitBackfiller) Run added in v1.1.0

func (b *IsCommitBackfiller) Run()

Run orchestrates the pipeline

type ReadMlsStore added in v1.1.0

type ReadMlsStore interface {
	GetInboxLogs(
		ctx context.Context,
		req *identity.GetIdentityUpdatesRequest,
	) (*identity.GetIdentityUpdatesResponse, error)
	GetInboxIds(
		ctx context.Context,
		req *identity.GetInboxIdsRequest,
	) (*identity.GetInboxIdsResponse, error)
	FetchKeyPackages(
		ctx context.Context,
		installationIds [][]byte,
	) ([]queries.FetchKeyPackagesRow, error)
	QueryGroupMessagesV1(
		ctx context.Context,
		query *mlsv1.QueryGroupMessagesRequest,
	) (*mlsv1.QueryGroupMessagesResponse, error)
	QueryWelcomeMessagesV1(
		ctx context.Context,
		query *mlsv1.QueryWelcomeMessagesRequest,
	) (*mlsv1.QueryWelcomeMessagesResponse, error)
	QueryCommitLog(
		ctx context.Context,
		query *mlsv1.QueryCommitLogRequest,
	) (*mlsv1.QueryCommitLogResponse, error)
	GetNewestGroupMessage(
		ctx context.Context,
		groupIds [][]byte,
	) ([]*queries.GetNewestGroupMessageRow, error)
	GetNewestGroupMessageMetadata(
		ctx context.Context,
		groupIds [][]byte,
	) ([]*queries.GetNewestGroupMessageMetadataRow, error)
	Queries() *queries.Queries
}

The ReadMlsStore interface is used for all queries, and will work whether connected to the primary DB or a read replica

type ReadStore added in v1.1.0

type ReadStore struct {
	// contains filtered or unexported fields
}

func NewReadStore added in v1.1.0

func NewReadStore(log *zap.Logger, db *bun.DB) *ReadStore

func (*ReadStore) FetchKeyPackages added in v1.1.0

func (s *ReadStore) FetchKeyPackages(
	ctx context.Context,
	installationIds [][]byte,
) ([]queries.FetchKeyPackagesRow, error)

func (*ReadStore) GetInboxIds added in v1.1.0

func (*ReadStore) GetInboxLogs added in v1.1.0

func (*ReadStore) GetNewestGroupMessage added in v1.3.0

func (s *ReadStore) GetNewestGroupMessage(
	ctx context.Context,
	groupIds [][]byte,
) ([]*queries.GetNewestGroupMessageRow, error)

Returns an array of the same length as the input groupIds in the original order. GroupIDs with no messages are represented by a nil value

func (*ReadStore) GetNewestGroupMessageMetadata added in v1.3.0

func (s *ReadStore) GetNewestGroupMessageMetadata(
	ctx context.Context,
	groupIds [][]byte,
) ([]*queries.GetNewestGroupMessageMetadataRow, error)

func (*ReadStore) Queries added in v1.1.0

func (s *ReadStore) Queries() *queries.Queries

func (*ReadStore) QueryCommitLog added in v1.1.0

func (*ReadStore) QueryGroupMessagesV1 added in v1.1.0

func (*ReadStore) QueryWelcomeMessagesV1 added in v1.1.0

type ReadWriteMlsStore added in v1.1.0

type ReadWriteMlsStore interface {
	ReadMlsStore
	CreateOrUpdateInstallation(ctx context.Context, installationId []byte, keyPackage []byte) error
	PublishIdentityUpdate(
		ctx context.Context,
		req *identity.PublishIdentityUpdateRequest,
		validationService mlsvalidate.MLSValidationService,
	) (*identity.PublishIdentityUpdateResponse, error)
	InsertGroupMessage(
		ctx context.Context,
		groupId []byte,
		data []byte,
		senderHmac []byte,
		shouldPush bool,
		isCommit bool,
	) (*queries.GroupMessage, error)
	InsertWelcomeMessage(
		ctx context.Context,
		installationId []byte,
		data []byte,
		hpkePublicKey []byte,
		algorithm types.WrapperAlgorithm,
		welcomeMetadata []byte,
	) (*queries.WelcomeMessage, error)
	InsertWelcomePointerMessage(
		ctx context.Context,
		installationKey []byte,
		welcomePointerData []byte,
		hpkePublicKey []byte,
		wrapperAlgorithm types.WrapperAlgorithm,
	) (*queries.WelcomeMessage, error)
	InsertCommitLog(
		ctx context.Context,
		groupId []byte,
		serialized_entry []byte,
		serialized_signature []byte,
	) (queries.CommitLogV2, error)
}

The ReadWriteMlsStore is a superset of the ReadMlsStore interface and is used for all writes. It must be connected to the primary DB, and will use the primary for both reads and writes.

type Store

type Store struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, log *zap.Logger, db *bun.DB) (*Store, error)

func (*Store) CreateOrUpdateInstallation

func (s *Store) CreateOrUpdateInstallation(
	ctx context.Context,
	installationId []byte,
	keyPackage []byte,
) error

Creates the installation and last resort key package

func (*Store) FetchKeyPackages

func (s *Store) FetchKeyPackages(
	ctx context.Context,
	installationIds [][]byte,
) ([]queries.FetchKeyPackagesRow, error)

func (*Store) GetInboxIds

func (*Store) GetInboxLogs

func (*Store) GetNewestGroupMessage added in v1.3.0

func (s *Store) GetNewestGroupMessage(
	ctx context.Context,
	groupIds [][]byte,
) ([]*queries.GetNewestGroupMessageRow, error)

func (*Store) GetNewestGroupMessageMetadata added in v1.3.0

func (s *Store) GetNewestGroupMessageMetadata(
	ctx context.Context,
	groupIds [][]byte,
) ([]*queries.GetNewestGroupMessageMetadataRow, error)

func (*Store) InsertCommitLog added in v1.0.1

func (s *Store) InsertCommitLog(
	ctx context.Context,
	groupId []byte,
	serialized_entry []byte,
	serialized_signature []byte,
) (queries.CommitLogV2, error)

func (*Store) InsertGroupMessage

func (s *Store) InsertGroupMessage(
	ctx context.Context,
	groupId []byte,
	data []byte,
	senderHmac []byte,
	shouldPush bool,
	isCommit bool,
) (*queries.GroupMessage, error)

func (*Store) InsertWelcomeMessage

func (s *Store) InsertWelcomeMessage(
	ctx context.Context,
	installationId []byte,
	data []byte,
	hpkePublicKey []byte,
	wrapperAlgorithm types.WrapperAlgorithm,
	welcomeMetadata []byte,
) (*queries.WelcomeMessage, error)

func (*Store) InsertWelcomePointerMessage added in v1.3.0

func (s *Store) InsertWelcomePointerMessage(
	ctx context.Context,
	installationKey []byte,
	welcomePointerData []byte,
	hpkePublicKey []byte,
	wrapperAlgorithm types.WrapperAlgorithm,
) (*queries.WelcomeMessage, error)

func (*Store) Queries added in v1.1.0

func (s *Store) Queries() *queries.Queries

func (*Store) QueryCommitLog added in v1.0.1

func (s *Store) QueryCommitLog(
	ctx context.Context,
	query *mlsv1.QueryCommitLogRequest,
) (*mlsv1.QueryCommitLogResponse, error)

func (*Store) QueryGroupMessagesV1

func (s *Store) QueryGroupMessagesV1(
	ctx context.Context,
	req *mlsv1.QueryGroupMessagesRequest,
) (*mlsv1.QueryGroupMessagesResponse, error)

func (*Store) QueryWelcomeMessagesV1

func (s *Store) QueryWelcomeMessagesV1(
	ctx context.Context,
	req *mlsv1.QueryWelcomeMessagesRequest,
) (*mlsv1.QueryWelcomeMessagesResponse, error)

func (*Store) RunInRepeatableReadTx

func (s *Store) RunInRepeatableReadTx(
	ctx context.Context,
	numRetries int,
	fn func(ctx context.Context, txQueries *queries.Queries) error,
) error

type StoreOptions

type StoreOptions struct {
	DbConnectionString       string        `long:"db-connection-string"        description:"Connection string for MLS DB"`
	DbReaderConnectionString string        `` /* 141-byte string literal not displayed */
	ReadTimeout              time.Duration `` /* 144-byte string literal not displayed */
	WriteTimeout             time.Duration `` /* 144-byte string literal not displayed */
	MaxOpenConns             int           `` /* 143-byte string literal not displayed */
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL