Documentation
¶
Index ¶
- type BadgerStore
- func (s *BadgerStore) AddSubscription(ctx context.Context, clientID, topic string) error
- func (s *BadgerStore) AddToIndex(ctx context.Context, indexName, key, value string) error
- func (s *BadgerStore) Close() error
- func (s *BadgerStore) DeleteClient(ctx context.Context, id string) error
- func (s *BadgerStore) DeleteMessage(ctx context.Context, id string) error
- func (s *BadgerStore) DeleteQueue(ctx context.Context, name string) error
- func (s *BadgerStore) DequeueMessage(ctx context.Context, queueName string) (string, error)
- func (s *BadgerStore) EnqueueMessage(ctx context.Context, queueName string, msgID string) error
- func (s *BadgerStore) GetClient(ctx context.Context, id string) (*types.Client, error)
- func (s *BadgerStore) GetFromIndex(ctx context.Context, indexName, key string) ([]string, error)
- func (s *BadgerStore) GetMessage(ctx context.Context, id string) (*types.Message, error)
- func (s *BadgerStore) GetQueue(ctx context.Context, name string) (*types.Queue, error)
- func (s *BadgerStore) GetQueueSize(ctx context.Context, queueName string) (int64, error)
- func (s *BadgerStore) GetSubscribers(ctx context.Context, topic string) ([]string, error)
- func (s *BadgerStore) GetSubscriptions(ctx context.Context, clientID string) ([]string, error)
- func (s *BadgerStore) ListClients(ctx context.Context, filter ClientFilter) ([]*types.Client, error)
- func (s *BadgerStore) ListMessages(ctx context.Context, filter MessageFilter) ([]*types.Message, error)
- func (s *BadgerStore) ListQueues(ctx context.Context) ([]*types.Queue, error)
- func (s *BadgerStore) MigrateLegacyQueues(ctx context.Context) error
- func (s *BadgerStore) Open(path string) error
- func (s *BadgerStore) PeekQueue(ctx context.Context, queueName string, limit int) ([]string, error)
- func (s *BadgerStore) RemoveFromIndex(ctx context.Context, indexName, key, value string) error
- func (s *BadgerStore) RemoveSubscription(ctx context.Context, clientID, topic string) error
- func (s *BadgerStore) RunInTransaction(ctx context.Context, fn func(tx Transaction) error) error
- func (s *BadgerStore) SaveClient(ctx context.Context, client *types.Client) error
- func (s *BadgerStore) SaveMessage(ctx context.Context, msg *types.Message) error
- func (s *BadgerStore) SaveQueue(ctx context.Context, queue *types.Queue) error
- func (s *BadgerStore) UpdateClient(ctx context.Context, client *types.Client) error
- func (s *BadgerStore) UpdateMessage(ctx context.Context, msg *types.Message) error
- func (s *BadgerStore) UpdateQueue(ctx context.Context, queue *types.Queue) error
- type ClientFilter
- type MessageFilter
- type Store
- type Transaction
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BadgerStore ¶
type BadgerStore struct {
// contains filtered or unexported fields
}
BadgerStore implements Store interface using BadgerDB
func NewBadgerStore ¶
func NewBadgerStore() *BadgerStore
NewBadgerStore creates a new BadgerDB store
func (*BadgerStore) AddSubscription ¶
func (s *BadgerStore) AddSubscription(ctx context.Context, clientID, topic string) error
AddSubscription adds a subscription for a client to a topic
func (*BadgerStore) AddToIndex ¶
func (s *BadgerStore) AddToIndex(ctx context.Context, indexName, key, value string) error
AddToIndex adds a value to an index
func (*BadgerStore) Close ¶
func (s *BadgerStore) Close() error
Close closes the BadgerDB connection
func (*BadgerStore) DeleteClient ¶
func (s *BadgerStore) DeleteClient(ctx context.Context, id string) error
DeleteClient deletes a client
func (*BadgerStore) DeleteMessage ¶
func (s *BadgerStore) DeleteMessage(ctx context.Context, id string) error
DeleteMessage deletes a message
func (*BadgerStore) DeleteQueue ¶
func (s *BadgerStore) DeleteQueue(ctx context.Context, name string) error
DeleteQueue deletes a queue and all its messages
func (*BadgerStore) DequeueMessage ¶
DequeueMessage removes and returns the first message ID from a queue with O(1) complexity
func (*BadgerStore) EnqueueMessage ¶
EnqueueMessage adds a message ID to a queue with O(1) complexity
func (*BadgerStore) GetFromIndex ¶
GetFromIndex retrieves values from an index
func (*BadgerStore) GetMessage ¶
GetMessage retrieves a message by ID
func (*BadgerStore) GetQueueSize ¶
GetQueueSize returns the number of messages in a queue with O(1) complexity
func (*BadgerStore) GetSubscribers ¶
GetSubscribers gets all clients subscribed to a topic
func (*BadgerStore) GetSubscriptions ¶
GetSubscriptions gets all topics a client is subscribed to
func (*BadgerStore) ListClients ¶
func (s *BadgerStore) ListClients(ctx context.Context, filter ClientFilter) ([]*types.Client, error)
ListClients lists clients based on filter
func (*BadgerStore) ListMessages ¶
func (s *BadgerStore) ListMessages(ctx context.Context, filter MessageFilter) ([]*types.Message, error)
ListMessages lists messages based on filter
func (*BadgerStore) ListQueues ¶
ListQueues lists all queues
func (*BadgerStore) MigrateLegacyQueues ¶
func (s *BadgerStore) MigrateLegacyQueues(ctx context.Context) error
MigrateLegacyQueues migrates existing queues from O(n) to O(1) format This should be called after opening the database to ensure backward compatibility
func (*BadgerStore) Open ¶
func (s *BadgerStore) Open(path string) error
Open initializes the BadgerDB connection
func (*BadgerStore) PeekQueue ¶
PeekQueue returns message IDs from queue without removing them with O(1) for size check + O(limit) for reading
func (*BadgerStore) RemoveFromIndex ¶
func (s *BadgerStore) RemoveFromIndex(ctx context.Context, indexName, key, value string) error
RemoveFromIndex removes a value from an index
func (*BadgerStore) RemoveSubscription ¶
func (s *BadgerStore) RemoveSubscription(ctx context.Context, clientID, topic string) error
RemoveSubscription removes a subscription
func (*BadgerStore) RunInTransaction ¶
func (s *BadgerStore) RunInTransaction(ctx context.Context, fn func(tx Transaction) error) error
RunInTransaction runs a function within a transaction
func (*BadgerStore) SaveClient ¶
SaveClient saves a client to the store
func (*BadgerStore) SaveMessage ¶
SaveMessage saves a message to the store
func (*BadgerStore) UpdateClient ¶
UpdateClient updates an existing client
func (*BadgerStore) UpdateMessage ¶
UpdateMessage updates an existing message
func (*BadgerStore) UpdateQueue ¶
UpdateQueue updates queue metadata
type ClientFilter ¶
ClientFilter defines filters for listing clients
type MessageFilter ¶
type MessageFilter struct {
QueueName string
Status *types.MessageStatus
ClientID string
Topic string
Since *time.Time
Until *time.Time
Limit int
}
MessageFilter defines filters for listing messages
type Store ¶
type Store interface {
// Initialize the store
Open(path string) error
Close() error
// Message operations
SaveMessage(ctx context.Context, msg *types.Message) error
GetMessage(ctx context.Context, id string) (*types.Message, error)
UpdateMessage(ctx context.Context, msg *types.Message) error
DeleteMessage(ctx context.Context, id string) error
ListMessages(ctx context.Context, filter MessageFilter) ([]*types.Message, error)
// Queue operations
EnqueueMessage(ctx context.Context, queueName string, msgID string) error
DequeueMessage(ctx context.Context, queueName string) (string, error)
PeekQueue(ctx context.Context, queueName string, limit int) ([]string, error)
GetQueueSize(ctx context.Context, queueName string) (int64, error)
// Client operations
SaveClient(ctx context.Context, client *types.Client) error
GetClient(ctx context.Context, id string) (*types.Client, error)
UpdateClient(ctx context.Context, client *types.Client) error
DeleteClient(ctx context.Context, id string) error
ListClients(ctx context.Context, filter ClientFilter) ([]*types.Client, error)
// Subscription operations
AddSubscription(ctx context.Context, clientID, topic string) error
RemoveSubscription(ctx context.Context, clientID, topic string) error
GetSubscriptions(ctx context.Context, clientID string) ([]string, error)
GetSubscribers(ctx context.Context, topic string) ([]string, error)
// Queue metadata operations
SaveQueue(ctx context.Context, queue *types.Queue) error
GetQueue(ctx context.Context, name string) (*types.Queue, error)
UpdateQueue(ctx context.Context, queue *types.Queue) error
DeleteQueue(ctx context.Context, name string) error
ListQueues(ctx context.Context) ([]*types.Queue, error)
// Index operations
AddToIndex(ctx context.Context, indexName, key, value string) error
RemoveFromIndex(ctx context.Context, indexName, key, value string) error
GetFromIndex(ctx context.Context, indexName, key string) ([]string, error)
// Transaction support
RunInTransaction(ctx context.Context, fn func(tx Transaction) error) error
}
Store defines the interface for message queue storage
type Transaction ¶
type Transaction interface {
// Message operations
SaveMessage(msg *types.Message) error
UpdateMessage(msg *types.Message) error
DeleteMessage(id string) error
// Queue operations
EnqueueMessage(queueName string, msgID string) error
DequeueMessage(queueName string) (string, error)
// Client operations
SaveClient(client *types.Client) error
UpdateClient(client *types.Client) error
// Index operations
AddToIndex(indexName, key, value string) error
RemoveFromIndex(indexName, key, value string) error
}
Transaction defines transaction operations