msgqueue

package
v0.74.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MsgIDCancelTasks                  = "cancel-tasks"
	MsgIDCELEvaluationFailure         = "cel-evaluation-failure"
	MsgIDCheckTenantQueue             = "check-tenant-queue"
	MsgIDCreateMonitoringEvent        = "create-monitoring-event"
	MsgIDCreatedDAG                   = "created-dag"
	MsgIDCreatedEventTrigger          = "created-event-trigger"
	MsgIDCreatedTask                  = "created-task"
	MsgIDFailedWebhookValidation      = "failed-webhook-validation"
	MsgIDInternalEvent                = "internal-event"
	MsgIDOffloadPayload               = "offload-payload"
	MsgIDReplayTasks                  = "replay-tasks"
	MsgIDTaskAssignedBulk             = "task-assigned-bulk"
	MsgIDTaskCancelled                = "task-cancelled"
	MsgIDTaskCompleted                = "task-completed"
	MsgIDTaskFailed                   = "task-failed"
	MsgIDTaskStreamEvent              = "task-stream-event"
	MsgIDTaskTrigger                  = "task-trigger"
	MsgIDUserEvent                    = "user-event"
	MsgIDWorkflowRunFinished          = "workflow-run-finished"
	MsgIDWorkflowRunFinishedCandidate = "workflow-run-finished-candidate"
)

Message ID constants for tenant messages

View Source
const (
	TASK_PROCESSING_QUEUE        staticQueue = "task_processing_queue_v2"
	OLAP_QUEUE                   staticQueue = "olap_queue_v2"
	DISPATCHER_DEAD_LETTER_QUEUE staticQueue = "dispatcher_dlq_v2"
)
View Source
const (
	Scheduler = "scheduler"
)

Variables

View Source
var (
	PUB_FLUSH_INTERVAL  = 10 * time.Millisecond
	PUB_BUFFER_SIZE     = 1000
	PUB_MAX_CONCURRENCY = 1
	PUB_TIMEOUT         = 10 * time.Second
)

nolint: staticcheck

View Source
var (
	SUB_FLUSH_INTERVAL  = 10 * time.Millisecond
	SUB_BUFFER_SIZE     = 1000
	SUB_MAX_CONCURRENCY = 10
)

nolint: staticcheck

Functions

func DecodeAndValidateSingleton added in v0.74.3

func DecodeAndValidateSingleton(dv datautils.DataDecoderValidator, payloads [][]byte, target interface{}) error

func GetTenantExchangeName added in v0.74.3

func GetTenantExchangeName(t string) string

func JSONConvert added in v0.74.3

func JSONConvert[T any](payloads [][]byte) []*T

func NewRandomStaticQueue added in v0.74.3

func NewRandomStaticQueue() staticQueue

func NoOpHook

func NoOpHook(task *Message) error

func QueueTypeFromDispatcherID

func QueueTypeFromDispatcherID(d string) dispatcherQueue

func QueueTypeFromPartitionIDAndController

func QueueTypeFromPartitionIDAndController(p, controller string) consumerQueue

func TenantEventConsumerQueue

func TenantEventConsumerQueue(t string) fanoutQueue

func WithBufferSize added in v0.74.3

func WithBufferSize(bufferSize int) mqSubBufferOptFunc

func WithDisableImmediateFlush added in v0.74.3

func WithDisableImmediateFlush(disableImmediateFlush bool) mqSubBufferOptFunc

"Immediate flush" means that if we haven't flushed yet, we can flush immediately without waiting on the flush interval timer.

func WithFlushInterval added in v0.74.3

func WithFlushInterval(flushInterval time.Duration) mqSubBufferOptFunc

func WithKind added in v0.74.3

func WithKind(kind SubBufferKind) mqSubBufferOptFunc

func WithMaxConcurrency added in v0.74.3

func WithMaxConcurrency(maxConcurrency int) mqSubBufferOptFunc

Types

type AckHook

type AckHook func(task *Message) error

type DstFunc added in v0.74.3

type DstFunc func(tenantId, msgId string, payloads [][]byte) error

type MQPubBuffer added in v0.74.3

type MQPubBuffer struct {
	// contains filtered or unexported fields
}

MQPubBuffer buffers messages coming out of the task queue, groups them by tenantId and msgId, and then flushes them to the task handler as necessary.

func NewMQPubBuffer added in v0.74.3

func NewMQPubBuffer(mq MessageQueue) *MQPubBuffer

func (*MQPubBuffer) Pub added in v0.74.3

func (m *MQPubBuffer) Pub(ctx context.Context, queue Queue, msg *Message, wait bool) error

func (*MQPubBuffer) Stop added in v0.74.3

func (m *MQPubBuffer) Stop()

type MQSubBuffer added in v0.74.3

type MQSubBuffer struct {
	// contains filtered or unexported fields
}

MQSubBuffer buffers messages coming out of the task queue, groups them by tenantId and msgId, and then flushes them to the task handler as necessary.

func NewMQSubBuffer added in v0.74.3

func NewMQSubBuffer(queue Queue, mq MessageQueue, dst DstFunc, fs ...mqSubBufferOptFunc) *MQSubBuffer

func (*MQSubBuffer) Start added in v0.74.3

func (m *MQSubBuffer) Start() (func() error, error)

type Message

type Message struct {
	// ID is the ID of the task.
	ID string `json:"id"`

	// Payloads is the list of payloads.
	Payloads [][]byte `json:"messages"`

	// TenantID is the tenant ID.
	TenantID string `json:"tenant_id"`

	// Whether the message should immediately expire if it reaches the queue without an active consumer.
	ImmediatelyExpire bool `json:"immediately_expire"`

	// Whether the message should be persisted to disk
	Persistent bool `json:"persistent"`

	// OtelCarrier is the OpenTelemetry carrier for the task.
	OtelCarrier map[string]string `json:"otel_carrier"`

	// Retries is the number of retries for the task.
	// Deprecated: retries are set globally at the moment.
	Retries int `json:"retries"`

	// Compressed indicates whether the payloads are gzip compressed
	Compressed bool `json:"compressed,omitempty"`
}

func NewTenantMessage added in v0.74.3

func NewTenantMessage[T any](tenantId, id string, immediatelyExpire, persistent bool, payloads ...T) (*Message, error)

func (*Message) Serialize added in v0.74.3

func (t *Message) Serialize() ([]byte, error)

func (*Message) SetOtelCarrier added in v0.74.3

func (t *Message) SetOtelCarrier(otelCarrier map[string]string)

type MessageQueue

type MessageQueue interface {
	// Clone copies the message queue with a new instance.
	Clone() (func() error, MessageQueue, error)

	// SetQOS sets the quality of service for the message queue.
	SetQOS(prefetchCount int)

	// SendMessage sends a message to the message queue.
	SendMessage(ctx context.Context, queue Queue, msg *Message) error

	// Subscribe subscribes to the task queue. It returns a cleanup function that should be called when the
	// subscription is no longer needed.
	Subscribe(queue Queue, preAck AckHook, postAck AckHook) (func() error, error)

	// RegisterTenant registers a new pub/sub mechanism for a tenant. This should be called when a
	// new tenant is created. If this is not called, implementors should ensure that there's a check
	// on the first message to a tenant to ensure that the tenant is registered, and store the tenant
	// in an LRU cache which lives in-memory.
	RegisterTenant(ctx context.Context, tenantId string) error

	// IsReady returns true if the task queue is ready to accept tasks.
	IsReady() bool
}

type PubFunc added in v0.74.3

type PubFunc func(m *Message) error

type Queue

type Queue interface {
	// Name returns the name of the queue.
	Name() string

	// Durable returns true if this queue should survive task queue restarts.
	Durable() bool

	// AutoDeleted returns true if this queue should be deleted when the last consumer unsubscribes.
	AutoDeleted() bool

	// Exclusive returns true if this queue should only be accessed by the current connection.
	Exclusive() bool

	// FanoutExchangeKey returns which exchange the queue should be subscribed to. This is only currently relevant
	// to tenant pub/sub queues.
	//
	// In RabbitMQ terminology, the existence of a subscriber key means that the queue is bound to a fanout
	// exchange, and a new random queue is generated for each connection when connections are retried.
	FanoutExchangeKey() string

	// DLQ returns the queue's dead letter queue, if it exists.
	DLQ() Queue

	// IsDLQ returns true if the queue is a dead letter queue.
	IsDLQ() bool

	// We distinguish between a static DLQ or an automatic DLQ. An automatic DLQ will automatically retry messages
	// in a loop with a 5-second backoff, and each subscription to the regular Queue will include a subscription
	// to the DLQ.
	IsAutoDLQ() bool

	// IsExpirable refers to whether the queue itself is expirable
	IsExpirable() bool
}

type SharedBufferedTenantReader added in v0.74.3

type SharedBufferedTenantReader struct {
	// contains filtered or unexported fields
}

func NewSharedBufferedTenantReader added in v0.74.3

func NewSharedBufferedTenantReader(mq MessageQueue) *SharedBufferedTenantReader

func (*SharedBufferedTenantReader) Subscribe added in v0.74.3

func (s *SharedBufferedTenantReader) Subscribe(tenantId string, f DstFunc) (func() error, error)

type SharedTenantReader

type SharedTenantReader struct {
	// contains filtered or unexported fields
}

func NewSharedTenantReader

func NewSharedTenantReader(mq MessageQueue) *SharedTenantReader

func (*SharedTenantReader) Subscribe

func (s *SharedTenantReader) Subscribe(tenantId string, postAck AckHook) (func() error, error)

type SubBufferKind added in v0.74.3

type SubBufferKind string
const (
	PostAck SubBufferKind = "postAck"
	PreAck  SubBufferKind = "preAck"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL