messaging

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2025 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidMessageID = errors.New("invalid or empty message ID")

	ErrInvalidMessageType = errors.New("invalid message type")

	ErrInvalidAction = errors.New("invalid action")

	ErrMissingReplyTo = errors.New("response message missing replyTo field")

	ErrRequestTimeout = errors.New("request timeout")

	ErrNotConnected = errors.New("not connected to agent server")

	ErrAgentNotFound = errors.New("agent not found")
)

Functions

func GenerateMessageID

func GenerateMessageID() string

Types

type Action

type Action string

Action represents the operation to perform

const (
	// ActionApplyResource applies a resource using server-side apply
	ActionApplyResource Action = "apply-resource"

	// ActionListResources lists resources by GVK and labels
	ActionListResources Action = "list-resources"

	// ActionGetResource gets a specific resource
	ActionGetResource Action = "get-resource"

	// ActionDeleteResource deletes a resource
	ActionDeleteResource Action = "delete-resource"

	// ActionPatchResource patches a resource
	ActionPatchResource Action = "patch-resource"

	// ActionCreateNamespace creates a namespace if it doesn't exist
	ActionCreateNamespace Action = "create-namespace"

	// ActionWatchResources watches resources for changes
	ActionWatchResources Action = "watch-resources"
)

func (Action) IsValid

func (a Action) IsValid() bool

func (Action) String

func (a Action) String() string

type ClusterAgentRequest

type ClusterAgentRequest struct {
	// Type indicates whether this is a 'command' or 'query'
	Type RequestType `json:"type"`

	// Identifier specifies the operation (e.g., "apply-resource", "list-pods")
	Identifier string `json:"identifier"`

	// RequestID is a unique identifier for this request (UUID)
	RequestID string `json:"requestID"`

	// ClusterID identifies the target cluster/plane
	ClusterID string `json:"clusterId"`

	// Payload contains operation-specific data (manifests, parameters, etc.)
	Payload map[string]interface{} `json:"payload,omitempty"`

	// OverrideRequestTimeouts allows custom retry timeouts (optional)
	OverrideRequestTimeouts []int `json:"overrideRequestTimeouts,omitempty"`
}

ClusterAgentRequest represents a request sent from control plane to data plane agent Follows CQRS pattern: Commands modify state, Queries read state

func NewCommand

func NewCommand(identifier, requestID, clusterID string, payload map[string]interface{}) *ClusterAgentRequest

func NewQuery

func NewQuery(identifier, requestID, clusterID string, payload map[string]interface{}) *ClusterAgentRequest

func (*ClusterAgentRequest) IsCommand

func (r *ClusterAgentRequest) IsCommand() bool

func (*ClusterAgentRequest) IsQuery

func (r *ClusterAgentRequest) IsQuery() bool

type ClusterAgentResponse

type ClusterAgentResponse struct {
	// Type indicates whether this is a 'command' or 'query' response
	Type RequestType `json:"type"`

	// Identifier specifies the operation this is a response to
	Identifier string `json:"identifier"`

	// RequestID matches the ID from the original request
	RequestID string `json:"requestID"`

	// ClusterID identifies the responding cluster/plane
	ClusterID string `json:"clusterId"`

	// Status indicates success or failure
	Status ResponseStatus `json:"status"`

	// Payload contains the operation result data
	Payload map[string]interface{} `json:"payload,omitempty"`

	// Error contains error details if status is 'fail'
	Error *ErrorDetails `json:"error,omitempty"`
}

ClusterAgentResponse represents a response sent from data plane agent to control plane

func NewClusterAgentFailResponse

func NewClusterAgentFailResponse(req *ClusterAgentRequest, errCode int, errMsg string, errDetails map[string]interface{}) *ClusterAgentResponse

func NewClusterAgentSuccessResponse

func NewClusterAgentSuccessResponse(req *ClusterAgentRequest, payload map[string]interface{}) *ClusterAgentResponse

func (*ClusterAgentResponse) IsFail

func (r *ClusterAgentResponse) IsFail() bool

func (*ClusterAgentResponse) IsSuccess

func (r *ClusterAgentResponse) IsSuccess() bool

type ErrorDetails

type ErrorDetails struct {
	// Code is the error code (e.g., 404, 500)
	Code int `json:"code,omitempty"`

	// Message is a human-readable error message
	Message string `json:"message"`

	// Details contains additional error context
	Details map[string]interface{} `json:"details,omitempty"`
}

ErrorDetails provides structured error information

type Message

type Message struct {
	// ID is a unique identifier for the message (UUID)
	ID string `json:"id"`

	// Type indicates the message type (request, response, broadcast, heartbeat)
	Type MessageType `json:"type"`

	// Action specifies the operation to perform (for request messages)
	Action Action `json:"action,omitempty"`

	// Payload contains the message data (resource manifests, query parameters, etc.)
	Payload map[string]interface{} `json:"payload,omitempty"`

	// ReplyTo contains the ID of the message being replied to (for response messages)
	ReplyTo string `json:"replyTo,omitempty"`

	// From identifies the sender (data plane name or control plane)
	From string `json:"from,omitempty"`

	// Success indicates whether the operation succeeded (for response messages)
	Success bool `json:"success,omitempty"`

	// Error contains error information if the operation failed
	Error string `json:"error,omitempty"`

	// Timestamp is when the message was created (RFC3339 format)
	Timestamp string `json:"timestamp,omitempty"`
}

func NewBroadcastMessage

func NewBroadcastMessage(payload map[string]interface{}) *Message

func NewErrorResponse

func NewErrorResponse(replyTo string, errorMsg string) *Message

func NewHeartbeatMessage

func NewHeartbeatMessage(from string, sequence int) *Message

func NewRequestMessage

func NewRequestMessage(action Action, payload map[string]interface{}) *Message

func NewResponseMessage

func NewResponseMessage(replyTo string, success bool, payload map[string]interface{}, errorMsg string) *Message

func NewSuccessResponse

func NewSuccessResponse(replyTo string, payload map[string]interface{}) *Message

func (*Message) IsBroadcast

func (m *Message) IsBroadcast() bool

func (*Message) IsHeartbeat

func (m *Message) IsHeartbeat() bool

func (*Message) IsRequest

func (m *Message) IsRequest() bool

func (*Message) IsResponse

func (m *Message) IsResponse() bool

func (*Message) Validate

func (m *Message) Validate() error

type MessageType

type MessageType string

MessageType represents the type of message (legacy)

const (
	// TypeRequest is a request message from control plane to data plane
	TypeRequest MessageType = "request"

	// TypeResponse is a response message from data plane to control plane
	TypeResponse MessageType = "response"

	// TypeBroadcast is a broadcast message to all data planes
	TypeBroadcast MessageType = "broadcast"

	// TypeHeartbeat is a periodic heartbeat message
	TypeHeartbeat MessageType = "heartbeat"
)

func (MessageType) IsValid

func (mt MessageType) IsValid() bool

func (MessageType) String

func (mt MessageType) String() string

type RequestType

type RequestType string

RequestType represents the CQRS type

const (
	TypeCommand RequestType = "command"

	TypeQuery RequestType = "query"
)

type ResponseStatus

type ResponseStatus string

ResponseStatus represents the outcome of a request

const (
	StatusSuccess ResponseStatus = "success"

	StatusFail ResponseStatus = "fail"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL