store

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BadgerStore

type BadgerStore struct {
	// contains filtered or unexported fields
}

BadgerStore implements Store interface using BadgerDB

func NewBadgerStore

func NewBadgerStore() *BadgerStore

NewBadgerStore creates a new BadgerDB store

func (*BadgerStore) AddSubscription

func (s *BadgerStore) AddSubscription(ctx context.Context, clientID, topic string) error

AddSubscription adds a subscription for a client to a topic

func (*BadgerStore) AddToIndex

func (s *BadgerStore) AddToIndex(ctx context.Context, indexName, key, value string) error

AddToIndex adds a value to an index

func (*BadgerStore) Close

func (s *BadgerStore) Close() error

Close closes the BadgerDB connection

func (*BadgerStore) DeleteClient

func (s *BadgerStore) DeleteClient(ctx context.Context, id string) error

DeleteClient deletes a client

func (*BadgerStore) DeleteMessage

func (s *BadgerStore) DeleteMessage(ctx context.Context, id string) error

DeleteMessage deletes a message

func (*BadgerStore) DeleteQueue

func (s *BadgerStore) DeleteQueue(ctx context.Context, name string) error

DeleteQueue deletes a queue and all its messages

func (*BadgerStore) DequeueMessage

func (s *BadgerStore) DequeueMessage(ctx context.Context, queueName string) (string, error)

DequeueMessage removes and returns the first message ID from a queue with O(1) complexity

func (*BadgerStore) EnqueueMessage

func (s *BadgerStore) EnqueueMessage(ctx context.Context, queueName string, msgID string) error

EnqueueMessage adds a message ID to a queue with O(1) complexity

func (*BadgerStore) GetClient

func (s *BadgerStore) GetClient(ctx context.Context, id string) (*types.Client, error)

GetClient retrieves a client by ID

func (*BadgerStore) GetFromIndex

func (s *BadgerStore) GetFromIndex(ctx context.Context, indexName, key string) ([]string, error)

GetFromIndex retrieves values from an index

func (*BadgerStore) GetMessage

func (s *BadgerStore) GetMessage(ctx context.Context, id string) (*types.Message, error)

GetMessage retrieves a message by ID

func (*BadgerStore) GetQueue

func (s *BadgerStore) GetQueue(ctx context.Context, name string) (*types.Queue, error)

GetQueue retrieves queue metadata

func (*BadgerStore) GetQueueSize

func (s *BadgerStore) GetQueueSize(ctx context.Context, queueName string) (int64, error)

GetQueueSize returns the number of messages in a queue with O(1) complexity

func (*BadgerStore) GetSubscribers

func (s *BadgerStore) GetSubscribers(ctx context.Context, topic string) ([]string, error)

GetSubscribers gets all clients subscribed to a topic

func (*BadgerStore) GetSubscriptions

func (s *BadgerStore) GetSubscriptions(ctx context.Context, clientID string) ([]string, error)

GetSubscriptions gets all topics a client is subscribed to

func (*BadgerStore) ListClients

func (s *BadgerStore) ListClients(ctx context.Context, filter ClientFilter) ([]*types.Client, error)

ListClients lists clients based on filter

func (*BadgerStore) ListMessages

func (s *BadgerStore) ListMessages(ctx context.Context, filter MessageFilter) ([]*types.Message, error)

ListMessages lists messages based on filter

func (*BadgerStore) ListQueues

func (s *BadgerStore) ListQueues(ctx context.Context) ([]*types.Queue, error)

ListQueues lists all queues

func (*BadgerStore) MigrateLegacyQueues

func (s *BadgerStore) MigrateLegacyQueues(ctx context.Context) error

MigrateLegacyQueues migrates existing queues from O(n) to O(1) format This should be called after opening the database to ensure backward compatibility

func (*BadgerStore) Open

func (s *BadgerStore) Open(path string) error

Open initializes the BadgerDB connection

func (*BadgerStore) PeekQueue

func (s *BadgerStore) PeekQueue(ctx context.Context, queueName string, limit int) ([]string, error)

PeekQueue returns message IDs from queue without removing them with O(1) for size check + O(limit) for reading

func (*BadgerStore) RemoveFromIndex

func (s *BadgerStore) RemoveFromIndex(ctx context.Context, indexName, key, value string) error

RemoveFromIndex removes a value from an index

func (*BadgerStore) RemoveSubscription

func (s *BadgerStore) RemoveSubscription(ctx context.Context, clientID, topic string) error

RemoveSubscription removes a subscription

func (*BadgerStore) RunInTransaction

func (s *BadgerStore) RunInTransaction(ctx context.Context, fn func(tx Transaction) error) error

RunInTransaction runs a function within a transaction

func (*BadgerStore) SaveClient

func (s *BadgerStore) SaveClient(ctx context.Context, client *types.Client) error

SaveClient saves a client to the store

func (*BadgerStore) SaveMessage

func (s *BadgerStore) SaveMessage(ctx context.Context, msg *types.Message) error

SaveMessage saves a message to the store

func (*BadgerStore) SaveQueue

func (s *BadgerStore) SaveQueue(ctx context.Context, queue *types.Queue) error

SaveQueue saves queue metadata

func (*BadgerStore) UpdateClient

func (s *BadgerStore) UpdateClient(ctx context.Context, client *types.Client) error

UpdateClient updates an existing client

func (*BadgerStore) UpdateMessage

func (s *BadgerStore) UpdateMessage(ctx context.Context, msg *types.Message) error

UpdateMessage updates an existing message

func (*BadgerStore) UpdateQueue

func (s *BadgerStore) UpdateQueue(ctx context.Context, queue *types.Queue) error

UpdateQueue updates queue metadata

type ClientFilter

type ClientFilter struct {
	MetadataKey   string
	MetadataValue string
	Limit         int
}

ClientFilter defines filters for listing clients

type MessageFilter

type MessageFilter struct {
	QueueName string
	Status    *types.MessageStatus
	ClientID  string
	Topic     string
	Since     *time.Time
	Until     *time.Time
	Limit     int
}

MessageFilter defines filters for listing messages

type Store

type Store interface {
	// Initialize the store
	Open(path string) error
	Close() error

	// Message operations
	SaveMessage(ctx context.Context, msg *types.Message) error
	GetMessage(ctx context.Context, id string) (*types.Message, error)
	UpdateMessage(ctx context.Context, msg *types.Message) error
	DeleteMessage(ctx context.Context, id string) error
	ListMessages(ctx context.Context, filter MessageFilter) ([]*types.Message, error)

	// Queue operations
	EnqueueMessage(ctx context.Context, queueName string, msgID string) error
	DequeueMessage(ctx context.Context, queueName string) (string, error)
	PeekQueue(ctx context.Context, queueName string, limit int) ([]string, error)
	GetQueueSize(ctx context.Context, queueName string) (int64, error)

	// Client operations
	SaveClient(ctx context.Context, client *types.Client) error
	GetClient(ctx context.Context, id string) (*types.Client, error)
	UpdateClient(ctx context.Context, client *types.Client) error
	DeleteClient(ctx context.Context, id string) error
	ListClients(ctx context.Context, filter ClientFilter) ([]*types.Client, error)

	// Subscription operations
	AddSubscription(ctx context.Context, clientID, topic string) error
	RemoveSubscription(ctx context.Context, clientID, topic string) error
	GetSubscriptions(ctx context.Context, clientID string) ([]string, error)
	GetSubscribers(ctx context.Context, topic string) ([]string, error)

	// Queue metadata operations
	SaveQueue(ctx context.Context, queue *types.Queue) error
	GetQueue(ctx context.Context, name string) (*types.Queue, error)
	UpdateQueue(ctx context.Context, queue *types.Queue) error
	DeleteQueue(ctx context.Context, name string) error
	ListQueues(ctx context.Context) ([]*types.Queue, error)

	// Index operations
	AddToIndex(ctx context.Context, indexName, key, value string) error
	RemoveFromIndex(ctx context.Context, indexName, key, value string) error
	GetFromIndex(ctx context.Context, indexName, key string) ([]string, error)

	// Transaction support
	RunInTransaction(ctx context.Context, fn func(tx Transaction) error) error
}

Store defines the interface for message queue storage

type Transaction

type Transaction interface {
	// Message operations
	SaveMessage(msg *types.Message) error
	UpdateMessage(msg *types.Message) error
	DeleteMessage(id string) error

	// Queue operations
	EnqueueMessage(queueName string, msgID string) error
	DequeueMessage(queueName string) (string, error)

	// Client operations
	SaveClient(client *types.Client) error
	UpdateClient(client *types.Client) error

	// Index operations
	AddToIndex(indexName, key, value string) error
	RemoveFromIndex(indexName, key, value string) error
}

Transaction defines transaction operations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL