Documentation
¶
Index ¶
- Constants
- Variables
- func CreateMessages(ctx context.Context, message core.Message) error
- func CreateTxBucket(tx *nutsdb.Tx, structure uint16, bucketName string) error
- func CreateTxSubscribers(ctx context.Context, tx *sqlx.Tx, subscribers core.Subscribers) error
- func CreateTxTopic(ctx context.Context, tx *sqlx.Tx, topic core.Topic) error
- func DeleteTxSubscribers(ctx context.Context, tx *sqlx.Tx, topic core.Subscriber) error
- func DeleteTxTopic(ctx context.Context, tx *sqlx.Tx, topic core.Topic) error
- func GetMessages(ctx context.Context, filter core.FilterMessage) (core.Messages, error)
- func GetSubscribers(ctx context.Context, filter core.FilterSubscriber) (core.Subscribers, error)
- func GetTopics(ctx context.Context, filter core.FilterTopic) (core.Topics, error)
- func ReadBucketTx(fn func(tx *nutsdb.Tx) error) error
- func Tx(ctx context.Context, fn func(ctx context.Context, tx *sqlx.Tx) error) error
- func UpdateBucketTx(fn func(tx *nutsdb.Tx) error) error
- func UpdateStatusMessage(ctx context.Context, status core.MessageStatus, ids ...uuid.UUID) error
- type BlockQueue
- func (q *BlockQueue[V]) AckMessage(ctx context.Context, topic core.Topic, subscriberName, messageId string) error
- func (q *BlockQueue[V]) AddJob(ctx context.Context, topic core.Topic, subscribers core.Subscribers) error
- func (q *BlockQueue[V]) AddSubscribers(ctx context.Context, topic core.Topic) error
- func (q *BlockQueue[V]) DeleteJob(topic core.Topic) error
- func (q *BlockQueue[V]) DeleteSubscriber(ctx context.Context, topic core.Topic, subcriber string) error
- func (q *BlockQueue[V]) GetSubscribers(ctx context.Context, topic core.Topic) (io.SubscriberMessages, error)
- func (q *BlockQueue[V]) Publish(ctx context.Context, topic core.Topic, request io.Publish) error
- func (q *BlockQueue[V]) ReadSubscriber(ctx context.Context, topic core.Topic, subscriber string) (io.ResponseMessages, error)
- func (q *BlockQueue[V]) Run(ctx context.Context) error
- type EventListener
- type Http
- func (h *Http) AckMessage(w http.ResponseWriter, r *http.Request)
- func (h *Http) CreateSubscriber(w http.ResponseWriter, r *http.Request)
- func (h *Http) CreateTopic(w http.ResponseWriter, r *http.Request)
- func (h *Http) DeleteSubscriber(w http.ResponseWriter, r *http.Request)
- func (h *Http) DeleteTopic(w http.ResponseWriter, r *http.Request)
- func (h *Http) GetSubscribers(w http.ResponseWriter, r *http.Request)
- func (h *Http) Publish(w http.ResponseWriter, r *http.Request)
- func (h *Http) ReadSubscriber(w http.ResponseWriter, r *http.Request)
- func (h *Http) Router() http.Handler
- type Job
- func (job *Job[V]) AckMessage(ctx context.Context, topic core.Topic, subscriberName, messageId string) error
- func (job *Job[V]) AddListener(ctx context.Context, topic core.Topic) error
- func (job *Job[V]) Close()
- func (job *Job[V]) DeleteListener(ctx context.Context, topic core.Topic, subscriberName string) error
- func (job *Job[V]) Enqueue(ctx context.Context, topic core.Topic, subscriberName string) (io.ResponseMessages, error)
- func (job *Job[V]) GetListeners(ctx context.Context, topic core.Topic) (io.SubscriberMessages, error)
- func (job *Job[V]) Remove()
- func (job *Job[V]) Trigger()
- type KindEventListener
- type Listener
- func (listener Listener[V]) Bucket() []byte
- func (listener *Listener[V]) DeleteRetryMessage(id string) error
- func (listener *Listener[V]) Dequeue(id string)
- func (listener *Listener[V]) Enqueue(messages chan blockio.ResponseMessages) string
- func (listener *Listener[V]) GetMessages() (bucket.MessageCounter, error)
- func (listener *Listener[V]) Remove()
- func (listener Listener[V]) RetryBucket() []byte
- func (listener *Listener[V]) Shutdown()
Constants ¶
View Source
const ( LogPrefixBucket string = "bucket" LogPrefixConsumer string = "consumer" LogPrefixErr string = "err" LogPrefixTopic string = "topic" LogPrefixMessageStatus string = "message_status" LogPrefixMessage string = "message" )
View Source
const (
BufferSizeJob = 5000
)
View Source
const (
BufferSizeRetryListener = 100
)
Variables ¶
View Source
var ( ErrListenerShutdown = errors.New("listener shutdown") ErrListenerNotFound = errors.New("listener not found") ErrListenerDeleted = errors.New("listener was deleted") ErrListenerRetryMessageNotFound = errors.New("error ack message. message_id not found") )
View Source
var Conn *sqlite.SQLite
View Source
var (
ErrJobNotFound = errors.New("job not found")
)
View Source
var Etcd *etcd.Etcd
Functions ¶
func CreateTxSubscribers ¶
func DeleteTxSubscribers ¶
func GetMessages ¶
func GetSubscribers ¶
func GetSubscribers(ctx context.Context, filter core.FilterSubscriber) (core.Subscribers, error)
func UpdateStatusMessage ¶
Types ¶
type BlockQueue ¶
type BlockQueue[V chan io.ResponseMessages] struct { // contains filtered or unexported fields }
func New ¶
func New[V chan io.ResponseMessages]() *BlockQueue[V]
func (*BlockQueue[V]) AckMessage ¶
func (*BlockQueue[V]) AddJob ¶
func (q *BlockQueue[V]) AddJob(ctx context.Context, topic core.Topic, subscribers core.Subscribers) error
func (*BlockQueue[V]) AddSubscribers ¶
func (*BlockQueue[V]) DeleteSubscriber ¶
func (*BlockQueue[V]) GetSubscribers ¶
func (q *BlockQueue[V]) GetSubscribers(ctx context.Context, topic core.Topic) (io.SubscriberMessages, error)
func (*BlockQueue[V]) ReadSubscriber ¶
func (q *BlockQueue[V]) ReadSubscriber(ctx context.Context, topic core.Topic, subscriber string) (io.ResponseMessages, error)
type EventListener ¶
type EventListener struct {
Kind KindEventListener
Error error
}
type Http ¶
type Http struct {
Stream *BlockQueue[chan io.ResponseMessages]
}
func (*Http) AckMessage ¶
func (h *Http) AckMessage(w http.ResponseWriter, r *http.Request)
func (*Http) CreateSubscriber ¶
func (h *Http) CreateSubscriber(w http.ResponseWriter, r *http.Request)
func (*Http) CreateTopic ¶
func (h *Http) CreateTopic(w http.ResponseWriter, r *http.Request)
func (*Http) DeleteSubscriber ¶
func (h *Http) DeleteSubscriber(w http.ResponseWriter, r *http.Request)
func (*Http) DeleteTopic ¶
func (h *Http) DeleteTopic(w http.ResponseWriter, r *http.Request)
func (*Http) GetSubscribers ¶
func (h *Http) GetSubscribers(w http.ResponseWriter, r *http.Request)
GetSubscribers is endpoint to get metadata of subscribers before it claimed to consumer bucket
func (*Http) ReadSubscriber ¶
func (h *Http) ReadSubscriber(w http.ResponseWriter, r *http.Request)
type Job ¶
type Job[V chan io.ResponseMessages] struct { Id uuid.UUID Name string ServerCtx context.Context // contains filtered or unexported fields }
func (*Job[V]) AckMessage ¶
func (*Job[V]) AddListener ¶
func (*Job[V]) DeleteListener ¶
func (*Job[V]) GetListeners ¶
type KindEventListener ¶
type KindEventListener string
const ( ShutdownKindEventListener KindEventListener = "shutdown" RemovedKindEventListener KindEventListener = "remove" FailedDeliveryKindEventListener KindEventListener = "failed_delivery" DeliveryKindEventListener KindEventListener = "delivery" )
type Listener ¶
type Listener[V chan blockio.ResponseMessages] struct { Id string JobId string PriorityQueue *pqueue.PriorityQueue[V] // contains filtered or unexported fields }
func NewListener ¶
func NewListener[V chan blockio.ResponseMessages](serverCtx context.Context, jobId string, subscriber core.Subscriber) (*Listener[V], error)
func (*Listener[V]) DeleteRetryMessage ¶
func (*Listener[V]) Enqueue ¶
func (listener *Listener[V]) Enqueue(messages chan blockio.ResponseMessages) string
func (*Listener[V]) GetMessages ¶
func (listener *Listener[V]) GetMessages() (bucket.MessageCounter, error)
func (Listener[V]) RetryBucket ¶
Click to show internal directories.
Click to hide internal directories.


