data

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2026 License: Apache-2.0 Imports: 11 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ProviderSet = wire.NewSet(ProvideData)

ProviderSet is the wire provider set for the data package. It provides *Data with a cleanup function that closes all connections.

Usage:

wire.Build(
    data.ProviderSet,
    // ... other providers
)

Functions

func GetTx

func GetTx(ctx context.Context) (*sql.Tx, error)

GetTx retrieves transaction from context

func ListRegisteredDrivers added in v0.2.0

func ListRegisteredDrivers() map[string][]string

ListRegisteredDrivers returns a snapshot of all registered drivers. Useful for debugging and diagnostics.

func NewSearchClient added in v0.2.1

func NewSearchClient(d *Data, collector ...metrics.Collector) *search.Client

NewSearchClient creates a search client from ncore data layer. It automatically detects and creates adapters for available search engines.

Returns nil if no search engines are available. Applications should check if the returned client is nil to support optional search functionality.

func RegisterCacheDriver added in v0.2.0

func RegisterCacheDriver(driver CacheDriver)

RegisterCacheDriver makes a cache driver available by the provided name. It follows the same pattern as RegisterDatabaseDriver.

func RegisterDatabaseDriver added in v0.2.0

func RegisterDatabaseDriver(driver DatabaseDriver)

RegisterDatabaseDriver makes a database driver available by the provided name. It is intended to be called from the init function in driver packages.

Example usage in a driver package:

func init() {
    data.RegisterDatabaseDriver(&postgresDriver{})
}

If RegisterDatabaseDriver is called twice with the same name or if driver is nil, it panics.

func RegisterMessageDriver added in v0.2.0

func RegisterMessageDriver(driver MessageDriver)

RegisterMessageDriver makes a message queue driver available by the provided name.

func RegisterSearchDriver added in v0.2.0

func RegisterSearchDriver(driver SearchDriver)

RegisterSearchDriver makes a search engine driver available by the provided name.

func RegisterStorageDriver added in v0.2.0

func RegisterStorageDriver(driver StorageDriver)

RegisterStorageDriver makes a storage driver available by the provided name.

Types

type CacheDriver added in v0.2.0

type CacheDriver interface {
	// Name returns the driver identifier (e.g., "redis", "memcached")
	Name() string

	// Connect establishes a new cache connection.
	Connect(ctx context.Context, cfg any) (any, error)

	// Close terminates the cache connection.
	Close(conn any) error

	// Ping verifies the cache connection is alive.
	Ping(ctx context.Context, conn any) error
}

CacheDriver defines the interface for cache/key-value store drivers.

func GetCacheDriver added in v0.2.0

func GetCacheDriver(name string) (CacheDriver, error)

GetCacheDriver retrieves a registered cache driver by name.

type ContextKey

type ContextKey string
const (
	ContextKeyTransaction ContextKey = "tx"
)

type Data

type Data struct {
	Conn *connection.Connections
	// contains filtered or unexported fields
}

func New

func New(cfg *config.Config, createNewInstance ...bool) (*Data, func(name ...string), error)

New creates new data layer

func ProvideData added in v0.2.0

func ProvideData(cfg *config.Config) (*Data, func(), error)

ProvideData initializes and returns the data layer with cleanup function. The cleanup function should be called when the application shuts down to properly close all database connections and release resources.

func (*Data) Close

func (d *Data) Close() []error

Close closes all data connections

func (*Data) ConsumeFromKafka

func (d *Data) ConsumeFromKafka(ctx context.Context, topic, groupID string, handler func([]byte) error) error

ConsumeFromKafka consumes messages from Kafka with metrics

func (*Data) ConsumeFromRabbitMQ

func (d *Data) ConsumeFromRabbitMQ(queue string, handler func([]byte) error) error

ConsumeFromRabbitMQ consumes messages from RabbitMQ with metrics

func (*Data) DB

func (d *Data) DB() *sql.DB

func (*Data) DBRead

