queue

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2025 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	QueueType_name = map[int32]string{
		0: "SIMPLE",
		1: "EXCLUSIVE",
	}
	QueueType_value = map[string]int32{
		"SIMPLE":    0,
		"EXCLUSIVE": 1,
	}
)

Enum value maps for QueueType.

View Source
var (
	FairnessPolicy_name = map[int32]string{
		0: "STRICT",
		1: "WEIGHTED",
		2: "AGING",
		3: "HYBRID",
	}
	FairnessPolicy_value = map[string]int32{
		"STRICT":   0,
		"WEIGHTED": 1,
		"AGING":    2,
		"HYBRID":   3,
	}
)

Enum value maps for FairnessPolicy.

View Source
var File_proto_queue_v1_queue_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type FairnessPolicy

type FairnessPolicy int32

FairnessPolicy determines how messages with different priorities compete for processing.

Priority fairness is critical in systems with both urgent and background tasks. Without fairness policies, low-priority messages could starve indefinitely.

const (
	FairnessPolicy_STRICT   FairnessPolicy = 0 // Process strictly by priority (high before medium before low)
	FairnessPolicy_WEIGHTED FairnessPolicy = 1 // Balance priority with fairness using weights
	FairnessPolicy_AGING    FairnessPolicy = 2 // Boost priority of messages waiting too long
	FairnessPolicy_HYBRID   FairnessPolicy = 3 // Combines weighted and aging strategies
)

func (FairnessPolicy) Descriptor

func (FairnessPolicy) Enum

func (x FairnessPolicy) Enum() *FairnessPolicy

func (FairnessPolicy) EnumDescriptor deprecated

func (FairnessPolicy) EnumDescriptor() ([]byte, []int)

Deprecated: Use FairnessPolicy.Descriptor instead.

func (FairnessPolicy) Number

func (FairnessPolicy) String

func (x FairnessPolicy) String() string

func (FairnessPolicy) Type

type PriorityConfig

type PriorityConfig struct {

	// policy: The fairness strategy to use.
	// See FairnessPolicy enum for detailed descriptions.
	Policy FairnessPolicy `protobuf:"varint,1,opt,name=policy,proto3,enum=chronoqueue.api.queue.v1.FairnessPolicy" json:"policy,omitempty"`
	// priority_weights: For WEIGHTED/HYBRID policies, maps priority levels to weights.
	// Key: priority level (e.g., 100, 50, 10)
	// Value: relative weight (e.g., 70, 20, 10 = 70% high, 20% medium, 10% low)
	// Weights are relative - they're normalized to percentages.
	PriorityWeights map[int32]int32 `` /* 182-byte string literal not displayed */
	// age_boost_threshold: For AGING/HYBRID policies, how long before boosting priority.
	// Messages waiting longer than this duration get their priority boosted.
	// Prevents indefinite starvation of low-priority messages.
	// Typical values: 15m (aggressive), 30m (standard), 1h (relaxed).
	AgeBoostThreshold *durationpb.Duration `protobuf:"bytes,3,opt,name=age_boost_threshold,json=ageBoostThreshold,proto3" json:"age_boost_threshold,omitempty"`
	// age_boost_multiplier: For AGING/HYBRID policies, how much to boost priority.
	// Priority multiplied by this factor every age_boost_threshold period.
	// Example: multiplier=2, threshold=30m means priority doubles every 30 minutes.
	// Typical values: 1.5 (gentle), 2 (standard), 3 (aggressive).
	AgeBoostMultiplier int32 `protobuf:"varint,4,opt,name=age_boost_multiplier,json=ageBoostMultiplier,proto3" json:"age_boost_multiplier,omitempty"`
	// contains filtered or unexported fields
}

PriorityConfig provides fine-grained control over priority-based processing.

Example - Weighted fairness with aging:

config := &queue.PriorityConfig{
    Policy: queue.FairnessPolicy_HYBRID,
    PriorityWeights: map[int32]int32{
        100: 70,  // High priority: 70% of processing capacity
        50:  20,  // Medium priority: 20%
        10:  10,  // Low priority: 10%
    },
    AgeBoostThreshold: durationpb.New(30*time.Minute),  // Boost after 30min wait
    AgeBoostMultiplier: 2,  // Double the priority every 30min
}

func (*PriorityConfig) Descriptor deprecated

func (*PriorityConfig) Descriptor() ([]byte, []int)

Deprecated: Use PriorityConfig.ProtoReflect.Descriptor instead.

func (*PriorityConfig) GetAgeBoostMultiplier

func (x *PriorityConfig) GetAgeBoostMultiplier() int32

func (*PriorityConfig) GetAgeBoostThreshold

func (x *PriorityConfig) GetAgeBoostThreshold() *durationpb.Duration

func (*PriorityConfig) GetPolicy

func (x *PriorityConfig) GetPolicy() FairnessPolicy

func (*PriorityConfig) GetPriorityWeights

func (x *PriorityConfig) GetPriorityWeights() map[int32]int32

func (*PriorityConfig) ProtoMessage

func (*PriorityConfig) ProtoMessage()

func (*PriorityConfig) ProtoReflect

func (x *PriorityConfig) ProtoReflect() protoreflect.Message

func (*PriorityConfig) Reset

func (x *PriorityConfig) Reset()

func (*PriorityConfig) String

func (x *PriorityConfig) String() string

type Queue

type Queue struct {

	// name: Unique identifier for this queue.
	// Must be unique across your ChronoQueue instance.
	// Naming convention: lowercase, hyphens, descriptive
	// Examples: "order-processing", "email-sender", "webhook-delivery"
	Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	// metadata: Queue configuration and behavior settings.
	Metadata *QueueMetadata `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"`
	// contains filtered or unexported fields
}

Queue represents a named message container with specific processing behavior.

Queues are the primary organizational unit in ChronoQueue. Create separate queues for different types of work to: - Isolate failures (one queue's DLQ doesn't affect others) - Apply different processing rules (retries, priorities, leases) - Scale consumers independently - Organize by domain (orders, notifications, analytics)

Example queue names: - "order-processing" // Core business logic - "email-notifications" // User communications - "webhook-deliveries" // External API calls - "data-analytics-high" // Time-sensitive analytics - "cleanup-tasks-low" // Background maintenance

func (*Queue) Descriptor deprecated

func (*Queue) Descriptor() ([]byte, []int)

Deprecated: Use Queue.ProtoReflect.Descriptor instead.

func (*Queue) GetMetadata

func (x *Queue) GetMetadata() *QueueMetadata

func (*Queue) GetName

func (x *Queue) GetName() string

func (*Queue) ProtoMessage

func (*Queue) ProtoMessage()

func (*Queue) ProtoReflect

func (x *Queue) ProtoReflect() protoreflect.Message

func (*Queue) Reset

func (x *Queue) Reset()

func (*Queue) String

func (x *Queue) String() string

type QueueMetadata

type QueueMetadata struct {

	// type: Determines message consumption pattern.
	// SIMPLE: Multiple workers process concurrently (most common).
	// EXCLUSIVE: Single worker processes sequentially (ensures order).
	Type QueueType `protobuf:"varint,1,opt,name=type,proto3,enum=chronoqueue.api.queue.v1.QueueType" json:"type,omitempty"`
	// default_max_attempts: Default retry count for messages without explicit max_attempts.
	// Messages failing this many times move to the dead letter queue.
	// Common values: 1 (no retry), 3 (standard), 5 (aggressive retry).
	// Set to -1 for infinite retries (use with extreme caution).
	DefaultMaxAttempts int32 `protobuf:"varint,2,opt,name=default_max_attempts,json=defaultMaxAttempts,proto3" json:"default_max_attempts,omitempty"`
	// lease_duration: Default time workers have to process a message.
	// If a worker doesn't acknowledge within this time, message returns to queue.
	// Workers can renew leases for longer operations via RenewLease/SendHeartbeat.
	// Common values: 30s (quick tasks), 5m (standard), 30m (long-running).
	// Balance between: too short = unnecessary retries, too long = slow failure detection.
	LeaseDuration *durationpb.Duration `protobuf:"bytes,3,opt,name=lease_duration,json=leaseDuration,proto3" json:"lease_duration,omitempty"`
	// exclusivity_key: For EXCLUSIVE queues, this key ensures single-consumer access.
	// Only one worker with a matching exclusivity key can consume from this queue.
	// Leave empty for non-exclusive queues.
	// Use cases: ordered processing, singleton workers, distributed locks.
	ExclusivityKey string `protobuf:"bytes,4,opt,name=exclusivity_key,json=exclusivityKey,proto3" json:"exclusivity_key,omitempty"`
	// dead_letter_queue_name: Name of the DLQ for messages that exhaust retries.
	// Messages that fail after max_attempts are moved here for investigation.
	// Naming convention: "{original-queue}-dlq" or "{original-queue}-errors"
	// Access DLQ messages via DLQ APIs: GetDLQMessage, ListDLQMessages, RequeueDLQMessage.
	DeadLetterQueueName string `protobuf:"bytes,6,opt,name=dead_letter_queue_name,json=deadLetterQueueName,proto3" json:"dead_letter_queue_name,omitempty"`
	// auto_create_dlq: If true, ChronoQueue automatically creates the DLQ if it doesn't exist.
	// Recommended: true for production (ensures no messages are lost).
	// Set to false if you want to manually control DLQ creation and configuration.
	AutoCreateDlq bool `protobuf:"varint,7,opt,name=auto_create_dlq,json=autoCreateDlq,proto3" json:"auto_create_dlq,omitempty"`
	// schema_id: Default schema for validating messages posted to this queue.
	// All messages must conform to this schema (if schema_required is true).
	// Helps ensure data quality and catch errors before processing.
	// Example: "order.created.v1", "user.profile.v2"
	// Register schemas via RegisterSchema API.
	SchemaId string `protobuf:"bytes,8,opt,name=schema_id,json=schemaId,proto3" json:"schema_id,omitempty"`
	// schema_required: If true, all messages must have a valid schema and pass validation.
	// Messages without a schema or failing validation are rejected.
	// Use in production for critical data pipelines to prevent bad data.
	// Set to false for development or when schema validation isn't needed.
	SchemaRequired bool `protobuf:"varint,9,opt,name=schema_required,json=schemaRequired,proto3" json:"schema_required,omitempty"`
	// max_payload_size: Maximum size of message payload in bytes.
	// Messages exceeding this limit are rejected.
	// If not set or 0, uses system default (typically 256KB).
	// Increase for large payloads (e.g., 1MB = 1048576).
	// Note: Very large messages impact performance - consider using references instead.
	MaxPayloadSize int32 `protobuf:"varint,10,opt,name=max_payload_size,json=maxPayloadSize,proto3" json:"max_payload_size,omitempty"`
	// allowed_content_types: List of permitted MIME types for message payloads.
	// Empty list = all content types allowed (default).
	// Examples: ["application/json"], ["application/json", "application/xml"]
	// Use to enforce data format standards across your organization.
	AllowedContentTypes []string `protobuf:"bytes,11,rep,name=allowed_content_types,json=allowedContentTypes,proto3" json:"allowed_content_types,omitempty"`
	// priority_config: Advanced priority scheduling configuration.
	// Controls how messages with different priorities are processed.
	// Most users can omit this - defaults work well for common cases.
	// Customize for: complex fairness requirements, aging prevention, weighted priorities.
	PriorityConfig *PriorityConfig `protobuf:"bytes,12,opt,name=priority_config,json=priorityConfig,proto3" json:"priority_config,omitempty"`
	// ---------------------------------------------------------------------------
	// Lease model defaults (queue-level)
	// ---------------------------------------------------------------------------
	// Message-level policies can selectively override these fields.
	LeasePolicy *v1.LeasePolicy `protobuf:"bytes,13,opt,name=lease_policy,json=leasePolicy,proto3" json:"lease_policy,omitempty"`
	// contains filtered or unexported fields
}

QueueMetadata defines the configuration and behavior of a queue.

This controls: - Message processing rules (retries, leases, priorities) - Data validation (schemas, content types) - Error handling (dead letter queues) - Consumer patterns (exclusivity, fairness)

Example - Create a queue for order processing:

metadata := &queue.QueueMetadata{
    Type: queue.QueueType_SIMPLE,
    DefaultMaxAttempts: 3,
    LeaseDuration: durationpb.New(5*time.Minute),
    DeadLetterQueueName: "order-processing-dlq",
    AutoCreateDlq: true,
    SchemaId: "order.v1",
    SchemaRequired: true,
    MaxPayloadSize: 1024 * 512,  // 512KB
    AllowedContentTypes: []string{"application/json"},
}

func (*QueueMetadata) Descriptor deprecated

func (*QueueMetadata) Descriptor() ([]byte, []int)

Deprecated: Use QueueMetadata.ProtoReflect.Descriptor instead.

func (*QueueMetadata) GetAllowedContentTypes

func (x *QueueMetadata) GetAllowedContentTypes() []string

func (*QueueMetadata) GetAutoCreateDlq

func (x *QueueMetadata) GetAutoCreateDlq() bool

func (*QueueMetadata) GetDeadLetterQueueName

func (x *QueueMetadata) GetDeadLetterQueueName() string

func (*QueueMetadata) GetDefaultMaxAttempts

func (x *QueueMetadata) GetDefaultMaxAttempts() int32

func (*QueueMetadata) GetExclusivityKey

func (x *QueueMetadata) GetExclusivityKey() string

func (*QueueMetadata) GetLeaseDuration

func (x *QueueMetadata) GetLeaseDuration() *durationpb.Duration

func (*QueueMetadata) GetLeasePolicy added in v1.1.1

func (x *QueueMetadata) GetLeasePolicy() *v1.LeasePolicy

func (*QueueMetadata) GetMaxPayloadSize

func (x *QueueMetadata) GetMaxPayloadSize() int32

func (*QueueMetadata) GetPriorityConfig

func (x *QueueMetadata) GetPriorityConfig() *PriorityConfig

func (*QueueMetadata) GetSchemaId

func (x *QueueMetadata) GetSchemaId() string

func (*QueueMetadata) GetSchemaRequired

func (x *QueueMetadata) GetSchemaRequired() bool

func (*QueueMetadata) GetType

func (x *QueueMetadata) GetType() QueueType

func (*QueueMetadata) ProtoMessage

func (*QueueMetadata) ProtoMessage()

func (*QueueMetadata) ProtoReflect

func (x *QueueMetadata) ProtoReflect() protoreflect.Message

func (*QueueMetadata) Reset

func (x *QueueMetadata) Reset()

func (*QueueMetadata) String

func (x *QueueMetadata) String() string

type QueueType

type QueueType int32

QueueType defines the message consumption pattern.

Choose based on your use case: - SIMPLE: Most common. Multiple workers can consume in parallel. - EXCLUSIVE: Single-consumer queues for ordered processing or exclusive access.

const (
	QueueType_SIMPLE    QueueType = 0 // Multiple workers can consume messages concurrently (default)
	QueueType_EXCLUSIVE QueueType = 1 // Only one worker can consume from this queue at a time
)

func (QueueType) Descriptor

func (QueueType) Descriptor() protoreflect.EnumDescriptor

func (QueueType) Enum

func (x QueueType) Enum() *QueueType

func (QueueType) EnumDescriptor deprecated

func (QueueType) EnumDescriptor() ([]byte, []int)

Deprecated: Use QueueType.Descriptor instead.

func (QueueType) Number

func (x QueueType) Number() protoreflect.EnumNumber

func (QueueType) String

func (x QueueType) String() string

func (QueueType) Type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL