Documentation
¶
Index ¶
- Constants
- type DBBackend
- type MemoryBackend
- func (b *MemoryBackend) Clear(queueName string) error
- func (b *MemoryBackend) Close() error
- func (b *MemoryBackend) CreateQueue(queueName string) error
- func (b *MemoryBackend) Dequeue(queueName string) (QueueMessage, bool, error)
- func (b *MemoryBackend) Enqueue(queueName string, msg QueueMessage) error
- func (b *MemoryBackend) GetLastEnqueueTime(queueName string) (time.Time, error)
- func (b *MemoryBackend) GetType() string
- func (b *MemoryBackend) Initialize(config map[string]interface{}) error
- func (b *MemoryBackend) ListQueues(prefix string) ([]string, error)
- func (b *MemoryBackend) Peek(queueName string) (QueueMessage, bool, error)
- func (b *MemoryBackend) QueueExists(queueName string) (bool, error)
- func (b *MemoryBackend) RemoveQueue(queueName string) error
- func (b *MemoryBackend) Size(queueName string) (int, error)
- type Queue
- type QueueBackend
- type QueueFSPlugin
- func (q *QueueFSPlugin) GetConfigParams() []plugin.ConfigParameter
- func (q *QueueFSPlugin) GetFileSystem() filesystem.FileSystem
- func (q *QueueFSPlugin) GetReadme() string
- func (q *QueueFSPlugin) Initialize(cfg map[string]interface{}) error
- func (q *QueueFSPlugin) Name() string
- func (q *QueueFSPlugin) Shutdown() error
- func (q *QueueFSPlugin) Validate(cfg map[string]interface{}) error
- type QueueMessage
- type SQLiteDBBackend
- type TiDBBackend
- func (b *TiDBBackend) Clear(queueName string) error
- func (b *TiDBBackend) Close() error
- func (b *TiDBBackend) CreateQueue(queueName string) error
- func (b *TiDBBackend) Dequeue(queueName string) (QueueMessage, bool, error)
- func (b *TiDBBackend) Enqueue(queueName string, msg QueueMessage) error
- func (b *TiDBBackend) GetLastEnqueueTime(queueName string) (time.Time, error)
- func (b *TiDBBackend) GetType() string
- func (b *TiDBBackend) Initialize(config map[string]interface{}) error
- func (b *TiDBBackend) ListQueues(prefix string) ([]string, error)
- func (b *TiDBBackend) Peek(queueName string) (QueueMessage, bool, error)
- func (b *TiDBBackend) QueueExists(queueName string) (bool, error)
- func (b *TiDBBackend) RemoveQueue(queueName string) error
- func (b *TiDBBackend) Size(queueName string) (int, error)
- type TiDBDBBackend
Constants ¶
const ( MetaValueQueueControl = "control" // Queue control files (enqueue, dequeue, peek, clear) MetaValueQueueStatus = "status" // Queue status files (size) )
Meta values for QueueFS plugin
const (
PluginName = "queuefs" // Name of this plugin
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DBBackend ¶
type DBBackend interface {
// Open opens a connection to the database
Open(cfg map[string]interface{}) (*sql.DB, error)
// GetInitSQL returns the SQL statements to initialize the schema
GetInitSQL() []string
// GetDriverName returns the driver name
GetDriverName() string
}
DBBackend defines the interface for database operations
func CreateBackend ¶
CreateBackend creates the appropriate database backend
type MemoryBackend ¶
type MemoryBackend struct {
// contains filtered or unexported fields
}
MemoryBackend implements QueueBackend using in-memory storage
func NewMemoryBackend ¶
func NewMemoryBackend() *MemoryBackend
func (*MemoryBackend) Clear ¶
func (b *MemoryBackend) Clear(queueName string) error
func (*MemoryBackend) Close ¶
func (b *MemoryBackend) Close() error
func (*MemoryBackend) CreateQueue ¶
func (b *MemoryBackend) CreateQueue(queueName string) error
func (*MemoryBackend) Dequeue ¶
func (b *MemoryBackend) Dequeue(queueName string) (QueueMessage, bool, error)
func (*MemoryBackend) Enqueue ¶
func (b *MemoryBackend) Enqueue(queueName string, msg QueueMessage) error
func (*MemoryBackend) GetLastEnqueueTime ¶
func (b *MemoryBackend) GetLastEnqueueTime(queueName string) (time.Time, error)
func (*MemoryBackend) GetType ¶
func (b *MemoryBackend) GetType() string
func (*MemoryBackend) Initialize ¶
func (b *MemoryBackend) Initialize(config map[string]interface{}) error
func (*MemoryBackend) ListQueues ¶
func (b *MemoryBackend) ListQueues(prefix string) ([]string, error)
func (*MemoryBackend) Peek ¶
func (b *MemoryBackend) Peek(queueName string) (QueueMessage, bool, error)
func (*MemoryBackend) QueueExists ¶
func (b *MemoryBackend) QueueExists(queueName string) (bool, error)
func (*MemoryBackend) RemoveQueue ¶
func (b *MemoryBackend) RemoveQueue(queueName string) error
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue represents a single message queue (for memory backend)
type QueueBackend ¶
type QueueBackend interface {
// Initialize initializes the backend with configuration
Initialize(config map[string]interface{}) error
// Close closes the backend connection
Close() error
// GetType returns the backend type name
GetType() string
// Enqueue adds a message to a queue
Enqueue(queueName string, msg QueueMessage) error
// Dequeue removes and returns the first message from a queue
Dequeue(queueName string) (QueueMessage, bool, error)
// Peek returns the first message without removing it
Peek(queueName string) (QueueMessage, bool, error)
// Size returns the number of messages in a queue
Size(queueName string) (int, error)
// Clear removes all messages from a queue
Clear(queueName string) error
// ListQueues returns all queue names (for directory listing)
ListQueues(prefix string) ([]string, error)
// GetLastEnqueueTime returns the timestamp of the last enqueued message
GetLastEnqueueTime(queueName string) (time.Time, error)
// RemoveQueue removes all messages for a queue and its nested queues
RemoveQueue(queueName string) error
// CreateQueue creates an empty queue (for mkdir support)
CreateQueue(queueName string) error
// QueueExists checks if a queue exists (even if empty)
QueueExists(queueName string) (bool, error)
}
QueueBackend defines the interface for queue storage backends
type QueueFSPlugin ¶
type QueueFSPlugin struct {
// contains filtered or unexported fields
}
QueueFSPlugin provides a message queue service through a file system interface. Each queue is a directory containing control files:
/queue_name/enqueue - write to this file to enqueue a message
/queue_name/dequeue - read from this file to dequeue a message
/queue_name/peek - read to peek at the next message without removing it
The peek file's modTime reflects the latest enqueued message timestamp
This can be used for implementing poll offset logic
/queue_name/size - read to get queue size
/queue_name/clear - write to this file to clear the queue
Supports multiple backends:
- memory (default): In-memory storage
- tidb: TiDB database storage with TLS support
- sqlite: SQLite database storage
func NewQueueFSPlugin ¶
func NewQueueFSPlugin() *QueueFSPlugin
NewQueueFSPlugin creates a new queue plugin
func (*QueueFSPlugin) GetConfigParams ¶
func (q *QueueFSPlugin) GetConfigParams() []plugin.ConfigParameter
func (*QueueFSPlugin) GetFileSystem ¶
func (q *QueueFSPlugin) GetFileSystem() filesystem.FileSystem
func (*QueueFSPlugin) GetReadme ¶
func (q *QueueFSPlugin) GetReadme() string
func (*QueueFSPlugin) Initialize ¶
func (q *QueueFSPlugin) Initialize(cfg map[string]interface{}) error
func (*QueueFSPlugin) Name ¶
func (q *QueueFSPlugin) Name() string
func (*QueueFSPlugin) Shutdown ¶
func (q *QueueFSPlugin) Shutdown() error
func (*QueueFSPlugin) Validate ¶
func (q *QueueFSPlugin) Validate(cfg map[string]interface{}) error
type QueueMessage ¶
type SQLiteDBBackend ¶
type SQLiteDBBackend struct{}
SQLiteDBBackend implements DBBackend for SQLite
func NewSQLiteDBBackend ¶
func NewSQLiteDBBackend() *SQLiteDBBackend
func (*SQLiteDBBackend) GetDriverName ¶
func (b *SQLiteDBBackend) GetDriverName() string
func (*SQLiteDBBackend) GetInitSQL ¶
func (b *SQLiteDBBackend) GetInitSQL() []string
type TiDBBackend ¶
type TiDBBackend struct {
// contains filtered or unexported fields
}
TiDBBackend implements QueueBackend using TiDB database
func NewTiDBBackend ¶
func NewTiDBBackend() *TiDBBackend
func (*TiDBBackend) Clear ¶
func (b *TiDBBackend) Clear(queueName string) error
func (*TiDBBackend) Close ¶
func (b *TiDBBackend) Close() error
func (*TiDBBackend) CreateQueue ¶
func (b *TiDBBackend) CreateQueue(queueName string) error
func (*TiDBBackend) Dequeue ¶
func (b *TiDBBackend) Dequeue(queueName string) (QueueMessage, bool, error)
func (*TiDBBackend) Enqueue ¶
func (b *TiDBBackend) Enqueue(queueName string, msg QueueMessage) error
func (*TiDBBackend) GetLastEnqueueTime ¶
func (b *TiDBBackend) GetLastEnqueueTime(queueName string) (time.Time, error)
func (*TiDBBackend) GetType ¶
func (b *TiDBBackend) GetType() string
func (*TiDBBackend) Initialize ¶
func (b *TiDBBackend) Initialize(config map[string]interface{}) error
func (*TiDBBackend) ListQueues ¶
func (b *TiDBBackend) ListQueues(prefix string) ([]string, error)
func (*TiDBBackend) Peek ¶
func (b *TiDBBackend) Peek(queueName string) (QueueMessage, bool, error)
func (*TiDBBackend) QueueExists ¶
func (b *TiDBBackend) QueueExists(queueName string) (bool, error)
func (*TiDBBackend) RemoveQueue ¶
func (b *TiDBBackend) RemoveQueue(queueName string) error
type TiDBDBBackend ¶
type TiDBDBBackend struct{}
TiDBDBBackend implements DBBackend for TiDB
func NewTiDBDBBackend ¶
func NewTiDBDBBackend() *TiDBDBBackend
func (*TiDBDBBackend) GetDriverName ¶
func (b *TiDBDBBackend) GetDriverName() string
func (*TiDBDBBackend) GetInitSQL ¶
func (b *TiDBDBBackend) GetInitSQL() []string