Documentation
¶
Index ¶
- func IsAlreadyExistsError(err error) bool
- func RunInTx(ctx context.Context, db *sql.DB, opts *sql.TxOptions, ...) error
- type AlreadyExistsError
- type BackfillGroupMessage
- type Backfiller
- type IdentityUpdate
- type IdentityUpdateKind
- type IdentityUpdateList
- type InstallationsBackfiller
- type IsCommitBackfiller
- type ReadMlsStore
- type ReadStore
- func (s *ReadStore) FetchKeyPackages(ctx context.Context, installationIds [][]byte) ([]queries.FetchKeyPackagesRow, error)
- func (s *ReadStore) GetInboxIds(ctx context.Context, req *identity.GetInboxIdsRequest) (*identity.GetInboxIdsResponse, error)
- func (s *ReadStore) GetInboxLogs(ctx context.Context, batched_req *identity.GetIdentityUpdatesRequest) (*identity.GetIdentityUpdatesResponse, error)
- func (s *ReadStore) GetNewestGroupMessage(ctx context.Context, groupIds [][]byte) ([]*queries.GetNewestGroupMessageRow, error)
- func (s *ReadStore) GetNewestGroupMessageMetadata(ctx context.Context, groupIds [][]byte) ([]*queries.GetNewestGroupMessageMetadataRow, error)
- func (s *ReadStore) Queries() *queries.Queries
- func (s *ReadStore) QueryCommitLog(ctx context.Context, req *mlsv1.QueryCommitLogRequest) (*mlsv1.QueryCommitLogResponse, error)
- func (s *ReadStore) QueryGroupMessagesV1(ctx context.Context, req *mlsv1.QueryGroupMessagesRequest) (*mlsv1.QueryGroupMessagesResponse, error)
- func (s *ReadStore) QueryWelcomeMessagesV1(ctx context.Context, req *mlsv1.QueryWelcomeMessagesRequest) (*mlsv1.QueryWelcomeMessagesResponse, error)
- type ReadWriteMlsStore
- type Store
- func (s *Store) CreateOrUpdateInstallation(ctx context.Context, installationId []byte, keyPackage []byte) error
- func (s *Store) FetchKeyPackages(ctx context.Context, installationIds [][]byte) ([]queries.FetchKeyPackagesRow, error)
- func (s *Store) GetInboxIds(ctx context.Context, req *identity.GetInboxIdsRequest) (*identity.GetInboxIdsResponse, error)
- func (s *Store) GetInboxLogs(ctx context.Context, batched_req *identity.GetIdentityUpdatesRequest) (*identity.GetIdentityUpdatesResponse, error)
- func (s *Store) GetNewestGroupMessage(ctx context.Context, groupIds [][]byte) ([]*queries.GetNewestGroupMessageRow, error)
- func (s *Store) GetNewestGroupMessageMetadata(ctx context.Context, groupIds [][]byte) ([]*queries.GetNewestGroupMessageMetadataRow, error)
- func (s *Store) InsertCommitLog(ctx context.Context, groupId []byte, serialized_entry []byte, ...) (queries.CommitLogV2, error)
- func (s *Store) InsertGroupMessage(ctx context.Context, groupId []byte, data []byte, senderHmac []byte, ...) (*queries.GroupMessage, error)
- func (s *Store) InsertWelcomeMessage(ctx context.Context, installationId []byte, data []byte, hpkePublicKey []byte, ...) (*queries.WelcomeMessage, error)
- func (s *Store) InsertWelcomePointerMessage(ctx context.Context, installationKey []byte, welcomePointerData []byte, ...) (*queries.WelcomeMessage, error)
- func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest, ...) (*identity.PublishIdentityUpdateResponse, error)
- func (s *Store) Queries() *queries.Queries
- func (s *Store) QueryCommitLog(ctx context.Context, query *mlsv1.QueryCommitLogRequest) (*mlsv1.QueryCommitLogResponse, error)
- func (s *Store) QueryGroupMessagesV1(ctx context.Context, req *mlsv1.QueryGroupMessagesRequest) (*mlsv1.QueryGroupMessagesResponse, error)
- func (s *Store) QueryWelcomeMessagesV1(ctx context.Context, req *mlsv1.QueryWelcomeMessagesRequest) (*mlsv1.QueryWelcomeMessagesResponse, error)
- func (s *Store) RunInRepeatableReadTx(ctx context.Context, numRetries int, ...) error
- type StoreOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsAlreadyExistsError ¶
Types ¶
type AlreadyExistsError ¶
type AlreadyExistsError struct {
Err error
}
func NewAlreadyExistsError ¶
func NewAlreadyExistsError(err error) *AlreadyExistsError
func (*AlreadyExistsError) Error ¶
func (e *AlreadyExistsError) Error() string
type BackfillGroupMessage ¶ added in v1.1.0
BackfillGroupMessage represents a single row from the group_message table that requires backfilling of its is_commit field. The Data field is used for validation, and IsCommit is the inferred result populated by the classification step.
type Backfiller ¶ added in v1.1.0
type Backfiller interface {
Run()
Close()
}
Backfiller defines the interface for a background backfill process. It can be started with Run() and gracefully stopped with Close().
type IdentityUpdate ¶
type IdentityUpdate struct {
Kind IdentityUpdateKind
InstallationKey []byte
CredentialIdentity []byte
TimestampNs uint64
}
type IdentityUpdateKind ¶
type IdentityUpdateKind int
const ( Create IdentityUpdateKind = iota Revoke )
type IdentityUpdateList ¶
type IdentityUpdateList []IdentityUpdate
Add the required methods to make a valid sort.Sort interface
func (IdentityUpdateList) Len ¶
func (a IdentityUpdateList) Len() int
func (IdentityUpdateList) Less ¶
func (a IdentityUpdateList) Less(i, j int) bool
func (IdentityUpdateList) Swap ¶
func (a IdentityUpdateList) Swap(i, j int)
type InstallationsBackfiller ¶ added in v1.2.0
type InstallationsBackfiller struct {
// contains filtered or unexported fields
}
InstallationsBackfiller is responsible for: - Appending to key_packages installations that have not been appended yet. - Updating the is_appended status of installations that have not been appended yet.
The backfill process runs in a loop and periodically:
- Starts a transaction
- Selects up to N unprocessed installations (e.g., where is_appended IS NULL) Skipping locked rows, to guarantee that no two workers process the same row.
- Inserts the key package into the key_packages table
- Updates the is_appended status of the installation
If no work is available, it sleeps briefly before retrying.
Because of how installations table works, HA is guaranteed: - Rows are inserted into installations via CreateOrUpdateInstallation, which has been modified accordingly to update the is_appended status to true, and to insert the key package into the key_packages table. This guarantees any new or updated installations makes its way into the key_packages table. - Meanwhile, the backfiller works by selecting installations with a NULL is_appended, inserting them into key_packages and updating the is_appended status to true. - InsertKeyPackage guarantees that on conflict with (installation_id, key_package) nothing happens. - UpdateIsAppendedStatus guarantees that the is_appended status is updated to true.
Important! Final ordering in key_packages is not needed, as this design guarantees that the newest key package is always inserted in last position in key_packages.
func NewInstallationsBackfiller ¶ added in v1.2.0
func (*InstallationsBackfiller) Close ¶ added in v1.2.0
func (b *InstallationsBackfiller) Close()
func (*InstallationsBackfiller) Run ¶ added in v1.2.0
func (b *InstallationsBackfiller) Run()
type IsCommitBackfiller ¶ added in v1.1.0
type IsCommitBackfiller struct {
DB *sql.DB
WG sync.WaitGroup
// contains filtered or unexported fields
}
IsCommitBackfiller is responsible for populating the `is_commit` column in the `group_message` table. This field is inferred from message data using a validation service.
In high-availability (HA) deployments where multiple instances of this service may be running concurrently, the backfiller is designed to be safe and non-conflicting. It relies on transactional locking and SKIP LOCKED semantics (or a manual locking scheme) to ensure that no two workers process the same rows. This avoids duplicate work and race conditions without requiring external coordination.
The backfill process runs in a loop and periodically:
- Starts a transaction
- Selects up to N unprocessed group messages (e.g., where is_commit IS NULL)
- Uses the MLSValidationService to classify each message
- Updates the corresponding rows with the is_commit result
If no work is available, it sleeps briefly before retrying.
func NewIsCommitBackfiller ¶ added in v1.1.0
func NewIsCommitBackfiller(ctx context.Context, db *sql.DB, log *zap.Logger, validationService mlsvalidate.MLSValidationService, ) *IsCommitBackfiller
func (*IsCommitBackfiller) Close ¶ added in v1.1.0
func (b *IsCommitBackfiller) Close()
func (*IsCommitBackfiller) Run ¶ added in v1.1.0
func (b *IsCommitBackfiller) Run()
Run orchestrates the pipeline
type ReadMlsStore ¶ added in v1.1.0
type ReadMlsStore interface {
GetInboxLogs(
ctx context.Context,
req *identity.GetIdentityUpdatesRequest,
) (*identity.GetIdentityUpdatesResponse, error)
GetInboxIds(
ctx context.Context,
req *identity.GetInboxIdsRequest,
) (*identity.GetInboxIdsResponse, error)
FetchKeyPackages(
ctx context.Context,
installationIds [][]byte,
) ([]queries.FetchKeyPackagesRow, error)
QueryGroupMessagesV1(
ctx context.Context,
query *mlsv1.QueryGroupMessagesRequest,
) (*mlsv1.QueryGroupMessagesResponse, error)
QueryWelcomeMessagesV1(
ctx context.Context,
query *mlsv1.QueryWelcomeMessagesRequest,
) (*mlsv1.QueryWelcomeMessagesResponse, error)
QueryCommitLog(
ctx context.Context,
query *mlsv1.QueryCommitLogRequest,
) (*mlsv1.QueryCommitLogResponse, error)
GetNewestGroupMessage(
ctx context.Context,
groupIds [][]byte,
) ([]*queries.GetNewestGroupMessageRow, error)
GetNewestGroupMessageMetadata(
ctx context.Context,
groupIds [][]byte,
) ([]*queries.GetNewestGroupMessageMetadataRow, error)
Queries() *queries.Queries
}
The ReadMlsStore interface is used for all queries, and will work whether connected to the primary DB or a read replica
type ReadStore ¶ added in v1.1.0
type ReadStore struct {
// contains filtered or unexported fields
}
func (*ReadStore) FetchKeyPackages ¶ added in v1.1.0
func (*ReadStore) GetInboxIds ¶ added in v1.1.0
func (s *ReadStore) GetInboxIds( ctx context.Context, req *identity.GetInboxIdsRequest, ) (*identity.GetInboxIdsResponse, error)
func (*ReadStore) GetInboxLogs ¶ added in v1.1.0
func (s *ReadStore) GetInboxLogs( ctx context.Context, batched_req *identity.GetIdentityUpdatesRequest, ) (*identity.GetIdentityUpdatesResponse, error)
func (*ReadStore) GetNewestGroupMessage ¶ added in v1.3.0
func (s *ReadStore) GetNewestGroupMessage( ctx context.Context, groupIds [][]byte, ) ([]*queries.GetNewestGroupMessageRow, error)
Returns an array of the same length as the input groupIds in the original order. GroupIDs with no messages are represented by a nil value
func (*ReadStore) GetNewestGroupMessageMetadata ¶ added in v1.3.0
func (*ReadStore) QueryCommitLog ¶ added in v1.1.0
func (s *ReadStore) QueryCommitLog( ctx context.Context, req *mlsv1.QueryCommitLogRequest, ) (*mlsv1.QueryCommitLogResponse, error)
func (*ReadStore) QueryGroupMessagesV1 ¶ added in v1.1.0
func (s *ReadStore) QueryGroupMessagesV1( ctx context.Context, req *mlsv1.QueryGroupMessagesRequest, ) (*mlsv1.QueryGroupMessagesResponse, error)
func (*ReadStore) QueryWelcomeMessagesV1 ¶ added in v1.1.0
func (s *ReadStore) QueryWelcomeMessagesV1( ctx context.Context, req *mlsv1.QueryWelcomeMessagesRequest, ) (*mlsv1.QueryWelcomeMessagesResponse, error)
type ReadWriteMlsStore ¶ added in v1.1.0
type ReadWriteMlsStore interface {
ReadMlsStore
CreateOrUpdateInstallation(ctx context.Context, installationId []byte, keyPackage []byte) error
PublishIdentityUpdate(
ctx context.Context,
req *identity.PublishIdentityUpdateRequest,
validationService mlsvalidate.MLSValidationService,
) (*identity.PublishIdentityUpdateResponse, error)
InsertGroupMessage(
ctx context.Context,
groupId []byte,
data []byte,
senderHmac []byte,
shouldPush bool,
isCommit bool,
) (*queries.GroupMessage, error)
InsertWelcomeMessage(
ctx context.Context,
installationId []byte,
data []byte,
hpkePublicKey []byte,
algorithm types.WrapperAlgorithm,
welcomeMetadata []byte,
) (*queries.WelcomeMessage, error)
InsertWelcomePointerMessage(
ctx context.Context,
installationKey []byte,
welcomePointerData []byte,
hpkePublicKey []byte,
wrapperAlgorithm types.WrapperAlgorithm,
) (*queries.WelcomeMessage, error)
InsertCommitLog(
ctx context.Context,
groupId []byte,
serialized_entry []byte,
serialized_signature []byte,
) (queries.CommitLogV2, error)
}
The ReadWriteMlsStore is a superset of the ReadMlsStore interface and is used for all writes. It must be connected to the primary DB, and will use the primary for both reads and writes.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
func (*Store) CreateOrUpdateInstallation ¶
func (s *Store) CreateOrUpdateInstallation( ctx context.Context, installationId []byte, keyPackage []byte, ) error
Creates the installation and last resort key package
func (*Store) FetchKeyPackages ¶
func (*Store) GetInboxIds ¶
func (s *Store) GetInboxIds( ctx context.Context, req *identity.GetInboxIdsRequest, ) (*identity.GetInboxIdsResponse, error)
func (*Store) GetInboxLogs ¶
func (s *Store) GetInboxLogs( ctx context.Context, batched_req *identity.GetIdentityUpdatesRequest, ) (*identity.GetIdentityUpdatesResponse, error)
func (*Store) GetNewestGroupMessage ¶ added in v1.3.0
func (*Store) GetNewestGroupMessageMetadata ¶ added in v1.3.0
func (*Store) InsertCommitLog ¶ added in v1.0.1
func (*Store) InsertGroupMessage ¶
func (*Store) InsertWelcomeMessage ¶
func (*Store) InsertWelcomePointerMessage ¶ added in v1.3.0
func (*Store) PublishIdentityUpdate ¶
func (s *Store) PublishIdentityUpdate( ctx context.Context, req *identity.PublishIdentityUpdateRequest, validationService mlsvalidate.MLSValidationService, ) (*identity.PublishIdentityUpdateResponse, error)
func (*Store) QueryCommitLog ¶ added in v1.0.1
func (s *Store) QueryCommitLog( ctx context.Context, query *mlsv1.QueryCommitLogRequest, ) (*mlsv1.QueryCommitLogResponse, error)
func (*Store) QueryGroupMessagesV1 ¶
func (s *Store) QueryGroupMessagesV1( ctx context.Context, req *mlsv1.QueryGroupMessagesRequest, ) (*mlsv1.QueryGroupMessagesResponse, error)
func (*Store) QueryWelcomeMessagesV1 ¶
func (s *Store) QueryWelcomeMessagesV1( ctx context.Context, req *mlsv1.QueryWelcomeMessagesRequest, ) (*mlsv1.QueryWelcomeMessagesResponse, error)
type StoreOptions ¶
type StoreOptions struct {
DbConnectionString string `long:"db-connection-string" description:"Connection string for MLS DB"`
DbReaderConnectionString string `` /* 141-byte string literal not displayed */
ReadTimeout time.Duration `` /* 144-byte string literal not displayed */
WriteTimeout time.Duration `` /* 144-byte string literal not displayed */
MaxOpenConns int `` /* 143-byte string literal not displayed */
}