Documentation
¶
Index ¶
- func ConsumeNonBlock(f Consumer)
- type BatchSendOption
- type Consumer
- type Message
- type MessageBatch
- type MessageSendRequest
- func NewBytesMessageSendRequest(content []byte, opts ...SendOption) *MessageSendRequest
- func NewJSONMessageSendRequest(content any, opts ...SendOption) *MessageSendRequest
- func NewTextMessageSendRequest(content string, opts ...SendOption) *MessageSendRequest
- func NewV8MessageSendRequest(content js.Value, opts ...SendOption) *MessageSendRequest
- type Producer
- func (p *Producer) SendBatch(messages []*MessageSendRequest, opts ...BatchSendOption) error
- func (p *Producer) SendBytes(body []byte, opts ...SendOption) error
- func (p *Producer) SendJSON(body any, opts ...SendOption) error
- func (p *Producer) SendText(body string, opts ...SendOption) error
- func (p *Producer) SendV8(body js.Value, opts ...SendOption) error
- type RetryOption
- type SendOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConsumeNonBlock ¶
func ConsumeNonBlock(f Consumer)
ConsumeNonBlock sets the Consumer function to receive batches of messages from Cloudflare Queues. This function is intented to be used when the worker has other purposes (e.g. handling HTTP requests). The worker will not block receiving messages and will continue to execute other tasks. ConsumeNonBlock should be called before setting other blocking handlers (e.g. workers.Serve).
Types ¶
type BatchSendOption ¶
type BatchSendOption func(*batchSendOptions)
func WithBatchDelaySeconds ¶
func WithBatchDelaySeconds(d time.Duration) BatchSendOption
WithBatchDelaySeconds changes the number of seconds to delay the message.
type Consumer ¶
type Consumer func(batch *MessageBatch) error
Consumer is a function that received a batch of messages from Cloudflare Queues. The function should be set using Consume or ConsumeNonBlock. A returned error will cause the batch to be retried (unless the batch or individual messages are acked). NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message acknowledgment until the task is completed witout blocking the queue consumption.
type Message ¶
type Message struct {
// ID - The unique Cloudflare-generated identifier of the message
ID string
// Timestamp - The time when the message was enqueued
Timestamp time.Time
// Body - The message body. Could be accessed directly or using converting helpers as StringBody, BytesBody, IntBody, FloatBody.
Body js.Value
// Attempts - The number of times the message delivery has been retried.
Attempts int
// contains filtered or unexported fields
}
Message represents a message of the batch received by the consumer.
func (*Message) Ack ¶
func (m *Message) Ack()
Ack acknowledges the message as successfully delivered despite the result returned from the consuming function.
func (*Message) Retry ¶
func (m *Message) Retry(opts ...RetryOption)
Retry marks the message to be re-delivered. The message will be retried after the optional delay configured with RetryOption.
func (*Message) StringBody ¶
type MessageBatch ¶
type MessageBatch struct {
// Queue - The name of the queue from which the messages were received
Queue string
// Messages - The messages in the batch
Messages []*Message
// contains filtered or unexported fields
}
MessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the worker configuration.
- https://developers.cloudflare.com/queues/configuration/configure-queues/#consumer
- https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
func (*MessageBatch) AckAll ¶
func (b *MessageBatch) AckAll()
AckAll acknowledges all messages in the batch as successfully delivered despite the result returned from the consuming function.
func (*MessageBatch) RetryAll ¶
func (b *MessageBatch) RetryAll(opts ...RetryOption)
RetryAll marks all messages in the batch to be re-delivered. The messages will be retried after the optional delay configured with RetryOption.
type MessageSendRequest ¶
type MessageSendRequest struct {
// contains filtered or unexported fields
}
MessageSendRequest is a wrapper type used for sending message batches. see: https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagesendrequest
func NewBytesMessageSendRequest ¶
func NewBytesMessageSendRequest(content []byte, opts ...SendOption) *MessageSendRequest
NewBytesMessageSendRequest creates a single byte array message to be batched before sending to a queue.
func NewJSONMessageSendRequest ¶
func NewJSONMessageSendRequest(content any, opts ...SendOption) *MessageSendRequest
NewJSONMessageSendRequest creates a single JSON message to be batched before sending to a queue.
func NewTextMessageSendRequest ¶
func NewTextMessageSendRequest(content string, opts ...SendOption) *MessageSendRequest
NewTextMessageSendRequest creates a single text message to be batched before sending to a queue.
func NewV8MessageSendRequest ¶
func NewV8MessageSendRequest(content js.Value, opts ...SendOption) *MessageSendRequest
NewV8MessageSendRequest creates a single raw JS value message to be batched before sending to a queue.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
NewProducer creates a new Producer object to send messages to a queue. queueName is the name of the queue environment var to send messages to. In Cloudflare API documentation, this object represents the Queue.
func (*Producer) SendBatch ¶
func (p *Producer) SendBatch(messages []*MessageSendRequest, opts ...BatchSendOption) error
SendBatch sends multiple messages to a queue. This function allows setting options for each message.
func (*Producer) SendBytes ¶
func (p *Producer) SendBytes(body []byte, opts ...SendOption) error
SendBytes sends a single byte array message to a queue.
func (*Producer) SendJSON ¶
func (p *Producer) SendJSON(body any, opts ...SendOption) error
SendJSON sends a single JSON message to a queue.
type RetryOption ¶
type RetryOption func(*retryOptions)
func WithRetryDelay ¶
func WithRetryDelay(d time.Duration) RetryOption
WithRetryDelay sets the delay in seconds before the messages delivery is retried. Note that the delay should not be less than a second and is not more precise than a second.
type SendOption ¶
type SendOption func(*sendOptions)
func WithDelaySeconds ¶
func WithDelaySeconds(d time.Duration) SendOption
WithDelaySeconds changes the number of seconds to delay the message.