Documentation
¶
Index ¶
- func ConsumeMessage(t *testing.T, amqpURL, queueName string, timeout time.Duration) *amqp091.Delivery
- func DecodeTaskMessage(t *testing.T, delivery *amqp091.Delivery) map[string]interface{}
- func NewAMQPConnection(host string) (*amqp.Connection, *amqp.Channel)
- func VerifyTaskMessageArgs(t *testing.T, taskMsg map[string]interface{}, expectedArgs []interface{})
- func VerifyTaskMessageKwargs(t *testing.T, taskMsg map[string]interface{}, ...)
- type AMQPBroker
- type Broker
- type BrokerType
- type CeleryDeliveryInfo
- type CeleryMessage
- type CeleryProperties
- type Config
- type PublishMode
- type PublishRequest
- type Publisher
- type RabbitMQTestContainer
- type TaskMessage
- type TaskMessageV1
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConsumeMessage ¶
func ConsumeMessage(t *testing.T, amqpURL, queueName string, timeout time.Duration) *amqp091.Delivery
ConsumeMessage consumes a single message from the specified queue
func DecodeTaskMessage ¶
DecodeTaskMessage decodes the task message directly from AMQP delivery body The delivery body contains plain JSON TaskMessage (Celery Protocol v1 with utf-8 encoding)
func NewAMQPConnection ¶
func NewAMQPConnection(host string) (*amqp.Connection, *amqp.Channel)
NewAMQPConnection creates new AMQP channel
func VerifyTaskMessageArgs ¶
func VerifyTaskMessageArgs(t *testing.T, taskMsg map[string]interface{}, expectedArgs []interface{})
VerifyTaskMessageArgs verifies that the task message contains expected args
func VerifyTaskMessageKwargs ¶
func VerifyTaskMessageKwargs(t *testing.T, taskMsg map[string]interface{}, expectedKwargs map[string]interface{})
VerifyTaskMessageKwargs verifies that the task message contains expected kwargs Note: This function checks that all expectedKwargs are present, but allows additional kwargs
Types ¶
type AMQPBroker ¶
type AMQPBroker struct {
Channel *amqp.Channel
Connection *amqp.Connection
}
func NewAMQPBroker ¶
func NewAMQPBroker(host string) *AMQPBroker
NewAMQPBroker creates new AMQPBroker
func (*AMQPBroker) CanPublish ¶
func (b *AMQPBroker) CanPublish() bool
func (*AMQPBroker) Reconnect ¶
func (b *AMQPBroker) Reconnect(host string) error
func (*AMQPBroker) SendCeleryMessage ¶
func (b *AMQPBroker) SendCeleryMessage(msg *CeleryMessage) error
type Broker ¶
type Broker interface {
CanPublish() bool
Reconnect(host string) error
SendCeleryMessage(msg *CeleryMessage) error
}
type BrokerType ¶
type BrokerType string
BrokerType represents the type of message broker to use.
var (
AMQP BrokerType = "amqp" // RabbitMQ/AMQP broker
)
Supported broker types
type CeleryDeliveryInfo ¶
type CeleryDeliveryInfo struct {
Priority int `json:"priority"`
RoutingKey string `json:"routing_key"`
Exchange string `json:"exchange"`
}
CeleryDeliveryInfo represents deliveryinfo json
type CeleryMessage ¶
type CeleryMessage struct {
Body string `json:"body"`
Headers map[string]interface{} `json:"headers,omitempty"`
ContentType string `json:"content-type"`
Properties CeleryProperties `json:"properties"`
ContentEncoding string `json:"content-encoding"`
}
CeleryMessage is actual message to be sent to the broker https://docs.celeryq.dev/projects/kombu/en/stable/_modules/kombu/message.html
type CeleryProperties ¶
type CeleryProperties struct {
BodyEncoding string `json:"body_encoding"`
CorrelationID string `json:"correlation_id"`
ReplyTo string `json:"reply_to"`
DeliveryInfo CeleryDeliveryInfo `json:"delivery_info"`
DeliveryMode int `json:"delivery_mode"`
DeliveryTag string `json:"delivery_tag"`
}
CeleryProperties represents properties json
type Config ¶
type Config struct {
BrokerType BrokerType // Type of broker to use (e.g., AMQP)
HostURL string // Broker connection URL (e.g., "amqp://guest:guest@localhost:5672/")
PublishMode PublishMode // Publishing mode (DirectMode or ChannelMode)
// contains filtered or unexported fields
}
Config holds the configuration for a Publisher.
type PublishMode ¶
type PublishMode string
PublishMode represents the publishing mode for the Publisher.
var ( // DirectMode publishes messages directly without goroutine safety. // Use this mode for simple, single-threaded applications. // WARNING: Not safe for concurrent use from multiple goroutines. DirectMode PublishMode = "direct" // ChannelMode publishes messages through an internal task channel. // This mode is goroutine-safe and suitable for high-performance concurrent applications. // Uses a single dedicated goroutine to handle all AMQP operations sequentially. ChannelMode PublishMode = "channel" )
Supported publish modes
type PublishRequest ¶
type PublishRequest struct {
Queue string // Target queue name
Task string // Task name (e.g., "tasks.add")
Args []interface{} // Positional arguments for the task
Kwargs map[string]interface{} // Keyword arguments for the task
}
PublishRequest represents a request to publish a Celery task.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher publishes Celery tasks to a message broker. It maintains a connection to the broker and handles message serialization.
func New ¶
New creates a new Publisher with the given configuration. It initializes the appropriate broker based on the BrokerType specified in the config. If PublishMode is not specified, it defaults to DirectMode. For ChannelMode, it starts a dedicated goroutine to handle publishing operations. Returns an error if the broker type is not supported.
func (*Publisher) CanPublish ¶ added in v0.1.0
func (*Publisher) Close ¶
Close gracefully shuts down the Publisher. For ChannelMode, it closes the task channel and waits for the run() goroutine to finish. For both modes, it closes the broker connection. This method should be called when the Publisher is no longer needed to prevent resource leaks.
func (*Publisher) Publish ¶
func (p *Publisher) Publish(req *PublishRequest) error
Publish publishes a Celery task to the specified queue. It creates a Celery-compatible message and sends it via the configured broker. The message uses the default exchange ("") and routes directly to the queue by name.
func (*Publisher) Send ¶
func (p *Publisher) Send(req *PublishRequest) error
Send publishes a Celery task using channel-based mode. This method is goroutine-safe and should be used when the Publisher is configured with ChannelMode. It sends the publish request to an internal channel where a dedicated goroutine handles the actual publishing.
The error channel is buffered to prevent blocking in the run() goroutine. Even if this method returns before the caller reads from errCh, the run() goroutine won't block.
Returns an error if publishing fails or if the Publisher is not in ChannelMode.
type RabbitMQTestContainer ¶
type RabbitMQTestContainer struct {
Container *rabbitmq.RabbitMQContainer
AmqpURL string
}
RabbitMQTestContainer represents a RabbitMQ test container
func SetupRabbitMQContainer ¶
func SetupRabbitMQContainer(t *testing.T) (*RabbitMQTestContainer, func())
SetupRabbitMQContainer starts a RabbitMQ container for testing
type TaskMessage ¶
type TaskMessage interface {
ToCeleryMessage(deliveryInfo CeleryDeliveryInfo) *CeleryMessage
Encode() (string, error)
GetID() string
}
TaskMessage is interface for celery task messages TaskMessage composes the body of CeleryMessage
type TaskMessageV1 ¶
type TaskMessageV1 struct {
ID string `json:"id"`
Task string `json:"task"`
Args []interface{} `json:"args"`
Kwargs map[string]interface{} `json:"kwargs"`
Retries int `json:"retries,omitempty"`
ETA *string `json:"eta,omitempty"`
Expires *string `json:"expires,omitempty"`
Taskset string `json:"taskset,omitempty"` // Group ID (also called group)
UTC bool `json:"utc,omitempty"` // Whether to use UTC timezone
TimeLimit []float64 `json:"timelimit,omitempty"` // [soft, hard] time limits in seconds
}
TaskMessageV1 is celery-compatible message (protocol v1) https://celery-safwan.readthedocs.io/en/latest/internals/protocol.html#version-1
func (*TaskMessageV1) Encode ¶
func (tm *TaskMessageV1) Encode() (string, error)
Encode returns json encoded string (without base64)
func (*TaskMessageV1) GetID ¶
func (tm *TaskMessageV1) GetID() string
func (*TaskMessageV1) ToCeleryMessage ¶
func (tm *TaskMessageV1) ToCeleryMessage(deliveryInfo CeleryDeliveryInfo) *CeleryMessage