migrator

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2025 License: MIT Imports: 28 Imported by: 0

Documentation

Overview

Package migrator implements a service that migrates data from a source database to a destination database.

The data to migrate (source) is expected to be of the types defined in xmtp/proto MLS V1. https://github.com/xmtp/proto/blob/main/proto/mls/api/v1/mls.proto

Upon migration, the data is transformed and written into the xmtpd database, in the originator_envelopes table. - The OriginatorEnvelope will have a hardcoded originator ID, based on the type of data. See types.go. - Original sequence IDs are preserved. - Expiry (retentionDays) are set based on the type of data. - Congestion fee and base fee are calculated and set, based on retentionDays. - Payer and originator envelopes are signed with the payer and node signing keys, respectively.

Index

Constants

View Source
const (
	GroupMessageOriginatorID   uint32 = 10
	WelcomeMessageOriginatorID uint32 = 11
	InboxLogOriginatorID       uint32 = 12
	KeyPackagesOriginatorID    uint32 = 13
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DBMigratorConfig

type DBMigratorConfig struct {
	// contains filtered or unexported fields
}

type DBMigratorOption

type DBMigratorOption func(*DBMigratorConfig)

func WithContext

func WithContext(ctx context.Context) DBMigratorOption

func WithContractsOptions added in v0.5.1

func WithContractsOptions(contracts *config.ContractsOptions) DBMigratorOption

func WithDestinationDB

func WithDestinationDB(db *sql.DB) DBMigratorOption

func WithLogger

func WithLogger(logger *zap.Logger) DBMigratorOption

func WithMigratorConfig

func WithMigratorConfig(options *config.MigrationServerOptions) DBMigratorOption

type DBReader

type DBReader[T ISourceRecord] struct {
	// contains filtered or unexported fields
}

DBReader provides a generic implementation for fetching records from database tables.

func NewDBReader

func NewDBReader[T ISourceRecord](
	db *sql.DB,
	query string,
	factory func() T,
) *DBReader[T]

NewDBReader creates a new reader for the specified table and type.

func (*DBReader[T]) Fetch

func (r *DBReader[T]) Fetch(
	ctx context.Context,
	lastID int64,
	limit int32,
) ([]ISourceRecord, error)

Fetch rows from the database and return a slice of records.

type GroupMessage

type GroupMessage struct {
	ID              int64        `db:"id"`
	CreatedAt       time.Time    `db:"created_at"`
	GroupID         []byte       `db:"group_id"`
	Data            []byte       `db:"data"`
	GroupIDDataHash []byte       `db:"group_id_data_hash"`
	IsCommit        sql.NullBool `db:"is_commit"`
	SenderHmac      []byte       `db:"sender_hmac"`
	ShouldPush      sql.NullBool `db:"should_push"`
}

GroupMessage represents the group_messages table from the source database. Order by ID ASC.

func (GroupMessage) GetID

func (g GroupMessage) GetID() int64

func (*GroupMessage) Scan

func (g *GroupMessage) Scan(rows *sql.Rows) error

func (GroupMessage) TableName

func (g GroupMessage) TableName() string

type GroupMessageReader

type GroupMessageReader struct {
	*DBReader[*GroupMessage]
}

func NewGroupMessageReader

func NewGroupMessageReader(db *sql.DB) *GroupMessageReader

type IDataTransformer

type IDataTransformer interface {
	Transform(record ISourceRecord) (*envelopes.OriginatorEnvelope, error)
}

IDataTransformer defines the interface for transforming external data to xmtpd OriginatorEnvelope format.

type IDestinationWriter

type IDestinationWriter interface {
	Write(ctx context.Context, env *envelopes.OriginatorEnvelope) error
}

type ISourceReader

type ISourceReader interface {
	Fetch(ctx context.Context, lastID int64, limit int32) ([]ISourceRecord, error)
}

ISourceReader defines the interface for reading records from the source database.

type ISourceRecord

type ISourceRecord interface {
	GetID() int64
	TableName() string
	Scan(rows *sql.Rows) error
}

ISourceRecord defines a record from the source database, that can scanned and ordered by some ID.

type InboxLog

type InboxLog struct {
	SequenceID          int64  `db:"sequence_id"`
	InboxID             []byte `db:"inbox_id"`
	ServerTimestampNs   int64  `db:"server_timestamp_ns"`
	IdentityUpdateProto []byte `db:"identity_update_proto"`
}

InboxLog represents the inbox_log table from the source database. Order by SequenceID ASC.

func (InboxLog) GetID

func (i InboxLog) GetID() int64

func (*InboxLog) Scan

func (i *InboxLog) Scan(rows *sql.Rows) error

func (InboxLog) TableName

func (i InboxLog) TableName() string

type InboxLogReader

type InboxLogReader struct {
	*DBReader[*InboxLog]
}

func NewInboxLogReader

func NewInboxLogReader(db *sql.DB) *InboxLogReader

type KeyPackage

type KeyPackage struct {
	SequenceID     int64  `db:"sequence_id"`
	InstallationID []byte `db:"installation_id"`
	KeyPackage     []byte `db:"key_package"`
}

KeyPackage represents the key_packages table from the source database. Order by CreatedAt ASC.

func (KeyPackage) GetID

func (i KeyPackage) GetID() int64

func (*KeyPackage) Scan

func (i *KeyPackage) Scan(rows *sql.Rows) error

func (KeyPackage) TableName

func (i KeyPackage) TableName() string

type KeyPackageReader

type KeyPackageReader struct {
	*DBReader[*KeyPackage]
}

func NewKeyPackageReader

func NewKeyPackageReader(db *sql.DB) *KeyPackageReader

type Migrator

type Migrator struct {
	// contains filtered or unexported fields
}

func NewMigrationService

func NewMigrationService(opts ...DBMigratorOption) (*Migrator, error)

func (*Migrator) Start

func (m *Migrator) Start() error

func (*Migrator) Stop

func (m *Migrator) Stop() error

type Transformer

type Transformer struct {
	// contains filtered or unexported fields
}

func NewTransformer

func NewTransformer(
	payerPrivateKey *ecdsa.PrivateKey,
	nodeSigningKey *ecdsa.PrivateKey,
) *Transformer

func (*Transformer) Transform

func (t *Transformer) Transform(record ISourceRecord) (*envelopes.OriginatorEnvelope, error)

func (*Transformer) TransformGroupMessage

func (t *Transformer) TransformGroupMessage(
	groupMessage *GroupMessage,
) (*envelopes.OriginatorEnvelope, error)

TransformGroupMessage converts GroupMessage to appropriate XMTPD envelope format.

func (*Transformer) TransformInboxLog

func (t *Transformer) TransformInboxLog(
	inboxLog *InboxLog,
) (*envelopes.OriginatorEnvelope, error)

TransformInboxLog converts InboxLog to appropriate XMTPD IdentityUpdate envelope format.

func (*Transformer) TransformKeyPackage

func (t *Transformer) TransformKeyPackage(
	keyPackage *KeyPackage,
) (*envelopes.OriginatorEnvelope, error)

TransformKeyPackage converts a V3 KeyPackage to appropriate XMTPD KeyPackage envelope format.

func (*Transformer) TransformWelcomeMessage

func (t *Transformer) TransformWelcomeMessage(
	welcomeMessage *WelcomeMessage,
) (*envelopes.OriginatorEnvelope, error)

TransformWelcomeMessage converts WelcomeMessage to appropriate XMTPD envelope format.

type WelcomeMessage

type WelcomeMessage struct {
	ID                      int64     `db:"id"`
	CreatedAt               time.Time `db:"created_at"`
	InstallationKey         []byte    `db:"installation_key"`
	Data                    []byte    `db:"data"`
	HpkePublicKey           []byte    `db:"hpke_public_key"`
	InstallationKeyDataHash []byte    `db:"installation_key_data_hash"`
	WrapperAlgorithm        int16     `db:"wrapper_algorithm"`
	WelcomeMetadata         []byte    `db:"welcome_metadata"`
}

WelcomeMessage represents the welcome_messages table from the source database. Order by ID ASC.

func (WelcomeMessage) GetID

func (w WelcomeMessage) GetID() int64

func (*WelcomeMessage) Scan

func (w *WelcomeMessage) Scan(rows *sql.Rows) error

func (WelcomeMessage) TableName

func (w WelcomeMessage) TableName() string

type WelcomeMessageReader

type WelcomeMessageReader struct {
	*DBReader[*WelcomeMessage]
}

func NewWelcomeMessageReader

func NewWelcomeMessageReader(db *sql.DB) *WelcomeMessageReader

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL