types

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2025 License: MIT Imports: 4 Imported by: 1

Documentation

Overview

Package types contains shared types for amq

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenerateAgentID

func GenerateAgentID() string

GenerateAgentID generates a unique agent ID

func GenerateMessageID

func GenerateMessageID() string

GenerateMessageID generates a unique message ID

func GenerateQueueID

func GenerateQueueID(name string) string

GenerateQueueID generates a unique queue ID

Types

type Client

type Client struct {
	ID        string            `json:"id"`         // Unique client identifier
	Metadata  map[string]string `json:"metadata"`   // Optional client metadata
	CreatedAt time.Time         `json:"created_at"` // When client was created
	LastSeen  time.Time         `json:"last_seen"`  // Last activity timestamp
}

Client represents a generic client in the AMQ system

func NewClient

func NewClient(id string) *Client

NewClient creates a new client

func (*Client) GetMetadata

func (c *Client) GetMetadata(key string) (string, bool)

GetMetadata gets a metadata value by key

func (*Client) IsActive

func (c *Client) IsActive(timeout time.Duration) bool

IsActive checks if client has been active within the given duration

func (*Client) SetMetadata

func (c *Client) SetMetadata(key, value string)

SetMetadata sets a metadata key-value pair

func (*Client) UpdateActivity

func (c *Client) UpdateActivity()

UpdateActivity updates the last seen timestamp

type Message

type Message struct {
	ID          string            `json:"id"`                     // Unique message identifier
	Type        MessageType       `json:"type"`                   // Task or Direct
	From        string            `json:"from"`                   // Sender client ID
	To          string            `json:"to,omitempty"`           // Target client ID (for Direct)
	Topic       string            `json:"topic,omitempty"`        // Topic name (for Task)
	Payload     []byte            `json:"payload"`                // Message content
	Priority    int               `json:"priority"`               // Message priority (0-9, higher is more important)
	Status      MessageStatus     `json:"status"`                 // Current message status
	CreatedAt   time.Time         `json:"created_at"`             // Message creation timestamp
	ProcessedAt *time.Time        `json:"processed_at,omitempty"` // When message was processed
	CompletedAt *time.Time        `json:"completed_at,omitempty"` // When message was completed
	RetryCount  int               `json:"retry_count"`            // Number of retry attempts
	MaxRetries  int               `json:"max_retries"`            // Maximum retry attempts
	TTL         time.Duration     `json:"ttl,omitempty"`          // Time to live
	ExpiresAt   *time.Time        `json:"expires_at,omitempty"`   // Expiration timestamp
	Error       string            `json:"error,omitempty"`        // Error message if failed
	Metadata    map[string]string `json:"metadata,omitempty"`     // Additional metadata
}

Message represents a message in the queue

func NewDirectMessage

func NewDirectMessage(from, to string, payload []byte) *Message

NewDirectMessage creates a new direct message

func NewTaskMessage

func NewTaskMessage(from, topic string, payload []byte) *Message

NewTaskMessage creates a new task message

func (*Message) CanRetry

func (m *Message) CanRetry() bool

CanRetry checks if the message can be retried

func (*Message) IncrementRetry

func (m *Message) IncrementRetry()

IncrementRetry increments the retry count

func (*Message) IsExpired

func (m *Message) IsExpired() bool

IsExpired checks if the message has expired

func (*Message) SetCompleted

func (m *Message) SetCompleted()

SetCompleted marks the message as completed

func (*Message) SetFailed

func (m *Message) SetFailed(err string)

SetFailed marks the message as failed with an error

func (*Message) SetProcessing

func (m *Message) SetProcessing()

SetProcessing marks the message as processing

type MessageStatus

type MessageStatus int

MessageStatus represents the status of a message

const (
	// MessageStatusPending - message is waiting to be processed
	MessageStatusPending MessageStatus = iota
	// MessageStatusProcessing - message is currently being processed
	MessageStatusProcessing
	// MessageStatusCompleted - message has been successfully processed
	MessageStatusCompleted
	// MessageStatusFailed - message processing failed
	MessageStatusFailed
	// MessageStatusExpired - message TTL expired
	MessageStatusExpired
	// MessageStatusDead - message moved to dead letter queue
	MessageStatusDead
)

func (MessageStatus) String

func (s MessageStatus) String() string

String returns string representation of MessageStatus

type MessageType

type MessageType int

MessageType represents the type of message

const (
	// MessageTypeTask represents a task message routed via topics
	MessageTypeTask MessageType = iota
	// MessageTypeDirect represents a direct P2P message
	MessageTypeDirect
)

func (MessageType) String

func (t MessageType) String() string

String returns string representation of MessageType

type Queue

type Queue struct {
	Name     string        `json:"name"`     // Queue/Topic name
	Type     QueueType     `json:"type"`     // Queue type
	Created  time.Time     `json:"created"`  // Queue creation time
	Settings QueueSettings `json:"settings"` // Queue-specific settings

	// Statistics
	MessageCount  int64 `json:"message_count"`  // Current message count
	TotalEnqueued int64 `json:"total_enqueued"` // Total messages enqueued
	TotalDequeued int64 `json:"total_dequeued"` // Total messages dequeued
	TotalFailed   int64 `json:"total_failed"`   // Total failed messages

	// Runtime state (not persisted)
	Subscribers []string `json:"subscribers,omitempty"` // Client IDs subscribed (for task queues)
}

Queue represents a message queue

func NewDeadLetterQueue

func NewDeadLetterQueue() *Queue

NewDeadLetterQueue creates a new dead letter queue

func NewDirectQueue

func NewDirectQueue(clientID string) *Queue

NewDirectQueue creates a new direct message queue for a client

func NewQueue

func NewQueue(name string, queueType QueueType) *Queue

NewQueue creates a new queue

func NewTaskQueue

func NewTaskQueue(topic string) *Queue

NewTaskQueue creates a new task queue with a topic

func (*Queue) AddSubscriber

func (q *Queue) AddSubscriber(clientID string)

AddSubscriber adds a client to the subscriber list

func (*Queue) HasSubscriber

func (q *Queue) HasSubscriber(clientID string) bool

HasSubscriber checks if a client is subscribed

func (*Queue) IncrementDequeued

func (q *Queue) IncrementDequeued()

IncrementDequeued increments the dequeue counter

func (*Queue) IncrementEnqueued

func (q *Queue) IncrementEnqueued()

IncrementEnqueued increments the enqueue counter

func (*Queue) IncrementFailed

func (q *Queue) IncrementFailed()

IncrementFailed increments the failed counter

func (*Queue) IsFull

func (q *Queue) IsFull() bool

IsFull checks if the queue is full

func (*Queue) RemoveSubscriber

func (q *Queue) RemoveSubscriber(clientID string)

RemoveSubscriber removes a client from the subscriber list

type QueueSettings

type QueueSettings struct {
	MaxSize         int           `json:"max_size"`         // Maximum queue size (0 = unlimited)
	MaxMessageSize  int           `json:"max_message_size"` // Maximum message size in bytes
	DefaultTTL      time.Duration `json:"default_ttl"`      // Default message TTL
	RetentionPeriod time.Duration `json:"retention_period"` // How long to keep completed messages
	EnablePriority  bool          `json:"enable_priority"`  // Enable priority processing
	BatchSize       int           `json:"batch_size"`       // Batch size for processing
}

QueueSettings contains queue-specific settings

func DefaultQueueSettings

func DefaultQueueSettings() QueueSettings

DefaultQueueSettings returns default queue settings

type QueueStats

type QueueStats struct {
	Name         string        `json:"name"`
	Type         QueueType     `json:"type"`
	MessageCount int64         `json:"message_count"`
	Subscribers  int           `json:"subscribers"`
	EnqueueRate  float64       `json:"enqueue_rate"` // Messages per second
	DequeueRate  float64       `json:"dequeue_rate"` // Messages per second
	ErrorRate    float64       `json:"error_rate"`   // Errors per second
	AverageWait  time.Duration `json:"average_wait"` // Average time in queue
}

QueueStats represents queue statistics

type QueueType

type QueueType int

QueueType represents the type of queue

const (
	// QueueTypeTask - queue for task messages
	QueueTypeTask QueueType = iota
	// QueueTypeDirect - queue for direct messages
	QueueTypeDirect
	// QueueTypeDead - dead letter queue
	QueueTypeDead
)

func (QueueType) String

func (t QueueType) String() string

String returns string representation of QueueType

type Response

type Response struct {
	Success bool        `json:"success"`
	Message string      `json:"message,omitempty"`
	Data    interface{} `json:"data,omitempty"`
}

Response represents a generic API response

type Subscription

type Subscription struct {
	ClientID     string    `json:"client_id"`
	Topic        string    `json:"topic"`
	SubscribedAt time.Time `json:"subscribed_at"`
}

Subscription represents a client's subscription to a topic

func NewSubscription

func NewSubscription(clientID, topic string) *Subscription

NewSubscription creates a new subscription

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL