blockqueue

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

README

Block Queue

Block Queue is a lightweight message queue with pub/sub mechanism for a cheap, robust, reliable, and durable messaging system.

Built on SQLite3 with support for Turso Database and PostgreSQL.

Why BlockQueue

While Kafka, Redis, or SQS are excellent products, they are complex and require significant resources. BlockQueue is built for simplicity, low resource usage, and cost-effectiveness.

Features

  • Cost-Effective: Budget-friendly solution for messaging needs
  • Pub/Sub Mechanism: Easy communication and real-time updates
  • Low Latency: Minimized network latency with SQLite as default storage
  • Multiple Drivers: SQLite, Turso, and PostgreSQL support

Installation

Binary

Download from releases or build from source:

go build -o blockqueue ./cmd/blockqueue
As Library
go get -u github.com/yudhasubki/blockqueue

Usage

BlockQueue can be used in two ways:

1. HTTP Server Mode

Start the server:

./blockqueue http -config=config.yaml

Example config.yaml:

http:
  port: 8080
  shutdown: "30s"
  driver: "sqlite"
sqlite:
  db_name: "blockqueue"
  busy_timeout: 5000
write_buffer:
  batch_size: 100
  flush_interval: "100ms"
  buffer_size: 10000

Then use HTTP API:

# Create topic with subscriber
curl -X POST http://localhost:8080/topics \
  -H "Content-Type: application/json" \
  -d '{
    "name": "orders",
    "subscribers": [{"name": "processor", "option": {"max_attempts": 5, "visibility_duration": "5m"}}]
  }'

# Publish message
curl -X POST http://localhost:8080/topics/orders/messages \
  -H "Content-Type: application/json" \
  -d '{"message": "order created"}'

# Read message (long-polling)
curl http://localhost:8080/topics/orders/subscribers/processor?timeout=5s

# Acknowledge message
curl -X DELETE http://localhost:8080/topics/orders/subscribers/processor/messages/{message_id}
2. Library Mode (Embedded)
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/yudhasubki/blockqueue"
    "github.com/yudhasubki/blockqueue/pkg/io"
    "github.com/yudhasubki/blockqueue/pkg/sqlite"
)

func main() {
    // Initialize SQLite driver
    db, err := sqlite.New("queue.db", sqlite.Config{BusyTimeout: 5000})
    if err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()

    // Create BlockQueue instance
    stream := blockqueue.New(db, blockqueue.BlockQueueOption{
        WriteBufferConfig: blockqueue.WriteBufferConfig{
            BatchSize:     100,
            FlushInterval: 100 * time.Millisecond,
            BufferSize:    1000,
        },
    })

    if err := stream.Run(ctx); err != nil {
        log.Fatal(err)
    }
    defer stream.Close()

    // Create topic and subscriber
    request := io.Topic{
        Name: "orders",
        Subscribers: io.Subscribers{
            {
                Name: "processor",
                Option: io.SubscriberOpt{
                    MaxAttempts:        3,
                    VisibilityDuration: "1m",
                },
            },
        },
    }
    topic := request.Topic()
    stream.AddJob(ctx, topic, request.Subscriber(topic.Id))

    // Start consumer goroutine
    go func() {
        for {
            messages, err := stream.Read(ctx, topic, "processor")
            if err != nil {
                log.Printf("read error: %v", err)
                continue
            }
            for _, msg := range messages {
                log.Printf("received: %s", msg.Message)
                stream.Ack(ctx, topic, "processor", msg.Id)
            }
        }
    }()

    // Publish messages
    for i := 0; i < 10; i++ {
        stream.Publish(ctx, topic, io.Publish{
            Message: fmt.Sprintf("order-%d", i),
        })
    }
}

Drivers

Best for single-node deployments. Highest throughput with minimal latency.

db, _ := sqlite.New("queue.db", sqlite.Config{
    BusyTimeout: 5000,
    CacheSize:   -4000,  // 4MB cache
    MmapSize:    0,      // Disable mmap for minimal memory
})
PostgreSQL

For multi-client scenarios or when you already have PostgreSQL infrastructure.

db, _ := postgre.New(postgre.Config{
    Host:     "localhost",
    Username: "user",
    Password: "pass",
    Name:     "blockqueue",
    Port:     5432,
})
Turso

For edge deployments with LibSQL.

db, _ := turso.New("libsql://your-db.turso.io?authToken=TOKEN")

API Reference

Endpoint Method Description
/topics POST Create topic with subscribers
/topics/{topic}/messages POST Publish message
/topics/{topic}/subscribers/{sub} GET Read messages (long-polling)
/topics/{topic}/subscribers/{sub}/messages/{id} DELETE Acknowledge message
/topics/{topic}/subscribers POST Add subscribers
/topics/{topic}/subscribers/{sub} DELETE Remove subscriber
Subscriber Options
Option Example Description
max_attempts 5 Maximum redelivery attempts
visibility_duration 5m Time before unacked message is redelivered

Benchmark

MacBook Pro M1, 8GB RAM

SQLite (100 VUs, 10s)
http_reqs..................: 388908  38885/s
http_req_duration..........: med=1.19ms p(95)=7.02ms p(99.9)=30.47ms
PostgreSQL (100 VUs, 10s)
http_reqs..................: 113626  11340/s
http_req_duration..........: med=4.87ms p(95)=18.26ms p(99.9)=275.74ms

Architecture

Publish Architecture

Consumer Architecture

Roadmap

  • HTTP Protocol
  • Metrics (Prometheus)
  • SQLite WAL Mode
  • PostgreSQL Support
  • TCP Protocol
  • Go SDK
  • PHP SDK

Acknowledgment

Inspired by Redis, Kafka, and Amazon SQS.

License

Apache 2.0 License

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrListenerShutdown             = errors.New("listener shutdown")
	ErrListenerNotFound             = errors.New("listener not found")
	ErrListenerDeleted              = errors.New("listener was deleted")
	ErrListenerRetryMessageNotFound = errors.New("error ack message. message_id not found")
)
View Source
var (
	ErrJobNotFound = errors.New("job not found")
)
View Source
var (
	ErrMessageNotFound = errors.New("message not found")
)

Functions

func InvalidateSubscriberCache

func InvalidateSubscriberCache(topicId uuid.UUID)

InvalidateSubscriberCache removes a topic from cache (call when subscribers change)

Types

type BlockQueue

type BlockQueue[V chan bqio.ResponseMessages] struct {
	Opt BlockQueueOption
	// contains filtered or unexported fields
}

func New

func New[V chan bqio.ResponseMessages](driver Driver, opt BlockQueueOption) *BlockQueue[V]

func (*BlockQueue[V]) Ack

func (q *BlockQueue[V]) Ack(ctx context.Context, topic core.Topic, subscriberName, messageId string) error

func (*BlockQueue[V]) AddJob

func (q *BlockQueue[V]) AddJob(ctx context.Context, topic core.Topic, subscribers core.Subscribers) error

func (*BlockQueue[V]) AddSubscriber

func (q *BlockQueue[V]) AddSubscriber(ctx context.Context, topic core.Topic, subscribers core.Subscribers) error

func (*BlockQueue[V]) Close

func (q *BlockQueue[V]) Close()

func (*BlockQueue[V]) DeleteJob

func (q *BlockQueue[V]) DeleteJob(topic core.Topic) error

func (*BlockQueue[V]) DeleteSubscriber

func (q *BlockQueue[V]) DeleteSubscriber(ctx context.Context, topic core.Topic, subcriber string) error

func (*BlockQueue[V]) GetSubscribersStatus

func (q *BlockQueue[V]) GetSubscribersStatus(ctx context.Context, topic core.Topic) (bqio.SubscriberMessages, error)

func (*BlockQueue[V]) GetTopics

func (q *BlockQueue[V]) GetTopics(ctx context.Context, filter core.FilterTopic) (core.Topics, error)

func (*BlockQueue[V]) Publish

func (q *BlockQueue[V]) Publish(ctx context.Context, topic core.Topic, request bqio.Publish) error

func (*BlockQueue[V]) Read

func (q *BlockQueue[V]) Read(ctx context.Context, topic core.Topic, subscriber string) (bqio.ResponseMessages, error)

func (*BlockQueue[V]) Run

func (q *BlockQueue[V]) Run(ctx context.Context) error

type BlockQueueOption

type BlockQueueOption struct {
	WriteBufferConfig  WriteBufferConfig
	CheckpointInterval time.Duration // Default: 30s
}

type Driver

type Driver interface {
	Conn() *sqlx.DB
	Close() error
}

type Http

type Http struct {
	Stream *BlockQueue[chan io.ResponseMessages]
}

func (*Http) Router

func (h *Http) Router() http.Handler

type Job

type Job[V chan io.ResponseMessages] struct {
	Id   uuid.UUID
	Name string
	// contains filtered or unexported fields
}

type Listener

type Listener[V chan blockio.ResponseMessages] struct {
	Id            string
	SubscriberId  uuid.UUID
	JobId         string
	PriorityQueue *pqueue.PriorityQueue[V]
	// contains filtered or unexported fields
}

type MessageCounter

type MessageCounter struct {
	Name             string
	UnpublishMessage int
	UnackMessage     int
}

MessageCounter holds message statistics for a subscriber

type SubscriberInfo

type SubscriberInfo struct {
	Id                 uuid.UUID
	Name               string
	MaxAttempts        int
	VisibilityDuration time.Duration
	DequeueBatchSize   int
	PollInterval       time.Duration
	MaxBackoff         time.Duration
}

SubscriberInfo holds parsed subscriber information

type SubscriberMessage

type SubscriberMessage struct {
	Id           int64     `db:"id"`
	SubscriberId uuid.UUID `db:"subscriber_id"`
	TopicId      uuid.UUID `db:"topic_id"`
	MessageId    string    `db:"message_id"`
	Message      string    `db:"message"`
	Status       string    `db:"status"`
	RetryCount   int       `db:"retry_count"`
	VisibleAt    time.Time `db:"visible_at"`
	CreatedAt    time.Time `db:"created_at"`
}

SubscriberMessage represents a message in the subscriber queue

type SubscriberMessages

type SubscriberMessages []SubscriberMessage

type SubscriberQueueStats

type SubscriberQueueStats struct {
	Pending   int
	Delivered int
}

SubscriberQueueStats holds queue statistics

type WriteBuffer

type WriteBuffer struct {
	// contains filtered or unexported fields
}

WriteBuffer collects messages and batch inserts them for improved throughput

func NewWriteBuffer

func NewWriteBuffer(ctx context.Context, database *db, config WriteBufferConfig) *WriteBuffer

NewWriteBuffer creates a new write buffer

func (*WriteBuffer) Close

func (w *WriteBuffer) Close()

Close gracefully shuts down the write buffer

func (*WriteBuffer) Enqueue

func (w *WriteBuffer) Enqueue(topicId uuid.UUID, messageId, message string)

Enqueue adds a message to the buffer for batch insertion

type WriteBufferConfig

type WriteBufferConfig struct {
	BatchSize     int           // Max messages before flush (default: 100)
	FlushInterval time.Duration // Max time before flush (default: 50ms)
	BufferSize    int           // Channel buffer size (default: 10000)
}

WriteBufferConfig holds configuration for the write buffer

func DefaultWriteBufferConfig

func DefaultWriteBufferConfig() WriteBufferConfig

DefaultWriteBufferConfig returns sensible defaults

Directories

Path Synopsis
cmd
blockqueue command
example
basic command
Package main demonstrates basic BlockQueue library usage This example shows how to create a topic, publish messages, and consume them
Package main demonstrates basic BlockQueue library usage This example shows how to create a topic, publish messages, and consume them
http command
pgsql command
Package main demonstrates BlockQueue library usage with PostgreSQL
Package main demonstrates BlockQueue library usage with PostgreSQL
sqlite command
Package main demonstrates BlockQueue library usage with SQLite
Package main demonstrates BlockQueue library usage with SQLite
pkg
cas
io

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL