client

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2025 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxRetries          = 5
	DefaultInitialBackoff      = 500 * time.Millisecond
	DefaultMaxBackoff          = 60 * time.Second
	DefaultMaxHeartBeatWorkers = 10
)

Variables

This section is empty.

Functions

func ParseQueueType

func ParseQueueType(queueType string) int32

Function to convert SIMPLE queue type string to enum

Types

type ChronoQueueClient

type ChronoQueueClient struct {
	// contains filtered or unexported fields
}

ChronoQueueClient is a client to call ChronoQueue RPC

func NewChronoQueueClient

func NewChronoQueueClient(address string, opts ClientOptions) (*ChronoQueueClient, error)

NewChronoQueueClient returns a new ChronoQueue client

func (*ChronoQueueClient) AcknowledgeMessage

func (client *ChronoQueueClient) AcknowledgeMessage(ctx context.Context, queue string, messageId string, state State, streamEntryID string) (*queueservice_pb.AcknowledgeMessageResponse, error)

AcknowledgeMessage updates state of a message and empty response Automatically stops heartbeat for the message if one is active. The streamEntryID is the Redis Stream entry ID returned from GetNextMessage.

func (*ChronoQueueClient) Close

func (client *ChronoQueueClient) Close()

Close closes the client

func (*ChronoQueueClient) CreateQueue

func (client *ChronoQueueClient) CreateQueue(ctx context.Context, name string, queueOptions QueueOptions) (*queueservice_pb.CreateQueueResponse, error)

CreateQueue create a queue and returns empty response

func (*ChronoQueueClient) CreateSchedule

func (client *ChronoQueueClient) CreateSchedule(ctx context.Context, scheduleId string, scheduleOptions ScheduleOptions) (*queueservice_pb.CreateScheduleResponse, error)

CreateSchedule creates a schedule and returns an empty response

func (*ChronoQueueClient) DeleteFromDLQ

func (client *ChronoQueueClient) DeleteFromDLQ(ctx context.Context, dlqName string, messageId string) (*queueservice_pb.DeleteFromDLQResponse, error)

DeleteFromDLQ permanently deletes a message from a DLQ

func (*ChronoQueueClient) DeleteQueue

DeleteQueue deletes a queue and returns empty response

func (*ChronoQueueClient) DeleteSchedule

func (client *ChronoQueueClient) DeleteSchedule(ctx context.Context, scheduleId string) (*queueservice_pb.DeleteScheduleResponse, error)

DeleteSchedule deletes a schedule and returns an empty response

func (*ChronoQueueClient) DeleteSchema

func (client *ChronoQueueClient) DeleteSchema(ctx context.Context, schemaID string, version int32) error

DeleteSchema removes a schema version or all versions version = 0 means delete all versions

func (*ChronoQueueClient) GetDLQMessages

func (client *ChronoQueueClient) GetDLQMessages(ctx context.Context, dlqName string, limit int32) (*queueservice_pb.GetDLQMessagesResponse, error)

GetDLQMessages retrieves messages from a Dead Letter Queue

func (*ChronoQueueClient) GetDLQStats

func (client *ChronoQueueClient) GetDLQStats(ctx context.Context, dlqName string) (*queueservice_pb.GetDLQStatsResponse, error)

GetDLQStats returns statistics about a DLQ

func (*ChronoQueueClient) GetNextMessage

func (client *ChronoQueueClient) GetNextMessage(ctx context.Context, queue string, leaseDuration string, enableHeartbeat bool) (*queueservice_pb.GetNextMessageResponse, error)

GetNextMessage returns next message on a queue

func (*ChronoQueueClient) GetQueueState

func (client *ChronoQueueClient) GetQueueState(ctx context.Context, queue string) (*queueservice_pb.GetQueueStateResponse, error)

GetQueueState returns state of a queue

func (*ChronoQueueClient) GetSchedule

func (client *ChronoQueueClient) GetSchedule(ctx context.Context, scheduleId string) (*queueservice_pb.GetScheduleResponse, error)

GetSchedule returns a schedule

func (*ChronoQueueClient) GetScheduleHistory

func (client *ChronoQueueClient) GetScheduleHistory(ctx context.Context, scheduleId string, limit int64) (*queueservice_pb.GetScheduleHistoryResponse, error)

GetScheduleHistory returns the history of a schedule

func (*ChronoQueueClient) GetSchema

func (client *ChronoQueueClient) GetSchema(ctx context.Context, schemaID string, version int32) (map[string]interface{}, error)

GetSchema retrieves a schema by ID and optional version version = 0 means get the latest version

func (*ChronoQueueClient) ListQueues

func (client *ChronoQueueClient) ListQueues(ctx context.Context, prefix string) (*queueservice_pb.ListQueuesResponse, error)

ListQueues returns list of available queues.

func (*ChronoQueueClient) ListSchedules

func (client *ChronoQueueClient) ListSchedules(ctx context.Context, prefix string) (*queueservice_pb.ListSchedulesResponse, error)

ListSchedules returns list of schedules

func (*ChronoQueueClient) ListSchemas

func (client *ChronoQueueClient) ListSchemas(ctx context.Context, prefix string, limit int32, activeOnly bool) ([]map[string]interface{}, error)

ListSchemas returns all schemas matching the criteria

func (*ChronoQueueClient) PauseSchedule

func (client *ChronoQueueClient) PauseSchedule(ctx context.Context, scheduleId string) (*queueservice_pb.PauseScheduleResponse, error)

PauseSchedule pauses a schedule

func (*ChronoQueueClient) PeekQueueMessages

func (client *ChronoQueueClient) PeekQueueMessages(ctx context.Context, queue string, limit int32, timeRange TimeRangeOption) (*queueservice_pb.PeekQueueMessagesResponse, error)

PeekQueueMessages returns messages on a queue that are in pending state

func (*ChronoQueueClient) PostMessage

func (client *ChronoQueueClient) PostMessage(ctx context.Context, queue string, messageId string, messageOptions MessageOptions) (*queueservice_pb.PostMessageResponse, error)

PostMessage create adds a message to the queue and returns empty response

func (*ChronoQueueClient) PreviewCalendarSchedule

func (client *ChronoQueueClient) PreviewCalendarSchedule(ctx context.Context, calendarScheduleJSON string, count int32) (*queueservice_pb.PreviewCalendarScheduleResponse, error)

PreviewCalendarSchedule previews execution times for a calendar schedule

func (*ChronoQueueClient) PurgeDLQ

func (client *ChronoQueueClient) PurgeDLQ(ctx context.Context, dlqName string) (*queueservice_pb.PurgeDLQResponse, error)

PurgeDLQ removes all messages from a DLQ

func (*ChronoQueueClient) RegisterSchema

func (client *ChronoQueueClient) RegisterSchema(ctx context.Context, schemaID string, options SchemaOptions) error

RegisterSchema registers a new schema or creates a new version of an existing schema This is a client-side implementation that will work once server-side methods are added

func (*ChronoQueueClient) RenewMessageLease

func (client *ChronoQueueClient) RenewMessageLease(ctx context.Context, queue string, messageId string, leaseDuration string) (*queueservice_pb.RenewMessageLeaseResponse, error)

RenewMessageLease updates a message's lease duration and returns empty response

func (*ChronoQueueClient) RequeueFromDLQ

func (client *ChronoQueueClient) RequeueFromDLQ(ctx context.Context, dlqName string, messageId string, targetQueue string) (*queueservice_pb.RequeueFromDLQResponse, error)

RequeueFromDLQ moves a message from DLQ back to its original queue or specified target queue

func (*ChronoQueueClient) ResumeSchedule

func (client *ChronoQueueClient) ResumeSchedule(ctx context.Context, scheduleId string) (*queueservice_pb.ResumeScheduleResponse, error)

ResumeSchedule resumes a schedule

func (*ChronoQueueClient) SendMessageHeartbeat

func (client *ChronoQueueClient) SendMessageHeartbeat(ctx context.Context, queueName string, messageId string, streamEntryID string) (*queueservice_pb.SendMessageHeartBeatResponse, error)

SendMessageHeartbeat sends a heartbeat for an in-flight message. The streamEntryID is the Redis Stream entry ID returned from GetNextMessage.

func (*ChronoQueueClient) StopHeartbeat

func (client *ChronoQueueClient) StopHeartbeat(messageID string)

StopHeartbeat explicitly stops the heartbeat for a specific message. This should be called when message processing completes (success or failure) to ensure the heartbeat goroutine terminates cleanly.

func (*ChronoQueueClient) ValidateCalendarSchedule

func (client *ChronoQueueClient) ValidateCalendarSchedule(ctx context.Context, calendarScheduleJSON string) (*queueservice_pb.ValidateCalendarScheduleResponse, error)

ValidateCalendarSchedule validates a calendar schedule configuration

func (*ChronoQueueClient) ValidatePayload

func (client *ChronoQueueClient) ValidatePayload(ctx context.Context, schemaID string, version int32, payloadJSON string) error

ValidatePayload validates a payload against a schema

type ClientOptions

type ClientOptions struct {
	MaxRetries               int
	InitialBackoff           time.Duration
	MaxBackoff               time.Duration
	MaxHeartBeatWorkers      int
	DefaultRPCTimeout        time.Duration
	TLSCredentials           credentials.TransportCredentials // Define as per your gRPC setup
	Connector                Connector                        // User-provided Connector
	MaxHeartbeatRetryCount   int
	SendMessageHeartbeatFunc func(context.Context, string, string) (*queueservice_pb.SendMessageHeartBeatResponse, error)
}

type Connector

type Connector func(address string, opts ClientOptions) (queueservice_pb.QueueServiceClient, *grpc.ClientConn, error)

type DLQStats

type DLQStats struct {
	Name         string `json:"name"`
	MessageCount int64  `json:"message_count"`
	CreatedAt    int64  `json:"created_at"`
	UpdatedAt    int64  `json:"updated_at"`
}

DLQStats represents statistics about a Dead Letter Queue

type MessageOptions

type MessageOptions struct {
	Payload            Payload    `json:"payload,omitempty"`
	AttemptsLeft       int32      `json:"attemptsLeft,omitempty"`
	MaxAttempts        int32      `json:"maxAttempts,omitempty"`
	ScheduledTime      *time.Time `json:"scheduledTime,omitempty"`
	LeaseDuration      string     `json:"leaseDuration"`
	LeaseExpiry        int64      `json:"leaseExpiry,omitempty"`
	State              State      `json:"state,omitempty"`
	InvisibilityExpiry int64      `json:"invisibilityExpiry,omitempty"`
	Priority           int64      `json:"Priority,omitempty"`
}

type Payload

type Payload struct {
	Metadata      map[string]*structpb.Value `json:"metadata,omitempty"`
	Data          *structpb.Struct           `json:"data,omitempty"`
	ContentType   string                     `json:"contentType,omitempty"`   // NEW: MIME type
	SchemaID      string                     `json:"schemaId,omitempty"`      // NEW: Schema reference
	SchemaVersion int32                      `json:"schemaVersion,omitempty"` // NEW: Schema version
}

type QueueOptions

type QueueOptions struct {
	DequeueAttempts     int32  `json:"dequeueAttempts,omitempty"`
	ExclusivityKey      string `json:"exclusivityKey,omitempty"`
	LeaseDuration       string `json:"leaseDuration"`
	Type                int32  `json:"type,omitempty"`
	DeadLetterQueueName string `json:"deadLetterQueueName,omitempty"`
	AutoCreateDLQ       bool   `json:"autoCreateDLQ,omitempty"`
}

type ScheduleOptions

type ScheduleOptions struct {
	Payload          Payload                       `json:"payload,omitempty"`
	State            State                         `json:"state,omitempty"`
	CronSchedule     string                        `json:"cronSchedule,omitempty"`
	CalendarSchedule *schedule_pb.CalendarSchedule `json:"calendarSchedule,omitempty"` // New: for calendar-based scheduling
	QueueName        string                        `json:"queueName,omitempty"`
	ExclusivityKey   string                        `json:"exclusivityKey,omitempty"`
	MaxMessages      int64                         `json:"maxMessages,omitempty"`
	LeaseDuration    string                        `json:"leaseDuration,omitempty"`
}

type SchemaOptions

type SchemaOptions struct {
	Name        string            // Human-readable schema name
	Description string            // Schema description
	Content     string            // JSON Schema content
	ContentType string            // Schema type (default: "json-schema")
	Metadata    map[string]string // Additional metadata
}

SchemaOptions contains options for schema registration

type State

type State int32
const (
	// Message states match the proto enum values
	MESSAGE_INVISIBLE State = 0
	MESSAGE_PENDING   State = 1
	MESSAGE_RUNNING   State = 2
	MESSAGE_COMPLETED State = 3
	MESSAGE_CANCELED  State = 4
	MESSAGE_ERRORED   State = 5
)

func ParseMessageState

func ParseMessageState(state string) (State, error)

type TimeRangeOption

type TimeRangeOption struct {
	Min int64 `json:"min,omitempty"`
	Max int64 `json:"max,omitempty"`
}

type WorkItem

type WorkItem struct {
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL