Documentation
¶
Index ¶
- Variables
- type FairnessPolicy
- func (FairnessPolicy) Descriptor() protoreflect.EnumDescriptor
- func (x FairnessPolicy) Enum() *FairnessPolicy
- func (FairnessPolicy) EnumDescriptor() ([]byte, []int)deprecated
- func (x FairnessPolicy) Number() protoreflect.EnumNumber
- func (x FairnessPolicy) String() string
- func (FairnessPolicy) Type() protoreflect.EnumType
- type PriorityConfig
- func (*PriorityConfig) Descriptor() ([]byte, []int)deprecated
- func (x *PriorityConfig) GetAgeBoostMultiplier() int32
- func (x *PriorityConfig) GetAgeBoostThreshold() *durationpb.Duration
- func (x *PriorityConfig) GetPolicy() FairnessPolicy
- func (x *PriorityConfig) GetPriorityWeights() map[int32]int32
- func (*PriorityConfig) ProtoMessage()
- func (x *PriorityConfig) ProtoReflect() protoreflect.Message
- func (x *PriorityConfig) Reset()
- func (x *PriorityConfig) String() string
- type Queue
- type QueueMetadata
- func (*QueueMetadata) Descriptor() ([]byte, []int)deprecated
- func (x *QueueMetadata) GetAllowedContentTypes() []string
- func (x *QueueMetadata) GetAutoCreateDlq() bool
- func (x *QueueMetadata) GetDeadLetterQueueName() string
- func (x *QueueMetadata) GetDefaultMaxAttempts() int32
- func (x *QueueMetadata) GetExclusivityKey() string
- func (x *QueueMetadata) GetLeaseDuration() *durationpb.Duration
- func (x *QueueMetadata) GetLeasePolicy() *v1.LeasePolicy
- func (x *QueueMetadata) GetMaxPayloadSize() int32
- func (x *QueueMetadata) GetPriorityConfig() *PriorityConfig
- func (x *QueueMetadata) GetSchemaId() string
- func (x *QueueMetadata) GetSchemaRequired() bool
- func (x *QueueMetadata) GetType() QueueType
- func (*QueueMetadata) ProtoMessage()
- func (x *QueueMetadata) ProtoReflect() protoreflect.Message
- func (x *QueueMetadata) Reset()
- func (x *QueueMetadata) String() string
- type QueueType
Constants ¶
This section is empty.
Variables ¶
var ( QueueType_name = map[int32]string{ 0: "SIMPLE", 1: "EXCLUSIVE", } QueueType_value = map[string]int32{ "SIMPLE": 0, "EXCLUSIVE": 1, } )
Enum value maps for QueueType.
var ( FairnessPolicy_name = map[int32]string{ 0: "STRICT", 1: "WEIGHTED", 2: "AGING", 3: "HYBRID", } FairnessPolicy_value = map[string]int32{ "STRICT": 0, "WEIGHTED": 1, "AGING": 2, "HYBRID": 3, } )
Enum value maps for FairnessPolicy.
var File_proto_queue_v1_queue_proto protoreflect.FileDescriptor
Functions ¶
This section is empty.
Types ¶
type FairnessPolicy ¶
type FairnessPolicy int32
FairnessPolicy determines how messages with different priorities compete for processing.
Priority fairness is critical in systems with both urgent and background tasks. Without fairness policies, low-priority messages could starve indefinitely.
const ( FairnessPolicy_STRICT FairnessPolicy = 0 // Process strictly by priority (high before medium before low) FairnessPolicy_WEIGHTED FairnessPolicy = 1 // Balance priority with fairness using weights FairnessPolicy_AGING FairnessPolicy = 2 // Boost priority of messages waiting too long FairnessPolicy_HYBRID FairnessPolicy = 3 // Combines weighted and aging strategies )
func (FairnessPolicy) Descriptor ¶
func (FairnessPolicy) Descriptor() protoreflect.EnumDescriptor
func (FairnessPolicy) Enum ¶
func (x FairnessPolicy) Enum() *FairnessPolicy
func (FairnessPolicy) EnumDescriptor
deprecated
func (FairnessPolicy) EnumDescriptor() ([]byte, []int)
Deprecated: Use FairnessPolicy.Descriptor instead.
func (FairnessPolicy) Number ¶
func (x FairnessPolicy) Number() protoreflect.EnumNumber
func (FairnessPolicy) String ¶
func (x FairnessPolicy) String() string
func (FairnessPolicy) Type ¶
func (FairnessPolicy) Type() protoreflect.EnumType
type PriorityConfig ¶
type PriorityConfig struct {
// policy: The fairness strategy to use.
// See FairnessPolicy enum for detailed descriptions.
Policy FairnessPolicy `protobuf:"varint,1,opt,name=policy,proto3,enum=chronoqueue.api.queue.v1.FairnessPolicy" json:"policy,omitempty"`
// priority_weights: For WEIGHTED/HYBRID policies, maps priority levels to weights.
// Key: priority level (e.g., 100, 50, 10)
// Value: relative weight (e.g., 70, 20, 10 = 70% high, 20% medium, 10% low)
// Weights are relative - they're normalized to percentages.
PriorityWeights map[int32]int32 `` /* 182-byte string literal not displayed */
// age_boost_threshold: For AGING/HYBRID policies, how long before boosting priority.
// Messages waiting longer than this duration get their priority boosted.
// Prevents indefinite starvation of low-priority messages.
// Typical values: 15m (aggressive), 30m (standard), 1h (relaxed).
AgeBoostThreshold *durationpb.Duration `protobuf:"bytes,3,opt,name=age_boost_threshold,json=ageBoostThreshold,proto3" json:"age_boost_threshold,omitempty"`
// age_boost_multiplier: For AGING/HYBRID policies, how much to boost priority.
// Priority multiplied by this factor every age_boost_threshold period.
// Example: multiplier=2, threshold=30m means priority doubles every 30 minutes.
// Typical values: 1.5 (gentle), 2 (standard), 3 (aggressive).
AgeBoostMultiplier int32 `protobuf:"varint,4,opt,name=age_boost_multiplier,json=ageBoostMultiplier,proto3" json:"age_boost_multiplier,omitempty"`
// contains filtered or unexported fields
}
PriorityConfig provides fine-grained control over priority-based processing.
Example - Weighted fairness with aging:
config := &queue.PriorityConfig{
Policy: queue.FairnessPolicy_HYBRID,
PriorityWeights: map[int32]int32{
100: 70, // High priority: 70% of processing capacity
50: 20, // Medium priority: 20%
10: 10, // Low priority: 10%
},
AgeBoostThreshold: durationpb.New(30*time.Minute), // Boost after 30min wait
AgeBoostMultiplier: 2, // Double the priority every 30min
}
func (*PriorityConfig) Descriptor
deprecated
func (*PriorityConfig) Descriptor() ([]byte, []int)
Deprecated: Use PriorityConfig.ProtoReflect.Descriptor instead.
func (*PriorityConfig) GetAgeBoostMultiplier ¶
func (x *PriorityConfig) GetAgeBoostMultiplier() int32
func (*PriorityConfig) GetAgeBoostThreshold ¶
func (x *PriorityConfig) GetAgeBoostThreshold() *durationpb.Duration
func (*PriorityConfig) GetPolicy ¶
func (x *PriorityConfig) GetPolicy() FairnessPolicy
func (*PriorityConfig) GetPriorityWeights ¶
func (x *PriorityConfig) GetPriorityWeights() map[int32]int32
func (*PriorityConfig) ProtoMessage ¶
func (*PriorityConfig) ProtoMessage()
func (*PriorityConfig) ProtoReflect ¶
func (x *PriorityConfig) ProtoReflect() protoreflect.Message
func (*PriorityConfig) Reset ¶
func (x *PriorityConfig) Reset()
func (*PriorityConfig) String ¶
func (x *PriorityConfig) String() string
type Queue ¶
type Queue struct {
// name: Unique identifier for this queue.
// Must be unique across your ChronoQueue instance.
// Naming convention: lowercase, hyphens, descriptive
// Examples: "order-processing", "email-sender", "webhook-delivery"
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
// metadata: Queue configuration and behavior settings.
Metadata *QueueMetadata `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"`
// contains filtered or unexported fields
}
Queue represents a named message container with specific processing behavior.
Queues are the primary organizational unit in ChronoQueue. Create separate queues for different types of work to: - Isolate failures (one queue's DLQ doesn't affect others) - Apply different processing rules (retries, priorities, leases) - Scale consumers independently - Organize by domain (orders, notifications, analytics)
Example queue names: - "order-processing" // Core business logic - "email-notifications" // User communications - "webhook-deliveries" // External API calls - "data-analytics-high" // Time-sensitive analytics - "cleanup-tasks-low" // Background maintenance
func (*Queue) Descriptor
deprecated
func (*Queue) GetMetadata ¶
func (x *Queue) GetMetadata() *QueueMetadata
func (*Queue) ProtoMessage ¶
func (*Queue) ProtoMessage()
func (*Queue) ProtoReflect ¶
func (x *Queue) ProtoReflect() protoreflect.Message
type QueueMetadata ¶
type QueueMetadata struct {
// type: Determines message consumption pattern.
// SIMPLE: Multiple workers process concurrently (most common).
// EXCLUSIVE: Single worker processes sequentially (ensures order).
Type QueueType `protobuf:"varint,1,opt,name=type,proto3,enum=chronoqueue.api.queue.v1.QueueType" json:"type,omitempty"`
// default_max_attempts: Default retry count for messages without explicit max_attempts.
// Messages failing this many times move to the dead letter queue.
// Common values: 1 (no retry), 3 (standard), 5 (aggressive retry).
// Set to -1 for infinite retries (use with extreme caution).
DefaultMaxAttempts int32 `protobuf:"varint,2,opt,name=default_max_attempts,json=defaultMaxAttempts,proto3" json:"default_max_attempts,omitempty"`
// lease_duration: Default time workers have to process a message.
// If a worker doesn't acknowledge within this time, message returns to queue.
// Workers can renew leases for longer operations via RenewLease/SendHeartbeat.
// Common values: 30s (quick tasks), 5m (standard), 30m (long-running).
// Balance between: too short = unnecessary retries, too long = slow failure detection.
LeaseDuration *durationpb.Duration `protobuf:"bytes,3,opt,name=lease_duration,json=leaseDuration,proto3" json:"lease_duration,omitempty"`
// exclusivity_key: For EXCLUSIVE queues, this key ensures single-consumer access.
// Only one worker with a matching exclusivity key can consume from this queue.
// Leave empty for non-exclusive queues.
// Use cases: ordered processing, singleton workers, distributed locks.
ExclusivityKey string `protobuf:"bytes,4,opt,name=exclusivity_key,json=exclusivityKey,proto3" json:"exclusivity_key,omitempty"`
// dead_letter_queue_name: Name of the DLQ for messages that exhaust retries.
// Messages that fail after max_attempts are moved here for investigation.
// Naming convention: "{original-queue}-dlq" or "{original-queue}-errors"
// Access DLQ messages via DLQ APIs: GetDLQMessage, ListDLQMessages, RequeueDLQMessage.
DeadLetterQueueName string `protobuf:"bytes,6,opt,name=dead_letter_queue_name,json=deadLetterQueueName,proto3" json:"dead_letter_queue_name,omitempty"`
// auto_create_dlq: If true, ChronoQueue automatically creates the DLQ if it doesn't exist.
// Recommended: true for production (ensures no messages are lost).
// Set to false if you want to manually control DLQ creation and configuration.
AutoCreateDlq bool `protobuf:"varint,7,opt,name=auto_create_dlq,json=autoCreateDlq,proto3" json:"auto_create_dlq,omitempty"`
// schema_id: Default schema for validating messages posted to this queue.
// All messages must conform to this schema (if schema_required is true).
// Helps ensure data quality and catch errors before processing.
// Example: "order.created.v1", "user.profile.v2"
// Register schemas via RegisterSchema API.
SchemaId string `protobuf:"bytes,8,opt,name=schema_id,json=schemaId,proto3" json:"schema_id,omitempty"`
// schema_required: If true, all messages must have a valid schema and pass validation.
// Messages without a schema or failing validation are rejected.
// Use in production for critical data pipelines to prevent bad data.
// Set to false for development or when schema validation isn't needed.
SchemaRequired bool `protobuf:"varint,9,opt,name=schema_required,json=schemaRequired,proto3" json:"schema_required,omitempty"`
// max_payload_size: Maximum size of message payload in bytes.
// Messages exceeding this limit are rejected.
// If not set or 0, uses system default (typically 256KB).
// Increase for large payloads (e.g., 1MB = 1048576).
// Note: Very large messages impact performance - consider using references instead.
MaxPayloadSize int32 `protobuf:"varint,10,opt,name=max_payload_size,json=maxPayloadSize,proto3" json:"max_payload_size,omitempty"`
// allowed_content_types: List of permitted MIME types for message payloads.
// Empty list = all content types allowed (default).
// Examples: ["application/json"], ["application/json", "application/xml"]
// Use to enforce data format standards across your organization.
AllowedContentTypes []string `protobuf:"bytes,11,rep,name=allowed_content_types,json=allowedContentTypes,proto3" json:"allowed_content_types,omitempty"`
// priority_config: Advanced priority scheduling configuration.
// Controls how messages with different priorities are processed.
// Most users can omit this - defaults work well for common cases.
// Customize for: complex fairness requirements, aging prevention, weighted priorities.
PriorityConfig *PriorityConfig `protobuf:"bytes,12,opt,name=priority_config,json=priorityConfig,proto3" json:"priority_config,omitempty"`
// ---------------------------------------------------------------------------
// Lease model defaults (queue-level)
// ---------------------------------------------------------------------------
// Message-level policies can selectively override these fields.
LeasePolicy *v1.LeasePolicy `protobuf:"bytes,13,opt,name=lease_policy,json=leasePolicy,proto3" json:"lease_policy,omitempty"`
// contains filtered or unexported fields
}
QueueMetadata defines the configuration and behavior of a queue.
This controls: - Message processing rules (retries, leases, priorities) - Data validation (schemas, content types) - Error handling (dead letter queues) - Consumer patterns (exclusivity, fairness)
Example - Create a queue for order processing:
metadata := &queue.QueueMetadata{
Type: queue.QueueType_SIMPLE,
DefaultMaxAttempts: 3,
LeaseDuration: durationpb.New(5*time.Minute),
DeadLetterQueueName: "order-processing-dlq",
AutoCreateDlq: true,
SchemaId: "order.v1",
SchemaRequired: true,
MaxPayloadSize: 1024 * 512, // 512KB
AllowedContentTypes: []string{"application/json"},
}
func (*QueueMetadata) Descriptor
deprecated
func (*QueueMetadata) Descriptor() ([]byte, []int)
Deprecated: Use QueueMetadata.ProtoReflect.Descriptor instead.
func (*QueueMetadata) GetAllowedContentTypes ¶
func (x *QueueMetadata) GetAllowedContentTypes() []string
func (*QueueMetadata) GetAutoCreateDlq ¶
func (x *QueueMetadata) GetAutoCreateDlq() bool
func (*QueueMetadata) GetDeadLetterQueueName ¶
func (x *QueueMetadata) GetDeadLetterQueueName() string
func (*QueueMetadata) GetDefaultMaxAttempts ¶
func (x *QueueMetadata) GetDefaultMaxAttempts() int32
func (*QueueMetadata) GetExclusivityKey ¶
func (x *QueueMetadata) GetExclusivityKey() string
func (*QueueMetadata) GetLeaseDuration ¶
func (x *QueueMetadata) GetLeaseDuration() *durationpb.Duration
func (*QueueMetadata) GetLeasePolicy ¶ added in v1.1.1
func (x *QueueMetadata) GetLeasePolicy() *v1.LeasePolicy
func (*QueueMetadata) GetMaxPayloadSize ¶
func (x *QueueMetadata) GetMaxPayloadSize() int32
func (*QueueMetadata) GetPriorityConfig ¶
func (x *QueueMetadata) GetPriorityConfig() *PriorityConfig
func (*QueueMetadata) GetSchemaId ¶
func (x *QueueMetadata) GetSchemaId() string
func (*QueueMetadata) GetSchemaRequired ¶
func (x *QueueMetadata) GetSchemaRequired() bool
func (*QueueMetadata) GetType ¶
func (x *QueueMetadata) GetType() QueueType
func (*QueueMetadata) ProtoMessage ¶
func (*QueueMetadata) ProtoMessage()
func (*QueueMetadata) ProtoReflect ¶
func (x *QueueMetadata) ProtoReflect() protoreflect.Message
func (*QueueMetadata) Reset ¶
func (x *QueueMetadata) Reset()
func (*QueueMetadata) String ¶
func (x *QueueMetadata) String() string
type QueueType ¶
type QueueType int32
QueueType defines the message consumption pattern.
Choose based on your use case: - SIMPLE: Most common. Multiple workers can consume in parallel. - EXCLUSIVE: Single-consumer queues for ordered processing or exclusive access.
func (QueueType) Descriptor ¶
func (QueueType) Descriptor() protoreflect.EnumDescriptor
func (QueueType) EnumDescriptor
deprecated
func (QueueType) Number ¶
func (x QueueType) Number() protoreflect.EnumNumber
func (QueueType) Type ¶
func (QueueType) Type() protoreflect.EnumType