ccmessage

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2025 License: MIT Imports: 14 Imported by: 0

README

ClusterCockpit messages

As described in the ClusterCockpit specifications, the whole ClusterCockpit stack uses metrics, events and control in the InfluxDB line protocol format. This is also the input and output format for the ClusterCockpit Metric Collector but internally it uses an extended format while processing, named CCMessage.

It is basically a copy of the InfluxDB line protocol MutableMetric interface with one extension. Besides the tags and fields, it contains a list of meta information (re-using the Tag structure of the original protocol):

type ccMessage struct {
    name   string                 // Measurement name
    meta   map[string]string      // map of meta data tags
    tags   map[string]string      // map of of tags
    fields map[string]interface{} // map of of fields
    tm     time.Time              // timestamp
}

type CCMessage interface {
    ToPoint(metaAsTags map[string]bool) *write.Point  // Generate influxDB point for data type ccMessage
    ToLineProtocol(metaAsTags map[string]bool) string // Generate influxDB line protocol for data type ccMessage
    String() string                                   // Return line-protocol like string

    Name() string        // Get metric name
    SetName(name string) // Set metric name

    Time() time.Time     // Get timestamp
    SetTime(t time.Time) // Set timestamp

    Tags() map[string]string                   // Map of tags
    AddTag(key, value string)                  // Add a tag
    GetTag(key string) (value string, ok bool) // Get a tag by its key
    HasTag(key string) (ok bool)               // Check if a tag key is present
    RemoveTag(key string)                      // Remove a tag by its key

    Meta() map[string]string                    // Map of meta data tags
    AddMeta(key, value string)                  // Add a meta data tag
    GetMeta(key string) (value string, ok bool) // Get a meta data tab addressed by its key
    HasMeta(key string) (ok bool)               // Check if a meta data key is present
    RemoveMeta(key string)                      // Remove a meta data tag by its key

    Fields() map[string]interface{}                   // Map of fields
    AddField(key string, value interface{})           // Add a field
    GetField(key string) (value interface{}, ok bool) // Get a field addressed by its key
    HasField(key string) (ok bool)                    // Check if a field key is present
    RemoveField(key string)                           // Remove a field addressed by its key
}

func NewMessage(name string, tags map[string]string, meta map[string]string, fields map[string]interface{}, tm time.Time) (CCMessage, error)
func FromMessage(other CCMessage) CCMessage
func FromBytes(data []byte) ([]CCMessage, error)

The CCMessage interface provides the same functions as the MutableMetric like {Add, Get, Remove, Has}{Tag, Field} and additionally provides {Add, Get, Remove, Has}Meta.

The InfluxDB protocol creates a new metric with influx.New(name, tags, fields, time) while CCMessage uses ccMessage.New(name, tags, meta, fields, time) where tags and meta are both of type map[string]string.

You can copy a CCMessage with FromMessage(other CCMessage) CCMessage. To parse InfluxDB line protocol data, use FromBytes(data []byte) ([]CCMessage, error) which decodes one or more messages from line protocol format.

Although the cc-specifications defines that there is only a value field for the metric value, the CCMessage still can have multiple values similar to the InfluxDB line protocol.

Design Decisions

Meta vs Tags

CCMessage extends InfluxDB line protocol with a separate meta map in addition to tags. This separation serves important purposes:

  • Tags are used for filtering and querying in time-series databases. They're indexed and optimized for search operations.
  • Meta fields contain descriptive metadata that doesn't need to be indexed (e.g., unit, scope, source).

When converting to InfluxDB line protocol via ToPoint() or ToLineProtocol(), you control which meta fields should be promoted to tags using the metaAsTags parameter. This provides flexibility without forcing all metadata into the tag space.

Example:

msg, _ := ccMessage.NewMetric(
    "cpu_usage",
    map[string]string{"hostname": "node001", "type": "node"},  // Tags for querying
    map[string]string{"unit": "percent", "scope": "hwthread"}, // Meta for context
    75.5,
    time.Now(),
)

// Convert with unit as tag, scope remains in meta (not exported)
lp := msg.ToLineProtocol(map[string]bool{"unit": true})
Tag Sorting

When serializing messages to InfluxDB line protocol (via Bytes() or ToLineProtocol()), tags are sorted alphabetically by key. This ensures:

  1. Deterministic output: Same message always produces identical serialization
  2. Test reliability: Output can be compared for equality in tests
  3. Cache efficiency: Consistent ordering improves cache hit rates in downstream systems
  4. Line protocol compliance: InfluxDB recommends sorted tags for optimal performance

The sorting happens only during serialization; tags are stored in an unsorted map internally for efficient access.

Message Type Helpers

The ccMessage package provides convenient helper functions to create different types of messages:

Metrics

Metrics are numerical measurements from the monitored system.

msg, err := ccMessage.NewMetric(
    "cpu_usage",                           // metric name
    map[string]string{"type": "node"},     // tags
    map[string]string{"unit": "percent"},  // meta
    75.5,                                  // value (numeric)
    time.Now(),                            // timestamp
)
Events

Events represent significant occurrences in the system.

msg, err := ccMessage.NewEvent(
    "node_down",                          // event name
    map[string]string{"severity": "critical"}, // tags
    nil,                                  // meta
    "Node node001 is unreachable",        // event payload
    time.Now(),                           // timestamp
)

For job-related events:

// Job start event
startMsg, err := ccMessage.NewJobStartEvent(job)

// Job stop event
stopMsg, err := ccMessage.NewJobStopEvent(job)

// Check if message is a job event
if eventName, ok := msg.IsJobEvent(); ok {
    job, err := msg.GetJob()  // Deserialize job data
}
Logs

Log messages transmit textual log data through the system.

msg, err := ccMessage.NewLog(
    "application_log",                       // log category
    map[string]string{"level": "error"},     // tags
    map[string]string{"source": "backend"}, // meta
    "Database connection failed: timeout",   // log message
    time.Now(),                              // timestamp
)
Control Messages

Control messages request or set configuration values.

// GET control - request current value
getMsg, err := ccMessage.NewGetControl(
    "sampling_rate",  // parameter name
    nil,              // tags
    nil,              // meta
    time.Now(),       // timestamp
)

// PUT control - set new value
putMsg, err := ccMessage.NewPutControl(
    "sampling_rate",  // parameter name
    nil,              // tags
    nil,              // meta
    "10",             // new value
    time.Now(),       // timestamp
)
Query Messages

Query messages contain database queries or search requests.

msg, err := ccMessage.NewQuery(
    "metrics_query",  // query name
    nil,              // tags
    nil,              // meta
    "SELECT * FROM metrics WHERE timestamp > NOW() - INTERVAL '1h'", // query string
    time.Now(),       // timestamp
)

Type Detection

CCMessage provides methods to detect the message type:

// Check message type
switch msg.MessageType() {
case ccMessage.CCMSG_TYPE_METRIC:
    value := msg.GetMetricValue()
case ccMessage.CCMSG_TYPE_EVENT:
    event := msg.GetEventValue()
case ccMessage.CCMSG_TYPE_LOG:
    log := msg.GetLogValue()
case ccMessage.CCMSG_TYPE_CONTROL:
    value := msg.GetControlValue()
    method := msg.GetControlMethod()  // "GET" or "PUT"
}

// Or use individual type checks
if msg.IsMetric() {
    value := msg.GetMetricValue()
}
if msg.IsEvent() {
    payload := msg.GetEventValue()
}
if msg.IsLog() {
    logText := msg.GetLogValue()
}
if msg.IsControl() {
    value := msg.GetControlValue()
    method := msg.GetControlMethod()
}
if msg.IsQuery() {
    query := msg.GetQueryValue()
}

Common Usage Patterns

Working with Metrics
// Create a metric with validation
msg, err := ccMessage.NewMetric(
    "memory_used",
    map[string]string{
        "hostname": "node001",
        "type":     "node",
    },
    map[string]string{
        "unit":  "bytes",
        "scope": "node",
    },
    int64(8589934592), // 8 GB
    time.Now(),
)
if err != nil {
    log.Fatalf("Failed to create metric: %v", err)
}

// Access metric value with type checking
if value, ok := msg.GetMetricValue(); ok {
    switch v := value.(type) {
    case int64:
        fmt.Printf("Integer metric: %d\n", v)
    case uint64:
        fmt.Printf("Unsigned metric: %d\n", v)
    case float64:
        fmt.Printf("Float metric: %.2f\n", v)
    }
}

// Convert to InfluxDB line protocol with unit as tag
lineProtocol := msg.ToLineProtocol(map[string]bool{"unit": true})
fmt.Println(lineProtocol)
// Output: memory_used,hostname=node001,type=node,unit=bytes value=8589934592 1234567890000000000
Parsing Line Protocol
// Parse InfluxDB line protocol data
data := []byte(`cpu_usage,hostname=node001,type=node value=75.5 1234567890000000000
mem_used,hostname=node001,type=node value=8192 1234567890000000000`)

messages, err := ccMessage.FromBytes(data)
if err != nil {
    log.Fatalf("Failed to parse: %v", err)
}

for _, msg := range messages {
    fmt.Printf("Metric: %s = %v\n", msg.Name(), msg.GetMetricValue())
}
Handling Events with JSON Payloads
// Create event with structured data
eventData := map[string]interface{}{
    "node":      "node001",
    "status":    "down",
    "timestamp": time.Now().Unix(),
    "reason":    "network timeout",
}
jsonPayload, _ := json.Marshal(eventData)

event, err := ccMessage.NewEvent(
    "node_failure",
    map[string]string{"severity": "critical", "cluster": "production"},
    nil,
    string(jsonPayload),
    time.Now(),
)

// Parse event payload
if payload, ok := event.GetEventValue(); ok {
    var data map[string]interface{}
    if err := json.Unmarshal([]byte(payload), &data); err == nil {
        fmt.Printf("Node %s is %s\n", data["node"], data["status"])
    }
}
Message Transformation
// Clone and modify a message
original, _ := ccMessage.NewMetric("cpu_usage", nil, nil, 50.0, time.Now())

// Create independent copy
modified := ccMessage.FromMessage(original)
modified.AddTag("datacenter", "dc1")
modified.AddMeta("aggregated", "true")

// Original remains unchanged
fmt.Printf("Original tags: %v\n", original.Tags())   // map[]
fmt.Printf("Modified tags: %v\n", modified.Tags())   // map[datacenter:dc1]
Working with Control Messages
// Request current sampling rate
getRequest, _ := ccMessage.NewGetControl(
    "sampling_rate",
    map[string]string{"component": "collector"},
    nil,
    time.Now(),
)

// Check control method
if method, ok := getRequest.GetControlMethod(); ok {
    fmt.Printf("Control method: %s\n", method) // "GET"
}

// Update sampling rate
putRequest, _ := ccMessage.NewPutControl(
    "sampling_rate",
    map[string]string{"component": "collector"},
    nil,
    "5",
    time.Now(),
)

if value, ok := putRequest.GetControlValue(); ok {
    fmt.Printf("New value: %s\n", value) // "5"
}
Batch Processing
// Process multiple messages
metrics := []struct {
    name  string
    value float64
}{
    {"cpu_usage", 75.5},
    {"mem_usage", 82.3},
    {"disk_usage", 45.1},
}

var messages []ccMessage.CCMessage
for _, m := range metrics {
    msg, err := ccMessage.NewMetric(
        m.name,
        map[string]string{"hostname": "node001"},
        map[string]string{"unit": "percent"},
        m.value,
        time.Now(),
    )
    if err != nil {
        log.Printf("Skipping metric %s: %v", m.name, err)
        continue
    }
    messages = append(messages, msg)
}

// Convert all to line protocol
for _, msg := range messages {
    lp := msg.ToLineProtocol(map[string]bool{"unit": true})
    fmt.Println(lp)
}
Type-Safe Message Handling
func processMessage(msg ccMessage.CCMessage) {
    switch msg.MessageType() {
    case ccMessage.CCMSG_TYPE_METRIC:
        if value, ok := msg.GetMetricValue(); ok {
            fmt.Printf("Processing metric %s: %v\n", msg.Name(), value)
            // Send to time-series database
        }
    
    case ccMessage.CCMSG_TYPE_EVENT:
        if event, ok := msg.GetEventValue(); ok {
            fmt.Printf("Processing event %s: %s\n", msg.Name(), event)
            // Send to event log
        }
    
    case ccMessage.CCMSG_TYPE_LOG:
        if logMsg, ok := msg.GetLogValue(); ok {
            fmt.Printf("Processing log %s: %s\n", msg.Name(), logMsg)
            // Send to logging system
        }
    
    case ccMessage.CCMSG_TYPE_CONTROL:
        if method, ok := msg.GetControlMethod(); ok {
            value, _ := msg.GetControlValue()
            fmt.Printf("Control %s %s = %s\n", method, msg.Name(), value)
            // Handle configuration change
        }
    
    default:
        fmt.Printf("Unknown message type: %s\n", msg.Name())
    }
}

Error Handling and Validation

Input Validation

All message creation functions perform validation and return errors for invalid inputs:

// Empty names are rejected
msg, err := ccMessage.NewMetric("", nil, nil, 123, time.Now())
// Error: message name cannot be empty

// Zero timestamps are rejected
msg, err := ccMessage.NewMetric("test", nil, nil, 123, time.Time{})
// Error: timestamp cannot be zero

// Empty keys are rejected
msg, err := ccMessage.NewMetric("test",
    map[string]string{"": "value"}, // empty tag key
    nil, 123, time.Now())
// Error: tag keys cannot be empty

// NaN and Inf values are rejected
msg, err := ccMessage.NewMetric("test", nil, nil, math.NaN(), time.Now())
// Error: field 'value' has invalid float value (NaN or Inf)

// At least one field is required
msg, err := ccMessage.NewMessage("test", nil, nil,
    map[string]any{}, // no fields
    time.Now())
// Error: at least one field is required
Type Conversion and Handling
// Automatic type conversion
msg, _ := ccMessage.NewMetric("test", nil, nil, int32(100), time.Now())
value, _ := msg.GetMetricValue()
// value is int64(100), not int32

// Unsupported types become nil and are skipped
type customType struct{ value int }
msg, err := ccMessage.NewMessage("test", nil, nil,
    map[string]any{
        "valid": 123,
        "invalid": customType{42}, // unsupported type
    },
    time.Now())
// err == nil, but "invalid" field is not present in message

// Checking for nil pointer values
var ptr *int64 = nil
msg, _ := ccMessage.NewMessage("test", nil, nil,
    map[string]any{"value": ptr},
    time.Now())
// "value" field will not be present (nil pointers are skipped)
Safe Type Assertions
// Always use the ok pattern
if value, ok := msg.GetMetricValue(); ok {
    // Safe to use value
    fmt.Printf("Metric value: %v\n", value)
} else {
    // Not a metric or no value field
    fmt.Println("Not a metric message")
}

// Type-specific value retrieval already checks type
if logMsg, ok := msg.GetLogValue(); ok {
    // Guaranteed to be a string
    fmt.Println(logMsg)
}

// Don't panic on type assertions
value, ok := msg.GetMetricValue()
if !ok {
    return errors.New("expected metric message")
}
// Now safe to use value
Concurrent Access Patterns
// WRONG: Concurrent modification without synchronization
msg, _ := ccMessage.NewMetric("test", nil, nil, 0.0, time.Now())
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(val int) {
        defer wg.Done()
        msg.AddTag(fmt.Sprintf("tag%d", val), "value") // RACE CONDITION!
    }(i)
}
wg.Wait()

// CORRECT: Use mutex for synchronization
var mu sync.Mutex
msg, _ := ccMessage.NewMetric("test", nil, nil, 0.0, time.Now())
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(val int) {
        defer wg.Done()
        mu.Lock()
        msg.AddTag(fmt.Sprintf("tag%d", val), "value")
        mu.Unlock()
    }(i)
}
wg.Wait()

// BETTER: Create separate messages per goroutine
original, _ := ccMessage.NewMetric("test", nil, nil, 0.0, time.Now())
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(val int) {
        defer wg.Done()
        // Each goroutine gets its own copy
        msg := ccMessage.FromMessage(original)
        msg.AddTag(fmt.Sprintf("tag%d", val), "value")
        // Process msg independently
    }(i)
}
wg.Wait()
Serialization Error Handling
// Handle serialization errors
msg, _ := ccMessage.NewMetric("test", nil, nil, 123.45, time.Now())

// Line protocol conversion
lp := msg.ToLineProtocol(map[string]bool{})
// Line protocol conversion doesn't return errors (uses panic recovery)

// Bytes conversion can fail
bytes, err := msg.(*ccmessage.ccMessage).Bytes()
if err != nil {
    log.Printf("Serialization failed: %v", err)
    // Error might indicate unsupported field type or encoding issue
}

// JSON conversion can fail
json, err := msg.ToJSON(map[string]bool{})
if err != nil {
    log.Printf("JSON conversion failed: %v", err)
}

// Parsing can fail with detailed errors
data := []byte("invalid line protocol !!!")
messages, err := ccMessage.FromBytes(data)
if err != nil {
    log.Printf("Failed to parse: %v", err)
    // Error will indicate what went wrong (invalid measurement, tags, etc.)
}

Best Practices

  1. Use appropriate message types: Choose the correct message type for your data. Use metrics for numerical measurements, events for significant occurrences, logs for textual output, and control messages for configuration.

  2. Leverage tags for categorization: Use tags to categorize messages for efficient filtering and querying. Common tags include type, hostname, cluster, severity.

  3. Store metadata in meta fields: Use meta fields for information that describes the data but shouldn't be used for filtering, such as unit, scope, source.

  4. Handle timestamps consistently: Always use appropriate timestamps for your messages. For job events, use the job's actual start time.

  5. Validate before use: Check error returns from message creation functions and type assertion operations.

  6. Deep copy when needed: Use FromMessage() to create independent copies of messages when you need to modify them without affecting the original.

  7. Type checking: Use the type detection methods (IsMetric(), IsEvent(), etc.) before accessing type-specific values to avoid runtime errors.

  8. Thread safety: CCMessage instances are NOT thread-safe. If you need to access a message from multiple goroutines, either use external synchronization (mutexes) or create separate copies with FromMessage() for each goroutine.

Documentation

Overview

Package ccmessage provides a message format and interface for ClusterCockpit.

CCMessage extends the InfluxDB line protocol with additional meta information, supporting multiple message types: metrics, events, logs, control messages, and queries.

Message Types

The package supports five message types:

  • Metric: Numerical measurements (CPU usage, memory, network throughput)
  • Event: Significant occurrences (job start/stop, node failures)
  • Log: Textual log messages
  • Control: Configuration requests (GET) or updates (PUT)
  • Query: Database queries or search requests

Basic Usage

// Create a metric
msg, err := ccmessage.NewMetric(
    "cpu_usage",
    map[string]string{"hostname": "node001"},
    map[string]string{"unit": "percent"},
    75.5,
    time.Now(),
)

// Check message type
if msg.IsMetric() {
    value, ok := msg.GetMetricValue()
    // ...
}

// Convert to InfluxDB line protocol
lineProtocol := msg.ToLineProtocol(map[string]bool{"unit": true})

Thread Safety

CCMessage instances are NOT thread-safe. For concurrent access, either use external synchronization or create separate copies with FromMessage().

Meta vs Tags

Tags are indexed in time-series databases for querying, while meta fields store descriptive metadata. Use the metaAsTags parameter when converting to control which meta fields become tags.

Index

Examples

Constants

View Source
const (
	MIN_CCMSG_TYPE     = CCMSG_TYPE_METRIC
	MAX_CCMSG_TYPE     = CCMSG_TYPE_QUERY
	CCMSG_TYPE_INVALID = MAX_CCMSG_TYPE + 1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CCMessage

type CCMessage interface {
	ToPoint(metaAsTags map[string]bool) *write.Point            // Generate influxDB point for data type ccMessage
	ToLineProtocol(metaAsTags map[string]bool) string           // Generate influxDB line protocol for data type ccMessage
	ToJSON(metaAsTags map[string]bool) (json.RawMessage, error) // Generate JSON representation

	Name() string        // Get metric name
	SetName(name string) // Set metric name

	Time() time.Time     // Get timestamp
	SetTime(t time.Time) // Set timestamp

	Tags() map[string]string                   // Map of tags
	AddTag(key, value string)                  // Add a tag
	GetTag(key string) (value string, ok bool) // Get a tag by its key
	HasTag(key string) (ok bool)               // Check if a tag key is present
	RemoveTag(key string)                      // Remove a tag by its key

	Meta() map[string]string                    // Map of meta data tags
	AddMeta(key, value string)                  // Add a meta data tag
	GetMeta(key string) (value string, ok bool) // Get a meta data tab addressed by its key
	HasMeta(key string) (ok bool)               // Check if a meta data key is present
	RemoveMeta(key string)                      // Remove a meta data tag by its key

	Fields() map[string]any                   // Map of fields
	AddField(key string, value any)           // Add a field
	GetField(key string) (value any, ok bool) // Get a field addressed by its key
	HasField(key string) (ok bool)            // Check if a field key is present
	RemoveField(key string)                   // Remove a field addressed by its key
	String() string                           // Return line-protocol like string

	MessageType() CCMessageType // Return message type
	IsMetric() bool             // Check if message is a metric
	GetMetricValue() (value any, ok bool)
	IsLog() bool // Check if message is a log
	GetLogValue() (value string, ok bool)
	IsEvent() bool // Check if message is an event
	GetEventValue() (value string, ok bool)
	IsControl() bool // Check if message is a control message
	GetControlValue() (value string, ok bool)
	GetControlMethod() (method string, ok bool)
	IsQuery() bool // Check if message is a query
	GetQueryValue() (value string, ok bool)
	IsJobEvent() (eventName string, ok bool) // Check if message is a job event (returns event name and bool)
	GetJob() (*schema.Job, error)
}

CCMessage is the interface for accessing and manipulating ClusterCockpit messages. It provides methods for converting to other formats (InfluxDB point, Line Protocol, JSON), accessing metadata (Name, Time, Tags, Meta, Fields), and checking message type.

Thread Safety: CCMessage instances are NOT thread-safe. Concurrent access to the same CCMessage from multiple goroutines must be synchronized externally. For concurrent use, either use locking mechanisms or create separate message instances using FromMessage().

func EmptyMessage

func EmptyMessage() CCMessage

EmptyMessage creates a new empty CCMessage.

func FromBytes

func FromBytes(data []byte) ([]CCMessage, error)

FromBytes creates a list of CCMessages from a byte slice containing InfluxDB line protocol data.

Example
data := []byte("cpu_usage,hostname=node001 value=75.5 1234567890000000000")

messages, err := ccmessage.FromBytes(data)
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

for _, msg := range messages {
	fmt.Printf("Metric: %s\n", msg.Name())
	if value, ok := msg.GetMetricValue(); ok {
		fmt.Printf("Value: %v\n", value)
	}
}
Output:

Metric: cpu_usage
Value: 75.5

func FromInfluxMetric deprecated

func FromInfluxMetric(other lp1.Metric) CCMessage

FromInfluxMetric creates a CCMessage from an InfluxDB line protocol v1 metric.

Deprecated: This function depends on the deprecated line-protocol v1 library. Use FromBytes() instead for parsing line protocol data with the v2 library.

func FromJSON

func FromJSON(input json.RawMessage) (CCMessage, error)

FromJSON creates a CCMessage from a JSON representation.

func FromMessage

func FromMessage(other CCMessage) CCMessage

FromMessage creates a deep copy of the given CCMessage.

Example
original, _ := ccmessage.NewMetric("cpu_usage", nil, nil, 50.0, time.Now())

copy := ccmessage.FromMessage(original)
copy.AddTag("modified", "true")

fmt.Printf("Original has tag: %v\n", original.HasTag("modified"))
fmt.Printf("Copy has tag: %v\n", copy.HasTag("modified"))
Output:

Original has tag: false
Copy has tag: true

func NewEvent

func NewEvent(name string,
	tags map[string]string,
	meta map[string]string,
	event string,
	tm time.Time,
) (CCMessage, error)

NewEvent creates a new event message. Events represent significant occurrences in the ClusterCockpit system, such as job starts/stops, system state changes, or other notable incidents.

Parameters:

  • name: The name/type of the event (e.g., "start_job", "stop_job", "node_down")
  • tags: Optional tags for categorizing the event
  • meta: Optional metadata information
  • event: The event payload as a string (can be JSON, plain text, or any other format)
  • tm: Timestamp when the event occurred

Returns a CCMessage with the "event" field set to the provided event payload.

Example
msg, err := ccmessage.NewEvent(
	"node_down",
	map[string]string{"severity": "critical"},
	nil,
	"Node node001 is unreachable",
	time.Unix(1234567890, 0),
)
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Event: %s\n", msg.Name())
if event, ok := msg.GetEventValue(); ok {
	fmt.Printf("Details: %s\n", event)
}
Output:

Event: node_down
Details: Node node001 is unreachable

func NewGetControl

func NewGetControl(name string,
	tags map[string]string,
	meta map[string]string,
	tm time.Time,
) (CCMessage, error)

NewGetControl creates a new control message with a GET method. Control messages are used to request or set configuration values in the ClusterCockpit system. A GET control message requests the current value of the specified control parameter.

Parameters:

  • name: The name of the control parameter to query
  • tags: Optional tags for categorizing the control message
  • meta: Optional metadata information
  • tm: Timestamp when the control message was created

Returns a CCMessage with the "control" field set to an empty string and a "method" tag set to "GET".

Example
msg, _ := ccmessage.NewGetControl(
	"sampling_rate",
	map[string]string{"component": "collector"},
	nil,
	time.Unix(1234567890, 0),
)

if method, ok := msg.GetControlMethod(); ok {
	fmt.Printf("Method: %s\n", method)
}
fmt.Printf("Parameter: %s\n", msg.Name())
Output:

Method: GET
Parameter: sampling_rate

func NewJobStartEvent

func NewJobStartEvent(job *schema.Job) (CCMessage, error)

NewJobStartEvent creates an event message for a job start. The job information is serialized to JSON and embedded in the event payload.

Parameters:

  • job: Pointer to the schema.Job structure containing job information

Returns a CCMessage with name "start_job" and the job data serialized as JSON in the event field. The timestamp is set to the job's start time.

func NewJobStopEvent

func NewJobStopEvent(job *schema.Job) (CCMessage, error)

NewJobStopEvent creates an event message for a job stop. The job information is serialized to JSON and embedded in the event payload.

Parameters:

  • job: Pointer to the schema.Job structure containing job information

Returns a CCMessage with name "stop_job" and the job data serialized as JSON in the event field. The timestamp is set to the job's start time.

func NewLog

func NewLog(name string,
	tags map[string]string,
	meta map[string]string,
	log string,
	tm time.Time,
) (CCMessage, error)

NewLog creates a new log message. Log messages are used to transmit textual log data through the ClusterCockpit messaging system.

Parameters:

  • name: The name/category of the log message (e.g., "system_log", "application_log")
  • tags: Optional tags for categorizing the log message (e.g., severity, source)
  • meta: Optional metadata information
  • log: The log message content as a string
  • tm: Timestamp when the log message was generated

Returns a CCMessage with the "log" field set to the provided log content.

func NewMessage

func NewMessage(
	name string,
	tags map[string]string,
	meta map[string]string,
	fields map[string]any,
	tm time.Time,
) (CCMessage, error)

NewMessage creates a new CCMessage with the given name, tags, meta, fields, and timestamp.

Parameters:

  • name: Message/metric name (must not be empty or whitespace-only)
  • tags: Key-value pairs for indexing and querying (keys must not be empty)
  • meta: Metadata for context (keys must not be empty)
  • fields: Data fields (at least one valid field required, keys must not be empty)
  • tm: Timestamp (must not be zero value)

Returns an error if:

  • name is empty or whitespace-only
  • timestamp is zero
  • any tag, meta, or field key is empty or whitespace-only
  • no fields provided
  • all field values are nil or invalid after type conversion
  • any float field is NaN or Inf

Field values are automatically converted to standard types:

  • All integer types → int64 or uint64
  • All float types → float64
  • []byte → string
  • Pointer types are dereferenced

Example:

msg, err := NewMessage(
    "cpu_usage",
    map[string]string{"hostname": "node001", "type": "node"},
    map[string]string{"unit": "percent", "scope": "hwthread"},
    map[string]any{"value": 75.5},
    time.Now(),
)
if err != nil {
    // handle validation error
}
Example (Validation)
_, err1 := ccmessage.NewMessage("", nil, nil, map[string]any{"value": 1}, time.Now())
_, err2 := ccmessage.NewMessage("test", nil, nil, map[string]any{"value": 1}, time.Time{})
_, err3 := ccmessage.NewMessage("test", nil, nil, map[string]any{}, time.Now())

fmt.Printf("Empty name error: %v\n", err1 != nil)
fmt.Printf("Zero timestamp error: %v\n", err2 != nil)
fmt.Printf("No fields error: %v\n", err3 != nil)
Output:

Empty name error: true
Zero timestamp error: true
No fields error: true

func NewMetric

func NewMetric(name string,
	tags map[string]string,
	meta map[string]string,
	value any,
	tm time.Time,
) (CCMessage, error)

NewMetric creates a new metric message. Metrics represent numerical measurements from the monitored system, such as CPU usage, memory consumption, network throughput, etc. This is the primary message type in ClusterCockpit for performance monitoring data.

Parameters:

  • name: The metric name (e.g., "cpu_usage", "mem_used", "flops_any")
  • tags: Optional tags for categorizing the metric (e.g., "type": "node", "hostname": "node001")
  • meta: Optional metadata information (e.g., "unit": "bytes", "scope": "node")
  • value: The metric value (can be int, float, uint, or other numeric types)
  • tm: Timestamp when the metric was collected

Returns a CCMessage with the "value" field set to the provided metric value. Note: Unlike events and logs, metric values should be numeric, not strings.

Example
msg, err := ccmessage.NewMetric(
	"cpu_usage",
	map[string]string{"hostname": "node001", "type": "node"},
	map[string]string{"unit": "percent"},
	75.5,
	time.Unix(1234567890, 0),
)
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

fmt.Printf("Name: %s\n", msg.Name())
if value, ok := msg.GetMetricValue(); ok {
	fmt.Printf("Value: %.1f\n", value)
}
Output:

Name: cpu_usage
Value: 75.5

func NewPutControl

func NewPutControl(name string,
	tags map[string]string,
	meta map[string]string,
	value string,
	tm time.Time,
) (CCMessage, error)

NewPutControl creates a new control message with a PUT method. Control messages are used to request or set configuration values in the ClusterCockpit system. A PUT control message sets a new value for the specified control parameter.

Parameters:

  • name: The name of the control parameter to set
  • tags: Optional tags for categorizing the control message
  • meta: Optional metadata information
  • value: The new value to set for the control parameter
  • tm: Timestamp when the control message was created

Returns a CCMessage with the "control" field set to the provided value and a "method" tag set to "PUT".

Example
msg, _ := ccmessage.NewPutControl(
	"sampling_rate",
	nil,
	nil,
	"10",
	time.Unix(1234567890, 0),
)

if method, ok := msg.GetControlMethod(); ok {
	fmt.Printf("Method: %s\n", method)
}
if value, ok := msg.GetControlValue(); ok {
	fmt.Printf("New value: %s\n", value)
}
Output:

Method: PUT
New value: 10

func NewQuery

func NewQuery(name string,
	tags map[string]string,
	meta map[string]string,
	q string,
	tm time.Time,
) (CCMessage, error)

NewQuery creates a new CCMessage of type Query

type CCMessageType

type CCMessageType int

CCMessageType defines the type of a CCMessage

const (
	CCMSG_TYPE_METRIC  CCMessageType = iota // Metric message type
	CCMSG_TYPE_EVENT                        // Event message type
	CCMSG_TYPE_LOG                          // Log message type
	CCMSG_TYPE_CONTROL                      // Control message type
	CCMSG_TYPE_QUERY                        // Query message type
)

func (CCMessageType) FieldKey

func (t CCMessageType) FieldKey() string

FieldKey returns the key used for the value field in the message

func (CCMessageType) String

func (t CCMessageType) String() string

String returns the string representation of the CCMessageType

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL