queuefs

package
v0.0.0-...-e948ea5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2025 License: Apache-2.0 Imports: 19 Imported by: 0

README

QueueFS Plugin - Message Queue Service

This plugin provides a message queue service through a file system interface.

DYNAMIC MOUNTING WITH PFS SHELL:

Interactive shell: pfs:/> mount queuefs /queue pfs:/> mount queuefs /tasks pfs:/> mount queuefs /messages

Direct command: uv run pfs mount queuefs /queue uv run pfs mount queuefs /jobs

CONFIGURATION PARAMETERS:

None required - QueueFS works with default settings

USAGE: Enqueue a message: echo "your message" > /enqueue

Dequeue a message: cat /dequeue

Peek at next message (without removing): cat /peek

Get queue size: cat /size

Clear the queue: echo "" > /clear

FILES: /enqueue - Write-only file to enqueue messages /dequeue - Read-only file to dequeue messages /peek - Read-only file to peek at next message /size - Read-only file showing queue size /clear - Write-only file to clear all messages /README - This file

EXAMPLES:

Enqueue a message

pfs:/> echo "task-123" > /queuefs/enqueue

Check queue size

pfs:/> cat /queuefs/size 1

Dequeue a message

pfs:/> cat /queuefs/dequeue {"id":"...","data":"task-123","timestamp":"..."}

License

Apache License 2.0

Documentation

Index

Constants

View Source
const (
	MetaValueQueueControl = "control" // Queue control files (enqueue, dequeue, peek, clear)
	MetaValueQueueStatus  = "status"  // Queue status files (size)
)

Meta values for QueueFS plugin

View Source
const (
	PluginName = "queuefs" // Name of this plugin
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DBBackend

type DBBackend interface {
	// Open opens a connection to the database
	Open(cfg map[string]interface{}) (*sql.DB, error)

	// GetInitSQL returns the SQL statements to initialize the schema
	GetInitSQL() []string

	// GetDriverName returns the driver name
	GetDriverName() string
}

DBBackend defines the interface for database operations

func CreateBackend

func CreateBackend(cfg map[string]interface{}) (DBBackend, error)

CreateBackend creates the appropriate database backend

type MemoryBackend

type MemoryBackend struct {
	// contains filtered or unexported fields
}

MemoryBackend implements QueueBackend using in-memory storage

func NewMemoryBackend

func NewMemoryBackend() *MemoryBackend

func (*MemoryBackend) Clear

func (b *MemoryBackend) Clear(queueName string) error

func (*MemoryBackend) Close

func (b *MemoryBackend) Close() error

func (*MemoryBackend) CreateQueue

func (b *MemoryBackend) CreateQueue(queueName string) error

func (*MemoryBackend) Dequeue

func (b *MemoryBackend) Dequeue(queueName string) (QueueMessage, bool, error)

func (*MemoryBackend) Enqueue

func (b *MemoryBackend) Enqueue(queueName string, msg QueueMessage) error

func (*MemoryBackend) GetLastEnqueueTime

func (b *MemoryBackend) GetLastEnqueueTime(queueName string) (time.Time, error)

func (*MemoryBackend) GetType

func (b *MemoryBackend) GetType() string

func (*MemoryBackend) Initialize

func (b *MemoryBackend) Initialize(config map[string]interface{}) error

func (*MemoryBackend) ListQueues

func (b *MemoryBackend) ListQueues(prefix string) ([]string, error)

func (*MemoryBackend) Peek

func (b *MemoryBackend) Peek(queueName string) (QueueMessage, bool, error)

func (*MemoryBackend) QueueExists

func (b *MemoryBackend) QueueExists(queueName string) (bool, error)

func (*MemoryBackend) RemoveQueue

func (b *MemoryBackend) RemoveQueue(queueName string) error

func (*MemoryBackend) Size

func (b *MemoryBackend) Size(queueName string) (int, error)

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue represents a single message queue (for memory backend)

type QueueBackend

type QueueBackend interface {
	// Initialize initializes the backend with configuration
	Initialize(config map[string]interface{}) error

	// Close closes the backend connection
	Close() error

	// GetType returns the backend type name
	GetType() string

	// Enqueue adds a message to a queue
	Enqueue(queueName string, msg QueueMessage) error

	// Dequeue removes and returns the first message from a queue
	Dequeue(queueName string) (QueueMessage, bool, error)

	// Peek returns the first message without removing it
	Peek(queueName string) (QueueMessage, bool, error)

	// Size returns the number of messages in a queue
	Size(queueName string) (int, error)

	// Clear removes all messages from a queue
	Clear(queueName string) error

	// ListQueues returns all queue names (for directory listing)
	ListQueues(prefix string) ([]string, error)

	// GetLastEnqueueTime returns the timestamp of the last enqueued message
	GetLastEnqueueTime(queueName string) (time.Time, error)

	// RemoveQueue removes all messages for a queue and its nested queues
	RemoveQueue(queueName string) error

	// CreateQueue creates an empty queue (for mkdir support)
	CreateQueue(queueName string) error

	// QueueExists checks if a queue exists (even if empty)
	QueueExists(queueName string) (bool, error)
}

QueueBackend defines the interface for queue storage backends

type QueueFSPlugin

type QueueFSPlugin struct {
	// contains filtered or unexported fields
}

QueueFSPlugin provides a message queue service through a file system interface. Each queue is a directory containing control files:

/queue_name/enqueue - write to this file to enqueue a message
/queue_name/dequeue - read from this file to dequeue a message
/queue_name/peek    - read to peek at the next message without removing it
                      The peek file's modTime reflects the latest enqueued message timestamp
                      This can be used for implementing poll offset logic
/queue_name/size    - read to get queue size
/queue_name/clear   - write to this file to clear the queue

Supports multiple backends:

  • memory (default): In-memory storage
  • tidb: TiDB database storage with TLS support
  • sqlite: SQLite database storage

func NewQueueFSPlugin

func NewQueueFSPlugin() *QueueFSPlugin

NewQueueFSPlugin creates a new queue plugin

func (*QueueFSPlugin) GetFileSystem

func (q *QueueFSPlugin) GetFileSystem() filesystem.FileSystem

func (*QueueFSPlugin) GetReadme

func (q *QueueFSPlugin) GetReadme() string

func (*QueueFSPlugin) Initialize

func (q *QueueFSPlugin) Initialize(cfg map[string]interface{}) error

func (*QueueFSPlugin) Name

func (q *QueueFSPlugin) Name() string

func (*QueueFSPlugin) Shutdown

func (q *QueueFSPlugin) Shutdown() error

func (*QueueFSPlugin) Validate

func (q *QueueFSPlugin) Validate(cfg map[string]interface{}) error

type QueueMessage

type QueueMessage struct {
	ID        string    `json:"id"`
	Data      string    `json:"data"`
	Timestamp time.Time `json:"timestamp"`
}

type SQLiteDBBackend

type SQLiteDBBackend struct{}

SQLiteDBBackend implements DBBackend for SQLite

func NewSQLiteDBBackend

func NewSQLiteDBBackend() *SQLiteDBBackend

func (*SQLiteDBBackend) GetDriverName

func (b *SQLiteDBBackend) GetDriverName() string

func (*SQLiteDBBackend) GetInitSQL

func (b *SQLiteDBBackend) GetInitSQL() []string

func (*SQLiteDBBackend) Open

func (b *SQLiteDBBackend) Open(cfg map[string]interface{}) (*sql.DB, error)

type TiDBBackend

type TiDBBackend struct {
	// contains filtered or unexported fields
}

TiDBBackend implements QueueBackend using TiDB database

func NewTiDBBackend

func NewTiDBBackend() *TiDBBackend

func (*TiDBBackend) Clear

func (b *TiDBBackend) Clear(queueName string) error

func (*TiDBBackend) Close

func (b *TiDBBackend) Close() error

func (*TiDBBackend) CreateQueue

func (b *TiDBBackend) CreateQueue(queueName string) error

func (*TiDBBackend) Dequeue

func (b *TiDBBackend) Dequeue(queueName string) (QueueMessage, bool, error)

func (*TiDBBackend) Enqueue

func (b *TiDBBackend) Enqueue(queueName string, msg QueueMessage) error

func (*TiDBBackend) GetLastEnqueueTime

func (b *TiDBBackend) GetLastEnqueueTime(queueName string) (time.Time, error)

func (*TiDBBackend) GetType

func (b *TiDBBackend) GetType() string

func (*TiDBBackend) Initialize

func (b *TiDBBackend) Initialize(config map[string]interface{}) error

func (*TiDBBackend) ListQueues

func (b *TiDBBackend) ListQueues(prefix string) ([]string, error)

func (*TiDBBackend) Peek

func (b *TiDBBackend) Peek(queueName string) (QueueMessage, bool, error)

func (*TiDBBackend) QueueExists

func (b *TiDBBackend) QueueExists(queueName string) (bool, error)

func (*TiDBBackend) RemoveQueue

func (b *TiDBBackend) RemoveQueue(queueName string) error

func (*TiDBBackend) Size

func (b *TiDBBackend) Size(queueName string) (int, error)

type TiDBDBBackend

type TiDBDBBackend struct{}

TiDBDBBackend implements DBBackend for TiDB

func NewTiDBDBBackend

func NewTiDBDBBackend() *TiDBDBBackend

func (*TiDBDBBackend) GetDriverName

func (b *TiDBDBBackend) GetDriverName() string

func (*TiDBDBBackend) GetInitSQL

func (b *TiDBDBBackend) GetInitSQL() []string

func (*TiDBDBBackend) Open

func (b *TiDBDBBackend) Open(cfg map[string]interface{}) (*sql.DB, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL