receivers

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2025 License: MIT Imports: 28 Imported by: 0

README

CCMetric receivers

The receivers package provides a modular system for collecting metrics, events, and logs from various sources. It defines a common Receiver interface and a ReceiveManager to orchestrate multiple receiver instances.

Architecture Overview

Receivers are the entry points for data into the ClusterCockpit monitoring stack. They collect or receive data from external sources, convert it into the internal CCMessage format, and send it to a unified sink channel.

Component Roles
  • Receiver: A component that gathers metrics from a specific source (e.g., HTTP POSTs, NATS subscriptions, or direct hardware queries like IPMI).
  • ReceiveManager: Orchestrates the lifecycle of all configured receivers. It initializes, starts, and stops them, and ensures they all point to the correct output channel.
  • Message Processor: An optional component within each receiver that can filter, rename, or transform messages before they are sent to the sink.
Message Flow
  1. Ingress: Data arrives at the receiver (e.g., an HTTP request or a scheduled sensor read).
  2. Decoding: The raw data is decoded into one or more CCMessage objects.
  3. Processing (Optional): If process_messages is configured, the messages pass through a processing pipeline.
  4. Egress: The processed messages are sent to the receiver's sink channel.
  5. Aggregation: All receivers send their messages to the same output channel managed by the ReceiveManager, which typically leads to the SinkManager.
graph LR
    S1[Source 1] --> R1[Receiver 1]
    S2[Source 2] --> R2[Receiver 2]
    R1 --> MP1[Message Processor 1]
    R2 --> MP2[Message Processor 2]
    MP1 --> OC[Output Channel]
    MP2 --> OC
    OC --> SM[Sink Manager]

Configuration

The ReceiveManager is configured with a JSON object where each key is a unique name for a receiver instance, and the value is its configuration.

Common Configuration Options

Every receiver supports these common fields:

  • type: (Required) The type of receiver to initialize (e.g., "http", "nats").
  • process_messages: (Optional) A list of message processing rules. See the messageProcessor documentation for details.

Example:

{
  "my_http_receiver" : {
    "type": "http",
    "port": "8080",
    "process_messages": [
      { "add_meta": { "source": "frontend-http" } }
    ]
  },
  "my_nats_receiver": {
    "type": "nats",
    "address": "nats.example.com",
    "subject": "metrics"
  }
}

The Receiver Interface

All receivers must implement the Receiver interface defined in metricReceiver.go:

type Receiver interface {
    Start()                         // Start begins the metric collection process
    Close()                         // Close stops the receiver and releases resources
    Name() string                   // Name returns the receiver's identifier
    SetSink(sink chan lp.CCMessage) // SetSink configures the output channel for collected metrics
}
  • Start(): This method is called to begin operation. For server-like receivers (e.g., HTTP), it starts the listener. For polling receivers (e.g., IPMI), it starts the collection loop. It should typically be non-blocking or start its own goroutines.
  • Close(): Gracefully shuts down the receiver, closing network connections, stopping goroutines, and releasing resources.
  • Name(): Returns a human-readable name for the receiver instance, usually including its type and the name from the configuration.
  • SetSink(): Receives the channel where all collected CCMessage objects should be sent.

Available Receivers

Type Description Platform
http Receives InfluxDB line protocol via HTTP POST requests. All
nats Subscribes to NATS subjects to receive metrics. All
prometheus Scrapes metrics from Prometheus-compatible endpoints. All
eecpt Specialized HTTP receiver for EECPT instrumentation. All
ipmi Polls hardware metrics via IPMI (requires freeipmi). Linux
redfish Polls hardware metrics via the Redfish API. Linux

Utilities

InfluxDB Decoding

The influxDecoder.go provides a helper function DecodeInfluxMessage to simplify parsing InfluxDB line protocol data into CCMessage objects. This is used by most receivers that accept line protocol data.

import "github.com/ClusterCockpit/cc-lib/v2/receivers"
// ...
msg, err := receivers.DecodeInfluxMessage(decoder)

Contributing Own Receivers

To add a new receiver type:

  1. Define Configuration: Create a struct for your receiver's configuration, embedding defaultReceiverConfig.
  2. Implement the Interface: Create a struct that implements Receiver. Use the receiver base struct from metricReceiver.go to gain default implementations of Name() and SetSink().
  3. Implement Factory Function: Create a New<Type>Receiver(name string, config json.RawMessage) function.
  4. Register Receiver: Add your factory function to AvailableReceivers in availableReceivers.go (or availableReceiversLinux.go for Linux-only receivers).
  5. Documentation: Create a <type>Receiver.md file and update this README.

Refer to sampleReceiver.go for a complete, documented template.

Best Practices
  • Goroutine Management: Always ensure goroutines started in Start() are cleaned up in Close(). Use sync.WaitGroup and stop channels.
  • Error Handling: Use the ccLogger package to log errors and debug information.
  • Message Processing: Always initialize and use a messageProcessor in your receiver to support the process_messages config option.
  • Testing: Provide a _test.go file. Use the ccMessage package to verify the messages produced by your receiver.

Troubleshooting

  • Receiver won't start: Check the logs for "ReceiveManager: SKIP" messages. This usually indicates a configuration error (e.g., missing type field or unknown receiver type).
  • No data received:
    • Verify the network connectivity and address/port configuration.
    • Check if basic authentication is required and configured correctly.
    • If using process_messages, verify that rules are not accidentally dropping all messages.
  • IPMI/Redfish issues: Ensure the required external tools (like freeipmi) are installed and that the user running the collector has the necessary permissions.

Testing Guidelines

Each receiver should have unit tests that:

  1. Initialize the receiver with a valid configuration.
  2. Simulate ingress data (e.g., sending a mock HTTP request or providing mock command output).
  3. Verify that the correct CCMessage objects are sent to the sink channel.
  4. Verify that Close() successfully stops all background activities.

Documentation

Overview

Package receivers provides a modular system for collecting metrics, events, and logs from various sources. It defines a common Receiver interface and a ReceiveManager to orchestrate multiple receiver instances. Receivers collect data from external sources, convert it into CCMessage format, and send it to a unified sink channel.

Index

Constants

View Source
const (
	CCCPT_RECEIVER_PORT = "8080"
)
View Source
const HTTP_RECEIVER_PORT = "8080"

Variables

View Source
var AvailableReceivers = map[string]func(name string, config json.RawMessage) (Receiver, error){
	"http":       NewHttpReceiver,
	"nats":       NewNatsReceiver,
	"eecpt":      NewEECPTReceiver,
	"prometheus": NewPrometheusReceiver,
	"ipmi":       NewIPMIReceiver,
	"redfish":    NewRedfishReceiver,
}

Map of all available receivers

Functions

func DecodeInfluxMessage

func DecodeInfluxMessage(d *influx.Decoder) (lp.CCMessage, error)

DecodeInfluxMessage decodes a single InfluxDB line protocol message from the decoder Returns the decoded CCMessage or an error if decoding fails

Types

type EECPTReceiver

type EECPTReceiver struct {
	// contains filtered or unexported fields
}

func (*EECPTReceiver) Close

func (r *EECPTReceiver) Close()

func (*EECPTReceiver) Name

func (r *EECPTReceiver) Name() string

Name returns the name of the metric receiver

func (*EECPTReceiver) ServerHttp

func (r *EECPTReceiver) ServerHttp(w http.ResponseWriter, req *http.Request)

func (*EECPTReceiver) SetSink

func (r *EECPTReceiver) SetSink(sink chan lp.CCMessage)

SetSink set the sink channel

func (*EECPTReceiver) Start

func (r *EECPTReceiver) Start()

type EECPTReceiverConfig

type EECPTReceiverConfig struct {
	Addr string `json:"address"`
	Port string `json:"port"`
	Path string `json:"path"`

	// Maximum amount of time to wait for the next request when keep-alives are enabled
	// should be larger than the measurement interval to keep the connection open
	IdleTimeout string `json:"idle_timeout"`

	// Controls whether HTTP keep-alives are enabled. By default, keep-alives are enabled
	KeepAlivesEnabled bool `json:"keep_alives_enabled"`

	// Basic authentication
	Username string `json:"username"`
	Password string `json:"password"`

	AnalysisBufferLength int    `json:"analysis_buffer_size"`
	AnalysisInterval     string `json:"analysis_interval"`
	AnalysisMetric       string `json:"analysis_metric"`
	// contains filtered or unexported fields
}

type EECPTReceiverJob

type EECPTReceiverJob struct {
	// contains filtered or unexported fields
}

func NewJob

func NewJob(ident string) *EECPTReceiverJob

func (*EECPTReceiverJob) Analyse

func (job *EECPTReceiverJob) Analyse() float64

Analyse performs chi-square statistical test to detect phase transitions in application behavior. It computes the chi-square statistic by comparing the expected rate of change (prev) with the observed change (last) across all tasks in the job. Returns the chi-square test statistic value.

func (*EECPTReceiverJob) ChiSquareLimit

func (job *EECPTReceiverJob) ChiSquareLimit() float64

ChiSquareLimit returns the critical chi-square value at 95% confidence level (p=0.05) for the given number of degrees of freedom (number of tasks). These are pre-calculated chi-square distribution values used to determine if a phase transition is statistically significant.

func (*EECPTReceiverJob) Reset

func (job *EECPTReceiverJob) Reset()

type EECPTReceiverTask

type EECPTReceiverTask struct {
	// contains filtered or unexported fields
}

func (*EECPTReceiverTask) Add

func (task *EECPTReceiverTask) Add(value float64)

func (*EECPTReceiverTask) Analyse

func (task *EECPTReceiverTask) Analyse() (float64, float64, error)

Analyse computes the expected rate of change (prev) and last observed change (last) for this task's metric buffer. Returns (prev, last, error). prev = average rate of change over buffer history last = most recent change

func (*EECPTReceiverTask) PrintBuffer

func (task *EECPTReceiverTask) PrintBuffer()

func (*EECPTReceiverTask) Reset

func (task *EECPTReceiverTask) Reset()

type HttpReceiver

type HttpReceiver struct {
	// contains filtered or unexported fields
}

func (*HttpReceiver) Close

func (r *HttpReceiver) Close()

func (*HttpReceiver) Name

func (r *HttpReceiver) Name() string

Name returns the name of the metric receiver

func (*HttpReceiver) ServerHttp

func (r *HttpReceiver) ServerHttp(w http.ResponseWriter, req *http.Request)

func (*HttpReceiver) SetSink

func (r *HttpReceiver) SetSink(sink chan lp.CCMessage)

SetSink set the sink channel

func (*HttpReceiver) Start

func (r *HttpReceiver) Start()

type HttpReceiverConfig

type HttpReceiverConfig struct {
	Addr string `json:"address"` // Listen address (default: empty for all interfaces)
	Port string `json:"port"`    // Listen port (default: 8080)
	Path string `json:"path"`    // HTTP path to listen on

	IdleTimeout string `json:"idle_timeout"` // Max idle time for keep-alive connections (default: 120s)

	KeepAlivesEnabled bool `json:"keep_alives_enabled"` // Enable HTTP keep-alive (default: true)

	Username string `json:"username"` // Basic auth username (optional)
	Password string `json:"password"` // Basic auth password (optional)
	// contains filtered or unexported fields
}

HttpReceiverConfig configures the HTTP receiver for accepting metrics via POST requests.

type IPMIReceiver

type IPMIReceiver struct {
	// contains filtered or unexported fields
}

func (*IPMIReceiver) Close

func (r *IPMIReceiver) Close()

Close receiver: close network connection, close files, close libraries, ...

func (*IPMIReceiver) Name

func (r *IPMIReceiver) Name() string

Name returns the name of the metric receiver

func (*IPMIReceiver) SetSink

func (r *IPMIReceiver) SetSink(sink chan lp.CCMessage)

SetSink set the sink channel

func (*IPMIReceiver) Start

func (r *IPMIReceiver) Start()

type IPMIReceiverClientConfig

type IPMIReceiverClientConfig struct {
	// Hostname the IPMI service belongs to
	Protocol         string            // Protocol / tool to use for IPMI sensor reading
	DriverType       string            // Out of band IPMI driver
	Fanout           int               // Maximum number of simultaneous IPMI connections
	NumHosts         int               // Number of remote IPMI devices with the same configuration
	IPMIHosts        string            // List of remote IPMI devices to communicate with
	IPMI2HostMapping map[string]string // Mapping between IPMI device name and host name
	Username         string            // User name to authenticate with
	Password         string            // Password to use for authentication
	CLIOptions       []string          // Additional command line options for ipmi-sensors
	// contains filtered or unexported fields
}

type NatsReceiver

type NatsReceiver struct {
	// contains filtered or unexported fields
}

func (*NatsReceiver) Close

func (r *NatsReceiver) Close()

Close closes the connection to the NATS server

func (*NatsReceiver) Name

func (r *NatsReceiver) Name() string

Name returns the name of the metric receiver

func (*NatsReceiver) SetSink

func (r *NatsReceiver) SetSink(sink chan lp.CCMessage)

SetSink set the sink channel

func (*NatsReceiver) Start

func (r *NatsReceiver) Start()

Start subscribes to the configured NATS subject Messages wil be handled by r._NatsReceive

type NatsReceiverConfig

type NatsReceiverConfig struct {
	Addr     string `json:"address"`             // NATS server address (default: localhost)
	Port     string `json:"port"`                // NATS server port (default: 4222)
	Subject  string `json:"subject"`             // NATS subject to subscribe to (required)
	User     string `json:"user,omitempty"`      // Username for authentication
	Password string `json:"password,omitempty"`  // Password for authentication
	NkeyFile string `json:"nkey_file,omitempty"` // Path to NKey credentials file
	// contains filtered or unexported fields
}

NatsReceiverConfig configures the NATS receiver for subscribing to metric messages.

type PrometheusReceiver

type PrometheusReceiver struct {
	// contains filtered or unexported fields
}

func (*PrometheusReceiver) Close

func (r *PrometheusReceiver) Close()

func (*PrometheusReceiver) Name

func (r *PrometheusReceiver) Name() string

Name returns the name of the metric receiver

func (*PrometheusReceiver) SetSink

func (r *PrometheusReceiver) SetSink(sink chan lp.CCMessage)

SetSink set the sink channel

func (*PrometheusReceiver) Start

func (r *PrometheusReceiver) Start()

type PrometheusReceiverConfig

type PrometheusReceiverConfig struct {
	Addr     string `json:"address"`
	Port     string `json:"port"`
	Path     string `json:"path"`
	Interval string `json:"interval"`
	SSL      bool   `json:"ssl"`
	// contains filtered or unexported fields
}

type ReceiveManager

type ReceiveManager interface {
	Init(wg *sync.WaitGroup, receiverConfig json.RawMessage) error
	AddInput(name string, rawConfig json.RawMessage) error
	AddOutput(output chan lp.CCMessage)
	Start()
	Close()
}

func New

func New(wg *sync.WaitGroup, receiverConfig json.RawMessage) (ReceiveManager, error)

type Receiver

type Receiver interface {
	Start()                         // Start begins the metric collection process
	Close()                         // Close stops the receiver and releases resources
	Name() string                   // Name returns the receiver's identifier
	SetSink(sink chan lp.CCMessage) // SetSink configures the output channel for collected metrics
}

Receiver is the interface all metric receivers must implement. Receivers collect metrics from various sources and send them to a sink channel.

func NewEECPTReceiver

func NewEECPTReceiver(name string, config json.RawMessage) (Receiver, error)

func NewHttpReceiver

func NewHttpReceiver(name string, config json.RawMessage) (Receiver, error)

func NewIPMIReceiver

func NewIPMIReceiver(name string, config json.RawMessage) (Receiver, error)

NewIPMIReceiver creates a new instance of the redfish receiver Initialize the receiver by giving it a name and reading in the config JSON

func NewNatsReceiver

func NewNatsReceiver(name string, config json.RawMessage) (Receiver, error)

NewNatsReceiver creates a new Receiver which subscribes to messages from a NATS server

func NewPrometheusReceiver

func NewPrometheusReceiver(name string, config json.RawMessage) (Receiver, error)

func NewRedfishReceiver

func NewRedfishReceiver(name string, config json.RawMessage) (Receiver, error)

NewRedfishReceiver creates a new instance of the redfish receiver Initialize the receiver by giving it a name and reading in the config JSON

func NewSampleReceiver

func NewSampleReceiver(name string, config json.RawMessage) (Receiver, error)

New function to create a new instance of the receiver Initialize the receiver by giving it a name and reading in the config JSON

type ReceiverConfig

type ReceiverConfig struct {
	Addr         string `json:"address"`                // Network address to bind/connect
	Port         string `json:"port"`                   // Network port
	Database     string `json:"database"`               // Database name (if applicable)
	Organization string `json:"organization,omitempty"` // Organization identifier
	Type         string `json:"type"`                   // Receiver type
}

ReceiverConfig is the legacy configuration structure for receivers. Deprecated: Most receivers now use type-specific configuration structs.

type RedfishReceiver

type RedfishReceiver struct {
	// contains filtered or unexported fields
}

RedfishReceiver configuration:

func (*RedfishReceiver) Close

func (r *RedfishReceiver) Close()

Close closes the redfish receiver

func (*RedfishReceiver) Name

func (r *RedfishReceiver) Name() string

Name returns the name of the metric receiver

func (*RedfishReceiver) SetSink

func (r *RedfishReceiver) SetSink(sink chan lp.CCMessage)

SetSink set the sink channel

func (*RedfishReceiver) Start

func (r *RedfishReceiver) Start()

Start starts the redfish receiver

type RedfishReceiverClientConfig

type RedfishReceiverClientConfig struct {
	// Hostname the redfish service belongs to
	Hostname string
	// contains filtered or unexported fields
}

type SampleReceiver

type SampleReceiver struct {
	// contains filtered or unexported fields
}

func (*SampleReceiver) Close

func (r *SampleReceiver) Close()

Close receiver: close network connection, close files, close libraries, ...

func (*SampleReceiver) Name

func (r *SampleReceiver) Name() string

Name returns the name of the metric receiver

func (*SampleReceiver) SetSink

func (r *SampleReceiver) SetSink(sink chan lp.CCMessage)

SetSink set the sink channel

func (*SampleReceiver) Start

func (r *SampleReceiver) Start()

type SampleReceiverConfig

type SampleReceiverConfig struct {
	Addr string `json:"address"`
	Port string `json:"port"`
	// contains filtered or unexported fields
}

SampleReceiver configuration: receiver type, listen address, port The defaultReceiverConfig contains the keys 'type' and 'process_messages'

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL