Documentation
¶
Overview ¶
Package migrator implements a service that migrates data from a source database to a destination database.
The data to migrate (source) is expected to be of the types defined in xmtp/proto MLS V1. https://github.com/xmtp/proto/blob/main/proto/mls/api/v1/mls.proto
Upon migration, the data is transformed and written into the xmtpd database, in the originator_envelopes table. - The OriginatorEnvelope will have a hardcoded originator ID, based on the type of data. See types.go. - Original sequence IDs are preserved. - Expiry (retentionDays) are set based on the type of data. - Congestion fee and base fee are calculated and set, based on retentionDays. - Payer and originator envelopes are signed with the payer and node signing keys, respectively.
Index ¶
- Constants
- Variables
- type BroadcasterBatch
- func (c *BroadcasterBatch) Add(identifier []byte, payload []byte, sequenceID uint64)
- func (c *BroadcasterBatch) All() iter.Seq[BroadcasterMessage]
- func (c *BroadcasterBatch) LastSequenceID() uint64
- func (c *BroadcasterBatch) Len() int
- func (c *BroadcasterBatch) Reset()
- func (c *BroadcasterBatch) Size() int64
- type BroadcasterMessage
- type CommitMessage
- type CommitMessageReader
- type DBMigratorConfig
- type DBMigratorOption
- func WithContext(ctx context.Context) DBMigratorOption
- func WithContractsOptions(contracts *config.ContractsOptions) DBMigratorOption
- func WithDestinationDB(db *db.Handler) DBMigratorOption
- func WithFeeCalculator(calc fees.IFeeCalculator) DBMigratorOption
- func WithLogger(logger *zap.Logger) DBMigratorOption
- func WithMigratorConfig(options *config.MigrationServerOptions) DBMigratorOption
- type DBReader
- type FailureReason
- type GroupMessage
- type GroupMessageReader
- type IDataTransformer
- type IDestinationWriter
- type ISourceReader
- type ISourceRecord
- type InboxLog
- type InboxLogReader
- type KeyPackage
- type KeyPackageReader
- type Migrator
- type Transformer
- func (t *Transformer) Transform(record ISourceRecord) (*envelopes.OriginatorEnvelope, error)
- func (t *Transformer) TransformCommitMessage(commitMessage *CommitMessage) (*envelopes.OriginatorEnvelope, error)
- func (t *Transformer) TransformGroupMessage(groupMessage *GroupMessage) (*envelopes.OriginatorEnvelope, error)
- func (t *Transformer) TransformInboxLog(inboxLog *InboxLog) (*envelopes.OriginatorEnvelope, error)
- func (t *Transformer) TransformKeyPackage(keyPackage *KeyPackage) (*envelopes.OriginatorEnvelope, error)
- func (t *Transformer) TransformWelcomeMessage(welcomeMessage *WelcomeMessage) (*envelopes.OriginatorEnvelope, error)
- type WelcomeMessage
- type WelcomeMessageReader
- type Worker
- func (w *Worker) StartBlockchainWriterBatch(ctx context.Context) error
- func (w *Worker) StartDatabaseWriter(ctx context.Context) error
- func (w *Worker) StartReader(ctx context.Context, reader ISourceReader) error
- func (w *Worker) StartTransformer(ctx context.Context, transformer IDataTransformer) error
Constants ¶
Variables ¶
var ( ErrDeadLetterBox = errors.New("skipped and added to dead letter box") ErrMigrationProgressUpdateFailed = errors.New("migration progress update failed") )
Functions ¶
This section is empty.
Types ¶
type BroadcasterBatch ¶ added in v1.1.0
type BroadcasterBatch struct {
// contains filtered or unexported fields
}
BroadcasterBatch defines a batch of messages to be published to the blockchain.
func (*BroadcasterBatch) Add ¶ added in v1.1.0
func (c *BroadcasterBatch) Add(identifier []byte, payload []byte, sequenceID uint64)
func (*BroadcasterBatch) All ¶ added in v1.1.0
func (c *BroadcasterBatch) All() iter.Seq[BroadcasterMessage]
All returns an iterator over all items in the batch.
func (*BroadcasterBatch) LastSequenceID ¶ added in v1.1.0
func (c *BroadcasterBatch) LastSequenceID() uint64
func (*BroadcasterBatch) Len ¶ added in v1.1.0
func (c *BroadcasterBatch) Len() int
func (*BroadcasterBatch) Reset ¶ added in v1.1.0
func (c *BroadcasterBatch) Reset()
func (*BroadcasterBatch) Size ¶ added in v1.1.0
func (c *BroadcasterBatch) Size() int64
type BroadcasterMessage ¶ added in v1.1.0
BroadcasterMessage represents a single element of the BroadcasterBatch.
type CommitMessage ¶ added in v1.1.0
type CommitMessage struct {
GroupMessage
}
CommitMessage represents the group_messages (with is_commit = true) from the source database. Order by ID ASC.
func (CommitMessage) GetID ¶ added in v1.1.0
func (c CommitMessage) GetID() int64
func (CommitMessage) TableName ¶ added in v1.1.0
func (c CommitMessage) TableName() string
type CommitMessageReader ¶ added in v1.1.0
type CommitMessageReader struct {
*DBReader[*CommitMessage]
}
func NewCommitMessageReader ¶ added in v1.1.0
func NewCommitMessageReader(db *sql.DB, startDate int64) *CommitMessageReader
type DBMigratorConfig ¶
type DBMigratorConfig struct {
// contains filtered or unexported fields
}
type DBMigratorOption ¶
type DBMigratorOption func(*DBMigratorConfig)
func WithContext ¶
func WithContext(ctx context.Context) DBMigratorOption
func WithContractsOptions ¶ added in v0.5.1
func WithContractsOptions(contracts *config.ContractsOptions) DBMigratorOption
func WithDestinationDB ¶
func WithDestinationDB(db *db.Handler) DBMigratorOption
func WithFeeCalculator ¶ added in v1.1.0
func WithFeeCalculator(calc fees.IFeeCalculator) DBMigratorOption
func WithLogger ¶
func WithLogger(logger *zap.Logger) DBMigratorOption
func WithMigratorConfig ¶
func WithMigratorConfig(options *config.MigrationServerOptions) DBMigratorOption
type DBReader ¶
type DBReader[T ISourceRecord] struct { // contains filtered or unexported fields }
DBReader provides a generic implementation for fetching records from database tables.
func NewDBReader ¶
func NewDBReader[T ISourceRecord]( db *sql.DB, query string, queryHeight string, factory func() T, startDate int64, ) *DBReader[T]
NewDBReader creates a new reader for the specified table and type.
type FailureReason ¶ added in v1.1.0
type FailureReason string
FailureReason defines the reason for inserting a record into the dead letter box.
const ( FailureTransformerError FailureReason = "transformer error" FailureOversizedChainMessage FailureReason = "oversized chain message" FailureBlockchainUndetermined FailureReason = "blockchain undetermined error" )
func (FailureReason) ShouldRetry ¶ added in v1.1.0
func (f FailureReason) ShouldRetry() bool
func (FailureReason) String ¶ added in v1.1.0
func (f FailureReason) String() string
type GroupMessage ¶
type GroupMessage struct {
ID int64 `db:"id"`
CreatedAt time.Time `db:"created_at"`
GroupID []byte `db:"group_id"`
Data []byte `db:"data"`
GroupIDDataHash []byte `db:"group_id_data_hash"`
IsCommit sql.NullBool `db:"is_commit"`
SenderHmac []byte `db:"sender_hmac"`
ShouldPush sql.NullBool `db:"should_push"`
}
GroupMessage represents the group_messages (with is_commit = false) from the source database. Order by ID ASC.
func (GroupMessage) GetID ¶
func (g GroupMessage) GetID() int64
func (GroupMessage) TableName ¶
func (g GroupMessage) TableName() string
type GroupMessageReader ¶
type GroupMessageReader struct {
*DBReader[*GroupMessage]
}
func NewGroupMessageReader ¶
func NewGroupMessageReader(db *sql.DB, startDate int64) *GroupMessageReader
type IDataTransformer ¶
type IDataTransformer interface {
Transform(record ISourceRecord) (*envelopes.OriginatorEnvelope, error)
}
IDataTransformer defines the interface for transforming external data to xmtpd OriginatorEnvelope format.
type IDestinationWriter ¶
type IDestinationWriter interface {
Write(ctx context.Context, env *envelopes.OriginatorEnvelope) error
}
type ISourceReader ¶
type ISourceReader interface {
Fetch(ctx context.Context, lastID int64, limit int32) ([]ISourceRecord, error)
}
ISourceReader defines the interface for reading records from the source database.
type ISourceRecord ¶
ISourceRecord defines a record from the source database, that can scanned and ordered by some ID.
type InboxLog ¶
type InboxLog struct {
SequenceID int64 `db:"sequence_id"`
InboxID []byte `db:"inbox_id"`
ServerTimestampNs int64 `db:"server_timestamp_ns"`
IdentityUpdateProto []byte `db:"identity_update_proto"`
}
InboxLog represents the inbox_log table from the source database. Order by SequenceID ASC.
type InboxLogReader ¶
func NewInboxLogReader ¶
func NewInboxLogReader(db *sql.DB, startDate int64) *InboxLogReader
type KeyPackage ¶
type KeyPackage struct {
SequenceID int64 `db:"sequence_id"`
InstallationID []byte `db:"installation_id"`
KeyPackage []byte `db:"key_package"`
}
KeyPackage represents the key_packages table from the source database. Order by CreatedAt ASC.
func (KeyPackage) GetID ¶
func (i KeyPackage) GetID() int64
func (KeyPackage) TableName ¶
func (i KeyPackage) TableName() string
type KeyPackageReader ¶
type KeyPackageReader struct {
*DBReader[*KeyPackage]
}
func NewKeyPackageReader ¶
func NewKeyPackageReader(db *sql.DB) *KeyPackageReader
type Migrator ¶
type Migrator struct {
// contains filtered or unexported fields
}
func NewMigrationService ¶
func NewMigrationService(opts ...DBMigratorOption) (*Migrator, error)
type Transformer ¶
type Transformer struct {
// contains filtered or unexported fields
}
func NewTransformer ¶
func NewTransformer( feeCalculator fees.IFeeCalculator, payerPrivateKey *ecdsa.PrivateKey, nodeSigningKey *ecdsa.PrivateKey, ) *Transformer
func (*Transformer) Transform ¶
func (t *Transformer) Transform(record ISourceRecord) (*envelopes.OriginatorEnvelope, error)
func (*Transformer) TransformCommitMessage ¶ added in v1.1.0
func (t *Transformer) TransformCommitMessage( commitMessage *CommitMessage, ) (*envelopes.OriginatorEnvelope, error)
TransformCommitMessage converts CommitMessage to appropriate XMTPD envelope format.
func (*Transformer) TransformGroupMessage ¶
func (t *Transformer) TransformGroupMessage( groupMessage *GroupMessage, ) (*envelopes.OriginatorEnvelope, error)
TransformGroupMessage converts GroupMessage to appropriate XMTPD envelope format.
func (*Transformer) TransformInboxLog ¶
func (t *Transformer) TransformInboxLog( inboxLog *InboxLog, ) (*envelopes.OriginatorEnvelope, error)
TransformInboxLog converts InboxLog to appropriate XMTPD IdentityUpdate envelope format.
func (*Transformer) TransformKeyPackage ¶
func (t *Transformer) TransformKeyPackage( keyPackage *KeyPackage, ) (*envelopes.OriginatorEnvelope, error)
TransformKeyPackage converts a V3 KeyPackage to appropriate XMTPD KeyPackage envelope format.
func (*Transformer) TransformWelcomeMessage ¶
func (t *Transformer) TransformWelcomeMessage( welcomeMessage *WelcomeMessage, ) (*envelopes.OriginatorEnvelope, error)
TransformWelcomeMessage converts WelcomeMessage to appropriate XMTPD envelope format.
type WelcomeMessage ¶
type WelcomeMessage struct {
ID int64 `db:"id"`
CreatedAt time.Time `db:"created_at"`
InstallationKey []byte `db:"installation_key"`
Data []byte `db:"data"`
HpkePublicKey []byte `db:"hpke_public_key"`
InstallationKeyDataHash []byte `db:"installation_key_data_hash"`
WrapperAlgorithm int16 `db:"wrapper_algorithm"`
WelcomeMetadata []byte `db:"welcome_metadata"`
}
WelcomeMessage represents the welcome_messages table from the source database. Order by ID ASC.
func (WelcomeMessage) GetID ¶
func (w WelcomeMessage) GetID() int64
func (WelcomeMessage) TableName ¶
func (w WelcomeMessage) TableName() string
type WelcomeMessageReader ¶
type WelcomeMessageReader struct {
*DBReader[*WelcomeMessage]
}
func NewWelcomeMessageReader ¶
func NewWelcomeMessageReader(db *sql.DB, startDate int64) *WelcomeMessageReader
type Worker ¶ added in v1.1.0
type Worker struct {
// contains filtered or unexported fields
}
func (*Worker) StartBlockchainWriterBatch ¶ added in v1.1.0
func (*Worker) StartDatabaseWriter ¶ added in v1.1.0
func (*Worker) StartReader ¶ added in v1.1.0
func (w *Worker) StartReader(ctx context.Context, reader ISourceReader) error
func (*Worker) StartTransformer ¶ added in v1.1.0
func (w *Worker) StartTransformer(ctx context.Context, transformer IDataTransformer) error