roproc

package module
v0.0.0-...-e745bd2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

README

Proc Plugin

The proc plugin provides sources for monitoring system resources and processes using the gopsutil library.

Installation

go get github.com/samber/ro/plugins/proc

Sources

Memory Monitoring
VirtualMemoryWatcher

Monitors virtual memory usage statistics.

import (
    "github.com/samber/ro"
    roproc "github.com/samber/ro/plugins/proc"
    "time"
)

observable := roproc.NewVirtualMemoryWatcher(5 * time.Second)

subscription := observable.Subscribe(ro.NewObserver(
    func(stats *mem.VirtualMemoryStat) {
        fmt.Printf("Total: %d, Used: %d, Free: %d\n", 
            stats.Total, stats.Used, stats.Free)
    },
    func(err error) {
        fmt.Printf("Error: %v\n", err)
    },
    func() {
        fmt.Println("Completed")
    },
))

// Output:
// Total: 16777216000, Used: 8589934592, Free: 8187281408
// Total: 16777216000, Used: 8590065664, Free: 8187149824
// ...
SwapMemoryWatcher

Monitors swap memory usage statistics.

observable := roproc.NewSwapMemoryWatcher(10 * time.Second)

subscription := observable.Subscribe(ro.NewObserver(
    func(stats *mem.SwapMemoryStat) {
        fmt.Printf("Swap Total: %d, Used: %d, Free: %d\n", 
            stats.Total, stats.Used, stats.Free)
    },
    func(err error) {
        fmt.Printf("Error: %v\n", err)
    },
    func() {
        fmt.Println("Completed")
    },
))
CPU Monitoring
CPUInfoWatcher

Monitors CPU information and statistics.

observable := roproc.NewCPUInfoWatcher(2 * time.Second)

subscription := observable.Subscribe(ro.NewObserver(
    func(cpuInfo cpu.InfoStat) {
        fmt.Printf("CPU: %s, Cores: %d, Model: %s\n", 
            cpuInfo.Name, cpuInfo.Cores, cpuInfo.ModelName)
    },
    func(err error) {
        fmt.Printf("Error: %v\n", err)
    },
    func() {
        fmt.Println("Completed")
    },
))
Disk Monitoring
DiskUsageWatcher

Monitors disk usage for a specific mountpoint or device.

observable := roproc.NewDiskUsageWatcher(30 * time.Second, "/")

subscription := observable.Subscribe(ro.NewObserver(
    func(usage *disk.UsageStat) {
        fmt.Printf("Path: %s, Total: %d, Used: %d, Free: %d\n", 
            usage.Path, usage.Total, usage.Used, usage.Free)
    },
    func(err error) {
        fmt.Printf("Error: %v\n", err)
    },
    func() {
        fmt.Println("Completed")
    },
))
DiskIOCountersWatcher

Monitors disk I/O counters for specified devices.

observable := roproc.NewDiskIOCountersWatcher(5 * time.Second, "sda", "sdb")

subscription := observable.Subscribe(ro.NewObserver(
    func(counters map[string]disk.IOCountersStat) {
        for device, stats := range counters {
            fmt.Printf("Device: %s, ReadBytes: %d, WriteBytes: %d\n", 
                device, stats.ReadBytes, stats.WriteBytes)
        }
    },
    func(err error) {
        fmt.Printf("Error: %v\n", err)
    },
    func() {
        fmt.Println("Completed")
    },
))
Network Monitoring
NetIOCountersWatcher

Monitors network I/O counters.

observable := roproc.NewNetIOCountersWatcher(5 * time.Second, true)

subscription := observable.Subscribe(ro.NewObserver(
    func(counters net.IOCountersStat) {
        fmt.Printf("Interface: %s, BytesSent: %d, BytesRecv: %d\n", 
            counters.Name, counters.BytesSent, counters.BytesRecv)
    },
    func(err error) {
        fmt.Printf("Error: %v\n", err)
    },
    func() {
        fmt.Println("Completed")
    },
))
Host Information
HostInfoWatcher

Monitors host information.

observable := roproc.NewHostInfoWatcher(60 * time.Second)

subscription := observable.Subscribe(ro.NewObserver(
    func(info *host.InfoStat) {
        fmt.Printf("Hostname: %s, OS: %s, Platform: %s\n", 
            info.Hostname, info.OS, info.Platform)
    },
    func(err error) {
        fmt.Printf("Error: %v\n", err)
    },
    func() {
        fmt.Println("Completed")
    },
))
Load Average
LoadAverageWatcher

Monitors system load average.

observable := roproc.NewLoadAverageWatcher(5 * time.Second)

subscription := observable.Subscribe(ro.NewObserver(
    func(load *load.AvgStat) {
        fmt.Printf("Load1: %.2f, Load5: %.2f, Load15: %.2f\n", 
            load.Load1, load.Load5, load.Load15)
    },
    func(err error) {
        fmt.Printf("Error: %v\n", err)
    },
    func() {
        fmt.Println("Completed")
    },
))
Sensors
SensorsTemperatureWatcher

Monitors temperature sensors.

observable := roproc.NewSensorsTemperatureWatcher(10 * time.Second, false)

subscription := observable.Subscribe(ro.NewObserver(
    func(temp sensors.TemperatureStat) {
        fmt.Printf("Sensor: %s, Temperature: %.2f°C\n", 
            temp.Name, temp.Temperature)
    },
    func(err error) {
        fmt.Printf("Error: %v\n", err)
    },
    func() {
        fmt.Println("Completed")
    },
))

Advanced Usage

Combining Multiple Sources
import (
    "github.com/samber/ro"
    roproc "github.com/samber/ro/plugins/proc"
    "time"
)

// Monitor both CPU and memory
cpuObservable := roproc.NewCPUInfoWatcher(2 * time.Second)
memObservable := roproc.NewVirtualMemoryWatcher(2 * time.Second)

// Combine them using Merge
combined := ro.Merge(cpuObservable, memObservable)

subscription := combined.Subscribe(ro.NewObserver(
    func(value interface{}) {
        switch v := value.(type) {
        case cpu.InfoStat:
            fmt.Printf("CPU: %s\n", v.Name)
        case *mem.VirtualMemoryStat:
            fmt.Printf("Memory Used: %d\n", v.Used)
        }
    },
    func(err error) {
        fmt.Printf("Error: %v\n", err)
    },
    func() {
        fmt.Println("Completed")
    },
))
Filtering and Processing
import (
    "github.com/samber/ro"
    roproc "github.com/samber/ro/plugins/proc"
    "time"
)

// Monitor memory and filter high usage
observable := ro.Pipe1(
    roproc.NewVirtualMemoryWatcher(1 * time.Second),
    ro.Filter(func(stats *mem.VirtualMemoryStat) bool {
        return float64(stats.Used)/float64(stats.Total) > 0.8 // > 80% usage
    }),
)

subscription := observable.Subscribe(ro.NewObserver(
    func(stats *mem.VirtualMemoryStat) {
        usage := float64(stats.Used) / float64(stats.Total) * 100
        fmt.Printf("High memory usage: %.1f%%\n", usage)
    },
    func(err error) {
        fmt.Printf("Error: %v\n", err)
    },
    func() {
        fmt.Println("Completed")
    },
))

Error Handling

All sources handle errors gracefully and will emit error notifications if the underlying system calls fail. The error handling follows the reactive stream pattern:

observable := roproc.NewVirtualMemoryWatcher(5 * time.Second)

subscription := observable.Subscribe(ro.NewObserver(
    func(stats *mem.VirtualMemoryStat) {
        // Handle successful data
    },
    func(err error) {
        // Handle errors (e.g., permission denied, system call failed)
        log.Printf("Monitoring error: %v", err)
    },
    func() {
        // Handle completion
    },
))

Dependencies

This plugin requires the gopsutil library:

go get github.com/shirou/gopsutil/v4

Performance Considerations

  • Choose appropriate intervals for your monitoring needs
  • Consider using ro.ObserveOn to control the scheduler for CPU-intensive operations
  • Use ro.SubscribeOn to control where the source runs
  • Be mindful of system resources when monitoring frequently

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewCPUInfoWatcher

func NewCPUInfoWatcher(interval time.Duration) ro.Observable[cpu.InfoStat]

NewCPUInfoWatcher creates an observable that emits CPU information statistics at regular intervals.

Example
// Monitor CPU information
observable := NewCPUInfoWatcher(2 * time.Second)

subscription := observable.Subscribe(ro.NoopObserver[cpu.InfoStat]())
defer subscription.Unsubscribe()

// Let it run for a few seconds
time.Sleep(4 * time.Second)

func NewDiskIOCountersWatcher

func NewDiskIOCountersWatcher(interval time.Duration, names ...string) ro.Observable[map[string]disk.IOCountersStat]

NewDiskIOCountersWatcher creates an observable that emits disk I/O counters at regular intervals.

func NewDiskPartitionWatcher

func NewDiskPartitionWatcher(interval time.Duration) ro.Observable[disk.PartitionStat]

NewDiskPartitionWatcher creates an observable that emits disk partition information at regular intervals.

func NewDiskUsageWatcher

func NewDiskUsageWatcher(interval time.Duration, mountpointOrDevicePath string) ro.Observable[*disk.UsageStat]

NewDiskUsageWatcher creates an observable that emits disk usage statistics at regular intervals.

Example
// Monitor disk usage for root filesystem
observable := NewDiskUsageWatcher(5*time.Second, "/")

subscription := observable.Subscribe(ro.NoopObserver[*disk.UsageStat]())
defer subscription.Unsubscribe()

// Let it run for a few seconds
time.Sleep(10 * time.Second)

func NewHostInfoWatcher

func NewHostInfoWatcher(interval time.Duration) ro.Observable[*host.InfoStat]

NewHostInfoWatcher creates an observable that emits host information at regular intervals.

Example
// Monitor host information
observable := NewHostInfoWatcher(10 * time.Second)

subscription := observable.Subscribe(ro.NoopObserver[*host.InfoStat]())
defer subscription.Unsubscribe()

// Let it run for a few seconds
time.Sleep(12 * time.Second)

func NewHostUserWatcher

func NewHostUserWatcher(interval time.Duration) ro.Observable[host.UserStat]

NewHostUserWatcher creates an observable that emits host user information at regular intervals.

func NewLoadAverageWatcher

func NewLoadAverageWatcher(interval time.Duration) ro.Observable[*load.AvgStat]

NewLoadAverageWatcher creates an observable that emits load average statistics at regular intervals.

Example
// Monitor system load average
observable := NewLoadAverageWatcher(2 * time.Second)

subscription := observable.Subscribe(ro.NoopObserver[*load.AvgStat]())
defer subscription.Unsubscribe()

// Let it run for a few seconds
time.Sleep(4 * time.Second)

func NewLoadMiscWatcher

func NewLoadMiscWatcher(interval time.Duration) ro.Observable[*load.MiscStat]

NewLoadMiscWatcher creates an observable that emits miscellaneous load statistics at regular intervals.

func NewNetConnectionsWatcher

func NewNetConnectionsWatcher(interval time.Duration) ro.Observable[net.ConnectionStat]

NewNetConnectionsWatcher creates an observable that emits network connection statistics at regular intervals.

func NewNetConntrackWatcher

func NewNetConntrackWatcher(interval time.Duration, perCPU bool) ro.Observable[net.ConntrackStat]

NewNetConntrackWatcher creates an observable that emits network conntrack statistics at regular intervals.

func NewNetFilterCountersWatcher

func NewNetFilterCountersWatcher(interval time.Duration) ro.Observable[net.FilterStat]

NewNetFilterCountersWatcher creates an observable that emits network filter counters at regular intervals.

func NewNetIOCountersWatcher

func NewNetIOCountersWatcher(interval time.Duration, perNIC bool) ro.Observable[net.IOCountersStat]

NewNetIOCountersWatcher creates an observable that emits network I/O counters at regular intervals.

Example
// Monitor network I/O counters
observable := NewNetIOCountersWatcher(3*time.Second, true)

subscription := observable.Subscribe(ro.NoopObserver[net.IOCountersStat]())
defer subscription.Unsubscribe()

// Let it run for a few seconds
time.Sleep(6 * time.Second)

func NewSensorsTemperatureWatcher

func NewSensorsTemperatureWatcher(interval time.Duration, perNIC bool) ro.Observable[sensors.TemperatureStat]

NewSensorsTemperatureWatcher creates an observable that emits sensor temperature statistics at regular intervals.

Example
// Monitor temperature sensors
observable := NewSensorsTemperatureWatcher(5*time.Second, false)

subscription := observable.Subscribe(ro.NoopObserver[sensors.TemperatureStat]())
defer subscription.Unsubscribe()

// Let it run for a few seconds
time.Sleep(10 * time.Second)

func NewSwapDeviceWatcher

func NewSwapDeviceWatcher(interval time.Duration) ro.Observable[*mem.SwapDevice]

NewSwapDeviceWatcher creates an observable that emits swap device information at regular intervals.

func NewSwapMemoryWatcher

func NewSwapMemoryWatcher(interval time.Duration) ro.Observable[*mem.SwapMemoryStat]

NewSwapMemoryWatcher creates an observable that emits swap memory statistics at regular intervals.

func NewVirtualMemoryWatcher

func NewVirtualMemoryWatcher(interval time.Duration) ro.Observable[*mem.VirtualMemoryStat]

NewVirtualMemoryWatcher creates an observable that emits virtual memory statistics at regular intervals.

Example
// Monitor virtual memory usage
observable := NewVirtualMemoryWatcher(1 * time.Second)

subscription := observable.Subscribe(ro.NoopObserver[*mem.VirtualMemoryStat]())
defer subscription.Unsubscribe()

// Let it run for a few seconds
time.Sleep(3 * time.Second)

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL