Documentation
¶
Overview ¶
Package ccmessage provides a message format and interface for ClusterCockpit.
CCMessage extends the InfluxDB line protocol with additional meta information, supporting multiple message types: metrics, events, logs, control messages, and queries.
Message Types ¶
The package supports five message types:
- Metric: Numerical measurements (CPU usage, memory, network throughput)
- Event: Significant occurrences (job start/stop, node failures)
- Log: Textual log messages
- Control: Configuration requests (GET) or updates (PUT)
- Query: Database queries or search requests
Basic Usage ¶
// Create a metric
msg, err := ccmessage.NewMetric(
"cpu_usage",
map[string]string{"hostname": "node001"},
map[string]string{"unit": "percent"},
75.5,
time.Now(),
)
// Check message type
if msg.IsMetric() {
value, ok := msg.GetMetricValue()
// ...
}
// Convert to InfluxDB line protocol
lineProtocol := msg.ToLineProtocol(map[string]bool{"unit": true})
Thread Safety ¶
CCMessage instances are NOT thread-safe. For concurrent access, either use external synchronization or create separate copies with FromMessage().
Meta vs Tags ¶
Tags are indexed in time-series databases for querying, while meta fields store descriptive metadata. Use the metaAsTags parameter when converting to control which meta fields become tags.
Index ¶
- Constants
- type CCMessage
- func EmptyMessage() CCMessage
- func FromBytes(data []byte) ([]CCMessage, error)
- func FromInfluxMetric(other lp1.Metric) CCMessagedeprecated
- func FromJSON(input json.RawMessage) (CCMessage, error)
- func FromMessage(other CCMessage) CCMessage
- func NewEvent(name string, tags map[string]string, meta map[string]string, event string, ...) (CCMessage, error)
- func NewGetControl(name string, tags map[string]string, meta map[string]string, tm time.Time) (CCMessage, error)
- func NewJobStartEvent(job *schema.Job) (CCMessage, error)
- func NewJobStopEvent(job *schema.Job) (CCMessage, error)
- func NewLog(name string, tags map[string]string, meta map[string]string, log string, ...) (CCMessage, error)
- func NewMessage(name string, tags map[string]string, meta map[string]string, ...) (CCMessage, error)
- func NewMetric(name string, tags map[string]string, meta map[string]string, value any, ...) (CCMessage, error)
- func NewPutControl(name string, tags map[string]string, meta map[string]string, value string, ...) (CCMessage, error)
- func NewQuery(name string, tags map[string]string, meta map[string]string, q string, ...) (CCMessage, error)
- type CCMessageType
Examples ¶
Constants ¶
const ( MIN_CCMSG_TYPE = CCMSG_TYPE_METRIC MAX_CCMSG_TYPE = CCMSG_TYPE_QUERY CCMSG_TYPE_INVALID = MAX_CCMSG_TYPE + 1 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CCMessage ¶
type CCMessage interface {
ToPoint(metaAsTags map[string]bool) *write.Point // Generate influxDB point for data type ccMessage
ToLineProtocol(metaAsTags map[string]bool) string // Generate influxDB line protocol for data type ccMessage
ToJSON(metaAsTags map[string]bool) (json.RawMessage, error) // Generate JSON representation
Name() string // Get metric name
SetName(name string) // Set metric name
Time() time.Time // Get timestamp
SetTime(t time.Time) // Set timestamp
Tags() map[string]string // Map of tags
AddTag(key, value string) // Add a tag
GetTag(key string) (value string, ok bool) // Get a tag by its key
HasTag(key string) (ok bool) // Check if a tag key is present
RemoveTag(key string) // Remove a tag by its key
Meta() map[string]string // Map of meta data tags
AddMeta(key, value string) // Add a meta data tag
GetMeta(key string) (value string, ok bool) // Get a meta data tab addressed by its key
HasMeta(key string) (ok bool) // Check if a meta data key is present
RemoveMeta(key string) // Remove a meta data tag by its key
Fields() map[string]any // Map of fields
AddField(key string, value any) // Add a field
GetField(key string) (value any, ok bool) // Get a field addressed by its key
HasField(key string) (ok bool) // Check if a field key is present
RemoveField(key string) // Remove a field addressed by its key
String() string // Return line-protocol like string
MessageType() CCMessageType // Return message type
IsMetric() bool // Check if message is a metric
GetMetricValue() (value any, ok bool)
IsLog() bool // Check if message is a log
GetLogValue() (value string, ok bool)
IsEvent() bool // Check if message is an event
GetEventValue() (value string, ok bool)
IsControl() bool // Check if message is a control message
GetControlValue() (value string, ok bool)
GetControlMethod() (method string, ok bool)
IsQuery() bool // Check if message is a query
GetQueryValue() (value string, ok bool)
IsJobEvent() (eventName string, ok bool) // Check if message is a job event (returns event name and bool)
GetJob() (*schema.Job, error)
}
CCMessage is the interface for accessing and manipulating ClusterCockpit messages. It provides methods for converting to other formats (InfluxDB point, Line Protocol, JSON), accessing metadata (Name, Time, Tags, Meta, Fields), and checking message type.
Thread Safety: CCMessage instances are NOT thread-safe. Concurrent access to the same CCMessage from multiple goroutines must be synchronized externally. For concurrent use, either use locking mechanisms or create separate message instances using FromMessage().
func FromBytes ¶
FromBytes creates a list of CCMessages from a byte slice containing InfluxDB line protocol data.
Example ¶
data := []byte("cpu_usage,hostname=node001 value=75.5 1234567890000000000")
messages, err := ccmessage.FromBytes(data)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
for _, msg := range messages {
fmt.Printf("Metric: %s\n", msg.Name())
if value, ok := msg.GetMetricValue(); ok {
fmt.Printf("Value: %v\n", value)
}
}
Output: Metric: cpu_usage Value: 75.5
func FromInfluxMetric
deprecated
func FromJSON ¶
func FromJSON(input json.RawMessage) (CCMessage, error)
FromJSON creates a CCMessage from a JSON representation.
func FromMessage ¶
FromMessage creates a deep copy of the given CCMessage.
Example ¶
original, _ := ccmessage.NewMetric("cpu_usage", nil, nil, 50.0, time.Now())
copy := ccmessage.FromMessage(original)
copy.AddTag("modified", "true")
fmt.Printf("Original has tag: %v\n", original.HasTag("modified"))
fmt.Printf("Copy has tag: %v\n", copy.HasTag("modified"))
Output: Original has tag: false Copy has tag: true
func NewEvent ¶
func NewEvent(name string, tags map[string]string, meta map[string]string, event string, tm time.Time, ) (CCMessage, error)
NewEvent creates a new event message. Events represent significant occurrences in the ClusterCockpit system, such as job starts/stops, system state changes, or other notable incidents.
Parameters:
- name: The name/type of the event (e.g., "start_job", "stop_job", "node_down")
- tags: Optional tags for categorizing the event
- meta: Optional metadata information
- event: The event payload as a string (can be JSON, plain text, or any other format)
- tm: Timestamp when the event occurred
Returns a CCMessage with the "event" field set to the provided event payload.
Example ¶
msg, err := ccmessage.NewEvent(
"node_down",
map[string]string{"severity": "critical"},
nil,
"Node node001 is unreachable",
time.Unix(1234567890, 0),
)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Event: %s\n", msg.Name())
if event, ok := msg.GetEventValue(); ok {
fmt.Printf("Details: %s\n", event)
}
Output: Event: node_down Details: Node node001 is unreachable
func NewGetControl ¶
func NewGetControl(name string, tags map[string]string, meta map[string]string, tm time.Time, ) (CCMessage, error)
NewGetControl creates a new control message with a GET method. Control messages are used to request or set configuration values in the ClusterCockpit system. A GET control message requests the current value of the specified control parameter.
Parameters:
- name: The name of the control parameter to query
- tags: Optional tags for categorizing the control message
- meta: Optional metadata information
- tm: Timestamp when the control message was created
Returns a CCMessage with the "control" field set to an empty string and a "method" tag set to "GET".
Example ¶
msg, _ := ccmessage.NewGetControl(
"sampling_rate",
map[string]string{"component": "collector"},
nil,
time.Unix(1234567890, 0),
)
if method, ok := msg.GetControlMethod(); ok {
fmt.Printf("Method: %s\n", method)
}
fmt.Printf("Parameter: %s\n", msg.Name())
Output: Method: GET Parameter: sampling_rate
func NewJobStartEvent ¶
NewJobStartEvent creates an event message for a job start. The job information is serialized to JSON and embedded in the event payload.
Parameters:
- job: Pointer to the schema.Job structure containing job information
Returns a CCMessage with name "start_job" and the job data serialized as JSON in the event field. The timestamp is set to the job's start time.
func NewJobStopEvent ¶
NewJobStopEvent creates an event message for a job stop. The job information is serialized to JSON and embedded in the event payload.
Parameters:
- job: Pointer to the schema.Job structure containing job information
Returns a CCMessage with name "stop_job" and the job data serialized as JSON in the event field. The timestamp is set to the job's start time.
func NewLog ¶
func NewLog(name string, tags map[string]string, meta map[string]string, log string, tm time.Time, ) (CCMessage, error)
NewLog creates a new log message. Log messages are used to transmit textual log data through the ClusterCockpit messaging system.
Parameters:
- name: The name/category of the log message (e.g., "system_log", "application_log")
- tags: Optional tags for categorizing the log message (e.g., severity, source)
- meta: Optional metadata information
- log: The log message content as a string
- tm: Timestamp when the log message was generated
Returns a CCMessage with the "log" field set to the provided log content.
func NewMessage ¶
func NewMessage( name string, tags map[string]string, meta map[string]string, fields map[string]any, tm time.Time, ) (CCMessage, error)
NewMessage creates a new CCMessage with the given name, tags, meta, fields, and timestamp.
Parameters:
- name: Message/metric name (must not be empty or whitespace-only)
- tags: Key-value pairs for indexing and querying (keys must not be empty)
- meta: Metadata for context (keys must not be empty)
- fields: Data fields (at least one valid field required, keys must not be empty)
- tm: Timestamp (must not be zero value)
Returns an error if:
- name is empty or whitespace-only
- timestamp is zero
- any tag, meta, or field key is empty or whitespace-only
- no fields provided
- all field values are nil or invalid after type conversion
- any float field is NaN or Inf
Field values are automatically converted to standard types:
- All integer types → int64 or uint64
- All float types → float64
- []byte → string
- Pointer types are dereferenced
Example:
msg, err := NewMessage(
"cpu_usage",
map[string]string{"hostname": "node001", "type": "node"},
map[string]string{"unit": "percent", "scope": "hwthread"},
map[string]any{"value": 75.5},
time.Now(),
)
if err != nil {
// handle validation error
}
Example (Validation) ¶
_, err1 := ccmessage.NewMessage("", nil, nil, map[string]any{"value": 1}, time.Now())
_, err2 := ccmessage.NewMessage("test", nil, nil, map[string]any{"value": 1}, time.Time{})
_, err3 := ccmessage.NewMessage("test", nil, nil, map[string]any{}, time.Now())
fmt.Printf("Empty name error: %v\n", err1 != nil)
fmt.Printf("Zero timestamp error: %v\n", err2 != nil)
fmt.Printf("No fields error: %v\n", err3 != nil)
Output: Empty name error: true Zero timestamp error: true No fields error: true
func NewMetric ¶
func NewMetric(name string, tags map[string]string, meta map[string]string, value any, tm time.Time, ) (CCMessage, error)
NewMetric creates a new metric message. Metrics represent numerical measurements from the monitored system, such as CPU usage, memory consumption, network throughput, etc. This is the primary message type in ClusterCockpit for performance monitoring data.
Parameters:
- name: The metric name (e.g., "cpu_usage", "mem_used", "flops_any")
- tags: Optional tags for categorizing the metric (e.g., "type": "node", "hostname": "node001")
- meta: Optional metadata information (e.g., "unit": "bytes", "scope": "node")
- value: The metric value (can be int, float, uint, or other numeric types)
- tm: Timestamp when the metric was collected
Returns a CCMessage with the "value" field set to the provided metric value. Note: Unlike events and logs, metric values should be numeric, not strings.
Example ¶
msg, err := ccmessage.NewMetric(
"cpu_usage",
map[string]string{"hostname": "node001", "type": "node"},
map[string]string{"unit": "percent"},
75.5,
time.Unix(1234567890, 0),
)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
fmt.Printf("Name: %s\n", msg.Name())
if value, ok := msg.GetMetricValue(); ok {
fmt.Printf("Value: %.1f\n", value)
}
Output: Name: cpu_usage Value: 75.5
func NewPutControl ¶
func NewPutControl(name string, tags map[string]string, meta map[string]string, value string, tm time.Time, ) (CCMessage, error)
NewPutControl creates a new control message with a PUT method. Control messages are used to request or set configuration values in the ClusterCockpit system. A PUT control message sets a new value for the specified control parameter.
Parameters:
- name: The name of the control parameter to set
- tags: Optional tags for categorizing the control message
- meta: Optional metadata information
- value: The new value to set for the control parameter
- tm: Timestamp when the control message was created
Returns a CCMessage with the "control" field set to the provided value and a "method" tag set to "PUT".
Example ¶
msg, _ := ccmessage.NewPutControl(
"sampling_rate",
nil,
nil,
"10",
time.Unix(1234567890, 0),
)
if method, ok := msg.GetControlMethod(); ok {
fmt.Printf("Method: %s\n", method)
}
if value, ok := msg.GetControlValue(); ok {
fmt.Printf("New value: %s\n", value)
}
Output: Method: PUT New value: 10
type CCMessageType ¶
type CCMessageType int
CCMessageType defines the type of a CCMessage
const ( CCMSG_TYPE_METRIC CCMessageType = iota // Metric message type CCMSG_TYPE_EVENT // Event message type CCMSG_TYPE_LOG // Log message type CCMSG_TYPE_CONTROL // Control message type CCMSG_TYPE_QUERY // Query message type )
func (CCMessageType) FieldKey ¶
func (t CCMessageType) FieldKey() string
FieldKey returns the key used for the value field in the message
func (CCMessageType) String ¶
func (t CCMessageType) String() string
String returns the string representation of the CCMessageType