Documentation
¶
Index ¶
- type ConnectionInfo
- type CreateTopicRequest
- type DeleteTopicRequest
- type GetTopicInfoRequest
- type ManagerConfig
- type Message
- func (m *Message) AddHeader(key string, value interface{})
- func (m *Message) AddMetadata(key string, value interface{})
- func (m *Message) GetHeader(key string) (interface{}, bool)
- func (m *Message) GetMetadata(key string) (interface{}, bool)
- func (m *Message) SetCorrelationID(correlationID string)
- func (m *Message) SetExpiration(duration time.Duration)
- func (m *Message) SetPriority(priority int)
- func (m *Message) SetReplyTo(replyTo string)
- func (m *Message) SetScheduledTime(scheduledAt time.Time)
- func (m *Message) SetTTL(ttl time.Duration)
- type MessageHandler
- type MessagingFeature
- type MessagingManager
- func (mm *MessagingManager) Close() error
- func (mm *MessagingManager) Connect(ctx context.Context, providerName string) error
- func (mm *MessagingManager) CreateTopic(ctx context.Context, providerName string, request *CreateTopicRequest) error
- func (mm *MessagingManager) DeleteTopic(ctx context.Context, providerName string, request *DeleteTopicRequest) error
- func (mm *MessagingManager) Disconnect(ctx context.Context, providerName string) error
- func (mm *MessagingManager) GetConnectedProviders() []string
- func (mm *MessagingManager) GetDefaultProvider() (MessagingProvider, error)
- func (mm *MessagingManager) GetProvider(name string) (MessagingProvider, error)
- func (mm *MessagingManager) GetProviderCapabilities(providerName string) ([]MessagingFeature, *ConnectionInfo, error)
- func (mm *MessagingManager) GetStats(ctx context.Context, providerName string) (*MessagingStats, error)
- func (mm *MessagingManager) GetSupportedProviders() []string
- func (mm *MessagingManager) GetTopicInfo(ctx context.Context, providerName string, request *GetTopicInfoRequest) (*TopicInfo, error)
- func (mm *MessagingManager) HealthCheck(ctx context.Context) map[string]error
- func (mm *MessagingManager) IsProviderConnected(providerName string) bool
- func (mm *MessagingManager) ListTopics(ctx context.Context, providerName string) ([]TopicInfo, error)
- func (mm *MessagingManager) Ping(ctx context.Context, providerName string) error
- func (mm *MessagingManager) PublishBatch(ctx context.Context, providerName string, request *PublishBatchRequest) (*PublishBatchResponse, error)
- func (mm *MessagingManager) PublishMessage(ctx context.Context, providerName string, request *PublishRequest) (*PublishResponse, error)
- func (mm *MessagingManager) RegisterProvider(provider MessagingProvider) error
- func (mm *MessagingManager) SubscribeToTopic(ctx context.Context, providerName string, request *SubscribeRequest, ...) error
- func (mm *MessagingManager) TopicExists(ctx context.Context, providerName string, request *TopicExistsRequest) (bool, error)
- func (mm *MessagingManager) UnsubscribeFromTopic(ctx context.Context, providerName string, request *UnsubscribeRequest) error
- type MessagingProvider
- type MessagingStats
- type PublishBatchRequest
- type PublishBatchResponse
- type PublishRequest
- type PublishResponse
- type SubscribeRequest
- type TopicExistsRequest
- type TopicInfo
- type UnsubscribeRequest
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConnectionInfo ¶
type ConnectionInfo struct {
Host string `json:"host"`
Port int `json:"port"`
Protocol string `json:"protocol"`
Version string `json:"version"`
}
ConnectionInfo represents messaging connection information
type CreateTopicRequest ¶
type CreateTopicRequest struct {
Topic string `json:"topic"`
Partitions int `json:"partitions,omitempty"`
ReplicationFactor int `json:"replication_factor,omitempty"`
RetentionPeriod *time.Duration `json:"retention_period,omitempty"`
Config map[string]interface{} `json:"config,omitempty"`
}
CreateTopicRequest represents a create topic request
type DeleteTopicRequest ¶
type DeleteTopicRequest struct {
Topic string `json:"topic"`
}
DeleteTopicRequest represents a delete topic request
type GetTopicInfoRequest ¶
type GetTopicInfoRequest struct {
Topic string `json:"topic"`
}
GetTopicInfoRequest represents a get topic info request
type ManagerConfig ¶
type ManagerConfig struct {
DefaultProvider string `json:"default_provider"`
RetryAttempts int `json:"retry_attempts"`
RetryDelay time.Duration `json:"retry_delay"`
Timeout time.Duration `json:"timeout"`
MaxMessageSize int64 `json:"max_message_size"`
Metadata map[string]string `json:"metadata"`
}
ManagerConfig holds messaging manager configuration
func DefaultManagerConfig ¶
func DefaultManagerConfig() *ManagerConfig
DefaultManagerConfig returns default messaging manager configuration
type Message ¶
type Message struct {
ID uuid.UUID `json:"id"`
Type string `json:"type"`
Source string `json:"source"`
Target string `json:"target"`
Topic string `json:"topic"`
RoutingKey string `json:"routing_key,omitempty"`
Payload map[string]interface{} `json:"payload"`
Headers map[string]interface{} `json:"headers,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt *time.Time `json:"expires_at,omitempty"`
ScheduledAt *time.Time `json:"scheduled_at,omitempty"`
Priority int `json:"priority,omitempty"`
TTL *time.Duration `json:"ttl,omitempty"`
CorrelationID string `json:"correlation_id,omitempty"`
ReplyTo string `json:"reply_to,omitempty"`
ProviderData map[string]interface{} `json:"provider_data,omitempty"`
}
Message represents a unified message structure
func CreateMessage ¶
func CreateMessage(messageType, source, target, topic string, payload map[string]interface{}) *Message
CreateMessage creates a new message with default values
func (*Message) AddMetadata ¶
AddMetadata adds metadata to the message
func (*Message) GetMetadata ¶
GetMetadata retrieves metadata from the message
func (*Message) SetCorrelationID ¶
SetCorrelationID sets message correlation ID
func (*Message) SetExpiration ¶
SetExpiration sets message expiration
func (*Message) SetPriority ¶
SetPriority sets message priority
func (*Message) SetReplyTo ¶
SetReplyTo sets message reply-to address
func (*Message) SetScheduledTime ¶
SetScheduledTime sets message scheduled time
type MessageHandler ¶
MessageHandler handles incoming messages
type MessagingFeature ¶
type MessagingFeature string
MessagingFeature represents a messaging feature
const ( FeaturePublishSubscribe MessagingFeature = "pub_sub" FeatureRequestReply MessagingFeature = "request_reply" FeatureMessageRouting MessagingFeature = "message_routing" FeatureMessageFiltering MessagingFeature = "message_filtering" FeatureMessageOrdering MessagingFeature = "message_ordering" FeatureMessageDeduplication MessagingFeature = "message_deduplication" FeatureMessageRetention MessagingFeature = "message_retention" FeatureMessageCompression MessagingFeature = "message_compression" FeatureMessageEncryption MessagingFeature = "message_encryption" FeatureMessageBatching MessagingFeature = "message_batching" FeatureMessagePartitioning MessagingFeature = "message_partitioning" FeatureMessageReplay MessagingFeature = "message_replay" FeatureMessageDeadLetter MessagingFeature = "message_dead_letter" FeatureMessageScheduling MessagingFeature = "message_scheduling" FeatureMessagePriority MessagingFeature = "message_priority" FeatureMessageTTL MessagingFeature = "message_ttl" FeatureMessageHeaders MessagingFeature = "message_headers" FeatureMessageCorrelation MessagingFeature = "message_correlation" FeatureMessageGrouping MessagingFeature = "message_grouping" FeatureMessageStreaming MessagingFeature = "message_streaming" )
type MessagingManager ¶
type MessagingManager struct {
// contains filtered or unexported fields
}
MessagingManager manages multiple messaging providers
func NewMessagingManager ¶
func NewMessagingManager(config *ManagerConfig, logger *logrus.Logger) *MessagingManager
NewMessagingManager creates a new messaging manager
func (*MessagingManager) Close ¶
func (mm *MessagingManager) Close() error
Close closes all messaging connections
func (*MessagingManager) Connect ¶
func (mm *MessagingManager) Connect(ctx context.Context, providerName string) error
Connect connects to a messaging system using the specified provider
func (*MessagingManager) CreateTopic ¶
func (mm *MessagingManager) CreateTopic(ctx context.Context, providerName string, request *CreateTopicRequest) error
CreateTopic creates a topic using the specified provider
func (*MessagingManager) DeleteTopic ¶
func (mm *MessagingManager) DeleteTopic(ctx context.Context, providerName string, request *DeleteTopicRequest) error
DeleteTopic deletes a topic using the specified provider
func (*MessagingManager) Disconnect ¶
func (mm *MessagingManager) Disconnect(ctx context.Context, providerName string) error
Disconnect disconnects from a messaging system using the specified provider
func (*MessagingManager) GetConnectedProviders ¶
func (mm *MessagingManager) GetConnectedProviders() []string
GetConnectedProviders returns a list of connected providers
func (*MessagingManager) GetDefaultProvider ¶
func (mm *MessagingManager) GetDefaultProvider() (MessagingProvider, error)
GetDefaultProvider returns the default messaging provider
func (*MessagingManager) GetProvider ¶
func (mm *MessagingManager) GetProvider(name string) (MessagingProvider, error)
GetProvider returns a messaging provider by name
func (*MessagingManager) GetProviderCapabilities ¶
func (mm *MessagingManager) GetProviderCapabilities(providerName string) ([]MessagingFeature, *ConnectionInfo, error)
GetProviderCapabilities returns capabilities of a provider
func (*MessagingManager) GetStats ¶
func (mm *MessagingManager) GetStats(ctx context.Context, providerName string) (*MessagingStats, error)
GetStats gets statistics from a provider
func (*MessagingManager) GetSupportedProviders ¶
func (mm *MessagingManager) GetSupportedProviders() []string
GetSupportedProviders returns a list of registered providers
func (*MessagingManager) GetTopicInfo ¶
func (mm *MessagingManager) GetTopicInfo(ctx context.Context, providerName string, request *GetTopicInfoRequest) (*TopicInfo, error)
GetTopicInfo gets topic information using the specified provider
func (*MessagingManager) HealthCheck ¶
func (mm *MessagingManager) HealthCheck(ctx context.Context) map[string]error
HealthCheck performs health check on all providers
func (*MessagingManager) IsProviderConnected ¶
func (mm *MessagingManager) IsProviderConnected(providerName string) bool
IsProviderConnected checks if a provider is connected
func (*MessagingManager) ListTopics ¶
func (mm *MessagingManager) ListTopics(ctx context.Context, providerName string) ([]TopicInfo, error)
ListTopics lists topics using the specified provider
func (*MessagingManager) Ping ¶
func (mm *MessagingManager) Ping(ctx context.Context, providerName string) error
Ping pings a messaging system using the specified provider
func (*MessagingManager) PublishBatch ¶
func (mm *MessagingManager) PublishBatch(ctx context.Context, providerName string, request *PublishBatchRequest) (*PublishBatchResponse, error)
PublishBatch publishes multiple messages using the specified provider
func (*MessagingManager) PublishMessage ¶
func (mm *MessagingManager) PublishMessage(ctx context.Context, providerName string, request *PublishRequest) (*PublishResponse, error)
PublishMessage publishes a message using the specified provider
func (*MessagingManager) RegisterProvider ¶
func (mm *MessagingManager) RegisterProvider(provider MessagingProvider) error
RegisterProvider registers a messaging provider
func (*MessagingManager) SubscribeToTopic ¶
func (mm *MessagingManager) SubscribeToTopic(ctx context.Context, providerName string, request *SubscribeRequest, handler MessageHandler) error
SubscribeToTopic subscribes to a topic using the specified provider
func (*MessagingManager) TopicExists ¶
func (mm *MessagingManager) TopicExists(ctx context.Context, providerName string, request *TopicExistsRequest) (bool, error)
TopicExists checks if a topic exists using the specified provider
func (*MessagingManager) UnsubscribeFromTopic ¶
func (mm *MessagingManager) UnsubscribeFromTopic(ctx context.Context, providerName string, request *UnsubscribeRequest) error
UnsubscribeFromTopic unsubscribes from a topic using the specified provider
type MessagingProvider ¶
type MessagingProvider interface {
// Provider information
GetName() string
GetSupportedFeatures() []MessagingFeature
GetConnectionInfo() *ConnectionInfo
// Connection management
Connect(ctx context.Context) error
Disconnect(ctx context.Context) error
Ping(ctx context.Context) error
IsConnected() bool
// Message operations
PublishMessage(ctx context.Context, request *PublishRequest) (*PublishResponse, error)
SubscribeToTopic(ctx context.Context, request *SubscribeRequest, handler MessageHandler) error
UnsubscribeFromTopic(ctx context.Context, request *UnsubscribeRequest) error
// Topic/Queue management
CreateTopic(ctx context.Context, request *CreateTopicRequest) error
DeleteTopic(ctx context.Context, request *DeleteTopicRequest) error
TopicExists(ctx context.Context, request *TopicExistsRequest) (bool, error)
ListTopics(ctx context.Context) ([]TopicInfo, error)
// Advanced operations
PublishBatch(ctx context.Context, request *PublishBatchRequest) (*PublishBatchResponse, error)
GetTopicInfo(ctx context.Context, request *GetTopicInfoRequest) (*TopicInfo, error)
GetStats(ctx context.Context) (*MessagingStats, error)
// Health and monitoring
HealthCheck(ctx context.Context) error
// Configuration
Configure(config map[string]interface{}) error
IsConfigured() bool
Close() error
}
MessagingProvider interface for messaging backends
type MessagingStats ¶
type MessagingStats struct {
PublishedMessages int64 `json:"published_messages"`
ConsumedMessages int64 `json:"consumed_messages"`
FailedMessages int64 `json:"failed_messages"`
ActiveConnections int `json:"active_connections"`
ActiveSubscriptions int `json:"active_subscriptions"`
ProviderData map[string]interface{} `json:"provider_data"`
}
MessagingStats represents messaging statistics
type PublishBatchRequest ¶
type PublishBatchRequest struct {
Topic string `json:"topic"`
Messages []*Message `json:"messages"`
RoutingKey string `json:"routing_key,omitempty"`
Options map[string]interface{} `json:"options,omitempty"`
}
PublishBatchRequest represents a batch publish request
type PublishBatchResponse ¶
type PublishBatchResponse struct {
PublishedCount int `json:"published_count"`
FailedCount int `json:"failed_count"`
FailedMessages []*Message `json:"failed_messages,omitempty"`
ProviderData map[string]interface{} `json:"provider_data,omitempty"`
}
PublishBatchResponse represents a batch publish response
type PublishRequest ¶
type PublishRequest struct {
Topic string `json:"topic"`
Message *Message `json:"message"`
RoutingKey string `json:"routing_key,omitempty"`
Headers map[string]interface{} `json:"headers,omitempty"`
Options map[string]interface{} `json:"options,omitempty"`
}
PublishRequest represents a publish message request
type PublishResponse ¶
type PublishResponse struct {
MessageID string `json:"message_id"`
Topic string `json:"topic"`
Partition int `json:"partition,omitempty"`
Offset int64 `json:"offset,omitempty"`
Timestamp time.Time `json:"timestamp"`
ProviderData map[string]interface{} `json:"provider_data,omitempty"`
}
PublishResponse represents a publish message response
type SubscribeRequest ¶
type SubscribeRequest struct {
Topic string `json:"topic"`
GroupID string `json:"group_id,omitempty"`
ConsumerID string `json:"consumer_id,omitempty"`
AutoAck bool `json:"auto_ack"`
PrefetchCount int `json:"prefetch_count,omitempty"`
StartOffset string `json:"start_offset,omitempty"`
Filter map[string]interface{} `json:"filter,omitempty"`
Options map[string]interface{} `json:"options,omitempty"`
}
SubscribeRequest represents a subscribe request
type TopicExistsRequest ¶
type TopicExistsRequest struct {
Topic string `json:"topic"`
}
TopicExistsRequest represents a topic exists request
type TopicInfo ¶
type TopicInfo struct {
Name string `json:"name"`
Partitions int `json:"partitions,omitempty"`
ReplicationFactor int `json:"replication_factor,omitempty"`
RetentionPeriod *time.Duration `json:"retention_period,omitempty"`
MessageCount int64 `json:"message_count,omitempty"`
Size int64 `json:"size,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
ProviderData map[string]interface{} `json:"provider_data,omitempty"`
}
TopicInfo represents topic information
type UnsubscribeRequest ¶
type UnsubscribeRequest struct {
Topic string `json:"topic"`
GroupID string `json:"group_id,omitempty"`
ConsumerID string `json:"consumer_id,omitempty"`
}
UnsubscribeRequest represents an unsubscribe request