Documentation
¶
Index ¶
- Constants
- func DefaultServerConnector(address string, opts ClientOptions) (queueservice_pb.QueueServiceClient, *grpc.ClientConn, error)
- func ParseQueueType(queueType string) int32
- type ChronoQueueClient
- func (client *ChronoQueueClient) AcknowledgeMessage(ctx context.Context, queue string, messageId string, state State, ...) (*queueservice_pb.AcknowledgeMessageResponse, error)
- func (client *ChronoQueueClient) Close()
- func (client *ChronoQueueClient) CreateQueue(ctx context.Context, name string, queueOptions QueueOptions) (*queueservice_pb.CreateQueueResponse, error)
- func (client *ChronoQueueClient) CreateSchedule(ctx context.Context, scheduleId string, scheduleOptions ScheduleOptions) (*queueservice_pb.CreateScheduleResponse, error)
- func (client *ChronoQueueClient) DeleteFromDLQ(ctx context.Context, dlqName string, messageId string) (*queueservice_pb.DeleteFromDLQResponse, error)
- func (client *ChronoQueueClient) DeleteQueue(ctx context.Context, name string) (*queueservice_pb.DeleteQueueResponse, error)
- func (client *ChronoQueueClient) DeleteSchedule(ctx context.Context, scheduleId string) (*queueservice_pb.DeleteScheduleResponse, error)
- func (client *ChronoQueueClient) DeleteSchema(ctx context.Context, schemaID string, version int32) error
- func (client *ChronoQueueClient) GetDLQMessages(ctx context.Context, dlqName string, limit int32) (*queueservice_pb.GetDLQMessagesResponse, error)
- func (client *ChronoQueueClient) GetDLQStats(ctx context.Context, dlqName string) (*queueservice_pb.GetDLQStatsResponse, error)
- func (client *ChronoQueueClient) GetNextMessage(ctx context.Context, queue string, leaseDuration string, enableHeartbeat bool) (*queueservice_pb.GetNextMessageResponse, error)
- func (client *ChronoQueueClient) GetQueueState(ctx context.Context, queue string) (*queueservice_pb.GetQueueStateResponse, error)
- func (client *ChronoQueueClient) GetSchedule(ctx context.Context, scheduleId string) (*queueservice_pb.GetScheduleResponse, error)
- func (client *ChronoQueueClient) GetScheduleHistory(ctx context.Context, scheduleId string, limit int64) (*queueservice_pb.GetScheduleHistoryResponse, error)
- func (client *ChronoQueueClient) GetSchema(ctx context.Context, schemaID string, version int32) (map[string]interface{}, error)
- func (client *ChronoQueueClient) ListQueues(ctx context.Context, prefix string) (*queueservice_pb.ListQueuesResponse, error)
- func (client *ChronoQueueClient) ListSchedules(ctx context.Context, prefix string) (*queueservice_pb.ListSchedulesResponse, error)
- func (client *ChronoQueueClient) ListSchemas(ctx context.Context, prefix string, limit int32, activeOnly bool) ([]map[string]interface{}, error)
- func (client *ChronoQueueClient) PauseSchedule(ctx context.Context, scheduleId string) (*queueservice_pb.PauseScheduleResponse, error)
- func (client *ChronoQueueClient) PeekQueueMessages(ctx context.Context, queue string, limit int32, timeRange TimeRangeOption) (*queueservice_pb.PeekQueueMessagesResponse, error)
- func (client *ChronoQueueClient) PostMessage(ctx context.Context, queue string, messageId string, ...) (*queueservice_pb.PostMessageResponse, error)
- func (client *ChronoQueueClient) PreviewCalendarSchedule(ctx context.Context, calendarScheduleJSON string, count int32) (*queueservice_pb.PreviewCalendarScheduleResponse, error)
- func (client *ChronoQueueClient) PurgeDLQ(ctx context.Context, dlqName string) (*queueservice_pb.PurgeDLQResponse, error)
- func (client *ChronoQueueClient) RegisterSchema(ctx context.Context, schemaID string, options SchemaOptions) error
- func (client *ChronoQueueClient) RenewMessageLease(ctx context.Context, queue string, messageId string, leaseDuration string) (*queueservice_pb.RenewMessageLeaseResponse, error)
- func (client *ChronoQueueClient) RequeueFromDLQ(ctx context.Context, dlqName string, messageId string, targetQueue string) (*queueservice_pb.RequeueFromDLQResponse, error)
- func (client *ChronoQueueClient) ResumeSchedule(ctx context.Context, scheduleId string) (*queueservice_pb.ResumeScheduleResponse, error)
- func (client *ChronoQueueClient) SendMessageHeartbeat(ctx context.Context, queueName string, messageId string, streamEntryID string) (*queueservice_pb.SendMessageHeartBeatResponse, error)
- func (client *ChronoQueueClient) StopHeartbeat(messageID string)
- func (client *ChronoQueueClient) ValidateCalendarSchedule(ctx context.Context, calendarScheduleJSON string) (*queueservice_pb.ValidateCalendarScheduleResponse, error)
- func (client *ChronoQueueClient) ValidatePayload(ctx context.Context, schemaID string, version int32, payloadJSON string) error
- type ClientOptions
- type Connector
- type DLQStats
- type MessageOptions
- type Payload
- type QueueOptions
- type ScheduleOptions
- type SchemaOptions
- type State
- type TimeRangeOption
- type WorkItem
Constants ¶
const ( DefaultMaxRetries = 5 DefaultInitialBackoff = 500 * time.Millisecond DefaultMaxBackoff = 60 * time.Second DefaultMaxHeartBeatWorkers = 10 )
Variables ¶
This section is empty.
Functions ¶
func DefaultServerConnector ¶
func DefaultServerConnector(address string, opts ClientOptions) (queueservice_pb.QueueServiceClient, *grpc.ClientConn, error)
func ParseQueueType ¶
Function to convert SIMPLE queue type string to enum
Types ¶
type ChronoQueueClient ¶
type ChronoQueueClient struct {
// contains filtered or unexported fields
}
ChronoQueueClient is a client to call ChronoQueue RPC
func NewChronoQueueClient ¶
func NewChronoQueueClient(address string, opts ClientOptions) (*ChronoQueueClient, error)
NewChronoQueueClient returns a new ChronoQueue client
func (*ChronoQueueClient) AcknowledgeMessage ¶
func (client *ChronoQueueClient) AcknowledgeMessage(ctx context.Context, queue string, messageId string, state State, streamEntryID string) (*queueservice_pb.AcknowledgeMessageResponse, error)
AcknowledgeMessage updates state of a message and empty response Automatically stops heartbeat for the message if one is active. The streamEntryID is the Redis Stream entry ID returned from GetNextMessage.
func (*ChronoQueueClient) CreateQueue ¶
func (client *ChronoQueueClient) CreateQueue(ctx context.Context, name string, queueOptions QueueOptions) (*queueservice_pb.CreateQueueResponse, error)
CreateQueue create a queue and returns empty response
func (*ChronoQueueClient) CreateSchedule ¶
func (client *ChronoQueueClient) CreateSchedule(ctx context.Context, scheduleId string, scheduleOptions ScheduleOptions) (*queueservice_pb.CreateScheduleResponse, error)
CreateSchedule creates a schedule and returns an empty response
func (*ChronoQueueClient) DeleteFromDLQ ¶
func (client *ChronoQueueClient) DeleteFromDLQ(ctx context.Context, dlqName string, messageId string) (*queueservice_pb.DeleteFromDLQResponse, error)
DeleteFromDLQ permanently deletes a message from a DLQ
func (*ChronoQueueClient) DeleteQueue ¶
func (client *ChronoQueueClient) DeleteQueue(ctx context.Context, name string) (*queueservice_pb.DeleteQueueResponse, error)
DeleteQueue deletes a queue and returns empty response
func (*ChronoQueueClient) DeleteSchedule ¶
func (client *ChronoQueueClient) DeleteSchedule(ctx context.Context, scheduleId string) (*queueservice_pb.DeleteScheduleResponse, error)
DeleteSchedule deletes a schedule and returns an empty response
func (*ChronoQueueClient) DeleteSchema ¶
func (client *ChronoQueueClient) DeleteSchema(ctx context.Context, schemaID string, version int32) error
DeleteSchema removes a schema version or all versions version = 0 means delete all versions
func (*ChronoQueueClient) GetDLQMessages ¶
func (client *ChronoQueueClient) GetDLQMessages(ctx context.Context, dlqName string, limit int32) (*queueservice_pb.GetDLQMessagesResponse, error)
GetDLQMessages retrieves messages from a Dead Letter Queue
func (*ChronoQueueClient) GetDLQStats ¶
func (client *ChronoQueueClient) GetDLQStats(ctx context.Context, dlqName string) (*queueservice_pb.GetDLQStatsResponse, error)
GetDLQStats returns statistics about a DLQ
func (*ChronoQueueClient) GetNextMessage ¶
func (client *ChronoQueueClient) GetNextMessage(ctx context.Context, queue string, leaseDuration string, enableHeartbeat bool) (*queueservice_pb.GetNextMessageResponse, error)
GetNextMessage returns next message on a queue
func (*ChronoQueueClient) GetQueueState ¶
func (client *ChronoQueueClient) GetQueueState(ctx context.Context, queue string) (*queueservice_pb.GetQueueStateResponse, error)
GetQueueState returns state of a queue
func (*ChronoQueueClient) GetSchedule ¶
func (client *ChronoQueueClient) GetSchedule(ctx context.Context, scheduleId string) (*queueservice_pb.GetScheduleResponse, error)
GetSchedule returns a schedule
func (*ChronoQueueClient) GetScheduleHistory ¶
func (client *ChronoQueueClient) GetScheduleHistory(ctx context.Context, scheduleId string, limit int64) (*queueservice_pb.GetScheduleHistoryResponse, error)
GetScheduleHistory returns the history of a schedule
func (*ChronoQueueClient) GetSchema ¶
func (client *ChronoQueueClient) GetSchema(ctx context.Context, schemaID string, version int32) (map[string]interface{}, error)
GetSchema retrieves a schema by ID and optional version version = 0 means get the latest version
func (*ChronoQueueClient) ListQueues ¶
func (client *ChronoQueueClient) ListQueues(ctx context.Context, prefix string) (*queueservice_pb.ListQueuesResponse, error)
ListQueues returns list of available queues.
func (*ChronoQueueClient) ListSchedules ¶
func (client *ChronoQueueClient) ListSchedules(ctx context.Context, prefix string) (*queueservice_pb.ListSchedulesResponse, error)
ListSchedules returns list of schedules
func (*ChronoQueueClient) ListSchemas ¶
func (client *ChronoQueueClient) ListSchemas(ctx context.Context, prefix string, limit int32, activeOnly bool) ([]map[string]interface{}, error)
ListSchemas returns all schemas matching the criteria
func (*ChronoQueueClient) PauseSchedule ¶
func (client *ChronoQueueClient) PauseSchedule(ctx context.Context, scheduleId string) (*queueservice_pb.PauseScheduleResponse, error)
PauseSchedule pauses a schedule
func (*ChronoQueueClient) PeekQueueMessages ¶
func (client *ChronoQueueClient) PeekQueueMessages(ctx context.Context, queue string, limit int32, timeRange TimeRangeOption) (*queueservice_pb.PeekQueueMessagesResponse, error)
PeekQueueMessages returns messages on a queue that are in pending state
func (*ChronoQueueClient) PostMessage ¶
func (client *ChronoQueueClient) PostMessage(ctx context.Context, queue string, messageId string, messageOptions MessageOptions) (*queueservice_pb.PostMessageResponse, error)
PostMessage create adds a message to the queue and returns empty response
func (*ChronoQueueClient) PreviewCalendarSchedule ¶
func (client *ChronoQueueClient) PreviewCalendarSchedule(ctx context.Context, calendarScheduleJSON string, count int32) (*queueservice_pb.PreviewCalendarScheduleResponse, error)
PreviewCalendarSchedule previews execution times for a calendar schedule
func (*ChronoQueueClient) PurgeDLQ ¶
func (client *ChronoQueueClient) PurgeDLQ(ctx context.Context, dlqName string) (*queueservice_pb.PurgeDLQResponse, error)
PurgeDLQ removes all messages from a DLQ
func (*ChronoQueueClient) RegisterSchema ¶
func (client *ChronoQueueClient) RegisterSchema(ctx context.Context, schemaID string, options SchemaOptions) error
RegisterSchema registers a new schema or creates a new version of an existing schema This is a client-side implementation that will work once server-side methods are added
func (*ChronoQueueClient) RenewMessageLease ¶
func (client *ChronoQueueClient) RenewMessageLease(ctx context.Context, queue string, messageId string, leaseDuration string) (*queueservice_pb.RenewMessageLeaseResponse, error)
RenewMessageLease updates a message's lease duration and returns empty response
func (*ChronoQueueClient) RequeueFromDLQ ¶
func (client *ChronoQueueClient) RequeueFromDLQ(ctx context.Context, dlqName string, messageId string, targetQueue string) (*queueservice_pb.RequeueFromDLQResponse, error)
RequeueFromDLQ moves a message from DLQ back to its original queue or specified target queue
func (*ChronoQueueClient) ResumeSchedule ¶
func (client *ChronoQueueClient) ResumeSchedule(ctx context.Context, scheduleId string) (*queueservice_pb.ResumeScheduleResponse, error)
ResumeSchedule resumes a schedule
func (*ChronoQueueClient) SendMessageHeartbeat ¶
func (client *ChronoQueueClient) SendMessageHeartbeat(ctx context.Context, queueName string, messageId string, streamEntryID string) (*queueservice_pb.SendMessageHeartBeatResponse, error)
SendMessageHeartbeat sends a heartbeat for an in-flight message. The streamEntryID is the Redis Stream entry ID returned from GetNextMessage.
func (*ChronoQueueClient) StopHeartbeat ¶
func (client *ChronoQueueClient) StopHeartbeat(messageID string)
StopHeartbeat explicitly stops the heartbeat for a specific message. This should be called when message processing completes (success or failure) to ensure the heartbeat goroutine terminates cleanly.
func (*ChronoQueueClient) ValidateCalendarSchedule ¶
func (client *ChronoQueueClient) ValidateCalendarSchedule(ctx context.Context, calendarScheduleJSON string) (*queueservice_pb.ValidateCalendarScheduleResponse, error)
ValidateCalendarSchedule validates a calendar schedule configuration
func (*ChronoQueueClient) ValidatePayload ¶
func (client *ChronoQueueClient) ValidatePayload(ctx context.Context, schemaID string, version int32, payloadJSON string) error
ValidatePayload validates a payload against a schema
type ClientOptions ¶
type ClientOptions struct {
MaxRetries int
InitialBackoff time.Duration
MaxBackoff time.Duration
MaxHeartBeatWorkers int
DefaultRPCTimeout time.Duration
TLSCredentials credentials.TransportCredentials // Define as per your gRPC setup
Connector Connector // User-provided Connector
MaxHeartbeatRetryCount int
SendMessageHeartbeatFunc func(context.Context, string, string) (*queueservice_pb.SendMessageHeartBeatResponse, error)
}
type Connector ¶
type Connector func(address string, opts ClientOptions) (queueservice_pb.QueueServiceClient, *grpc.ClientConn, error)
type DLQStats ¶
type DLQStats struct {
Name string `json:"name"`
MessageCount int64 `json:"message_count"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
DLQStats represents statistics about a Dead Letter Queue
type MessageOptions ¶
type MessageOptions struct {
Payload Payload `json:"payload,omitempty"`
AttemptsLeft int32 `json:"attemptsLeft,omitempty"`
MaxAttempts int32 `json:"maxAttempts,omitempty"`
ScheduledTime *time.Time `json:"scheduledTime,omitempty"`
LeaseDuration string `json:"leaseDuration"`
LeaseExpiry int64 `json:"leaseExpiry,omitempty"`
State State `json:"state,omitempty"`
InvisibilityExpiry int64 `json:"invisibilityExpiry,omitempty"`
Priority int64 `json:"Priority,omitempty"`
}
type Payload ¶
type Payload struct {
Metadata map[string]*structpb.Value `json:"metadata,omitempty"`
Data *structpb.Struct `json:"data,omitempty"`
ContentType string `json:"contentType,omitempty"` // NEW: MIME type
SchemaID string `json:"schemaId,omitempty"` // NEW: Schema reference
SchemaVersion int32 `json:"schemaVersion,omitempty"` // NEW: Schema version
}
type QueueOptions ¶
type QueueOptions struct {
DequeueAttempts int32 `json:"dequeueAttempts,omitempty"`
ExclusivityKey string `json:"exclusivityKey,omitempty"`
LeaseDuration string `json:"leaseDuration"`
Type int32 `json:"type,omitempty"`
DeadLetterQueueName string `json:"deadLetterQueueName,omitempty"`
AutoCreateDLQ bool `json:"autoCreateDLQ,omitempty"`
}
type ScheduleOptions ¶
type ScheduleOptions struct {
Payload Payload `json:"payload,omitempty"`
State State `json:"state,omitempty"`
CronSchedule string `json:"cronSchedule,omitempty"`
CalendarSchedule *schedule_pb.CalendarSchedule `json:"calendarSchedule,omitempty"` // New: for calendar-based scheduling
QueueName string `json:"queueName,omitempty"`
ExclusivityKey string `json:"exclusivityKey,omitempty"`
MaxMessages int64 `json:"maxMessages,omitempty"`
LeaseDuration string `json:"leaseDuration,omitempty"`
}
type SchemaOptions ¶
type SchemaOptions struct {
Name string // Human-readable schema name
Description string // Schema description
Content string // JSON Schema content
ContentType string // Schema type (default: "json-schema")
Metadata map[string]string // Additional metadata
}
SchemaOptions contains options for schema registration