etl

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

README

ETL Package

Standalone, importable Go package for indexing OpenAudio blockchain data into PostgreSQL.

Usage

import (
    "net/http"
    "github.com/OpenAudio/go-openaudio/pkg/etl"
    corev1connect "github.com/OpenAudio/go-openaudio/pkg/api/core/v1/v1connect"
)

// Connect to rpc.audius.co (HTTP/Connect) or grpc.audius.co
client := corev1connect.NewCoreServiceClient(
    http.DefaultClient,
    "https://rpc.audius.co",
)

indexer := etl.New(client, logger)
indexer.SetDBURL("postgres://user:pass@localhost:5432/etl?sslmode=disable")
indexer.Run() // blocks until done or error

Configuration

  • SetDBURL(url) - PostgreSQL connection string
  • SetStartingBlockHeight(n) - Start from block n (default: 1)
  • SetEndingBlockHeight(n) - Stop after block n (0 = run forever)
  • SetRunDownMigrations(true) - Run down migrations before up
  • SetCheckReadiness(true) - Wait for core service ready before starting
  • SetConfig(c) - Enable/disable optional components (MV refresh, pg notify)

Tests

cd pkg/etl && go test ./...
cd pkg/etl && go test ./processors/... -v

Documentation

Index

Constants

View Source
const (
	BlockTopic = "block-subscriber"
	PlayTopic  = "play-subscriber"
)

Variables

View Source
var (
	TxTypePlay                        = processors.TxTypePlay
	TxTypeManageEntity                = processors.TxTypeManageEntity
	TxTypeValidatorRegistration       = processors.TxTypeValidatorRegistration
	TxTypeValidatorDeregistration     = processors.TxTypeValidatorDeregistration
	TxTypeValidatorRegistrationLegacy = processors.TxTypeValidatorRegistrationLegacy
	TxTypeSlaRollup                   = processors.TxTypeSlaRollup
	TxTypeValidatorMisbehaviorDereg   = processors.TxTypeValidatorMisbehaviorDereg
	TxTypeStorageProof                = processors.TxTypeStorageProof
	TxTypeStorageProofVerification    = processors.TxTypeStorageProofVerification
	TxTypeRelease                     = processors.TxTypeRelease
)

Functions

This section is empty.

Types

type BlockPubsub

type BlockPubsub = Pubsub[*db.EtlBlock]

type ChallengeStats

type ChallengeStats struct {
	ChallengesReceived int32
	ChallengesFailed   int32
}

ChallengeStats represents storage proof challenge statistics for a validator

type Config

type Config struct {
	EnableMaterializedViewRefresh bool
	EnablePgNotifyListener        bool
	EnableScheduledReleases       bool

	// DataTypes controls which entity types the entity manager will index.
	// If nil (default), all entity types are enabled.
	// If non-nil (even if empty), only listed types are enabled.
	// Populated from OPENAUDIO_ETL_ENTITY_MANAGER_DATA_TYPES env var (comma-separated).
	DataTypes *[]string
}

Config holds optional ETL component flags. All are enabled by default for full indexing behavior.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns config with all optional components enabled.

func (*Config) DisableMaterializedViewRefresh

func (c *Config) DisableMaterializedViewRefresh()

DisableMaterializedViewRefresh disables the periodic MV refresh (for minimal indexing).

func (*Config) DisablePgNotifyListener

func (c *Config) DisablePgNotifyListener()

DisablePgNotifyListener disables the PostgreSQL LISTEN-based pubsub (for minimal indexing).

func (*Config) DisableScheduledReleases

func (c *Config) DisableScheduledReleases()

DisableScheduledReleases disables the periodic publish-scheduled-releases task.

func (*Config) IsDataTypeEnabled

func (c *Config) IsDataTypeEnabled(entityType string) bool

IsDataTypeEnabled returns true if the given entity type should be indexed.

func (*Config) ReadDataTypesEnv

func (c *Config) ReadDataTypesEnv()

ReadDataTypesEnv reads OPENAUDIO_ETL_ENTITY_MANAGER_DATA_TYPES and sets DataTypes accordingly. If the env var is not set, DataTypes remains nil (all types enabled). If set to empty string, DataTypes is an empty slice (no entity types enabled). If set to a comma-separated list, only those types are enabled.

type Indexer

type Indexer struct {
	ChainID string
	// contains filtered or unexported fields
}

Indexer extracts blockchain data from a Core RPC and indexes it into PostgreSQL.

func New

func New(core corev1connect.CoreServiceClient, logger *zap.Logger) *Indexer

New creates a new ETL indexer.

func (*Indexer) GetBlockPubsub

func (e *Indexer) GetBlockPubsub() *BlockPubsub

GetBlockPubsub returns the block pubsub instance.

func (*Indexer) GetDB

func (e *Indexer) GetDB() *db.Queries

func (*Indexer) GetPlayPubsub

func (e *Indexer) GetPlayPubsub() *PlayPubsub

GetPlayPubsub returns the play pubsub instance.

func (*Indexer) InitializeChainID

func (e *Indexer) InitializeChainID(ctx context.Context) error

InitializeChainID fetches and caches the chain ID from the core service.

func (*Indexer) Run

func (e *Indexer) Run() error

func (*Indexer) SetCheckReadiness

func (e *Indexer) SetCheckReadiness(checkReadiness bool)

func (*Indexer) SetConfig

func (e *Indexer) SetConfig(c Config)

SetConfig sets optional component flags.

func (*Indexer) SetDBURL

func (e *Indexer) SetDBURL(dbURL string)

func (*Indexer) SetEndingBlockHeight

func (e *Indexer) SetEndingBlockHeight(endingBlockHeight int64)

func (*Indexer) SetRunDownMigrations

func (e *Indexer) SetRunDownMigrations(runDownMigrations bool)

func (*Indexer) SetSkipMigrations

func (e *Indexer) SetSkipMigrations(skip bool)

func (*Indexer) SetStartingBlockHeight

func (e *Indexer) SetStartingBlockHeight(startingBlockHeight int64)

type MaterializedViewRefresher

type MaterializedViewRefresher struct {
	// contains filtered or unexported fields
}

MaterializedViewRefresher refreshes dashboard materialized views periodically

func NewMaterializedViewRefresher

func NewMaterializedViewRefresher(database db.DBTX, logger *zap.Logger) *MaterializedViewRefresher

NewMaterializedViewRefresher creates a new refresher service

func (*MaterializedViewRefresher) Start

Start begins the periodic refresh cycle (every 2 minutes) This method blocks and should be run in a goroutine (e.g., via errgroup)

func (*MaterializedViewRefresher) Stop

func (r *MaterializedViewRefresher) Stop()

Stop stops the refresher

type PlayPubsub

type PlayPubsub = Pubsub[*db.EtlPlay]

type Pubsub

type Pubsub[Message any] struct {
	// contains filtered or unexported fields
}

func NewPubsub

func NewPubsub[Message any]() *Pubsub[Message]

func (*Pubsub[Message]) HasSubscribers

func (ps *Pubsub[Message]) HasSubscribers(topic string) bool

HasSubscribers checks if there are any active subscribers for a topic

func (*Pubsub[Message]) Publish

func (ps *Pubsub[Message]) Publish(ctx context.Context, topic string, msg Message)

Publish sends a message to all subscribers of the specified topic

func (*Pubsub[Message]) Subscribe

func (ps *Pubsub[Message]) Subscribe(topic string, bufferSizes ...int) chan Message

Subscribe subscribes to a specific topic and returns a channel to receive messages.

func (*Pubsub[Message]) Unsubscribe

func (ps *Pubsub[Message]) Unsubscribe(topic string, ch chan Message)

Unsubscribe removes a subscriber from a topic and closes the channel.

type ScheduledReleasePublisher

type ScheduledReleasePublisher struct {
	// contains filtered or unexported fields
}

ScheduledReleasePublisher periodically publishes scheduled tracks and albums whose release_date has passed.

  • tracks: is_unlisted=true → false when is_scheduled_release and release_date < now().
  • playlists: is_private=true → false when is_album AND is_scheduled_release AND release_date < now(). Non-album playlists are not auto-published.

func NewScheduledReleasePublisher

func NewScheduledReleasePublisher(database db.DBTX, logger *zap.Logger) *ScheduledReleasePublisher

NewScheduledReleasePublisher creates a publisher that runs every minute.

func (*ScheduledReleasePublisher) Start

Start runs the publish loop. Blocks; intended to run in a goroutine.

func (*ScheduledReleasePublisher) Stop

func (p *ScheduledReleasePublisher) Stop()

Stop signals the loop to exit.

type StorageProofEntry

type StorageProofEntry struct {
	Address         string
	ProverAddresses []string
	ProofSignature  []byte
	Cid             string
	SignatureValid  bool // determined during verification
}

type StorageProofState

type StorageProofState struct {
	Height          int64
	Proofs          map[string]*StorageProofEntry // address -> proof entry
	ProverAddresses map[string]int                // address -> vote count for who should be provers
	Resolved        bool
}

StorageProofState tracks storage proof challenges and their resolution

Directories

Path Synopsis
Parity compare tool: field-by-field comparison of Go ETL output against production data.
Parity compare tool: field-by-field comparison of Go ETL output against production data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL