Documentation
¶
Index ¶
- Variables
- func InvalidateSubscriberCache(topicId uuid.UUID)
- type BlockQueue
- func (q *BlockQueue[V]) Ack(ctx context.Context, topic core.Topic, subscriberName, messageId string) error
- func (q *BlockQueue[V]) AddJob(ctx context.Context, topic core.Topic, subscribers core.Subscribers) error
- func (q *BlockQueue[V]) AddSubscriber(ctx context.Context, topic core.Topic, subscribers core.Subscribers) error
- func (q *BlockQueue[V]) Close()
- func (q *BlockQueue[V]) DeleteJob(topic core.Topic) error
- func (q *BlockQueue[V]) DeleteSubscriber(ctx context.Context, topic core.Topic, subcriber string) error
- func (q *BlockQueue[V]) GetSubscribersStatus(ctx context.Context, topic core.Topic) (bqio.SubscriberMessages, error)
- func (q *BlockQueue[V]) GetTopics(ctx context.Context, filter core.FilterTopic) (core.Topics, error)
- func (q *BlockQueue[V]) Publish(ctx context.Context, topic core.Topic, request bqio.Publish) error
- func (q *BlockQueue[V]) Read(ctx context.Context, topic core.Topic, subscriber string) (bqio.ResponseMessages, error)
- func (q *BlockQueue[V]) Run(ctx context.Context) error
- type BlockQueueOption
- type Driver
- type Http
- type Job
- type Listener
- type MessageCounter
- type SubscriberInfo
- type SubscriberMessage
- type SubscriberMessages
- type SubscriberQueueStats
- type WriteBuffer
- type WriteBufferConfig
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrListenerShutdown = errors.New("listener shutdown") ErrListenerNotFound = errors.New("listener not found") ErrListenerDeleted = errors.New("listener was deleted") ErrListenerRetryMessageNotFound = errors.New("error ack message. message_id not found") )
View Source
var (
ErrJobNotFound = errors.New("job not found")
)
View Source
var (
ErrMessageNotFound = errors.New("message not found")
)
Functions ¶
func InvalidateSubscriberCache ¶
InvalidateSubscriberCache removes a topic from cache (call when subscribers change)
Types ¶
type BlockQueue ¶
type BlockQueue[V chan bqio.ResponseMessages] struct { Opt BlockQueueOption // contains filtered or unexported fields }
func New ¶
func New[V chan bqio.ResponseMessages](driver Driver, opt BlockQueueOption) *BlockQueue[V]
func (*BlockQueue[V]) AddJob ¶
func (q *BlockQueue[V]) AddJob(ctx context.Context, topic core.Topic, subscribers core.Subscribers) error
func (*BlockQueue[V]) AddSubscriber ¶
func (q *BlockQueue[V]) AddSubscriber(ctx context.Context, topic core.Topic, subscribers core.Subscribers) error
func (*BlockQueue[V]) Close ¶
func (q *BlockQueue[V]) Close()
func (*BlockQueue[V]) DeleteSubscriber ¶
func (*BlockQueue[V]) GetSubscribersStatus ¶
func (q *BlockQueue[V]) GetSubscribersStatus(ctx context.Context, topic core.Topic) (bqio.SubscriberMessages, error)
func (*BlockQueue[V]) GetTopics ¶
func (q *BlockQueue[V]) GetTopics(ctx context.Context, filter core.FilterTopic) (core.Topics, error)
type BlockQueueOption ¶
type BlockQueueOption struct {
WriteBufferConfig WriteBufferConfig
CheckpointInterval time.Duration // Default: 30s
}
type Http ¶
type Http struct {
Stream *BlockQueue[chan io.ResponseMessages]
}
type Job ¶
type Job[V chan io.ResponseMessages] struct { Id uuid.UUID Name string // contains filtered or unexported fields }
type Listener ¶
type Listener[V chan blockio.ResponseMessages] struct { Id string SubscriberId uuid.UUID JobId string PriorityQueue *pqueue.PriorityQueue[V] // contains filtered or unexported fields }
type MessageCounter ¶
MessageCounter holds message statistics for a subscriber
type SubscriberInfo ¶
type SubscriberInfo struct {
Id uuid.UUID
Name string
MaxAttempts int
VisibilityDuration time.Duration
DequeueBatchSize int
PollInterval time.Duration
MaxBackoff time.Duration
}
SubscriberInfo holds parsed subscriber information
type SubscriberMessage ¶
type SubscriberMessage struct {
Id int64 `db:"id"`
SubscriberId uuid.UUID `db:"subscriber_id"`
TopicId uuid.UUID `db:"topic_id"`
MessageId string `db:"message_id"`
Message string `db:"message"`
Status string `db:"status"`
RetryCount int `db:"retry_count"`
VisibleAt time.Time `db:"visible_at"`
CreatedAt time.Time `db:"created_at"`
}
SubscriberMessage represents a message in the subscriber queue
type SubscriberMessages ¶
type SubscriberMessages []SubscriberMessage
type SubscriberQueueStats ¶
SubscriberQueueStats holds queue statistics
type WriteBuffer ¶
type WriteBuffer struct {
// contains filtered or unexported fields
}
WriteBuffer collects messages and batch inserts them for improved throughput
func NewWriteBuffer ¶
func NewWriteBuffer(ctx context.Context, database *db, config WriteBufferConfig) *WriteBuffer
NewWriteBuffer creates a new write buffer
func (*WriteBuffer) Close ¶
func (w *WriteBuffer) Close()
Close gracefully shuts down the write buffer
type WriteBufferConfig ¶
type WriteBufferConfig struct {
BatchSize int // Max messages before flush (default: 100)
FlushInterval time.Duration // Max time before flush (default: 50ms)
BufferSize int // Channel buffer size (default: 10000)
}
WriteBufferConfig holds configuration for the write buffer
func DefaultWriteBufferConfig ¶
func DefaultWriteBufferConfig() WriteBufferConfig
DefaultWriteBufferConfig returns sensible defaults
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
blockqueue
command
|
|
|
example
|
|
|
basic
command
Package main demonstrates basic BlockQueue library usage This example shows how to create a topic, publish messages, and consume them
|
Package main demonstrates basic BlockQueue library usage This example shows how to create a topic, publish messages, and consume them |
|
http
command
|
|
|
pgsql
command
Package main demonstrates BlockQueue library usage with PostgreSQL
|
Package main demonstrates BlockQueue library usage with PostgreSQL |
|
sqlite
command
Package main demonstrates BlockQueue library usage with SQLite
|
Package main demonstrates BlockQueue library usage with SQLite |
|
pkg
|
|
Click to show internal directories.
Click to hide internal directories.

