Documentation
¶
Index ¶
- Variables
- type Message
- type Message_Metadata
- func (*Message_Metadata) Descriptor() ([]byte, []int)deprecated
- func (x *Message_Metadata) GetAttemptsLeft() int32
- func (x *Message_Metadata) GetLeaseDuration() *durationpb.Duration
- func (x *Message_Metadata) GetLeaseExpiry() int64
- func (x *Message_Metadata) GetLeaseRenewalCount() int32
- func (x *Message_Metadata) GetMaxAttempts() int32
- func (x *Message_Metadata) GetPayload() *v1.Payload
- func (x *Message_Metadata) GetPriority() int64
- func (x *Message_Metadata) GetPriorityLevel() int32
- func (x *Message_Metadata) GetScheduledTime() *timestamppb.Timestamp
- func (x *Message_Metadata) GetState() Message_Metadata_State
- func (*Message_Metadata) ProtoMessage()
- func (x *Message_Metadata) ProtoReflect() protoreflect.Message
- func (x *Message_Metadata) Reset()
- func (x *Message_Metadata) String() string
- type Message_Metadata_State
- func (Message_Metadata_State) Descriptor() protoreflect.EnumDescriptor
- func (x Message_Metadata_State) Enum() *Message_Metadata_State
- func (Message_Metadata_State) EnumDescriptor() ([]byte, []int)deprecated
- func (x Message_Metadata_State) Number() protoreflect.EnumNumber
- func (x Message_Metadata_State) String() string
- func (Message_Metadata_State) Type() protoreflect.EnumType
Constants ¶
This section is empty.
Variables ¶
var ( Message_Metadata_State_name = map[int32]string{ 0: "INVISIBLE", 1: "PENDING", 2: "RUNNING", 3: "COMPLETED", 4: "CANCELED", 5: "ERRORED", } Message_Metadata_State_value = map[string]int32{ "INVISIBLE": 0, "PENDING": 1, "RUNNING": 2, "COMPLETED": 3, "CANCELED": 4, "ERRORED": 5, } )
Enum value maps for Message_Metadata_State.
var File_proto_message_v1_message_proto protoreflect.FileDescriptor
Functions ¶
This section is empty.
Types ¶
type Message ¶
type Message struct {
// message_id: Unique identifier for this message within its queue.
// Must be unique per queue. Used for deduplication and idempotency.
// If you retry posting the same message_id, ChronoQueue returns the existing message.
// Recommended format: UUID, task ID, or "{entity_type}:{entity_id}".
MessageId string `protobuf:"bytes,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"`
// metadata: All message execution and lifecycle information.
Metadata *Message_Metadata `protobuf:"bytes,3,opt,name=metadata,proto3" json:"metadata,omitempty"`
// contains filtered or unexported fields
}
Message represents a unit of work in ChronoQueue.
A message encapsulates: - Your business data (payload) - Execution metadata (state, priority, retries) - Scheduling information (when to execute) - Processing guarantees (leases, acknowledgments)
Lifecycle:
- INVISIBLE: Message is scheduled for future execution or waiting to become visible
- PENDING: Message is ready to be consumed by a worker
- RUNNING: Message is actively being processed (worker has acquired a lease)
- COMPLETED: Message was successfully processed and acknowledged
- ERRORED: Message failed all retry attempts and moved to DLQ
- CANCELED: Message was manually canceled before processing
Example usage:
// Post a message
msg := &message.Message{
MessageId: "task-123",
Metadata: &message.Message_Metadata{
Payload: yourPayload,
Priority: 100, // Higher = processed first
MaxAttempts: 3, // Retry up to 3 times
LeaseDuration: durationpb.New(30*time.Second), // 30s to process
ScheduledTime: timestamppb.New(executeAt), // When to execute
},
}
// Process a message
msg, err := client.GetNextMessage(ctx, queueName)
// ... do work ...
client.AcknowledgeMessage(ctx, queueName, msg.MessageId, streamEntryID)
func (*Message) Descriptor
deprecated
func (*Message) GetMessageId ¶
func (*Message) GetMetadata ¶
func (x *Message) GetMetadata() *Message_Metadata
func (*Message) ProtoMessage ¶
func (*Message) ProtoMessage()
func (*Message) ProtoReflect ¶
func (x *Message) ProtoReflect() protoreflect.Message
type Message_Metadata ¶
type Message_Metadata struct {
// payload: The business data and metadata to be processed.
// This is what your workers receive and act upon.
Payload *v1.Payload `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
State Message_Metadata_State `protobuf:"varint,2,opt,name=state,proto3,enum=chronoqueue.api.message.v1.Message_Metadata_State" json:"state,omitempty"`
// attempts_left: Number of retry attempts remaining before moving to DLQ.
// Decrements with each processing failure. When reaches 0 and fails, moves to ERRORED state.
// Set max_attempts when posting to control retry behavior.
AttemptsLeft int32 `protobuf:"varint,4,opt,name=attempts_left,json=attemptsLeft,proto3" json:"attempts_left,omitempty"`
// lease_duration: How long a worker can process this message before the lease expires.
// After this duration, if not acknowledged or renewed, message returns to PENDING state.
// Workers should call RenewLease or SendHeartbeat if processing takes longer.
// Typical values: 30s for quick tasks, 5m for longer processing.
LeaseDuration *durationpb.Duration `protobuf:"bytes,5,opt,name=lease_duration,json=leaseDuration,proto3" json:"lease_duration,omitempty"`
// lease_expiry: Unix timestamp (milliseconds) when the current lease expires.
// Managed automatically by ChronoQueue. Workers can monitor this to know when to renew.
// If current time > lease_expiry, message will be reclaimed and made PENDING again.
LeaseExpiry int64 `protobuf:"varint,6,opt,name=lease_expiry,json=leaseExpiry,proto3" json:"lease_expiry,omitempty"`
// lease_renewal_count: Number of times the lease has been renewed for this processing attempt.
// Useful for detecting stuck/zombie workers that keep renewing without completing.
// Consider implementing a max renewal limit in your worker logic.
LeaseRenewalCount int32 `protobuf:"varint,7,opt,name=lease_renewal_count,json=leaseRenewalCount,proto3" json:"lease_renewal_count,omitempty"`
// priority: Message priority score (0-2147483647, higher = processed first).
// ChronoQueue uses priority streams (high: ≥70, medium: ≥30, low: <30).
// Messages with the same priority are processed FIFO.
// Use for: urgent tasks (100), normal (50), background (10).
Priority int64 `protobuf:"varint,9,opt,name=priority,proto3" json:"priority,omitempty"`
// max_attempts: Maximum number of processing attempts before moving to DLQ.
// If omitted, uses queue's default_max_attempts.
// Set to -1 for infinite retries (use with caution).
// Common values: 1 (no retry), 3 (standard), 5 (persistent), -1 (infinite).
MaxAttempts int32 `protobuf:"varint,10,opt,name=max_attempts,json=maxAttempts,proto3" json:"max_attempts,omitempty"`
// scheduled_time: When this message should become visible for processing.
// If set to a future time, message enters INVISIBLE state until that time.
// If omitted or past, message immediately becomes PENDING.
// Use for: scheduled tasks, delayed execution, rate limiting, backoff strategies.
// Example: Schedule order fulfillment for tomorrow at 9 AM.
ScheduledTime *timestamppb.Timestamp `protobuf:"bytes,20,opt,name=scheduled_time,json=scheduledTime,proto3" json:"scheduled_time,omitempty"`
// priority_level: Internal priority level for stream routing (auto-calculated).
// Managed by ChronoQueue based on priority score. Read-only from client perspective.
// Used internally for routing to high/medium/low priority streams.
PriorityLevel int32 `protobuf:"varint,21,opt,name=priority_level,json=priorityLevel,proto3" json:"priority_level,omitempty"`
// contains filtered or unexported fields
}
Metadata contains all message execution and lifecycle information.
func (*Message_Metadata) Descriptor
deprecated
func (*Message_Metadata) Descriptor() ([]byte, []int)
Deprecated: Use Message_Metadata.ProtoReflect.Descriptor instead.
func (*Message_Metadata) GetAttemptsLeft ¶
func (x *Message_Metadata) GetAttemptsLeft() int32
func (*Message_Metadata) GetLeaseDuration ¶
func (x *Message_Metadata) GetLeaseDuration() *durationpb.Duration
func (*Message_Metadata) GetLeaseExpiry ¶
func (x *Message_Metadata) GetLeaseExpiry() int64
func (*Message_Metadata) GetLeaseRenewalCount ¶
func (x *Message_Metadata) GetLeaseRenewalCount() int32
func (*Message_Metadata) GetMaxAttempts ¶
func (x *Message_Metadata) GetMaxAttempts() int32
func (*Message_Metadata) GetPayload ¶
func (x *Message_Metadata) GetPayload() *v1.Payload
func (*Message_Metadata) GetPriority ¶
func (x *Message_Metadata) GetPriority() int64
func (*Message_Metadata) GetPriorityLevel ¶
func (x *Message_Metadata) GetPriorityLevel() int32
func (*Message_Metadata) GetScheduledTime ¶
func (x *Message_Metadata) GetScheduledTime() *timestamppb.Timestamp
func (*Message_Metadata) GetState ¶
func (x *Message_Metadata) GetState() Message_Metadata_State
func (*Message_Metadata) ProtoMessage ¶
func (*Message_Metadata) ProtoMessage()
func (*Message_Metadata) ProtoReflect ¶
func (x *Message_Metadata) ProtoReflect() protoreflect.Message
func (*Message_Metadata) Reset ¶
func (x *Message_Metadata) Reset()
func (*Message_Metadata) String ¶
func (x *Message_Metadata) String() string
type Message_Metadata_State ¶
type Message_Metadata_State int32
State represents the message lifecycle stage.
State transitions:
POST -> INVISIBLE (if scheduled_time is set) POST -> PENDING (if no scheduled_time or scheduled_time is past) INVISIBLE -> PENDING (when scheduled_time arrives) PENDING -> RUNNING (when worker calls GetNextMessage) RUNNING -> COMPLETED (when worker calls AcknowledgeMessage) RUNNING -> PENDING (if lease expires without acknowledgment) RUNNING -> ERRORED (if max_attempts exhausted) * -> CANCELED (when CancelMessage is called)
Important: Messages in COMPLETED, CANCELED, and ERRORED states are terminal and won't be processed again (unless explicitly requeued from DLQ).
const ( Message_Metadata_INVISIBLE Message_Metadata_State = 0 // Scheduled for future, not yet visible to workers Message_Metadata_PENDING Message_Metadata_State = 1 // Ready to be consumed by workers (visible in queue) Message_Metadata_RUNNING Message_Metadata_State = 2 // Currently being processed (lease acquired) Message_Metadata_COMPLETED Message_Metadata_State = 3 // Successfully processed and acknowledged (terminal) Message_Metadata_CANCELED Message_Metadata_State = 4 // Manually canceled (terminal) Message_Metadata_ERRORED Message_Metadata_State = 5 // Failed after all retries, moved to DLQ (terminal) )
func (Message_Metadata_State) Descriptor ¶
func (Message_Metadata_State) Descriptor() protoreflect.EnumDescriptor
func (Message_Metadata_State) Enum ¶
func (x Message_Metadata_State) Enum() *Message_Metadata_State
func (Message_Metadata_State) EnumDescriptor
deprecated
func (Message_Metadata_State) EnumDescriptor() ([]byte, []int)
Deprecated: Use Message_Metadata_State.Descriptor instead.
func (Message_Metadata_State) Number ¶
func (x Message_Metadata_State) Number() protoreflect.EnumNumber
func (Message_Metadata_State) String ¶
func (x Message_Metadata_State) String() string
func (Message_Metadata_State) Type ¶
func (Message_Metadata_State) Type() protoreflect.EnumType