func (d *Data) DBRead() (*sql.DB, error)

func (*Data) GetDBManager

func (d *Data) GetDBManager() *connection.DBManager

func (*Data) GetDatabaseNodes

func (d *Data) GetDatabaseNodes() (master *sql.DB, slaves []*sql.DB, err error)

GetDatabaseNodes returns information about all database nodes (master and slaves)

func (*Data) GetElasticsearch

func (d *Data) GetElasticsearch() any

func (*Data) GetMasterDB

func (d *Data) GetMasterDB() *sql.DB

func (*Data) GetMeilisearch

func (d *Data) GetMeilisearch() any

func (*Data) GetMetricsCollector

func (d *Data) GetMetricsCollector() metrics.Collector

GetMetricsCollector returns the metrics collector

func (*Data) GetMongoCollection

func (d *Data) GetMongoCollection(dbName, collName string, readOnly bool) (any, error)

func (*Data) GetMongoDatabase

func (d *Data) GetMongoDatabase(name string, readOnly bool) (any, error)

func (*Data) GetMongoManager

func (d *Data) GetMongoManager() any

func (*Data) GetOpenSearch

func (d *Data) GetOpenSearch() any

func (*Data) GetRedis

func (d *Data) GetRedis() any

func (*Data) GetSlaveDB

func (d *Data) GetSlaveDB() (*sql.DB, error)

func (*Data) GetStats

func (d *Data) GetStats() map[string]any

GetStats returns data layer statistics

func (*Data) Health

func (d *Data) Health(ctx context.Context) map[string]any

Health checks all components with comprehensive metrics collection

func (*Data) IsMessagingAvailable

func (d *Data) IsMessagingAvailable() bool

IsMessagingAvailable checks if any messaging (queue or memory) is available Deprecated: Use IsMessagingEnabled() and IsQueueAvailable() separately

func (*Data) IsMessagingEnabled

func (d *Data) IsMessagingEnabled() bool

IsMessagingEnabled checks if messaging services

func (*Data) IsQueueAvailable

func (d *Data) IsQueueAvailable() bool

IsQueueAvailable checks if external message queues are available

func (*Data) IsReadOnlyMode

func (d *Data) IsReadOnlyMode(ctx context.Context) bool

IsReadOnlyMode checks if the system is in read-only mode (only slaves available)

func (*Data) MongoHealthCheck

func (d *Data) MongoHealthCheck(ctx context.Context) error

func (*Data) Ping

func (d *Data) Ping(ctx context.Context) error

Ping checks all database connections

func (*Data) PublishToKafka

func (d *Data) PublishToKafka(ctx context.Context, topic string, key, value []byte) error

PublishToKafka publishes message to Kafka with metrics

func (*Data) PublishToRabbitMQ

func (d *Data) PublishToRabbitMQ(exchange, routingKey string, body []byte) error

PublishToRabbitMQ publishes message to RabbitMQ with metrics

func (*Data) ShouldUseMemoryFallback

func (d *Data) ShouldUseMemoryFallback() bool

ShouldUseMemoryFallback checks if should fallback to memory when queue unavailable

func (*Data) WithMongoTransaction

func (d *Data) WithMongoTransaction(ctx context.Context, fn func(any) error) error

func (*Data) WithTx

func (d *Data) WithTx(ctx context.Context, fn func(ctx context.Context) error) error

WithTx wraps function within transaction

func (*Data) WithTxRead

func (d *Data) WithTxRead(ctx context.Context, fn func(ctx context.Context) error) error

WithTxRead wraps function within read-only transaction

type DatabaseDriver added in v0.2.0

type DatabaseDriver interface {
	// Name returns the driver identifier (e.g., "postgres", "mysql", "sqlite")
	Name() string

	// Connect establishes a new database connection using the provided configuration.
	// The returned connection should be ready for use or return an error.
	Connect(ctx context.Context, cfg any) (any, error)

	// Close terminates the database connection and releases resources.
	Close(conn any) error

	// Ping verifies the connection is alive and functional.
	Ping(ctx context.Context, conn any) error
}

