store

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2025 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsAlreadyExistsError

func IsAlreadyExistsError(err error) bool

func RunInTx added in v1.1.0

func RunInTx(
	ctx context.Context,
	db *sql.DB,
	opts *sql.TxOptions,
	fn func(ctx context.Context, txQueries *queries.Queries) error,
) error

Types

type AlreadyExistsError

type AlreadyExistsError struct {
	Err error
}

func NewAlreadyExistsError

func NewAlreadyExistsError(err error) *AlreadyExistsError

func (*AlreadyExistsError) Error

func (e *AlreadyExistsError) Error() string

type BackfillGroupMessage added in v1.1.0

type BackfillGroupMessage struct {
	ID       int64
	Data     []byte
	IsCommit bool
}

BackfillGroupMessage represents a single row from the group_message table that requires backfilling of its is_commit field. The Data field is used for validation, and IsCommit is the inferred result populated by the classification step.

type Backfiller added in v1.1.0

type Backfiller interface {
	Run()
	Close()
}

Backfiller defines the interface for a background backfill process. It can be started with Run() and gracefully stopped with Close().

type IdentityUpdate

type IdentityUpdate struct {
	Kind               IdentityUpdateKind
	InstallationKey    []byte
	CredentialIdentity []byte
	TimestampNs        uint64
}

type IdentityUpdateKind

type IdentityUpdateKind int
const (
	Create IdentityUpdateKind = iota
	Revoke
)

type IdentityUpdateList

type IdentityUpdateList []IdentityUpdate

Add the required methods to make a valid sort.Sort interface

func (IdentityUpdateList) Len

func (a IdentityUpdateList) Len() int

func (IdentityUpdateList) Less

func (a IdentityUpdateList) Less(i, j int) bool

func (IdentityUpdateList) Swap

func (a IdentityUpdateList) Swap(i, j int)

type IsCommitBackfiller added in v1.1.0

type IsCommitBackfiller struct {
	DB *sql.DB

	WG sync.WaitGroup
	// contains filtered or unexported fields
}

IsCommitBackfiller is responsible for populating the `is_commit` column in the `group_message` table. This field is inferred from message data using a validation service.

In high-availability (HA) deployments where multiple instances of this service may be running concurrently, the backfiller is designed to be safe and non-conflicting. It relies on transactional locking and SKIP LOCKED semantics (or a manual locking scheme) to ensure that no two workers process the same rows. This avoids duplicate work and race conditions without requiring external coordination.

The backfill process runs in a loop and periodically:

  1. Starts a transaction
  2. Selects up to N unprocessed group messages (e.g., where is_commit IS NULL)
  3. Uses the MLSValidationService to classify each message
  4. Updates the corresponding rows with the is_commit result

If no work is available, it sleeps briefly before retrying.

func NewIsCommitBackfiller added in v1.1.0

func NewIsCommitBackfiller(ctx context.Context, db *sql.DB,
	log *zap.Logger, validationService mlsvalidate.MLSValidationService,
) *IsCommitBackfiller

func (*IsCommitBackfiller) Close added in v1.1.0

func (b *IsCommitBackfiller) Close()

func (*IsCommitBackfiller) Run added in v1.1.0

func (b *IsCommitBackfiller) Run()

Run orchestrates the pipeline

type ReadMlsStore added in v1.1.0

type ReadMlsStore interface {
	GetInboxLogs(
		ctx context.Context,
		req *identity.GetIdentityUpdatesRequest,
	) (*identity.GetIdentityUpdatesResponse, error)
	GetInboxIds(
		ctx context.Context,
		req *identity.GetInboxIdsRequest,
	) (*identity.GetInboxIdsResponse, error)
	FetchKeyPackages(
		ctx context.Context,
		installationIds [][]byte,
	) ([]queries.FetchKeyPackagesRow, error)
	QueryGroupMessagesV1(
		ctx context.Context,
		query *mlsv1.QueryGroupMessagesRequest,
	) (*mlsv1.QueryGroupMessagesResponse, error)
	QueryWelcomeMessagesV1(
		ctx context.Context,
		query *mlsv1.QueryWelcomeMessagesRequest,
	) (*mlsv1.QueryWelcomeMessagesResponse, error)
	QueryCommitLog(
		ctx context.Context,
		query *mlsv1.QueryCommitLogRequest,
	) (*mlsv1.QueryCommitLogResponse, error)
	Queries() *queries.Queries
}

The ReadMlsStore interface is used for all queries, and will work whether connected to the primary DB or a read replica

type ReadStore added in v1.1.0

type ReadStore struct {
	// contains filtered or unexported fields
}

func NewReadStore added in v1.1.0

func NewReadStore(log *zap.Logger, db *bun.DB) *ReadStore

func (*ReadStore) FetchKeyPackages added in v1.1.0

func (s *ReadStore) FetchKeyPackages(
	ctx context.Context,
	installationIds [][]byte,
) ([]queries.FetchKeyPackagesRow, error)

func (*ReadStore) GetInboxIds added in v1.1.0

func (*ReadStore) GetInboxLogs added in v1.1.0

func (*ReadStore) Queries added in v1.1.0

func (s *ReadStore) Queries() *queries.Queries

func (*ReadStore) QueryCommitLog added in v1.1.0

func (*ReadStore) QueryGroupMessagesV1 added in v1.1.0

func (*ReadStore) QueryWelcomeMessagesV1 added in v1.1.0

type ReadWriteMlsStore added in v1.1.0

type ReadWriteMlsStore interface {
	ReadMlsStore
	CreateOrUpdateInstallation(ctx context.Context, installationId []byte, keyPackage []byte) error
	PublishIdentityUpdate(
		ctx context.Context,
		req *identity.PublishIdentityUpdateRequest,
		validationService mlsvalidate.MLSValidationService,
	) (*identity.PublishIdentityUpdateResponse, error)
	InsertGroupMessage(
		ctx context.Context,
		groupId []byte,
		data []byte,
		senderHmac []byte,
		shouldPush bool,
		isCommit bool,
	) (*queries.GroupMessage, error)
	InsertWelcomeMessage(
		ctx context.Context,
		installationId []byte,
		data []byte,
		hpkePublicKey []byte,
		algorithm types.WrapperAlgorithm,
		welcomeMetadata []byte,
	) (*queries.WelcomeMessage, error)
	InsertCommitLog(
		ctx context.Context,
		groupId []byte,
		encrypted_entry []byte,
	) (queries.CommitLog, error)
}

The ReadWriteMlsStore is a superset of the ReadMlsStore interface and is used for all writes. It must be connected to the primary DB, and will use the primary for both reads and writes.

type Store

type Store struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, log *zap.Logger, db *bun.DB) (*Store, error)

func (*Store) CreateOrUpdateInstallation

func (s *Store) CreateOrUpdateInstallation(
	ctx context.Context,
	installationId []byte,
	keyPackage []byte,
) error

Creates the installation and last resort key package

func (*Store) FetchKeyPackages

func (s *Store) FetchKeyPackages(
	ctx context.Context,
	installationIds [][]byte,
) ([]queries.FetchKeyPackagesRow, error)

func (*Store) GetInboxIds

func (*Store) GetInboxLogs

func (*Store) InsertCommitLog added in v1.0.1

func (s *Store) InsertCommitLog(
	ctx context.Context,
	groupId []byte,
	encrypted_entry []byte,
) (queries.CommitLog, error)

func (*Store) InsertGroupMessage

func (s *Store) InsertGroupMessage(
	ctx context.Context,
	groupId []byte,
	data []byte,
	senderHmac []byte,
	shouldPush bool,
	isCommit bool,
) (*queries.GroupMessage, error)

func (*Store) InsertWelcomeMessage

func (s *Store) InsertWelcomeMessage(
	ctx context.Context,
	installationId []byte,
	data []byte,
	hpkePublicKey []byte,
	wrapperAlgorithm types.WrapperAlgorithm,
	welcomeMetadata []byte,
) (*queries.WelcomeMessage, error)

func (*Store) Queries added in v1.1.0

func (s *Store) Queries() *queries.Queries

func (*Store) QueryCommitLog added in v1.0.1

func (s *Store) QueryCommitLog(
	ctx context.Context,
	query *mlsv1.QueryCommitLogRequest,
) (*mlsv1.QueryCommitLogResponse, error)

func (*Store) QueryGroupMessagesV1

func (s *Store) QueryGroupMessagesV1(
	ctx context.Context,
	req *mlsv1.QueryGroupMessagesRequest,
) (*mlsv1.QueryGroupMessagesResponse, error)

func (*Store) QueryWelcomeMessagesV1

func (s *Store) QueryWelcomeMessagesV1(
	ctx context.Context,
	req *mlsv1.QueryWelcomeMessagesRequest,
) (*mlsv1.QueryWelcomeMessagesResponse, error)

func (*Store) RunInRepeatableReadTx

func (s *Store) RunInRepeatableReadTx(
	ctx context.Context,
	numRetries int,
	fn func(ctx context.Context, txQueries *queries.Queries) error,
) error

type StoreOptions

type StoreOptions struct {
	DbConnectionString       string        `long:"db-connection-string"        description:"Connection string for MLS DB"`
	DbReaderConnectionString string        `` /* 141-byte string literal not displayed */
	ReadTimeout              time.Duration `` /* 144-byte string literal not displayed */
	WriteTimeout             time.Duration `` /* 144-byte string literal not displayed */
	MaxOpenConns             int           `` /* 143-byte string literal not displayed */
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL