message

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	Message_Metadata_State_name = map[int32]string{
		0: "INVISIBLE",
		1: "PENDING",
		2: "RUNNING",
		3: "COMPLETED",
		4: "CANCELED",
		5: "ERRORED",
	}
	Message_Metadata_State_value = map[string]int32{
		"INVISIBLE": 0,
		"PENDING":   1,
		"RUNNING":   2,
		"COMPLETED": 3,
		"CANCELED":  4,
		"ERRORED":   5,
	}
)

Enum value maps for Message_Metadata_State.

View Source
var File_proto_message_v1_message_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type Message

type Message struct {

	// message_id: Unique identifier for this message within its queue.
	// Must be unique per queue. Used for deduplication and idempotency.
	// If you retry posting the same message_id, ChronoQueue returns the existing message.
	// Recommended format: UUID, task ID, or "{entity_type}:{entity_id}".
	MessageId string `protobuf:"bytes,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"`
	// metadata: All message execution and lifecycle information.
	Metadata *Message_Metadata `protobuf:"bytes,3,opt,name=metadata,proto3" json:"metadata,omitempty"`
	// contains filtered or unexported fields
}

Message represents a unit of work in ChronoQueue.

A message encapsulates: - Your business data (payload) - Execution metadata (state, priority, retries) - Scheduling information (when to execute) - Processing guarantees (leases, acknowledgments)

Lifecycle:

  1. INVISIBLE: Message is scheduled for future execution or waiting to become visible
  2. PENDING: Message is ready to be consumed by a worker
  3. RUNNING: Message is actively being processed (worker has acquired a lease)
  4. COMPLETED: Message was successfully processed and acknowledged
  5. ERRORED: Message failed all retry attempts and moved to DLQ
  6. CANCELED: Message was manually canceled before processing

Example usage:

// Post a message
msg := &message.Message{
    MessageId: "task-123",
    Metadata: &message.Message_Metadata{
        Payload: yourPayload,
        Priority: 100,                        // Higher = processed first
        MaxAttempts: 3,                       // Retry up to 3 times
        LeaseDuration: durationpb.New(30*time.Second), // 30s to process
        ScheduledTime: timestamppb.New(executeAt),     // When to execute
    },
}

// Process a message
msg, err := client.GetNextMessage(ctx, queueName)
// ... do work ...
client.AcknowledgeMessage(ctx, queueName, msg.MessageId, streamEntryID)

func (*Message) Descriptor deprecated

func (*Message) Descriptor() ([]byte, []int)

Deprecated: Use Message.ProtoReflect.Descriptor instead.

func (*Message) GetMessageId

func (x *Message) GetMessageId() string

func (*Message) GetMetadata

func (x *Message) GetMetadata() *Message_Metadata

func (*Message) ProtoMessage

func (*Message) ProtoMessage()

func (*Message) ProtoReflect

func (x *Message) ProtoReflect() protoreflect.Message

func (*Message) Reset

func (x *Message) Reset()

func (*Message) String

func (x *Message) String() string

type Message_Metadata

type Message_Metadata struct {

	// payload: The business data and metadata to be processed.
	// This is what your workers receive and act upon.
	Payload *v1.Payload            `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
	State   Message_Metadata_State `protobuf:"varint,2,opt,name=state,proto3,enum=chronoqueue.api.message.v1.Message_Metadata_State" json:"state,omitempty"`
	// attempts_left: Number of retry attempts remaining before moving to DLQ.
	// Decrements with each processing failure. When reaches 0 and fails, moves to ERRORED state.
	// Set max_attempts when posting to control retry behavior.
	AttemptsLeft int32 `protobuf:"varint,4,opt,name=attempts_left,json=attemptsLeft,proto3" json:"attempts_left,omitempty"`
	// lease_duration: How long a worker can process this message before the lease expires.
	// After this duration, if not acknowledged or renewed, message returns to PENDING state.
	// Workers should call RenewLease or SendHeartbeat if processing takes longer.
	// Typical values: 30s for quick tasks, 5m for longer processing.
	LeaseDuration *durationpb.Duration `protobuf:"bytes,5,opt,name=lease_duration,json=leaseDuration,proto3" json:"lease_duration,omitempty"`
	// lease_expiry: Unix timestamp (milliseconds) when the current lease expires.
	// Managed automatically by ChronoQueue. Workers can monitor this to know when to renew.
	// If current time > lease_expiry, message will be reclaimed and made PENDING again.
	LeaseExpiry int64 `protobuf:"varint,6,opt,name=lease_expiry,json=leaseExpiry,proto3" json:"lease_expiry,omitempty"`
	// lease_renewal_count: Number of times the lease has been renewed for this processing attempt.
	// Useful for detecting stuck/zombie workers that keep renewing without completing.
	// Consider implementing a max renewal limit in your worker logic.
	LeaseRenewalCount int32 `protobuf:"varint,7,opt,name=lease_renewal_count,json=leaseRenewalCount,proto3" json:"lease_renewal_count,omitempty"`
	// priority: Message priority score (0-2147483647, higher = processed first).
	// ChronoQueue uses priority streams (high: ≥70, medium: ≥30, low: <30).
	// Messages with the same priority are processed FIFO.
	// Use for: urgent tasks (100), normal (50), background (10).
	Priority int64 `protobuf:"varint,9,opt,name=priority,proto3" json:"priority,omitempty"`
	// max_attempts: Maximum number of processing attempts before moving to DLQ.
	// If omitted, uses queue's default_max_attempts.
	// Set to -1 for infinite retries (use with caution).
	// Common values: 1 (no retry), 3 (standard), 5 (persistent), -1 (infinite).
	MaxAttempts int32 `protobuf:"varint,10,opt,name=max_attempts,json=maxAttempts,proto3" json:"max_attempts,omitempty"`
	// scheduled_time: When this message should become visible for processing.
	// If set to a future time, message enters INVISIBLE state until that time.
	// If omitted or past, message immediately becomes PENDING.
	// Use for: scheduled tasks, delayed execution, rate limiting, backoff strategies.
	// Example: Schedule order fulfillment for tomorrow at 9 AM.
	ScheduledTime *timestamppb.Timestamp `protobuf:"bytes,20,opt,name=scheduled_time,json=scheduledTime,proto3" json:"scheduled_time,omitempty"`
	// priority_level: Internal priority level for stream routing (auto-calculated).
	// Managed by ChronoQueue based on priority score. Read-only from client perspective.
	// Used internally for routing to high/medium/low priority streams.
	PriorityLevel int32 `protobuf:"varint,21,opt,name=priority_level,json=priorityLevel,proto3" json:"priority_level,omitempty"`
	// contains filtered or unexported fields
}

Metadata contains all message execution and lifecycle information.

func (*Message_Metadata) Descriptor deprecated

func (*Message_Metadata) Descriptor() ([]byte, []int)

Deprecated: Use Message_Metadata.ProtoReflect.Descriptor instead.

func (*Message_Metadata) GetAttemptsLeft

func (x *Message_Metadata) GetAttemptsLeft() int32

func (*Message_Metadata) GetLeaseDuration

func (x *Message_Metadata) GetLeaseDuration() *durationpb.Duration

func (*Message_Metadata) GetLeaseExpiry

func (x *Message_Metadata) GetLeaseExpiry() int64

func (*Message_Metadata) GetLeaseRenewalCount

func (x *Message_Metadata) GetLeaseRenewalCount() int32

func (*Message_Metadata) GetMaxAttempts

func (x *Message_Metadata) GetMaxAttempts() int32

func (*Message_Metadata) GetPayload

func (x *Message_Metadata) GetPayload() *v1.Payload

func (*Message_Metadata) GetPriority

func (x *Message_Metadata) GetPriority() int64

func (*Message_Metadata) GetPriorityLevel

func (x *Message_Metadata) GetPriorityLevel() int32

func (*Message_Metadata) GetScheduledTime

func (x *Message_Metadata) GetScheduledTime() *timestamppb.Timestamp

func (*Message_Metadata) GetState

func (*Message_Metadata) ProtoMessage

func (*Message_Metadata) ProtoMessage()

func (*Message_Metadata) ProtoReflect

func (x *Message_Metadata) ProtoReflect() protoreflect.Message

func (*Message_Metadata) Reset

func (x *Message_Metadata) Reset()

func (*Message_Metadata) String

func (x *Message_Metadata) String() string

type Message_Metadata_State

type Message_Metadata_State int32

State represents the message lifecycle stage.

State transitions:

POST -> INVISIBLE (if scheduled_time is set)
POST -> PENDING (if no scheduled_time or scheduled_time is past)
INVISIBLE -> PENDING (when scheduled_time arrives)
PENDING -> RUNNING (when worker calls GetNextMessage)
RUNNING -> COMPLETED (when worker calls AcknowledgeMessage)
RUNNING -> PENDING (if lease expires without acknowledgment)
RUNNING -> ERRORED (if max_attempts exhausted)
* -> CANCELED (when CancelMessage is called)

Important: Messages in COMPLETED, CANCELED, and ERRORED states are terminal and won't be processed again (unless explicitly requeued from DLQ).

const (
	Message_Metadata_INVISIBLE Message_Metadata_State = 0 // Scheduled for future, not yet visible to workers
	Message_Metadata_PENDING   Message_Metadata_State = 1 // Ready to be consumed by workers (visible in queue)
	Message_Metadata_RUNNING   Message_Metadata_State = 2 // Currently being processed (lease acquired)
	Message_Metadata_COMPLETED Message_Metadata_State = 3 // Successfully processed and acknowledged (terminal)
	Message_Metadata_CANCELED  Message_Metadata_State = 4 // Manually canceled (terminal)
	Message_Metadata_ERRORED   Message_Metadata_State = 5 // Failed after all retries, moved to DLQ (terminal)
)

func (Message_Metadata_State) Descriptor

func (Message_Metadata_State) Enum

func (Message_Metadata_State) EnumDescriptor deprecated

func (Message_Metadata_State) EnumDescriptor() ([]byte, []int)

Deprecated: Use Message_Metadata_State.Descriptor instead.

func (Message_Metadata_State) Number

func (Message_Metadata_State) String

func (x Message_Metadata_State) String() string

func (Message_Metadata_State) Type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL