Documentation
¶
Index ¶
- Constants
- Variables
- func GetTypedConnection[T any](conn Connection) (T, error)
- type AMQPAdapter
- func (aa *AMQPAdapter) AckMessage(ctx context.Context, queueName, consumerGroup, receiptHandle string) error
- func (aa *AMQPAdapter) ClaimPendingMessages(ctx context.Context, queueName string, consumerGroup string, ...) ([]Message, error)
- func (aa *AMQPAdapter) Consume(ctx context.Context, queueName string, config ConsumerConfig) (<-chan Message, <-chan error)
- func (aa *AMQPAdapter) ConsumeWithGroup(ctx context.Context, queueName string, consumerGroup string, ...) (<-chan Message, <-chan error)
- func (aa *AMQPAdapter) DeleteMessage(ctx context.Context, queueName, receiptHandle string) error
- func (aa *AMQPAdapter) Publish(ctx context.Context, queueName string, body []byte) error
- func (aa *AMQPAdapter) PublishWithHeaders(ctx context.Context, queueName string, body []byte, headers map[string]any) error
- func (aa *AMQPAdapter) QueueDeclare(ctx context.Context, name string) (string, error)
- func (aa *AMQPAdapter) QueueDeclareWithConfig(ctx context.Context, name string, config QueueConfig) (string, error)
- type AMQPConfig
- type AMQPConnection
- func (ac *AMQPConnection) Close(ctx context.Context) error
- func (ac *AMQPConnection) GetBehaviors() []ConnectionBehavior
- func (ac *AMQPConnection) GetCapabilities() []ConnectionCapability
- func (ac *AMQPConnection) GetProtocol() string
- func (ac *AMQPConnection) GetRawConnection() any
- func (ac *AMQPConnection) GetState() ConnectionState
- func (ac *AMQPConnection) HealthCheck(ctx context.Context) *HealthStatus
- type AMQPConnectionFactory
- type CacheRepository
- type Config
- type ConfigTarget
- type Connection
- type ConnectionBehavior
- type ConnectionCapability
- type ConnectionFactory
- type ConnectionState
- type ConsumerConfig
- type ConsumerGroupInfo
- type ExecuteResult
- type HTTPConnection
- func (c *HTTPConnection) Close(ctx context.Context) error
- func (c *HTTPConnection) GetBaseURL() string
- func (c *HTTPConnection) GetBehaviors() []ConnectionBehavior
- func (c *HTTPConnection) GetCapabilities() []ConnectionCapability
- func (c *HTTPConnection) GetClient() *http.Client
- func (c *HTTPConnection) GetHeaders() map[string]string
- func (c *HTTPConnection) GetProtocol() string
- func (c *HTTPConnection) GetRawConnection() any
- func (c *HTTPConnection) GetState() ConnectionState
- func (c *HTTPConnection) HealthCheck(ctx context.Context) *HealthStatus
- func (c *HTTPConnection) NewRequest(ctx context.Context, method string, path string, body any) (*http.Request, error)
- type HTTPConnectionFactory
- type HealthStatus
- type Message
- type QueryRepository
- type QueryResult
- type QueueConfig
- type QueueRepository
- type QueueStreamRepository
- type RedisAdapter
- func (ra *RedisAdapter) AckMessage(ctx context.Context, queueName, consumerGroup, receiptHandle string) error
- func (ra *RedisAdapter) ClaimPendingMessages(ctx context.Context, queueName string, consumerGroup string, ...) ([]Message, error)
- func (ra *RedisAdapter) Consume(ctx context.Context, queueName string, config ConsumerConfig) (<-chan Message, <-chan error)
- func (ra *RedisAdapter) ConsumeWithGroup(ctx context.Context, queueName string, consumerGroup string, ...) (<-chan Message, <-chan error)
- func (ra *RedisAdapter) ConsumerGroupInfo(ctx context.Context, streamName string) ([]ConsumerGroupInfo, error)
- func (ra *RedisAdapter) CreateConsumerGroup(ctx context.Context, streamName, consumerGroup, startID string) error
- func (ra *RedisAdapter) DeleteMessage(ctx context.Context, queueName, receiptHandle string) error
- func (ra *RedisAdapter) Exists(ctx context.Context, key string) (bool, error)
- func (ra *RedisAdapter) Expire(ctx context.Context, key string, expiration time.Duration) error
- func (ra *RedisAdapter) Get(ctx context.Context, key string) ([]byte, error)
- func (ra *RedisAdapter) GetTTL(ctx context.Context, key string) (time.Duration, error)
- func (ra *RedisAdapter) Publish(ctx context.Context, queueName string, body []byte) error
- func (ra *RedisAdapter) PublishWithHeaders(ctx context.Context, queueName string, body []byte, headers map[string]any) error
- func (ra *RedisAdapter) QueueDeclare(ctx context.Context, name string) (string, error)
- func (ra *RedisAdapter) QueueDeclareWithConfig(ctx context.Context, name string, config QueueConfig) (string, error)
- func (ra *RedisAdapter) Remove(ctx context.Context, key string) error
- func (ra *RedisAdapter) Set(ctx context.Context, key string, value []byte) error
- func (ra *RedisAdapter) SetWithExpiration(ctx context.Context, key string, value []byte, expiration time.Duration) error
- func (ra *RedisAdapter) StreamInfo(ctx context.Context, streamName string) (StreamInfo, error)
- func (ra *RedisAdapter) TrimStream(ctx context.Context, streamName string, maxLen int64) error
- func (ra *RedisAdapter) Update(ctx context.Context, key string, value []byte) error
- type RedisConfig
- type RedisConnection
- func (rc *RedisConnection) Close(ctx context.Context) error
- func (rc *RedisConnection) GetBehaviors() []ConnectionBehavior
- func (rc *RedisConnection) GetCapabilities() []ConnectionCapability
- func (rc *RedisConnection) GetClient() *redis.Client
- func (rc *RedisConnection) GetProtocol() string
- func (rc *RedisConnection) GetRawConnection() any
- func (rc *RedisConnection) GetState() ConnectionState
- func (rc *RedisConnection) GetStats() map[string]any
- func (rc *RedisConnection) HealthCheck(ctx context.Context) *HealthStatus
- type RedisConnectionFactory
- type Registry
- func (registry *Registry) AddConnection(ctx context.Context, name string, config *ConfigTarget) (Connection, error)
- func (registry *Registry) Close(ctx context.Context) error
- func (registry *Registry) GetByBehavior(behavior ConnectionBehavior) []Connection
- func (registry *Registry) GetByCapability(capability ConnectionCapability) []Connection
- func (registry *Registry) GetByProtocol(protocol string) []Connection
- func (registry *Registry) GetDefault() Connection
- func (registry *Registry) GetNamed(name string) Connection
- func (registry *Registry) GetRepository(name string) (Repository, error)
- func (registry *Registry) HealthCheck(ctx context.Context) map[string]*HealthStatus
- func (registry *Registry) ListConnections() []string
- func (registry *Registry) ListRegisteredProtocols() []string
- func (registry *Registry) LoadFromConfig(ctx context.Context, config *Config) error
- func (registry *Registry) RegisterFactory(factory ConnectionFactory)
- func (registry *Registry) RemoveConnection(ctx context.Context, name string) error
- type Repository
- type SQLConnection
- func (c *SQLConnection) Close(ctx context.Context) error
- func (c *SQLConnection) GetBehaviors() []ConnectionBehavior
- func (c *SQLConnection) GetCapabilities() []ConnectionCapability
- func (c *SQLConnection) GetDB() *sql.DB
- func (c *SQLConnection) GetProtocol() string
- func (c *SQLConnection) GetRawConnection() any
- func (c *SQLConnection) GetState() ConnectionState
- func (c *SQLConnection) HealthCheck(ctx context.Context) *HealthStatus
- func (c *SQLConnection) Stats() sql.DBStats
- type SQLConnectionFactory
- type StreamEntry
- type StreamInfo
- type TransactionContext
- type TransactionalRepository
Constants ¶
const ( DefaultHTTPTimeout = 30 * time.Second HealthCheckTimeout = 2 * time.Second )
const ( DefaultPrefetchCount = 10 DefaultMaxRetries = 3 DefaultBlockTimeout = 5 * time.Second )
Default values for consumer configuration.
const DefaultConnection = "default"
Variables ¶
var ( ErrAMQPClientNotInitialized = errors.New("AMQP client not initialized") ErrFailedToOpenConnection = errors.New("failed to open AMQP connection") ErrFailedToOpenChannel = errors.New("failed to open AMQP channel") ErrFailedToCloseConnection = errors.New("failed to close AMQP connection") ErrFailedToCloseChannel = errors.New("failed to close AMQP channel") ErrFailedToDeclareQueue = errors.New("failed to declare queue") ErrFailedToPublishMessage = errors.New("failed to publish message") ErrFailedToStartConsuming = errors.New("failed to start consuming") ErrChannelClosed = errors.New("channel closed") ErrFailedToReconnect = errors.New("failed to reconnect") ErrDeliveryChannelClosed = errors.New("delivery channel closed") ErrNoChannelAvailable = errors.New("no channel available") ErrFailedToCloseAMQPClient = errors.New("failed to close AMQP client") ErrAMQPOperation = errors.New("AMQP operation failed") ErrAMQPConnectionFailed = errors.New("failed to connect to AMQP") ErrFailedToCreateAMQPClient = errors.New("failed to create AMQP client") ErrAMQPUnsupportedOperation = errors.New("operation not supported by AMQP") ErrIntegerOverflow = errors.New("integer overflow in conversion") )
var ( ErrFailedToCreateHTTPClient = errors.New("failed to create HTTP client") ErrFailedToHealthCheckHTTP = errors.New("failed to health check HTTP endpoint") ErrInvalidConfigTypeHTTP = errors.New("invalid config type for HTTP connection") ErrUnsupportedBodyType = errors.New("unsupported body type") ErrFailedToCreateRequest = errors.New("failed to create HTTP request") ErrFailedToLoadCertificate = errors.New("failed to load client certificate") )
var ( ErrRedisClientNotInitialized = errors.New("redis client not initialized") ErrFailedToCloseRedisClient = errors.New("failed to close Redis client") ErrRedisOperation = errors.New("redis operation failed") ErrRedisConnectionFailed = errors.New("failed to connect to Redis") ErrRedisUnexpectedPingResponse = errors.New("unexpected ping response") ErrRedisPoolTimeouts = errors.New("redis connection pool has timeouts") ErrFailedToCreateRedisClient = errors.New("failed to create Redis client") )
var ( ErrFailedToOpenSQLConnection = errors.New("failed to open SQL connection") ErrFailedToPingSQL = errors.New("failed to ping SQL database") ErrInvalidConfigTypeSQL = errors.New("invalid config type for SQL connection") ErrUnsupportedSQLProtocol = errors.New("unsupported SQL protocol") ErrFailedToCloseSQLDB = errors.New("failed to close SQL database") ErrSQLConnectionNil = errors.New("SQL connection is nil") )
var ( ErrInvalidConnectionBehavior = errors.New("invalid connection behavior") ErrInvalidConnectionProtocol = errors.New("invalid connection protocol") ErrInvalidDSN = errors.New("invalid DSN") ErrInvalidURL = errors.New("invalid URL") ErrInvalidConfigType = errors.New("invalid config type") )
var ( ErrConnectionIsNil = errors.New("connection is nil") ErrRawConnectionIsNil = errors.New("raw connection is nil") ErrInvalidType = errors.New("invalid type") )
Sentinel errors for GetTypedConnection function.
var ( ErrConnectionNotFound = errors.New("connection not found") ErrConnectionAlreadyExists = errors.New("connection already exists") ErrFailedToCreateConnection = errors.New("failed to create connection") ErrUnsupportedProtocol = errors.New("unsupported protocol") ErrFailedToCloseConnections = errors.New("failed to close connections") ErrFailedToAddConnection = errors.New("failed to add connection") ErrConnectionNotSupported = errors.New("connection does not support required operations") ErrInterfaceNotImplemented = errors.New("connection does not implement required interface") )
Functions ¶
func GetTypedConnection ¶
func GetTypedConnection[T any](conn Connection) (T, error)
GetTypedConnection extracts a typed connection from a Connection interface. This provides type-safe access to the underlying connection without manual type assertions.
Example usage:
conn, err := connfx.GetConnection("database")
if err != nil { return err }
db, err := connfx.GetTypedConnection[*sql.DB](conn)
if err != nil { return err }
// Now db is *sql.DB and can be used safely
rows, err := db.Query("SELECT * FROM users")
Types ¶
type AMQPAdapter ¶
type AMQPAdapter struct {
// contains filtered or unexported fields
}
AMQPAdapter implements the QueueRepository interface for AMQP-based message queues.
func (*AMQPAdapter) AckMessage ¶ added in v0.7.1
func (aa *AMQPAdapter) AckMessage( ctx context.Context, queueName, consumerGroup, receiptHandle string, ) error
func (*AMQPAdapter) ClaimPendingMessages ¶ added in v0.7.1
func (*AMQPAdapter) Consume ¶
func (aa *AMQPAdapter) Consume( ctx context.Context, queueName string, config ConsumerConfig, ) (<-chan Message, <-chan error)
func (*AMQPAdapter) ConsumeWithGroup ¶ added in v0.7.1
func (aa *AMQPAdapter) ConsumeWithGroup( ctx context.Context, queueName string, consumerGroup string, consumerName string, config ConsumerConfig, ) (<-chan Message, <-chan error)
func (*AMQPAdapter) DeleteMessage ¶ added in v0.7.1
func (aa *AMQPAdapter) DeleteMessage(ctx context.Context, queueName, receiptHandle string) error
func (*AMQPAdapter) PublishWithHeaders ¶ added in v0.7.1
func (*AMQPAdapter) QueueDeclare ¶
QueueRepository interface implementation.
func (*AMQPAdapter) QueueDeclareWithConfig ¶ added in v0.7.1
func (aa *AMQPAdapter) QueueDeclareWithConfig( ctx context.Context, name string, config QueueConfig, ) (string, error)
type AMQPConfig ¶ added in v0.7.1
type AMQPConfig struct {
URL string
}
AMQPConfig holds AMQP-specific configuration options.
func NewDefaultAMQPConfig ¶ added in v0.7.1
func NewDefaultAMQPConfig() *AMQPConfig
NewDefaultAMQPConfig creates an AMQP configuration with sensible defaults.
type AMQPConnection ¶
type AMQPConnection struct {
// contains filtered or unexported fields
}
AMQPConnection implements the connfx.Connection interface for AMQP connections.
func NewAMQPConnection ¶
func NewAMQPConnection(protocol string, config *AMQPConfig) *AMQPConnection
NewAMQPConnection creates a new AMQP connection.
func (*AMQPConnection) GetBehaviors ¶
func (ac *AMQPConnection) GetBehaviors() []ConnectionBehavior
Connection interface implementation.
func (*AMQPConnection) GetCapabilities ¶
func (ac *AMQPConnection) GetCapabilities() []ConnectionCapability
func (*AMQPConnection) GetProtocol ¶
func (ac *AMQPConnection) GetProtocol() string
func (*AMQPConnection) GetRawConnection ¶
func (ac *AMQPConnection) GetRawConnection() any
func (*AMQPConnection) GetState ¶
func (ac *AMQPConnection) GetState() ConnectionState
func (*AMQPConnection) HealthCheck ¶
func (ac *AMQPConnection) HealthCheck(ctx context.Context) *HealthStatus
type AMQPConnectionFactory ¶
type AMQPConnectionFactory struct {
// contains filtered or unexported fields
}
AMQPConnectionFactory creates AMQP connections.
func NewAMQPConnectionFactory ¶
func NewAMQPConnectionFactory(protocol string) *AMQPConnectionFactory
NewAMQPConnectionFactory creates a new AMQP connection factory for a specific protocol.
func (*AMQPConnectionFactory) CreateConnection ¶
func (f *AMQPConnectionFactory) CreateConnection( ctx context.Context, config *ConfigTarget, ) (Connection, error)
func (*AMQPConnectionFactory) GetProtocol ¶
func (f *AMQPConnectionFactory) GetProtocol() string
type CacheRepository ¶
type CacheRepository interface {
Repository
// SetWithExpiration stores a value with the given key and expiration time
SetWithExpiration(ctx context.Context, key string, value []byte, expiration time.Duration) error
// GetTTL returns the time-to-live for a key
GetTTL(ctx context.Context, key string) (time.Duration, error)
// Expire sets an expiration time for an existing key
Expire(ctx context.Context, key string, expiration time.Duration) error
}
CacheRepository extends Repository with cache-specific operations.
type Config ¶
type Config struct {
Targets map[string]ConfigTarget `conf:"targets"`
}
Config represents the main configuration for connfx.
type ConfigTarget ¶
type ConfigTarget struct {
Properties map[string]any `conf:"properties"`
Protocol string `conf:"protocol"` // e.g., "postgres", "redis", "http"
DSN string `conf:"dsn"`
URL string `conf:"url"`
Host string `conf:"host"`
CertFile string `conf:"cert_file"`
KeyFile string `conf:"key_file"`
CAFile string `conf:"ca_file"`
// External credential management
Port int `conf:"port"`
Timeout time.Duration `conf:"timeout"`
// Authentication and security
TLS bool `conf:"tls"`
TLSSkipVerify bool `conf:"tls_skip_verify"`
}
ConfigTarget represents the configuration data for a connection.
type Connection ¶
type Connection interface {
// GetBehaviors returns the connection behaviors this connection supports
GetBehaviors() []ConnectionBehavior
// GetCapabilities returns the connection capabilities this connection supports
GetCapabilities() []ConnectionCapability
// GetProtocol returns the protocol/technology used (e.g., "postgres", "redis", "http")
GetProtocol() string
// GetState returns the current connection state
GetState() ConnectionState
// HealthCheck performs a health check and returns the status
HealthCheck(ctx context.Context) *HealthStatus
// Close closes the connection
Close(ctx context.Context) error
// GetRawConnection returns the underlying connection object
GetRawConnection() any
}
Connection represents a generic connection interface.
type ConnectionBehavior ¶
type ConnectionBehavior string
ConnectionBehavior represents the behavioral type of connection.
const ( // ConnectionBehaviorStateful represents persistent connections that maintain state // Examples: database connections, persistent TCP connections, connection pools. ConnectionBehaviorStateful ConnectionBehavior = "stateful" // ConnectionBehaviorStateless represents connections that don't maintain state // Examples: HTTP clients, REST APIs, stateless services. ConnectionBehaviorStateless ConnectionBehavior = "stateless" // ConnectionBehaviorStreaming represents streaming/real-time connections // Examples: message queues, event streams, websockets, gRPC streams. ConnectionBehaviorStreaming ConnectionBehavior = "streaming" )
type ConnectionCapability ¶
type ConnectionCapability string
const ( // ConnectionCapabilityKeyValue represents key-value storage behavior. ConnectionCapabilityKeyValue ConnectionCapability = "key-value" // ConnectionCapabilityDocument represents document storage behavior. ConnectionCapabilityDocument ConnectionCapability = "document" // ConnectionCapabilityRelational represents relational database behavior. ConnectionCapabilityRelational ConnectionCapability = "relational" // ConnectionCapabilityTransactional represents transactional behavior. ConnectionCapabilityTransactional ConnectionCapability = "transactional" // ConnectionCapabilityCache represents caching behavior with expiration support. ConnectionCapabilityCache ConnectionCapability = "cache" // ConnectionCapabilityQueue represents message queue behavior. ConnectionCapabilityQueue ConnectionCapability = "queue" )
type ConnectionFactory ¶
type ConnectionFactory interface {
// CreateConnection creates a new connection from configuration
CreateConnection(ctx context.Context, config *ConfigTarget) (Connection, error)
// GetProtocol returns the protocol this factory supports (e.g., "postgres", "redis")
GetProtocol() string
}
ConnectionFactory creates connections from configuration.
type ConnectionState ¶
type ConnectionState int32
ConnectionState represents the current state of a connection.
const ( ConnectionStateNotInitialized ConnectionState = 0 ConnectionStateConnected ConnectionState = 1 ConnectionStateLive ConnectionState = 2 ConnectionStateReady ConnectionState = 3 ConnectionStateDisconnected ConnectionState = 4 ConnectionStateError ConnectionState = 5 ConnectionStateReconnecting ConnectionState = 6 )
func (ConnectionState) String ¶
func (i ConnectionState) String() string
type ConsumerConfig ¶
type ConsumerConfig struct {
// Args additional arguments for queue declaration
Args map[string]any
// AutoAck when true, the server will automatically acknowledge messages
AutoAck bool
// Exclusive when true, only this consumer can access the queue
Exclusive bool
// NoLocal when true, the server will not send messages to the connection that published them
NoLocal bool
// NoWait when true, the server will not respond to the declare
NoWait bool
// PrefetchCount sets how many messages to prefetch
PrefetchCount int
// BlockTimeout sets how long to wait for messages
BlockTimeout time.Duration
// MaxRetries sets maximum number of retries for failed messages
MaxRetries int
// RetryDelay sets delay between retries
RetryDelay time.Duration
}
ConsumerConfig holds configuration for message consumption.
func DefaultConsumerConfig ¶
func DefaultConsumerConfig() ConsumerConfig
DefaultConsumerConfig returns a default configuration for consuming messages.
type ConsumerGroupInfo ¶ added in v0.7.1
type ConsumerGroupInfo struct {
Name string `json:"name"`
LastDeliveredID string `json:"last_delivered_id"`
Consumers int64 `json:"consumers"`
Pending int64 `json:"pending"`
EntriesRead int64 `json:"entries_read"`
Lag int64 `json:"lag"`
}
ConsumerGroupInfo provides information about a consumer group.
type ExecuteResult ¶
type ExecuteResult interface {
// RowsAffected returns the number of rows affected
RowsAffected() (int64, error)
// LastInsertId returns the last insert ID (if applicable)
LastInsertId() (int64, error)
}
ExecuteResult represents execution results.
type HTTPConnection ¶
type HTTPConnection struct {
// contains filtered or unexported fields
}
HTTPConnection represents an HTTP API connection.
func (*HTTPConnection) GetBaseURL ¶
func (c *HTTPConnection) GetBaseURL() string
GetBaseURL returns the base URL for this connection.
func (*HTTPConnection) GetBehaviors ¶
func (c *HTTPConnection) GetBehaviors() []ConnectionBehavior
func (*HTTPConnection) GetCapabilities ¶
func (c *HTTPConnection) GetCapabilities() []ConnectionCapability
func (*HTTPConnection) GetClient ¶
func (c *HTTPConnection) GetClient() *http.Client
GetClient returns the underlying HTTP client.
func (*HTTPConnection) GetHeaders ¶
func (c *HTTPConnection) GetHeaders() map[string]string
GetHeaders returns the default headers for this connection.
func (*HTTPConnection) GetProtocol ¶
func (c *HTTPConnection) GetProtocol() string
func (*HTTPConnection) GetRawConnection ¶
func (c *HTTPConnection) GetRawConnection() any
func (*HTTPConnection) GetState ¶
func (c *HTTPConnection) GetState() ConnectionState
func (*HTTPConnection) HealthCheck ¶
func (c *HTTPConnection) HealthCheck( ctx context.Context, ) *HealthStatus
type HTTPConnectionFactory ¶
type HTTPConnectionFactory struct {
// contains filtered or unexported fields
}
HTTPConnectionFactory creates HTTP connections.
func NewHTTPConnectionFactory ¶
func NewHTTPConnectionFactory(protocol string) *HTTPConnectionFactory
NewHTTPConnectionFactory creates a new HTTP connection factory.
func (*HTTPConnectionFactory) CreateConnection ¶
func (f *HTTPConnectionFactory) CreateConnection( ctx context.Context, config *ConfigTarget, ) (Connection, error)
func (*HTTPConnectionFactory) GetProtocol ¶
func (f *HTTPConnectionFactory) GetProtocol() string
type HealthStatus ¶
type HealthStatus struct {
Timestamp time.Time `json:"timestamp"`
Error error `json:"error,omitempty"`
Message string `json:"message,omitempty"`
Latency time.Duration `json:"latency,omitempty"`
State ConnectionState `json:"state"`
}
HealthStatus represents the health check result.
type Message ¶
type Message struct {
// Timestamp when the message was created
Timestamp time.Time
// Headers contains message headers
Headers map[string]any
// ReceiptHandle is a unique identifier for the message (for acknowledgment)
ReceiptHandle string
// MessageID is the message identifier
MessageID string
// ConsumerGroup indicates which consumer group this message belongs to (if applicable)
ConsumerGroup string
// StreamName indicates which stream this message came from (for stream-based systems)
StreamName string
// Body contains the message payload
Body []byte
// DeliveryCount indicates how many times this message has been delivered
DeliveryCount int
// contains filtered or unexported fields
}
Message represents a consumed message with its metadata and acknowledgment functions.
func (*Message) SetAckFunc ¶
SetAckFunc sets the acknowledgment function.
func (*Message) SetNackFunc ¶
SetNackFunc sets the negative acknowledgment function.
type QueryRepository ¶
type QueryRepository interface {
// Query executes a query and returns raw results
Query(ctx context.Context, query string, args ...any) (QueryResult, error)
// Execute runs a command (INSERT, UPDATE, DELETE)
Execute(ctx context.Context, command string, args ...any) (ExecuteResult, error)
}
QueryRepository defines the port for query operations (for SQL-like storages).
type QueryResult ¶
type QueryResult interface {
// Next advances to the next row
Next() bool
// Scan scans the current row into destinations
Scan(dest ...any) error
// Close closes the result set
Close() error
}
QueryResult represents query results.
type QueueConfig ¶ added in v0.7.1
type QueueConfig struct {
// Args contains additional queue-specific arguments
Args map[string]any
// MaxLength sets maximum number of messages in queue (0 = unlimited)
MaxLength int64
// MessageTTL sets default TTL for messages
MessageTTL time.Duration
// Durable indicates if the queue should survive server restarts
Durable bool
// AutoDelete indicates if the queue should be deleted when no longer in use
AutoDelete bool
// Exclusive indicates if the queue is exclusive to one connection
Exclusive bool
}
QueueConfig holds configuration for queue declaration.
func DefaultQueueConfig ¶ added in v0.7.1
func DefaultQueueConfig() QueueConfig
DefaultQueueConfig returns a default configuration for queue declaration.
type QueueRepository ¶
type QueueRepository interface {
// QueueDeclare declares a queue and returns its name
QueueDeclare(ctx context.Context, name string) (string, error)
// QueueDeclareWithConfig declares a queue with specific configuration
QueueDeclareWithConfig(ctx context.Context, name string, config QueueConfig) (string, error)
// Publish sends a message to a queue
Publish(ctx context.Context, queueName string, body []byte) error
// PublishWithHeaders sends a message with custom headers
PublishWithHeaders(
ctx context.Context,
queueName string,
body []byte,
headers map[string]any,
) error
// Consume starts consuming messages from a queue
Consume(
ctx context.Context,
queueName string,
config ConsumerConfig,
) (<-chan Message, <-chan error)
// ConsumeWithGroup starts consuming messages as part of a consumer group
ConsumeWithGroup(
ctx context.Context,
queueName string,
consumerGroup string,
consumerName string,
config ConsumerConfig,
) (<-chan Message, <-chan error)
// ClaimPendingMessages claims pending messages from a consumer group
ClaimPendingMessages(
ctx context.Context,
queueName string,
consumerGroup string,
consumerName string,
minIdleTime time.Duration,
count int,
) ([]Message, error)
// AckMessage acknowledges a specific message by receipt handle
AckMessage(ctx context.Context, queueName, consumerGroup, receiptHandle string) error
// DeleteMessage removes a message from the queue (for non-streaming queues)
DeleteMessage(ctx context.Context, queueName, receiptHandle string) error
}
QueueRepository defines the port for message queue operations.
type QueueStreamRepository ¶ added in v0.7.1
type QueueStreamRepository interface {
QueueRepository
// CreateConsumerGroup creates a consumer group for a stream
CreateConsumerGroup(ctx context.Context, streamName, consumerGroup, startID string) error
// StreamInfo returns information about a stream
StreamInfo(ctx context.Context, streamName string) (StreamInfo, error)
// ConsumerGroupInfo returns information about consumer groups
ConsumerGroupInfo(ctx context.Context, streamName string) ([]ConsumerGroupInfo, error)
// TrimStream trims a stream to a maximum length
TrimStream(ctx context.Context, streamName string, maxLen int64) error
}
QueueStreamRepository defines operations for stream-based message systems (Redis Streams, Kafka, etc.)
type RedisAdapter ¶
type RedisAdapter struct {
// contains filtered or unexported fields
}
RedisAdapter implements Redis operations and wraps the Redis client.
func (*RedisAdapter) AckMessage ¶ added in v0.7.1
func (ra *RedisAdapter) AckMessage( ctx context.Context, queueName, consumerGroup, receiptHandle string, ) error
func (*RedisAdapter) ClaimPendingMessages ¶ added in v0.7.1
func (ra *RedisAdapter) ClaimPendingMessages( ctx context.Context, queueName string, consumerGroup string, consumerName string, minIdleTime time.Duration, count int, ) ([]Message, error)
ClaimPendingMessages claims pending messages from a consumer group.
func (*RedisAdapter) Consume ¶ added in v0.7.1
func (ra *RedisAdapter) Consume( ctx context.Context, queueName string, config ConsumerConfig, ) (<-chan Message, <-chan error)
func (*RedisAdapter) ConsumeWithGroup ¶ added in v0.7.1
func (ra *RedisAdapter) ConsumeWithGroup( ctx context.Context, queueName string, consumerGroup string, consumerName string, config ConsumerConfig, ) (<-chan Message, <-chan error)
func (*RedisAdapter) ConsumerGroupInfo ¶ added in v0.7.1
func (ra *RedisAdapter) ConsumerGroupInfo( ctx context.Context, streamName string, ) ([]ConsumerGroupInfo, error)
func (*RedisAdapter) CreateConsumerGroup ¶ added in v0.7.1
func (ra *RedisAdapter) CreateConsumerGroup( ctx context.Context, streamName, consumerGroup, startID string, ) error
StreamRepository interface implementation.
func (*RedisAdapter) DeleteMessage ¶ added in v0.7.1
func (ra *RedisAdapter) DeleteMessage(ctx context.Context, queueName, receiptHandle string) error
func (*RedisAdapter) PublishWithHeaders ¶ added in v0.7.1
func (*RedisAdapter) QueueDeclare ¶ added in v0.7.1
QueueRepository interface implementation for Redis Streams.
func (*RedisAdapter) QueueDeclareWithConfig ¶ added in v0.7.1
func (ra *RedisAdapter) QueueDeclareWithConfig( ctx context.Context, name string, config QueueConfig, ) (string, error)
func (*RedisAdapter) SetWithExpiration ¶
func (ra *RedisAdapter) SetWithExpiration( ctx context.Context, key string, value []byte, expiration time.Duration, ) error
CacheRepository interface implementation.
func (*RedisAdapter) StreamInfo ¶ added in v0.7.1
func (ra *RedisAdapter) StreamInfo(ctx context.Context, streamName string) (StreamInfo, error)
func (*RedisAdapter) TrimStream ¶ added in v0.7.1
type RedisConfig ¶ added in v0.7.1
type RedisConfig struct {
Address string
Password string
DB int
PoolSize int
MinIdleConns int
MaxIdleConns int
ConnMaxIdleTime time.Duration
PoolTimeout time.Duration
MaxRetries int
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration
TLSEnabled bool
TLSInsecureSkipVerify bool
}
RedisConfig holds Redis-specific configuration options.
func NewDefaultRedisConfig ¶ added in v0.7.1
func NewDefaultRedisConfig() *RedisConfig
NewDefaultRedisConfig creates a Redis configuration with sensible defaults.
type RedisConnection ¶
type RedisConnection struct {
// contains filtered or unexported fields
}
RedisConnection implements the connfx.Connection interface.
func NewRedisConnection ¶
func NewRedisConnection(protocol string, config *RedisConfig) *RedisConnection
NewRedisConnection creates a new Redis connection with enhanced configuration.
func (*RedisConnection) GetBehaviors ¶
func (rc *RedisConnection) GetBehaviors() []ConnectionBehavior
Connection interface implementation.
func (*RedisConnection) GetCapabilities ¶
func (rc *RedisConnection) GetCapabilities() []ConnectionCapability
func (*RedisConnection) GetClient ¶ added in v0.7.1
func (rc *RedisConnection) GetClient() *redis.Client
GetClient returns the underlying Redis client for advanced operations.
func (*RedisConnection) GetProtocol ¶
func (rc *RedisConnection) GetProtocol() string
func (*RedisConnection) GetRawConnection ¶
func (rc *RedisConnection) GetRawConnection() any
func (*RedisConnection) GetState ¶
func (rc *RedisConnection) GetState() ConnectionState
func (*RedisConnection) GetStats ¶ added in v0.7.1
func (rc *RedisConnection) GetStats() map[string]any
GetStats returns detailed connection and pool statistics.
func (*RedisConnection) HealthCheck ¶
func (rc *RedisConnection) HealthCheck(ctx context.Context) *HealthStatus
type RedisConnectionFactory ¶
type RedisConnectionFactory struct {
// contains filtered or unexported fields
}
RedisConnectionFactory creates Redis connections with enhanced configuration.
func NewRedisConnectionFactory ¶
func NewRedisConnectionFactory(protocol string) *RedisConnectionFactory
NewRedisConnectionFactory creates a new Redis connection factory for a specific protocol.
func (*RedisConnectionFactory) CreateConnection ¶
func (f *RedisConnectionFactory) CreateConnection( ctx context.Context, config *ConfigTarget, ) (Connection, error)
func (*RedisConnectionFactory) GetProtocol ¶
func (f *RedisConnectionFactory) GetProtocol() string
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages all connections in the system.
func NewRegistry ¶
NewRegistry creates a new connection registry.
func NewRegistryWithDefaults ¶
func (*Registry) AddConnection ¶
func (registry *Registry) AddConnection( ctx context.Context, name string, config *ConfigTarget, ) (Connection, error)
AddConnection adds a new connection to the registry.
func (*Registry) GetByBehavior ¶
func (registry *Registry) GetByBehavior(behavior ConnectionBehavior) []Connection
GetByBehavior returns all connections of a specific behavior.
func (*Registry) GetByCapability ¶
func (registry *Registry) GetByCapability(capability ConnectionCapability) []Connection
GetByCapability returns all connections of a specific capability.
func (*Registry) GetByProtocol ¶
func (registry *Registry) GetByProtocol(protocol string) []Connection
GetByProtocol returns all connections of a specific protocol.
func (*Registry) GetDefault ¶
func (registry *Registry) GetDefault() Connection
GetDefault returns the default connection.
func (*Registry) GetNamed ¶
func (registry *Registry) GetNamed(name string) Connection
GetNamed returns a named connection.
func (*Registry) GetRepository ¶
func (registry *Registry) GetRepository(name string) (Repository, error)
GetRepository returns a Repository from a connection if it supports it.
func (*Registry) HealthCheck ¶
func (registry *Registry) HealthCheck(ctx context.Context) map[string]*HealthStatus
HealthCheck performs health checks on all connections.
func (*Registry) ListConnections ¶
ListConnections returns all connection names.
func (*Registry) ListRegisteredProtocols ¶
ListRegisteredProtocols returns all registered protocols.
func (*Registry) LoadFromConfig ¶
func (*Registry) RegisterFactory ¶
func (registry *Registry) RegisterFactory(factory ConnectionFactory)
RegisterFactory registers a connection factory for a specific protocol.
type Repository ¶
type Repository interface {
// Get retrieves a value by key
Get(ctx context.Context, key string) ([]byte, error)
// Set stores a value with the given key
Set(ctx context.Context, key string, value []byte) error
// Remove deletes a value by key
Remove(ctx context.Context, key string) error
// Update updates an existing value by key
Update(ctx context.Context, key string, value []byte) error
// Exists checks if a key exists
Exists(ctx context.Context, key string) (bool, error)
}
Repository defines the port for data access operations. This interface will be implemented by adapters in connfx for different storage technologies.
type SQLConnection ¶
type SQLConnection struct {
// contains filtered or unexported fields
}
SQLConnection represents a SQL database connection.
func (*SQLConnection) GetBehaviors ¶
func (c *SQLConnection) GetBehaviors() []ConnectionBehavior
func (*SQLConnection) GetCapabilities ¶
func (c *SQLConnection) GetCapabilities() []ConnectionCapability
func (*SQLConnection) GetDB ¶
func (c *SQLConnection) GetDB() *sql.DB
GetDB returns the underlying *sql.DB instance.
func (*SQLConnection) GetProtocol ¶
func (c *SQLConnection) GetProtocol() string
func (*SQLConnection) GetRawConnection ¶
func (c *SQLConnection) GetRawConnection() any
func (*SQLConnection) GetState ¶
func (c *SQLConnection) GetState() ConnectionState
func (*SQLConnection) HealthCheck ¶
func (c *SQLConnection) HealthCheck(ctx context.Context) *HealthStatus
func (*SQLConnection) Stats ¶
func (c *SQLConnection) Stats() sql.DBStats
Stats returns database connection statistics.
type SQLConnectionFactory ¶
type SQLConnectionFactory struct {
// contains filtered or unexported fields
}
SQLConnectionFactory creates SQL connections.
func NewSQLConnectionFactory ¶
func NewSQLConnectionFactory(protocol string) *SQLConnectionFactory
NewSQLConnectionFactory creates a new SQL connection factory for a specific protocol.
func (*SQLConnectionFactory) CreateConnection ¶
func (f *SQLConnectionFactory) CreateConnection( ctx context.Context, config *ConfigTarget, ) (Connection, error)
func (*SQLConnectionFactory) GetProtocol ¶
func (f *SQLConnectionFactory) GetProtocol() string
type StreamEntry ¶ added in v0.7.1
StreamEntry represents a single stream entry.
type StreamInfo ¶ added in v0.7.1
type StreamInfo struct {
FirstEntry *StreamEntry `json:"first_entry,omitempty"`
LastEntry *StreamEntry `json:"last_entry,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
LastGeneratedID string `json:"last_generated_id"`
MaxDeletedID string `json:"max_deleted_id"`
RecordedFirstID string `json:"recorded_first_id"`
Length int64 `json:"length"`
RadixTreeKeys int64 `json:"radix_tree_keys"`
RadixTreeNodes int64 `json:"radix_tree_nodes"`
Groups int64 `json:"groups"`
EntriesAdded int64 `json:"entries_added"`
}
StreamInfo provides information about a stream.
type TransactionContext ¶
type TransactionContext interface {
// Commit commits the transaction
Commit() error
// Rollback rolls back the transaction
Rollback() error
// GetRepository returns a repository bound to this transaction
GetRepository() Repository
}
TransactionContext represents a transaction context for data operations.
type TransactionalRepository ¶
type TransactionalRepository interface {
Repository
// BeginTransaction starts a new transaction
BeginTransaction(ctx context.Context) (TransactionContext, error)
}
TransactionalRepository extends Repository with transaction support.