blockqueue

package module
v0.0.1-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

README

Block Queue

Block Queue is a lightweight and cost-effective queue messaging system with pub/sub mechanism for a cheap, robust, reliable, and durable messaging system.

Crafted atop the robust foundations of SQLite3 and NutsDB, Block Queue prioritizes efficiency by minimizing network latency and ensuring cost-effectiveness.

Why BlockQueue

While Kafka, Redis, or SQS is an excellent product, it is quite complex and requires a lot of resources. My purpose is to build this BlockQueue for simplicity, low resources, and cheap.

Features

  • 💸 Cost-Effective: Designed with affordability in mind, Block Queue provides a budget-friendly solution for messaging needs.
  • 📢 Pub/Sub Mechanism: The inclusion of a publish/subscribe mechanism allows for easy communication and real-time updates.
  • 📶 Less Network Latency: Prioritizing efficiency, Block Queue minimizes network latency to persistence to enhance overall performance.

Architecture

Publish Architecture

Consumer Architecture

Failed Redelivery Architecture

How it works

Create Topic
curl --location 'http://your-host/topics' \
--header 'Content-Type: application/json' \
--data '{
    "name": "cart",
    "subscribers": [
        {
            "name": "counter",
            "option": {
                "max_attempts": 5,
                "visibility_duration": "5m"
            }
        },
        {
            "name": "created",
            "option": {
                "max_attempts": 5,
                "visibility_duration": "5m"
            }
        }
    ]
}'
Subscriber Options
Key Value Description
max_attempts 1, 2, 3 max redeliver message
visibility_duration 5m, 6m, 1h if message not ack yet message, will send now() + visibility_duration
Create New Subscribers
curl --location 'http://your-host/topics/cart/subscribers' \
--header 'Content-Type: application/json' \
--data '[
    {
        "name": "counter",
        "option": {
            "max_attempts": 5,
            "visibility_duration": "5m"
        }
    }
]
'
Delete Subscriber
curl --location --request DELETE ''http://your-host/topics/{topic_name}/subscribers/{subscriber_name}'
Publish Message
curl --location 'http://your-host/topics/{topic_name}/messages' \
--header 'Content-Type: application/json' \
--data '{
    "message": "hi message from topic {topic_name}"
}'
Read Message

To read a message, you just need to pass the subscriber name into URL Path and with timeout. This ensures horizontal scalability and guarantees that the message is sent once.

curl --location 'http://your-host/topics/{topic_name}/subscribers/{subscriber_name}?timeout=5s'

Note: A message at-least-once message delivery.

Delete Message

After reading and successfully processing a message, you must delete it, as the message will persist based on queue retry policy on subscriber option.

curl --location --request DELETE 'http://your-host/topics/{topic_name}/subscribers/{subscriber_name}/messages/{message_id}'
Subscriber Message Status

If you want to check how many unpublished or unacked message, you can immediately hit this endpoint

curl --location 'localhost:8080/topics/{your_topic}/subscribers'

Roadmap

  • Protocol
    • HTTP
    • TCP
  • Metrics
  • SDK
    • Go
    • PHP
  • Perfomance Test

Acknowledgment

This package is inspired by the following:

License

The BlockQueue is open-sourced software licensed under the Apache 2.0 license.

Documentation

Index

Constants

View Source
const (
	LogPrefixBucket        string = "bucket"
	LogPrefixConsumer      string = "consumer"
	LogPrefixErr           string = "err"
	LogPrefixTopic         string = "topic"
	LogPrefixMessageStatus string = "message_status"
	LogPrefixMessage       string = "message"
)
View Source
const (
	BufferSizeJob = 5000
)
View Source
const (
	BufferSizeRetryListener = 100
)

Variables

View Source
var (
	ErrListenerShutdown             = errors.New("listener shutdown")
	ErrListenerNotFound             = errors.New("listener not found")
	ErrListenerDeleted              = errors.New("listener was deleted")
	ErrListenerRetryMessageNotFound = errors.New("error ack message. message_id not found")
)
View Source
var (
	ErrJobNotFound = errors.New("job not found")
)
View Source
var Etcd *etcd.Etcd

Functions

func CreateMessages

func CreateMessages(ctx context.Context, message core.Message) error

func CreateTxBucket

func CreateTxBucket(tx *nutsdb.Tx, structure uint16, bucketName string) error

func CreateTxSubscribers

func CreateTxSubscribers(ctx context.Context, tx *sqlx.Tx, subscribers core.Subscribers) error

func CreateTxTopic

func CreateTxTopic(ctx context.Context, tx *sqlx.Tx, topic core.Topic) error

func DeleteTxSubscribers

func DeleteTxSubscribers(ctx context.Context, tx *sqlx.Tx, topic core.Subscriber) error

func DeleteTxTopic

func DeleteTxTopic(ctx context.Context, tx *sqlx.Tx, topic core.Topic) error

func GetMessages

func GetMessages(ctx context.Context, filter core.FilterMessage) (core.Messages, error)

func GetSubscribers

func GetSubscribers(ctx context.Context, filter core.FilterSubscriber) (core.Subscribers, error)

func GetTopics

func GetTopics(ctx context.Context, filter core.FilterTopic) (core.Topics, error)

func ReadBucketTx

func ReadBucketTx(fn func(tx *nutsdb.Tx) error) error

func Tx

func Tx(ctx context.Context, fn func(ctx context.Context, tx *sqlx.Tx) error) error

func UpdateBucketTx

func UpdateBucketTx(fn func(tx *nutsdb.Tx) error) error

func UpdateStatusMessage

func UpdateStatusMessage(ctx context.Context, status core.MessageStatus, ids ...uuid.UUID) error

Types

type BlockQueue

type BlockQueue[V chan io.ResponseMessages] struct {
	// contains filtered or unexported fields
}

func New

func New[V chan io.ResponseMessages]() *BlockQueue[V]

func (*BlockQueue[V]) AckMessage

func (q *BlockQueue[V]) AckMessage(ctx context.Context, topic core.Topic, subscriberName, messageId string) error

func (*BlockQueue[V]) AddJob

func (q *BlockQueue[V]) AddJob(ctx context.Context, topic core.Topic, subscribers core.Subscribers) error

func (*BlockQueue[V]) AddSubscribers

func (q *BlockQueue[V]) AddSubscribers(ctx context.Context, topic core.Topic) error

func (*BlockQueue[V]) DeleteJob

func (q *BlockQueue[V]) DeleteJob(topic core.Topic) error

func (*BlockQueue[V]) DeleteSubscriber

func (q *BlockQueue[V]) DeleteSubscriber(ctx context.Context, topic core.Topic, subcriber string) error

func (*BlockQueue[V]) GetSubscribers

func (q *BlockQueue[V]) GetSubscribers(ctx context.Context, topic core.Topic) (io.SubscriberMessages, error)

func (*BlockQueue[V]) Publish

func (q *BlockQueue[V]) Publish(ctx context.Context, topic core.Topic, request io.Publish) error

func (*BlockQueue[V]) ReadSubscriber

func (q *BlockQueue[V]) ReadSubscriber(ctx context.Context, topic core.Topic, subscriber string) (io.ResponseMessages, error)

func (*BlockQueue[V]) Run

func (q *BlockQueue[V]) Run(ctx context.Context) error

type EventListener

type EventListener struct {
	Kind  KindEventListener
	Error error
}

type Http

type Http struct {
	Stream *BlockQueue[chan io.ResponseMessages]
}

func (*Http) AckMessage

func (h *Http) AckMessage(w http.ResponseWriter, r *http.Request)

func (*Http) CreateSubscriber

func (h *Http) CreateSubscriber(w http.ResponseWriter, r *http.Request)

func (*Http) CreateTopic

func (h *Http) CreateTopic(w http.ResponseWriter, r *http.Request)

func (*Http) DeleteSubscriber

func (h *Http) DeleteSubscriber(w http.ResponseWriter, r *http.Request)

func (*Http) DeleteTopic

func (h *Http) DeleteTopic(w http.ResponseWriter, r *http.Request)

func (*Http) GetSubscribers

func (h *Http) GetSubscribers(w http.ResponseWriter, r *http.Request)

GetSubscribers is endpoint to get metadata of subscribers before it claimed to consumer bucket

func (*Http) Publish

func (h *Http) Publish(w http.ResponseWriter, r *http.Request)

func (*Http) ReadSubscriber

func (h *Http) ReadSubscriber(w http.ResponseWriter, r *http.Request)

func (*Http) Router

func (h *Http) Router() http.Handler

type Job

type Job[V chan io.ResponseMessages] struct {
	Id        uuid.UUID
	Name      string
	ServerCtx context.Context
	// contains filtered or unexported fields
}

func NewJob

func NewJob[V chan io.ResponseMessages](serverCtx context.Context, topic core.Topic) (*Job[V], error)

func (*Job[V]) AckMessage

func (job *Job[V]) AckMessage(ctx context.Context, topic core.Topic, subscriberName, messageId string) error

func (*Job[V]) AddListener

func (job *Job[V]) AddListener(ctx context.Context, topic core.Topic) error

func (*Job[V]) Close

func (job *Job[V]) Close()

func (*Job[V]) DeleteListener

func (job *Job[V]) DeleteListener(ctx context.Context, topic core.Topic, subscriberName string) error

func (*Job[V]) Enqueue

func (job *Job[V]) Enqueue(ctx context.Context, topic core.Topic, subscriberName string) (io.ResponseMessages, error)

func (*Job[V]) GetListeners

func (job *Job[V]) GetListeners(ctx context.Context, topic core.Topic) (io.SubscriberMessages, error)

func (*Job[V]) Remove

func (job *Job[V]) Remove()

func (*Job[V]) Trigger

func (job *Job[V]) Trigger()

type KindEventListener

type KindEventListener string
const (
	ShutdownKindEventListener       KindEventListener = "shutdown"
	RemovedKindEventListener        KindEventListener = "remove"
	FailedDeliveryKindEventListener KindEventListener = "failed_delivery"
	DeliveryKindEventListener       KindEventListener = "delivery"
)

type Listener

type Listener[V chan blockio.ResponseMessages] struct {
	Id            string
	JobId         string
	PriorityQueue *pqueue.PriorityQueue[V]
	// contains filtered or unexported fields
}

func NewListener

func NewListener[V chan blockio.ResponseMessages](serverCtx context.Context, jobId string, subscriber core.Subscriber) (*Listener[V], error)

func (Listener[V]) Bucket

func (listener Listener[V]) Bucket() []byte

func (*Listener[V]) DeleteRetryMessage

func (listener *Listener[V]) DeleteRetryMessage(id string) error

func (*Listener[V]) Dequeue

func (listener *Listener[V]) Dequeue(id string)

func (*Listener[V]) Enqueue

func (listener *Listener[V]) Enqueue(messages chan blockio.ResponseMessages) string

func (*Listener[V]) GetMessages

func (listener *Listener[V]) GetMessages() (bucket.MessageCounter, error)

func (*Listener[V]) Remove

func (listener *Listener[V]) Remove()

func (Listener[V]) RetryBucket

func (listener Listener[V]) RetryBucket() []byte

func (*Listener[V]) Shutdown

func (listener *Listener[V]) Shutdown()

Directories

Path Synopsis
cmd
blockqueue command
pkg
io

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL