ccmessage

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2025 License: MIT Imports: 14 Imported by: 7

README

ClusterCockpit messages

As described in the ClusterCockpit specifications, the whole ClusterCockpit stack uses metrics, events and control in the InfluxDB line protocol format. This is also the input and output format for the ClusterCockpit Metric Collector but internally it uses an extended format while processing, named CCMessage.

It is basically a copy of the InfluxDB line protocol MutableMetric interface with one extension. Besides the tags and fields, it contains a list of meta information (re-using the Tag structure of the original protocol):

type ccMessage struct {
    name   string                 // Measurement name
    meta   map[string]string      // map of meta data tags
    tags   map[string]string      // map of of tags
    fields map[string]interface{} // map of of fields
    tm     time.Time              // timestamp
}

type CCMessage interface {
    ToPoint(metaAsTags map[string]bool) *write.Point  // Generate influxDB point for data type ccMessage
    ToLineProtocol(metaAsTags map[string]bool) string // Generate influxDB line protocol for data type ccMessage
    String() string                                   // Return line-protocol like string

    Name() string        // Get metric name
    SetName(name string) // Set metric name

    Time() time.Time     // Get timestamp
    SetTime(t time.Time) // Set timestamp

    Tags() map[string]string                   // Map of tags
    AddTag(key, value string)                  // Add a tag
    GetTag(key string) (value string, ok bool) // Get a tag by its key
    HasTag(key string) (ok bool)               // Check if a tag key is present
    RemoveTag(key string)                      // Remove a tag by its key

    Meta() map[string]string                    // Map of meta data tags
    AddMeta(key, value string)                  // Add a meta data tag
    GetMeta(key string) (value string, ok bool) // Get a meta data tab addressed by its key
    HasMeta(key string) (ok bool)               // Check if a meta data key is present
    RemoveMeta(key string)                      // Remove a meta data tag by its key

    Fields() map[string]interface{}                   // Map of fields
    AddField(key string, value interface{})           // Add a field
    GetField(key string) (value interface{}, ok bool) // Get a field addressed by its key
    HasField(key string) (ok bool)                    // Check if a field key is present
    RemoveField(key string)                           // Remove a field addressed by its key
}

func NewMessage(name string, tags map[string]string, meta map[string]string, fields map[string]interface{}, tm time.Time) (CCMessage, error)
func FromMessage(other CCMessage) CCMessage
func FromInfluxMetric(other lp.Metric) CCMessage

The CCMessage interface provides the same functions as the MutableMetric like {Add, Get, Remove, Has}{Tag, Field} and additionally provides {Add, Get, Remove, Has}Meta.

The InfluxDB protocol creates a new metric with influx.New(name, tags, fields, time) while CCMessage uses ccMessage.New(name, tags, meta, fields, time) where tags and meta are both of type map[string]string.

You can copy a CCMessage with FromFromMessage(other CCMessage) CCMessage. If you get an influx.Metric from a function, like the line protocol parser, you can use FromInfluxMetric(other influx.Metric) CCMessage to get a CCMessage out of it (see NatsReceiver for an example).

Although the cc-specifications defines that there is only a value field for the metric value, the CCMessage still can have multiple values similar to the InfluxDB line protocol.

Message Type Helpers

The ccMessage package provides convenient helper functions to create different types of messages:

Metrics

Metrics are numerical measurements from the monitored system.

msg, err := ccMessage.NewMetric(
    "cpu_usage",                           // metric name
    map[string]string{"type": "node"},     // tags
    map[string]string{"unit": "percent"},  // meta
    75.5,                                  // value (numeric)
    time.Now(),                            // timestamp
)
Events

Events represent significant occurrences in the system.

msg, err := ccMessage.NewEvent(
    "node_down",                          // event name
    map[string]string{"severity": "critical"}, // tags
    nil,                                  // meta
    "Node node001 is unreachable",        // event payload
    time.Now(),                           // timestamp
)

For job-related events:

// Job start event
startMsg, err := ccMessage.NewJobStartEvent(job)

// Job stop event
stopMsg, err := ccMessage.NewJobStopEvent(job)

// Check if message is a job event
if eventName, ok := msg.IsJobEvent(); ok {
    job, err := msg.GetJob()  // Deserialize job data
}
Logs

Log messages transmit textual log data through the system.

msg, err := ccMessage.NewLog(
    "application_log",                       // log category
    map[string]string{"level": "error"},     // tags
    map[string]string{"source": "backend"}, // meta
    "Database connection failed: timeout",   // log message
    time.Now(),                              // timestamp
)
Control Messages

Control messages request or set configuration values.

// GET control - request current value
getMsg, err := ccMessage.NewGetControl(
    "sampling_rate",  // parameter name
    nil,              // tags
    nil,              // meta
    time.Now(),       // timestamp
)

// PUT control - set new value
putMsg, err := ccMessage.NewPutControl(
    "sampling_rate",  // parameter name
    nil,              // tags
    nil,              // meta
    "10",             // new value
    time.Now(),       // timestamp
)
Query Messages

Query messages contain database queries or search requests.

msg, err := ccMessage.NewQuery(
    "metrics_query",  // query name
    nil,              // tags
    nil,              // meta
    "SELECT * FROM metrics WHERE timestamp > NOW() - INTERVAL '1h'", // query string
    time.Now(),       // timestamp
)

Type Detection

CCMessage provides methods to detect the message type:

// Check message type
switch msg.MessageType() {
case ccMessage.CCMSG_TYPE_METRIC:
    value := msg.GetMetricValue()
case ccMessage.CCMSG_TYPE_EVENT:
    event := msg.GetEventValue()
case ccMessage.CCMSG_TYPE_LOG:
    log := msg.GetLogValue()
case ccMessage.CCMSG_TYPE_CONTROL:
    value := msg.GetControlValue()
    method := msg.GetControlMethod()  // "GET" or "PUT"
}

// Or use individual type checks
if msg.IsMetric() {
    value := msg.GetMetricValue()
}
if msg.IsEvent() {
    payload := msg.GetEventValue()
}
if msg.IsLog() {
    logText := msg.GetLogValue()
}
if msg.IsControl() {
    value := msg.GetControlValue()
    method := msg.GetControlMethod()
}
if msg.IsQuery() {
    query := msg.GetQueryValue()
}

Best Practices

  1. Use appropriate message types: Choose the correct message type for your data. Use metrics for numerical measurements, events for significant occurrences, logs for textual output, and control messages for configuration.

  2. Leverage tags for categorization: Use tags to categorize messages for efficient filtering and querying. Common tags include type, hostname, cluster, severity.

  3. Store metadata in meta fields: Use meta fields for information that describes the data but shouldn't be used for filtering, such as unit, scope, source.

  4. Handle timestamps consistently: Always use appropriate timestamps for your messages. For job events, use the job's actual start time.

  5. Validate before use: Check error returns from message creation functions and type assertion operations.

  6. Deep copy when needed: Use FromMessage() to create independent copies of messages when you need to modify them without affecting the original.

  7. Type checking: Use the type detection methods (IsMetric(), IsEvent(), etc.) before accessing type-specific values to avoid runtime errors.

Documentation

Overview

Package ccmessage provides a message format and interface for ClusterCockpit. It extends the InfluxDB line protocol with additional meta information.

Index

Constants

View Source
const (
	MIN_CCMSG_TYPE     = CCMSG_TYPE_METRIC
	MAX_CCMSG_TYPE     = CCMSG_TYPE_CONTROL
	CCMSG_TYPE_INVALID = MAX_CCMSG_TYPE + 1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CCMessage

type CCMessage interface {
	ToPoint(metaAsTags map[string]bool) *write.Point            // Generate influxDB point for data type ccMessage
	ToLineProtocol(metaAsTags map[string]bool) string           // Generate influxDB line protocol for data type ccMessage
	ToJSON(metaAsTags map[string]bool) (json.RawMessage, error) // Generate JSON representation

	Name() string        // Get metric name
	SetName(name string) // Set metric name

	Time() time.Time     // Get timestamp
	SetTime(t time.Time) // Set timestamp

	Tags() map[string]string                   // Map of tags
	AddTag(key, value string)                  // Add a tag
	GetTag(key string) (value string, ok bool) // Get a tag by its key
	HasTag(key string) (ok bool)               // Check if a tag key is present
	RemoveTag(key string)                      // Remove a tag by its key

	Meta() map[string]string                    // Map of meta data tags
	AddMeta(key, value string)                  // Add a meta data tag
	GetMeta(key string) (value string, ok bool) // Get a meta data tab addressed by its key
	HasMeta(key string) (ok bool)               // Check if a meta data key is present
	RemoveMeta(key string)                      // Remove a meta data tag by its key

	Fields() map[string]any                   // Map of fields
	AddField(key string, value any)           // Add a field
	GetField(key string) (value any, ok bool) // Get a field addressed by its key
	HasField(key string) (ok bool)            // Check if a field key is present
	RemoveField(key string)                   // Remove a field addressed by its key
	String() string                           // Return line-protocol like string

	MessageType() CCMessageType // Return message type
	IsMetric() bool             // Check if message is a metric
	GetMetricValue() any
	IsLog() bool // Check if message is a log
	GetLogValue() string
	IsEvent() bool // Check if message is an event
	GetEventValue() string
	IsControl() bool // Check if message is a control message
	GetControlValue() string
	GetControlMethod() string
	IsQuery() bool // Check if message is a query
	GetQueryValue() string
	IsJobEvent() (string, bool) // Check if message is a job event (returns event name and bool)
	GetJob() (*schema.Job, error)
}

CCMessage is the interface for accessing and manipulating ClusterCockpit messages. It provides methods for converting to other formats (InfluxDB point, Line Protocol, JSON), accessing metadata (Name, Time, Tags, Meta, Fields), and checking message type.

func EmptyMessage

func EmptyMessage() CCMessage

EmptyMessage creates a new empty CCMessage.

func FromBytes

func FromBytes(data []byte) ([]CCMessage, error)

FromBytes creates a list of CCMessages from a byte slice containing InfluxDB line protocol data.

func FromInfluxMetric

func FromInfluxMetric(other lp1.Metric) CCMessage

FromInfluxMetric creates a CCMessage from an InfluxDB line protocol metric.

func FromJSON

func FromJSON(input json.RawMessage) (CCMessage, error)

FromJSON creates a CCMessage from a JSON representation.

func FromMessage

func FromMessage(other CCMessage) CCMessage

FromMessage creates a deep copy of the given CCMessage.

func NewEvent

func NewEvent(name string,
	tags map[string]string,
	meta map[string]string,
	event string,
	tm time.Time,
) (CCMessage, error)

NewEvent creates a new event message. Events represent significant occurrences in the ClusterCockpit system, such as job starts/stops, system state changes, or other notable incidents.

Parameters:

  • name: The name/type of the event (e.g., "start_job", "stop_job", "node_down")
  • tags: Optional tags for categorizing the event
  • meta: Optional metadata information
  • event: The event payload as a string (can be JSON, plain text, or any other format)
  • tm: Timestamp when the event occurred

Returns a CCMessage with the "event" field set to the provided event payload.

func NewGetControl

func NewGetControl(name string,
	tags map[string]string,
	meta map[string]string,
	tm time.Time,
) (CCMessage, error)

NewGetControl creates a new control message with a GET method. Control messages are used to request or set configuration values in the ClusterCockpit system. A GET control message requests the current value of the specified control parameter.

Parameters:

  • name: The name of the control parameter to query
  • tags: Optional tags for categorizing the control message
  • meta: Optional metadata information
  • tm: Timestamp when the control message was created

Returns a CCMessage with the "control" field set to an empty string and a "method" tag set to "GET".

func NewJobStartEvent

func NewJobStartEvent(job *schema.Job) (CCMessage, error)

NewJobStartEvent creates an event message for a job start. The job information is serialized to JSON and embedded in the event payload.

Parameters:

  • job: Pointer to the schema.Job structure containing job information

Returns a CCMessage with name "start_job" and the job data serialized as JSON in the event field. The timestamp is set to the job's start time.

func NewJobStopEvent

func NewJobStopEvent(job *schema.Job) (CCMessage, error)

NewJobStopEvent creates an event message for a job stop. The job information is serialized to JSON and embedded in the event payload.

Parameters:

  • job: Pointer to the schema.Job structure containing job information

Returns a CCMessage with name "stop_job" and the job data serialized as JSON in the event field. The timestamp is set to the job's start time.

func NewLog

func NewLog(name string,
	tags map[string]string,
	meta map[string]string,
	log string,
	tm time.Time,
) (CCMessage, error)

NewLog creates a new log message. Log messages are used to transmit textual log data through the ClusterCockpit messaging system.

Parameters:

  • name: The name/category of the log message (e.g., "system_log", "application_log")
  • tags: Optional tags for categorizing the log message (e.g., severity, source)
  • meta: Optional metadata information
  • log: The log message content as a string
  • tm: Timestamp when the log message was generated

Returns a CCMessage with the "log" field set to the provided log content.

func NewMessage

func NewMessage(
	name string,
	tags map[string]string,
	meta map[string]string,
	fields map[string]any,
	tm time.Time,
) (CCMessage, error)

NewMessage creates a new CCMessage with the given name, tags, meta, fields, and timestamp.

func NewMetric

func NewMetric(name string,
	tags map[string]string,
	meta map[string]string,
	value any,
	tm time.Time,
) (CCMessage, error)

NewMetric creates a new metric message. Metrics represent numerical measurements from the monitored system, such as CPU usage, memory consumption, network throughput, etc. This is the primary message type in ClusterCockpit for performance monitoring data.

Parameters:

  • name: The metric name (e.g., "cpu_usage", "mem_used", "flops_any")
  • tags: Optional tags for categorizing the metric (e.g., "type": "node", "hostname": "node001")
  • meta: Optional metadata information (e.g., "unit": "bytes", "scope": "node")
  • value: The metric value (can be int, float, uint, or other numeric types)
  • tm: Timestamp when the metric was collected

Returns a CCMessage with the "value" field set to the provided metric value. Note: Unlike events and logs, metric values should be numeric, not strings.

func NewPutControl

func NewPutControl(name string,
	tags map[string]string,
	meta map[string]string,
	value string,
	tm time.Time,
) (CCMessage, error)

NewPutControl creates a new control message with a PUT method. Control messages are used to request or set configuration values in the ClusterCockpit system. A PUT control message sets a new value for the specified control parameter.

Parameters:

  • name: The name of the control parameter to set
  • tags: Optional tags for categorizing the control message
  • meta: Optional metadata information
  • value: The new value to set for the control parameter
  • tm: Timestamp when the control message was created

Returns a CCMessage with the "control" field set to the provided value and a "method" tag set to "PUT".

func NewQuery added in v0.2.0

func NewQuery(name string,
	tags map[string]string,
	meta map[string]string,
	q string,
	tm time.Time,
) (CCMessage, error)

NewQuery creates a new CCMessage of type Query

type CCMessageType

type CCMessageType int

CCMessageType defines the type of a CCMessage

const (
	CCMSG_TYPE_METRIC  CCMessageType = iota // Metric message type
	CCMSG_TYPE_EVENT                        // Event message type
	CCMSG_TYPE_LOG                          // Log message type
	CCMSG_TYPE_CONTROL                      // Control message type
)

func (CCMessageType) FieldKey

func (t CCMessageType) FieldKey() string

FieldKey returns the key used for the value field in the message

func (CCMessageType) String

func (t CCMessageType) String() string

String returns the string representation of the CCMessageType

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL