Documentation
¶
Index ¶
- func IsAlreadyExistsError(err error) bool
- func RunInTx(ctx context.Context, db *sql.DB, opts *sql.TxOptions, ...) error
- type AlreadyExistsError
- type BackfillGroupMessage
- type Backfiller
- type IdentityUpdate
- type IdentityUpdateKind
- type IdentityUpdateList
- type IsCommitBackfiller
- type ReadMlsStore
- type ReadStore
- func (s *ReadStore) FetchKeyPackages(ctx context.Context, installationIds [][]byte) ([]queries.FetchKeyPackagesRow, error)
- func (s *ReadStore) GetInboxIds(ctx context.Context, req *identity.GetInboxIdsRequest) (*identity.GetInboxIdsResponse, error)
- func (s *ReadStore) GetInboxLogs(ctx context.Context, batched_req *identity.GetIdentityUpdatesRequest) (*identity.GetIdentityUpdatesResponse, error)
- func (s *ReadStore) Queries() *queries.Queries
- func (s *ReadStore) QueryCommitLog(ctx context.Context, req *mlsv1.QueryCommitLogRequest) (*mlsv1.QueryCommitLogResponse, error)
- func (s *ReadStore) QueryGroupMessagesV1(ctx context.Context, req *mlsv1.QueryGroupMessagesRequest) (*mlsv1.QueryGroupMessagesResponse, error)
- func (s *ReadStore) QueryWelcomeMessagesV1(ctx context.Context, req *mlsv1.QueryWelcomeMessagesRequest) (*mlsv1.QueryWelcomeMessagesResponse, error)
- type ReadWriteMlsStore
- type Store
- func (s *Store) CreateOrUpdateInstallation(ctx context.Context, installationId []byte, keyPackage []byte) error
- func (s *Store) FetchKeyPackages(ctx context.Context, installationIds [][]byte) ([]queries.FetchKeyPackagesRow, error)
- func (s *Store) GetInboxIds(ctx context.Context, req *identity.GetInboxIdsRequest) (*identity.GetInboxIdsResponse, error)
- func (s *Store) GetInboxLogs(ctx context.Context, batched_req *identity.GetIdentityUpdatesRequest) (*identity.GetIdentityUpdatesResponse, error)
- func (s *Store) InsertCommitLog(ctx context.Context, groupId []byte, encrypted_entry []byte) (queries.CommitLog, error)
- func (s *Store) InsertGroupMessage(ctx context.Context, groupId []byte, data []byte, senderHmac []byte, ...) (*queries.GroupMessage, error)
- func (s *Store) InsertWelcomeMessage(ctx context.Context, installationId []byte, data []byte, hpkePublicKey []byte, ...) (*queries.WelcomeMessage, error)
- func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest, ...) (*identity.PublishIdentityUpdateResponse, error)
- func (s *Store) Queries() *queries.Queries
- func (s *Store) QueryCommitLog(ctx context.Context, query *mlsv1.QueryCommitLogRequest) (*mlsv1.QueryCommitLogResponse, error)
- func (s *Store) QueryGroupMessagesV1(ctx context.Context, req *mlsv1.QueryGroupMessagesRequest) (*mlsv1.QueryGroupMessagesResponse, error)
- func (s *Store) QueryWelcomeMessagesV1(ctx context.Context, req *mlsv1.QueryWelcomeMessagesRequest) (*mlsv1.QueryWelcomeMessagesResponse, error)
- func (s *Store) RunInRepeatableReadTx(ctx context.Context, numRetries int, ...) error
- type StoreOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsAlreadyExistsError ¶
Types ¶
type AlreadyExistsError ¶
type AlreadyExistsError struct {
Err error
}
func NewAlreadyExistsError ¶
func NewAlreadyExistsError(err error) *AlreadyExistsError
func (*AlreadyExistsError) Error ¶
func (e *AlreadyExistsError) Error() string
type BackfillGroupMessage ¶ added in v1.1.0
BackfillGroupMessage represents a single row from the group_message table that requires backfilling of its is_commit field. The Data field is used for validation, and IsCommit is the inferred result populated by the classification step.
type Backfiller ¶ added in v1.1.0
type Backfiller interface {
Run()
Close()
}
Backfiller defines the interface for a background backfill process. It can be started with Run() and gracefully stopped with Close().
type IdentityUpdate ¶
type IdentityUpdate struct {
Kind IdentityUpdateKind
InstallationKey []byte
CredentialIdentity []byte
TimestampNs uint64
}
type IdentityUpdateKind ¶
type IdentityUpdateKind int
const ( Create IdentityUpdateKind = iota Revoke )
type IdentityUpdateList ¶
type IdentityUpdateList []IdentityUpdate
Add the required methods to make a valid sort.Sort interface
func (IdentityUpdateList) Len ¶
func (a IdentityUpdateList) Len() int
func (IdentityUpdateList) Less ¶
func (a IdentityUpdateList) Less(i, j int) bool
func (IdentityUpdateList) Swap ¶
func (a IdentityUpdateList) Swap(i, j int)
type IsCommitBackfiller ¶ added in v1.1.0
type IsCommitBackfiller struct {
DB *sql.DB
WG sync.WaitGroup
// contains filtered or unexported fields
}
IsCommitBackfiller is responsible for populating the `is_commit` column in the `group_message` table. This field is inferred from message data using a validation service.
In high-availability (HA) deployments where multiple instances of this service may be running concurrently, the backfiller is designed to be safe and non-conflicting. It relies on transactional locking and SKIP LOCKED semantics (or a manual locking scheme) to ensure that no two workers process the same rows. This avoids duplicate work and race conditions without requiring external coordination.
The backfill process runs in a loop and periodically:
- Starts a transaction
- Selects up to N unprocessed group messages (e.g., where is_commit IS NULL)
- Uses the MLSValidationService to classify each message
- Updates the corresponding rows with the is_commit result
If no work is available, it sleeps briefly before retrying.
func NewIsCommitBackfiller ¶ added in v1.1.0
func NewIsCommitBackfiller(ctx context.Context, db *sql.DB, log *zap.Logger, validationService mlsvalidate.MLSValidationService, ) *IsCommitBackfiller
func (*IsCommitBackfiller) Close ¶ added in v1.1.0
func (b *IsCommitBackfiller) Close()
func (*IsCommitBackfiller) Run ¶ added in v1.1.0
func (b *IsCommitBackfiller) Run()
Run orchestrates the pipeline
type ReadMlsStore ¶ added in v1.1.0
type ReadMlsStore interface {
GetInboxLogs(
ctx context.Context,
req *identity.GetIdentityUpdatesRequest,
) (*identity.GetIdentityUpdatesResponse, error)
GetInboxIds(
ctx context.Context,
req *identity.GetInboxIdsRequest,
) (*identity.GetInboxIdsResponse, error)
FetchKeyPackages(
ctx context.Context,
installationIds [][]byte,
) ([]queries.FetchKeyPackagesRow, error)
QueryGroupMessagesV1(
ctx context.Context,
query *mlsv1.QueryGroupMessagesRequest,
) (*mlsv1.QueryGroupMessagesResponse, error)
QueryWelcomeMessagesV1(
ctx context.Context,
query *mlsv1.QueryWelcomeMessagesRequest,
) (*mlsv1.QueryWelcomeMessagesResponse, error)
QueryCommitLog(
ctx context.Context,
query *mlsv1.QueryCommitLogRequest,
) (*mlsv1.QueryCommitLogResponse, error)
Queries() *queries.Queries
}
The ReadMlsStore interface is used for all queries, and will work whether connected to the primary DB or a read replica
type ReadStore ¶ added in v1.1.0
type ReadStore struct {
// contains filtered or unexported fields
}
func (*ReadStore) FetchKeyPackages ¶ added in v1.1.0
func (*ReadStore) GetInboxIds ¶ added in v1.1.0
func (s *ReadStore) GetInboxIds( ctx context.Context, req *identity.GetInboxIdsRequest, ) (*identity.GetInboxIdsResponse, error)
func (*ReadStore) GetInboxLogs ¶ added in v1.1.0
func (s *ReadStore) GetInboxLogs( ctx context.Context, batched_req *identity.GetIdentityUpdatesRequest, ) (*identity.GetIdentityUpdatesResponse, error)
func (*ReadStore) QueryCommitLog ¶ added in v1.1.0
func (s *ReadStore) QueryCommitLog( ctx context.Context, req *mlsv1.QueryCommitLogRequest, ) (*mlsv1.QueryCommitLogResponse, error)
func (*ReadStore) QueryGroupMessagesV1 ¶ added in v1.1.0
func (s *ReadStore) QueryGroupMessagesV1( ctx context.Context, req *mlsv1.QueryGroupMessagesRequest, ) (*mlsv1.QueryGroupMessagesResponse, error)
func (*ReadStore) QueryWelcomeMessagesV1 ¶ added in v1.1.0
func (s *ReadStore) QueryWelcomeMessagesV1( ctx context.Context, req *mlsv1.QueryWelcomeMessagesRequest, ) (*mlsv1.QueryWelcomeMessagesResponse, error)
type ReadWriteMlsStore ¶ added in v1.1.0
type ReadWriteMlsStore interface {
ReadMlsStore
CreateOrUpdateInstallation(ctx context.Context, installationId []byte, keyPackage []byte) error
PublishIdentityUpdate(
ctx context.Context,
req *identity.PublishIdentityUpdateRequest,
validationService mlsvalidate.MLSValidationService,
) (*identity.PublishIdentityUpdateResponse, error)
InsertGroupMessage(
ctx context.Context,
groupId []byte,
data []byte,
senderHmac []byte,
shouldPush bool,
isCommit bool,
) (*queries.GroupMessage, error)
InsertWelcomeMessage(
ctx context.Context,
installationId []byte,
data []byte,
hpkePublicKey []byte,
algorithm types.WrapperAlgorithm,
welcomeMetadata []byte,
) (*queries.WelcomeMessage, error)
InsertCommitLog(
ctx context.Context,
groupId []byte,
encrypted_entry []byte,
) (queries.CommitLog, error)
}
The ReadWriteMlsStore is a superset of the ReadMlsStore interface and is used for all writes. It must be connected to the primary DB, and will use the primary for both reads and writes.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
func (*Store) CreateOrUpdateInstallation ¶
func (s *Store) CreateOrUpdateInstallation( ctx context.Context, installationId []byte, keyPackage []byte, ) error
Creates the installation and last resort key package
func (*Store) FetchKeyPackages ¶
func (*Store) GetInboxIds ¶
func (s *Store) GetInboxIds( ctx context.Context, req *identity.GetInboxIdsRequest, ) (*identity.GetInboxIdsResponse, error)
func (*Store) GetInboxLogs ¶
func (s *Store) GetInboxLogs( ctx context.Context, batched_req *identity.GetIdentityUpdatesRequest, ) (*identity.GetIdentityUpdatesResponse, error)
func (*Store) InsertCommitLog ¶ added in v1.0.1
func (*Store) InsertGroupMessage ¶
func (*Store) InsertWelcomeMessage ¶
func (*Store) PublishIdentityUpdate ¶
func (s *Store) PublishIdentityUpdate( ctx context.Context, req *identity.PublishIdentityUpdateRequest, validationService mlsvalidate.MLSValidationService, ) (*identity.PublishIdentityUpdateResponse, error)
func (*Store) QueryCommitLog ¶ added in v1.0.1
func (s *Store) QueryCommitLog( ctx context.Context, query *mlsv1.QueryCommitLogRequest, ) (*mlsv1.QueryCommitLogResponse, error)
func (*Store) QueryGroupMessagesV1 ¶
func (s *Store) QueryGroupMessagesV1( ctx context.Context, req *mlsv1.QueryGroupMessagesRequest, ) (*mlsv1.QueryGroupMessagesResponse, error)
func (*Store) QueryWelcomeMessagesV1 ¶
func (s *Store) QueryWelcomeMessagesV1( ctx context.Context, req *mlsv1.QueryWelcomeMessagesRequest, ) (*mlsv1.QueryWelcomeMessagesResponse, error)
type StoreOptions ¶
type StoreOptions struct {
DbConnectionString string `long:"db-connection-string" description:"Connection string for MLS DB"`
DbReaderConnectionString string `` /* 141-byte string literal not displayed */
ReadTimeout time.Duration `` /* 144-byte string literal not displayed */
WriteTimeout time.Duration `` /* 144-byte string literal not displayed */
MaxOpenConns int `` /* 143-byte string literal not displayed */
}