DatabaseDriver defines the interface for relational database drivers. Implementations should handle connection lifecycle and health checks.

func GetDatabaseDriver added in v0.2.0

func GetDatabaseDriver(name string) (DatabaseDriver, error)

GetDatabaseDriver retrieves a registered database driver by name. It returns an error with helpful instructions if the driver is not found.

type MessageDriver added in v0.2.0

type MessageDriver interface {
	// Name returns the driver identifier (e.g., "kafka", "rabbitmq")
	Name() string

	// Connect establishes a new message broker connection.
	Connect(ctx context.Context, cfg any) (any, error)

	// Close terminates the message broker connection.
	Close(conn any) error
}

MessageDriver defines the interface for message queue/broker drivers.

func GetMessageDriver added in v0.2.0

func GetMessageDriver(name string) (MessageDriver, error)

GetMessageDriver retrieves a registered message queue driver by name.

type Option

type Option func(*Data)

func WithExtensionCollector

func WithExtensionCollector(collector metrics.ExtensionCollector) Option

func WithIndexPrefix

func WithIndexPrefix(prefix string) Option

func WithMetricsCollector

func WithMetricsCollector(collector metrics.Collector) Option

func WithSearchConfig

func WithSearchConfig(searchConfig *config.Search) Option

type SearchCollectorAdapter added in v0.2.1

type SearchCollectorAdapter struct {
	// contains filtered or unexported fields
}

SearchCollectorAdapter adapts data/metrics.Collector to data/search.Collector

func (*SearchCollectorAdapter) SearchIndex added in v0.2.1

func (a *SearchCollectorAdapter) SearchIndex(engine, operation string)

SearchIndex records search index operation metrics

func (*SearchCollectorAdapter) SearchQuery added in v0.2.1

func (a *SearchCollectorAdapter) SearchQuery(engine string, err error)

SearchQuery records search query metrics

type SearchDriver added in v0.2.0

type SearchDriver interface {
	// Name returns the driver identifier (e.g., "elasticsearch", "meilisearch")
	Name() string

	// Connect establishes a new search engine connection.
	Connect(ctx context.Context, cfg any) (any, error)

	// Close terminates the search engine connection.
	Close(conn any) error
}

SearchDriver defines the interface for search engine drivers.

func GetSearchDriver added in v0.2.0

func GetSearchDriver(name string) (SearchDriver, error)

GetSearchDriver retrieves a registered search engine driver by name.

type SearchEngine added in v0.2.0

type SearchEngine interface {
	// Health checks if the search engine is available and responds
	Health(ctx context.Context) error

	// IndexDocument indexes a single document
	IndexDocument(ctx context.Context, index, docID string, document any) error

	// DeleteDocument deletes a document by ID
	DeleteDocument(ctx context.Context, index, docID string) error

	// IndexExists checks if an index exists
	IndexExists(ctx context.Context, index string) (bool, error)

	// CreateIndex creates a new index with optional settings
	CreateIndex(ctx context.Context, index, settings string) error
}

SearchEngine defines the interface that all search engine client implementations must satisfy. This allows the search.Client to work with any search backend through type assertions.

type StorageDriver added in v0.2.0

type StorageDriver interface {
	// Name returns the driver identifier (e.g., "s3", "minio", "local")
	Name() string

	// Connect establishes a new storage connection.
	Connect(ctx context.Context, cfg any) (any, error)

	// Close terminates the storage connection.
	Close(conn any) error
}

StorageDriver defines the interface for object storage drivers.

func GetStorageDriver added in v0.2.0

func GetStorageDriver(name string) (StorageDriver, error)

GetStorageDriver retrieves a registered storage driver by name.

Directories

Path Synopsis
cache module
entgo module
kafka module
meilisearch module
mongodb module
mysql module
neo4j module
opensearch module
postgres module
rabbitmq module
redis module
sqlite module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL