store

package
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChainStateDao

type ChainStateDao struct {
	bun.BaseModel `bun:"table:chain_state"`
	ChainID       string    `bun:",pk,type:varchar(100)"`
	LastBlock     uint64    `bun:",notnull"`
	LastBlockHash string    `bun:",notnull,type:varchar(255)"` // stores the string offset
	UpdatedAt     time.Time `bun:",notnull,default:current_timestamp"`
}

ChainStateDao maps to the 'chain_state' table.

type InstrumentedStore

type InstrumentedStore struct {
	// contains filtered or unexported fields
}

InstrumentedStore wraps a PGStore and records Prometheus metrics for every operation.

func NewInstrumentedStore

func NewInstrumentedStore(inner *PGStore, metrics *StoreMetrics) *InstrumentedStore

NewInstrumentedStore returns a metrics-instrumented wrapper around the given PGStore.

func (*InstrumentedStore) CreateTransfer

func (s *InstrumentedStore) CreateTransfer(ctx context.Context, t *relayer.Transfer) (bool, error)

func (*InstrumentedStore) GetChainState

func (s *InstrumentedStore) GetChainState(ctx context.Context, chainID string) (*relayer.ChainState, error)

func (*InstrumentedStore) GetPendingTransfers

func (s *InstrumentedStore) GetPendingTransfers(ctx context.Context, direction relayer.TransferDirection) ([]*relayer.Transfer, error)

func (*InstrumentedStore) GetTransfer

func (s *InstrumentedStore) GetTransfer(ctx context.Context, id string) (*relayer.Transfer, error)

func (*InstrumentedStore) IncrementRetryCount

func (s *InstrumentedStore) IncrementRetryCount(ctx context.Context, id string) error

func (*InstrumentedStore) ListTransfers

func (s *InstrumentedStore) ListTransfers(ctx context.Context, limit int) ([]*relayer.Transfer, error)

func (*InstrumentedStore) SetChainState

func (s *InstrumentedStore) SetChainState(ctx context.Context, chainID string, blockNumber uint64, offset string) error

func (*InstrumentedStore) UpdateTransferStatus

func (s *InstrumentedStore) UpdateTransferStatus(
	ctx context.Context,
	id string,
	status relayer.TransferStatus,
	destTxHash *string,
	errMsg *string,
) error

type PGStore

type PGStore struct {
	// contains filtered or unexported fields
}

PGStore is a PostgreSQL-backed store for the relayer, backed by Bun ORM.

func NewStore

func NewStore(db *bun.DB) *PGStore

NewStore creates a new Bun-backed relayer store.

func (*PGStore) CreateTransfer

func (s *PGStore) CreateTransfer(ctx context.Context, t *relayer.Transfer) (bool, error)

CreateTransfer inserts a new transfer record. Returns true if newly inserted, false if it already existed (ON CONFLICT DO NOTHING).

func (*PGStore) GetChainState

func (s *PGStore) GetChainState(ctx context.Context, chainID string) (*relayer.ChainState, error)

GetChainState retrieves the last processed offset for a chain. Returns nil, nil when not found.

func (*PGStore) GetPendingTransfers

func (s *PGStore) GetPendingTransfers(
	ctx context.Context,
	direction relayer.TransferDirection,
) ([]*relayer.Transfer, error)

GetPendingTransfers returns all pending transfers for a given direction.

func (*PGStore) GetTransfer

func (s *PGStore) GetTransfer(ctx context.Context, id string) (*relayer.Transfer, error)

GetTransfer retrieves a transfer by ID. Returns nil, nil when not found.

func (*PGStore) IncrementRetryCount

func (s *PGStore) IncrementRetryCount(ctx context.Context, id string) error

IncrementRetryCount atomically increments the retry count for a transfer.

func (*PGStore) ListTransfers

func (s *PGStore) ListTransfers(ctx context.Context, limit int) ([]*relayer.Transfer, error)

ListTransfers returns the most recently created transfers up to limit.

func (*PGStore) SetChainState

func (s *PGStore) SetChainState(ctx context.Context, chainID string, blockNumber uint64, offset string) error

SetChainState upserts the last processed offset for a chain.

func (*PGStore) UpdateTransferStatus

func (s *PGStore) UpdateTransferStatus(
	ctx context.Context,
	id string,
	status relayer.TransferStatus,
	destTxHash *string,
	errMsg *string,
) error

UpdateTransferStatus updates the status, optional destination tx hash, and optional error message.

type StoreMetrics

type StoreMetrics struct {
	// QueryDuration tracks database query latency by operation.
	QueryDuration *prometheus.HistogramVec

	// Errors counts database errors by operation.
	Errors *prometheus.CounterVec
}

StoreMetrics holds Prometheus collectors for the relayer database layer.

func NewNopStoreMetrics

func NewNopStoreMetrics() *StoreMetrics

NewNopStoreMetrics returns a StoreMetrics instance backed by a throwaway registry.

func NewStoreMetrics

func NewStoreMetrics(reg sharedmetrics.NamespacedRegisterer) *StoreMetrics

NewStoreMetrics registers relayer store metrics against the given registerer.

func (*StoreMetrics) IncErrors

func (m *StoreMetrics) IncErrors(op StoreOperation)

IncErrors increments the error counter for the given operation.

func (*StoreMetrics) ObserveQueryDuration

func (m *StoreMetrics) ObserveQueryDuration(op StoreOperation) prometheus.Observer

ObserveQueryDuration returns the observer for the given operation's query duration.

type StoreOperation

type StoreOperation string

StoreOperation identifies a database operation for metrics labeling.

const (
	OpCreateTransfer       StoreOperation = "create_transfer"
	OpGetTransfer          StoreOperation = "get_transfer"
	OpUpdateTransferStatus StoreOperation = "update_transfer_status"
	OpIncrementRetryCount  StoreOperation = "increment_retry_count"
	OpGetChainState        StoreOperation = "get_chain_state"
	OpSetChainState        StoreOperation = "set_chain_state"
	OpGetPendingTransfers  StoreOperation = "get_pending_transfers"
	OpListTransfers        StoreOperation = "list_transfers"
)

type TransferDao

type TransferDao struct {
	bun.BaseModel     `bun:"table:transfers"`
	ID                string     `bun:",pk,type:varchar(255)"`
	Direction         string     `bun:",notnull,type:varchar(50)"`
	Status            string     `bun:",notnull,type:varchar(50)"`
	SourceChain       string     `bun:",notnull,type:varchar(100)"`
	DestinationChain  string     `bun:",notnull,type:varchar(100)"`
	SourceTxHash      string     `bun:",notnull,type:varchar(255)"`
	DestinationTxHash *string    `bun:",type:varchar(255)"`
	TokenAddress      string     `bun:",notnull,type:varchar(255)"`
	Amount            string     `bun:",notnull,type:varchar(255)"`
	Sender            string     `bun:",notnull,type:varchar(255)"`
	Recipient         string     `bun:",notnull,type:varchar(255)"`
	Nonce             int64      `bun:",notnull"`
	SourceBlockNumber uint64     `bun:",notnull"`
	RetryCount        int        `bun:",notnull,default:0"`
	CreatedAt         time.Time  `bun:",notnull,default:current_timestamp"`
	UpdatedAt         time.Time  `bun:",notnull,default:current_timestamp"`
	CompletedAt       *time.Time `bun:"completed_at"`
	ErrorMessage      *string    `bun:",type:text"`
}

TransferDao maps to the 'transfers' table.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL