Documentation
¶
Index ¶
- Variables
- type Cache
- func (c *Cache) Delete(ctx context.Context, key string) error
- func (c *Cache) Exists(ctx context.Context, key string) (bool, error)
- func (c *Cache) Expire(ctx context.Context, key string, expiration time.Duration) error
- func (c *Cache) Get(ctx context.Context, key string, dest any) error
- func (c *Cache) GetConnection() connfx.Connection
- func (c *Cache) GetRaw(ctx context.Context, key string) ([]byte, error)
- func (c *Cache) GetRepository() connfx.CacheRepository
- func (c *Cache) GetTTL(ctx context.Context, key string) (time.Duration, error)
- func (c *Cache) Set(ctx context.Context, key string, value any, expiration time.Duration) error
- func (c *Cache) SetRaw(ctx context.Context, key string, value []byte, expiration time.Duration) error
- type Queue
- func (q *Queue) AckMessage(ctx context.Context, queueName, consumerGroup, receiptHandle string) error
- func (q *Queue) ClaimPendingMessages(ctx context.Context, queueName string, consumerGroup string, ...) ([]connfx.Message, error)
- func (q *Queue) Consume(ctx context.Context, queueName string, config connfx.ConsumerConfig) (<-chan connfx.Message, <-chan error)
- func (q *Queue) ConsumeWithDefaults(ctx context.Context, queueName string) (<-chan connfx.Message, <-chan error)
- func (q *Queue) ConsumeWithGroup(ctx context.Context, queueName string, consumerGroup string, ...) (<-chan connfx.Message, <-chan error)
- func (q *Queue) DeclareQueue(ctx context.Context, name string) (string, error)
- func (q *Queue) DeclareQueueWithConfig(ctx context.Context, name string, config connfx.QueueConfig) (string, error)
- func (q *Queue) DeleteMessage(ctx context.Context, queueName, receiptHandle string) error
- func (q *Queue) GetConnection() connfx.Connection
- func (q *Queue) GetRepository() connfx.QueueRepository
- func (q *Queue) GetStreamRepository() (connfx.QueueStreamRepository, error)
- func (q *Queue) IsStreamSupported() bool
- func (q *Queue) ProcessMessages(ctx context.Context, queueName string, config connfx.ConsumerConfig, ...) error
- func (q *Queue) ProcessMessagesWithDefaults(ctx context.Context, queueName string, ...) error
- func (q *Queue) ProcessMessagesWithGroup(ctx context.Context, queueName string, consumerGroup string, ...) error
- func (q *Queue) Publish(ctx context.Context, queueName string, message any) error
- func (q *Queue) PublishRaw(ctx context.Context, queueName string, data []byte) error
- func (q *Queue) PublishRawWithHeaders(ctx context.Context, queueName string, data []byte, headers map[string]any) error
- func (q *Queue) PublishWithHeaders(ctx context.Context, queueName string, message any, headers map[string]any) error
- type QueueStream
- func (s *QueueStream) AckMessage(ctx context.Context, streamName string, consumerGroup string, messageID string) error
- func (s *QueueStream) ClaimPendingMessages(ctx context.Context, streamName string, consumerGroup string, ...) ([]connfx.Message, error)
- func (s *QueueStream) ConsumeFromGroup(ctx context.Context, streamName string, consumerGroup string, ...) (<-chan connfx.Message, <-chan error)
- func (s *QueueStream) CreateConsumerGroup(ctx context.Context, streamName string, consumerGroup string, startFrom string) error
- func (s *QueueStream) CreateStream(ctx context.Context, streamName string) error
- func (s *QueueStream) GetConnection() connfx.Connection
- func (s *QueueStream) GetConsumerGroupInfo(ctx context.Context, streamName string) ([]connfx.ConsumerGroupInfo, error)
- func (s *QueueStream) GetRepository() connfx.QueueStreamRepository
- func (s *QueueStream) GetStreamInfo(ctx context.Context, streamName string) (connfx.StreamInfo, error)
- func (s *QueueStream) ProcessMessagesFromGroup(ctx context.Context, streamName string, consumerGroup string, ...) error
- func (s *QueueStream) SendMessage(ctx context.Context, streamName string, message any) (string, error)
- func (s *QueueStream) SendMessageWithHeaders(ctx context.Context, streamName string, message any, headers map[string]any) (string, error)
- func (s *QueueStream) TrimStream(ctx context.Context, streamName string, maxLength int64) error
- type Store
- func (s *Store) Exists(ctx context.Context, key string) (bool, error)
- func (s *Store) Get(ctx context.Context, key string, dest any) error
- func (s *Store) GetConnection() connfx.Connection
- func (s *Store) GetRaw(ctx context.Context, key string) ([]byte, error)
- func (s *Store) GetRepository() connfx.Repository
- func (s *Store) Remove(ctx context.Context, key string) error
- func (s *Store) Set(ctx context.Context, key string, value any) error
- func (s *Store) SetRaw(ctx context.Context, key string, value []byte) error
- func (s *Store) Update(ctx context.Context, key string, value any) error
- func (s *Store) UpdateRaw(ctx context.Context, key string, value []byte) error
- type TransactionStore
- func (ts *TransactionStore) Exists(ctx context.Context, key string) (bool, error)
- func (ts *TransactionStore) Get(ctx context.Context, key string, dest any) error
- func (ts *TransactionStore) GetRaw(ctx context.Context, key string) ([]byte, error)
- func (ts *TransactionStore) Remove(ctx context.Context, key string) error
- func (ts *TransactionStore) Set(ctx context.Context, key string, value any) error
- func (ts *TransactionStore) SetRaw(ctx context.Context, key string, value []byte) error
- func (ts *TransactionStore) Update(ctx context.Context, key string, value any) error
- func (ts *TransactionStore) UpdateRaw(ctx context.Context, key string, value []byte) error
- type TransactionalStore
Constants ¶
This section is empty.
Variables ¶
var ( ErrCacheNotSupported = errors.New("connection does not support cache operations") ErrKeyExpired = errors.New("key has expired") ErrCacheOperation = errors.New("cache operation failed") )
var ( ErrQueueNotSupported = errors.New("connection does not support queue operations") ErrMessageProcessing = errors.New("message processing failed") ErrContextCanceled = errors.New("context canceled") ErrQueueOperation = errors.New("queue operation failed") )
var ( ErrQueueStreamNotSupported = errors.New("connection does not support queue stream operations") ErrQueueStreamOperation = errors.New("queue stream operation failed") )
var ( ErrConnectionNotSupported = errors.New("connection does not support required operations") ErrKeyNotFound = errors.New("key not found") ErrFailedToMarshal = errors.New("failed to marshal data") ErrFailedToUnmarshal = errors.New("failed to unmarshal data") ErrInvalidData = errors.New("invalid data") ErrRepositoryOperation = errors.New("repository operation failed") )
Functions ¶
This section is empty.
Types ¶
type Cache ¶ added in v0.7.0
type Cache struct {
// contains filtered or unexported fields
}
Cache provides high-level cache operations with expiration support.
func NewCache ¶ added in v0.7.0
func NewCache(conn connfx.Connection) (*Cache, error)
NewCache creates a new Cache instance from a connfx connection. The connection must support cache operations.
func (*Cache) Get ¶ added in v0.7.0
Get retrieves a value by key and unmarshals it into the provided destination.
func (*Cache) GetConnection ¶ added in v0.7.0
func (c *Cache) GetConnection() connfx.Connection
GetConnection returns the underlying connfx connection.
func (*Cache) GetRepository ¶ added in v0.7.0
func (c *Cache) GetRepository() connfx.CacheRepository
GetRepository returns the underlying cache repository.
type Queue ¶ added in v0.7.0
type Queue struct {
// contains filtered or unexported fields
}
Queue provides high-level message queue operations.
func NewQueue ¶ added in v0.7.0
func NewQueue(conn connfx.Connection) (*Queue, error)
NewQueue creates a new Queue instance from a connfx connection. The connection must support queue operations.
func (*Queue) AckMessage ¶ added in v0.7.1
func (q *Queue) AckMessage( ctx context.Context, queueName, consumerGroup, receiptHandle string, ) error
AckMessage acknowledges a specific message by receipt handle.
func (*Queue) ClaimPendingMessages ¶ added in v0.7.1
func (q *Queue) ClaimPendingMessages( ctx context.Context, queueName string, consumerGroup string, consumerName string, minIdleTime time.Duration, count int, ) ([]connfx.Message, error)
ClaimPendingMessages attempts to claim pending messages from a consumer group.
func (*Queue) Consume ¶ added in v0.7.0
func (q *Queue) Consume( ctx context.Context, queueName string, config connfx.ConsumerConfig, ) (<-chan connfx.Message, <-chan error)
Consume starts consuming messages from a queue with the given configuration. Returns channels for messages and errors.
func (*Queue) ConsumeWithDefaults ¶ added in v0.7.0
func (q *Queue) ConsumeWithDefaults( ctx context.Context, queueName string, ) (<-chan connfx.Message, <-chan error)
ConsumeWithDefaults starts consuming messages from a queue with default configuration.
func (*Queue) ConsumeWithGroup ¶ added in v0.7.1
func (q *Queue) ConsumeWithGroup( ctx context.Context, queueName string, consumerGroup string, consumerName string, config connfx.ConsumerConfig, ) (<-chan connfx.Message, <-chan error)
ConsumeWithGroup starts consuming messages as part of a consumer group.
func (*Queue) DeclareQueue ¶ added in v0.7.0
DeclareQueue declares a queue and returns its name.
func (*Queue) DeclareQueueWithConfig ¶ added in v0.7.1
func (q *Queue) DeclareQueueWithConfig( ctx context.Context, name string, config connfx.QueueConfig, ) (string, error)
DeclareQueueWithConfig declares a queue with specific configuration.
func (*Queue) DeleteMessage ¶ added in v0.7.1
DeleteMessage removes a message from the queue.
func (*Queue) GetConnection ¶ added in v0.7.0
func (q *Queue) GetConnection() connfx.Connection
GetConnection returns the underlying connfx connection.
func (*Queue) GetRepository ¶ added in v0.7.0
func (q *Queue) GetRepository() connfx.QueueRepository
GetRepository returns the underlying queue repository.
func (*Queue) GetStreamRepository ¶ added in v0.7.1
func (q *Queue) GetStreamRepository() (connfx.QueueStreamRepository, error)
GetStreamRepository returns the underlying stream repository if supported.
func (*Queue) IsStreamSupported ¶ added in v0.7.1
IsStreamSupported checks if the underlying repository supports stream operations.
func (*Queue) ProcessMessages ¶ added in v0.7.0
func (q *Queue) ProcessMessages( ctx context.Context, queueName string, config connfx.ConsumerConfig, messageHandler func(ctx context.Context, message any) bool, messageType any, ) error
ProcessMessages provides a convenient way to process messages with automatic unmarshalling. The messageHandler function receives the unmarshaled message and should return true to acknowledge the message, or false to negatively acknowledge it.
func (*Queue) ProcessMessagesWithDefaults ¶ added in v0.7.0
func (q *Queue) ProcessMessagesWithDefaults( ctx context.Context, queueName string, messageHandler func(ctx context.Context, message any) bool, messageType any, ) error
ProcessMessagesWithDefaults processes messages with default consumer configuration.
func (*Queue) ProcessMessagesWithGroup ¶ added in v0.7.1
func (q *Queue) ProcessMessagesWithGroup( ctx context.Context, queueName string, consumerGroup string, consumerName string, config connfx.ConsumerConfig, messageHandler func(ctx context.Context, message any) bool, messageType any, ) error
ProcessMessagesWithGroup processes messages as part of a consumer group.
func (*Queue) Publish ¶ added in v0.7.0
Publish sends a message to a queue after marshaling it to JSON.
func (*Queue) PublishRaw ¶ added in v0.7.0
PublishRaw sends raw bytes to a queue.
type QueueStream ¶ added in v0.7.1
type QueueStream struct {
// contains filtered or unexported fields
}
QueueStream provides high-level stream operations for message streaming systems.
func NewQueueStream ¶ added in v0.7.1
func NewQueueStream(conn connfx.Connection) (*QueueStream, error)
NewQueueStream creates a new QueueStream instance from a connfx connection. The connection must support stream operations.
func (*QueueStream) AckMessage ¶ added in v0.7.1
func (s *QueueStream) AckMessage( ctx context.Context, streamName string, consumerGroup string, messageID string, ) error
AckMessage acknowledges a message in a consumer group.
func (*QueueStream) ClaimPendingMessages ¶ added in v0.7.1
func (s *QueueStream) ClaimPendingMessages( ctx context.Context, streamName string, consumerGroup string, consumerName string, minIdleTime time.Duration, count int, ) ([]connfx.Message, error)
ClaimPendingMessages claims pending messages from a consumer group that have been idle.
func (*QueueStream) ConsumeFromGroup ¶ added in v0.7.1
func (s *QueueStream) ConsumeFromGroup( ctx context.Context, streamName string, consumerGroup string, consumerName string, config connfx.ConsumerConfig, ) (<-chan connfx.Message, <-chan error)
ConsumeFromGroup consumes messages from a stream as part of a consumer group.
func (*QueueStream) CreateConsumerGroup ¶ added in v0.7.1
func (s *QueueStream) CreateConsumerGroup( ctx context.Context, streamName string, consumerGroup string, startFrom string, ) error
CreateConsumerGroup creates a consumer group for a stream.
func (*QueueStream) CreateStream ¶ added in v0.7.1
func (s *QueueStream) CreateStream(ctx context.Context, streamName string) error
CreateStream creates a stream by sending the first message. For most stream systems, streams are created automatically when first written to.
func (*QueueStream) GetConnection ¶ added in v0.7.1
func (s *QueueStream) GetConnection() connfx.Connection
GetConnection returns the underlying connfx connection.
func (*QueueStream) GetConsumerGroupInfo ¶ added in v0.7.1
func (s *QueueStream) GetConsumerGroupInfo( ctx context.Context, streamName string, ) ([]connfx.ConsumerGroupInfo, error)
GetConsumerGroupInfo returns information about consumer groups for a stream.
func (*QueueStream) GetRepository ¶ added in v0.7.1
func (s *QueueStream) GetRepository() connfx.QueueStreamRepository
GetRepository returns the underlying stream repository.
func (*QueueStream) GetStreamInfo ¶ added in v0.7.1
func (s *QueueStream) GetStreamInfo( ctx context.Context, streamName string, ) (connfx.StreamInfo, error)
GetStreamInfo returns information about a stream.
func (*QueueStream) ProcessMessagesFromGroup ¶ added in v0.7.1
func (s *QueueStream) ProcessMessagesFromGroup( ctx context.Context, streamName string, consumerGroup string, consumerName string, config connfx.ConsumerConfig, messageHandler func(ctx context.Context, message any) bool, messageType any, ) error
ProcessMessagesFromGroup processes messages from a consumer group with automatic unmarshalling.
func (*QueueStream) SendMessage ¶ added in v0.7.1
func (s *QueueStream) SendMessage( ctx context.Context, streamName string, message any, ) (string, error)
SendMessage sends a message to a stream after marshaling it to JSON.
func (*QueueStream) SendMessageWithHeaders ¶ added in v0.7.1
func (s *QueueStream) SendMessageWithHeaders( ctx context.Context, streamName string, message any, headers map[string]any, ) (string, error)
SendMessageWithHeaders sends a message with headers to a stream.
func (*QueueStream) TrimStream ¶ added in v0.7.1
TrimStream trims a stream to a maximum length.
type Store ¶ added in v0.7.0
type Store struct {
// contains filtered or unexported fields
}
Store provides high-level data persistence operations.
func NewStore ¶ added in v0.7.0
func NewStore(conn connfx.Connection) (*Store, error)
New creates a new Store instance from a connfx connection. The connection must support data repository operations.
func (*Store) Get ¶ added in v0.7.0
Get retrieves a value by key and unmarshals it into the provided destination.
func (*Store) GetConnection ¶ added in v0.7.0
func (s *Store) GetConnection() connfx.Connection
GetConnection returns the underlying connfx connection.
func (*Store) GetRepository ¶ added in v0.7.0
func (s *Store) GetRepository() connfx.Repository
GetRepository returns the underlying data repository.
func (*Store) Set ¶ added in v0.7.0
Set stores a value with the given key after marshaling it to JSON.
type TransactionStore ¶ added in v0.7.0
type TransactionStore struct {
Repository connfx.Repository
}
TransactionStore provides data operations within a transaction context.
func (*TransactionStore) Get ¶ added in v0.7.0
Get retrieves a value by key and unmarshals it into the provided destination.
func (*TransactionStore) Remove ¶ added in v0.7.0
func (ts *TransactionStore) Remove(ctx context.Context, key string) error
Remove deletes a value by key.
func (*TransactionStore) Set ¶ added in v0.7.0
Set stores a value with the given key after marshaling it to JSON.
type TransactionalStore ¶ added in v0.7.0
type TransactionalStore struct {
*Store
// contains filtered or unexported fields
}
TransactionalStore provides transactional store operations.
func NewTransactionalStore ¶ added in v0.7.0
func NewTransactionalStore(conn connfx.Connection) (*TransactionalStore, error)
NewTransactionalStore creates a new TransactionalStore instance from a connfx connection. The connection must support transactional operations.
func (*TransactionalStore) ExecuteTransaction ¶ added in v0.7.0
func (ts *TransactionalStore) ExecuteTransaction( ctx context.Context, fn func(*TransactionStore) error, ) error
ExecuteTransaction executes a function within a transaction context. If the function returns an error, the transaction is rolled back. If the function succeeds, the transaction is committed.