controller

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2026 License: MIT Imports: 82 Imported by: 0

Documentation

Index

Constants

View Source
const (
	GlobalStrategyID = "global"
	CollectionName   = "metric_strategies"
)
View Source
const (
	AttachMetricRedisBusinessKey = "AttachMetricRedisBusinessKey"
	PubSubTopic                  = "metricAttachMsg"
)
View Source
const (
	CollectionTaskCollectionName = "collection_tasks"
)
View Source
const DefaultEtcdContainerName = "etcd-main"
View Source
const DefaultTelegrafResourceName = "telegraf"
View Source
const (
	GroupStrategyCollection = "group_metric_strategies"
)
View Source
const Version = "0.1.0"

Variables

View Source
var ErrNacosConfigNotFound = errors.New("nacos config not found")
View Source
var MinioResouceName = ""

ProviderSet 是一个 Wire 提供者集合,包含了所有控制器组件的提供者

View Source
var TelegrafContainerName = ""

Functions

func ProvideDBClient

func ProvideDBClient(config *Config) (*mongo.Client, error)

ProvideDBClient 创建并返回一个 MongoDB 客户端

func ProvideEtcdClient

func ProvideEtcdClient(config *Config) (*clientv3.Client, error)

ProvideEtcdClient 创建并返回一个 etcd 客户端 支持 Docker etcd 和外部 etcd

func ProvidePluginMappingConfigLoader

func ProvidePluginMappingConfigLoader(logger *zap.Logger) (*config.PluginMappingConfigLoader, error)

ProvidePluginMappingConfigLoader 提供插件映射配置加载器

func WithPort

func WithPort(port int) controllerOptionFunc

Types

type AgentConfigStruct

type AgentConfigStruct struct {
	Interval          string `toml:"interval,omitempty"`
	RoundInterval     bool   `toml:"round_interval,omitempty"`
	MetricBatchSize   int    `toml:"metric_batch_size,omitempty"`
	MetricBufferLimit int    `toml:"metric_buffer_limit,omitempty"`
	CollectionJitter  string `toml:"collection_jitter,omitempty"`
	FlushInterval     string `toml:"flush_interval,omitempty"`
	FlushJitter       string `toml:"flush_jitter,omitempty"`
	Precision         string `toml:"precision,omitempty"`
	Hostname          string `toml:"hostname,omitempty"`
	OmitHostname      bool   `toml:"omit_hostname,omitempty"`
}

AgentConfigStruct 表示 agent 配置(内部使用)

type AgentDiscovery

type AgentDiscovery struct {
	// contains filtered or unexported fields
}

AgentDiscovery Agent 自动发现模块

func NewAgentDiscovery

func NewAgentDiscovery(etcdClient *clientv3.Client, mongoClient *mongo.Client, watchPrefix string, eventNotifier *AgentEventNotifier, controllerID, functionArea string, logger *zap.Logger) *AgentDiscovery

NewAgentDiscovery 创建 Agent 发现模块

func (*AgentDiscovery) PeriodicSync

func (ad *AgentDiscovery) PeriodicSync(ctx context.Context)

PeriodicSync 定期从 etcd 同步 Agent 信息

func (*AgentDiscovery) Start

func (ad *AgentDiscovery) Start() error

Start 启动 Agent 发现

func (*AgentDiscovery) Stop

func (ad *AgentDiscovery) Stop()

Stop 停止 Agent 发现

func (*AgentDiscovery) SyncAgentsFromEtcd

func (ad *AgentDiscovery) SyncAgentsFromEtcd(ctx context.Context) error

SyncAgentsFromEtcd 从 etcd 同步所有 Agent 到 MongoDB

func (*AgentDiscovery) WatchAgents

func (ad *AgentDiscovery) WatchAgents(ctx context.Context)

WatchAgents 监听 etcd 中的 Agent 变化

type AgentEventNotifier

type AgentEventNotifier struct {
	// contains filtered or unexported fields
}

AgentEventNotifier Agent事件通知器

func NewAgentEventNotifier

func NewAgentEventNotifier(webhookURLs []string, secret, controllerID, functionArea string) *AgentEventNotifier

NewAgentEventNotifier 创建Agent事件通知器(使用默认配置)

func NewAgentEventNotifierWithConfig

func NewAgentEventNotifierWithConfig(webhookURLs []string, secret, controllerID, functionArea string, timeout, retryMaxAttempts, retryInterval int) *AgentEventNotifier

NewAgentEventNotifierWithConfig 创建Agent事件通知器(使用自定义配置)

func (*AgentEventNotifier) NotifyAgentRegistered

func (n *AgentEventNotifier) NotifyAgentRegistered(agent *models.Agent) error

NotifyAgentRegistered 通知Agent注册

func (*AgentEventNotifier) NotifyAgentUnregistered

func (n *AgentEventNotifier) NotifyAgentUnregistered(agentCode string) error

NotifyAgentUnregistered 通知Agent注销

func (*AgentEventNotifier) NotifyStatusChanged

func (n *AgentEventNotifier) NotifyStatusChanged(agentCode, oldStatus, newStatus string) error

NotifyStatusChanged 通知状态变化

func (*AgentEventNotifier) Start

func (n *AgentEventNotifier) Start()

Start 启动事件通知器

func (*AgentEventNotifier) Stop

func (n *AgentEventNotifier) Stop()

Stop 停止事件通知器

type AgentManager

type AgentManager struct {
	// contains filtered or unexported fields
}

func ProvideAgentManager

func ProvideAgentManager(registry *RegistryManager, rm *ResourceManager, cm *ConfigManager, km *KeyManager, mm *MinioManager, mongoClient *mongo.Client) (*AgentManager, error)

func (*AgentManager) ApplyPackageConfigs

func (am *AgentManager) ApplyPackageConfigs(ctx context.Context, agentCode, packageName string, configs []*pb.ConfigItem) error

func (*AgentManager) ExecuteCommandViaProxy

func (am *AgentManager) ExecuteCommandViaProxy(ctx context.Context, agentCode, command string, rmInfo *structs.L2DeviceRemoteInfo) (string, error)

func (*AgentManager) GetAgent

func (am *AgentManager) GetAgent(ctx context.Context, agentCode string) (*models.Agent, error)

func (*AgentManager) GetAgentDetail

func (am *AgentManager) GetAgentDetail(ctx context.Context, agentID string) (*models.Agent, error)

GetAgentDetail 获取 Agent 详情

func (*AgentManager) GetAgentHost

func (am *AgentManager) GetAgentHost(ctx context.Context, agentCode string) (string, error)

func (*AgentManager) GetAgentPackages

func (am *AgentManager) GetAgentPackages(ctx context.Context, agentID string) ([]*pb.PackItem, error)

GetAgentPackages 获取 Agent 管理的服务列表

func (*AgentManager) GetBatchOperator

func (am *AgentManager) GetBatchOperator() *BatchOperator

GetBatchOperator 获取批量操作模块

func (*AgentManager) GetFileOperation

func (am *AgentManager) GetFileOperation() *FileOperation

GetFileOperation 获取文件操作模块

func (*AgentManager) GetHealthMonitor

func (am *AgentManager) GetHealthMonitor() *HealthMonitor

GetHealthMonitor 获取健康监控模块

func (*AgentManager) GetMetricsQuery

func (am *AgentManager) GetMetricsQuery() *MetricsQuery

GetMetricsQuery 获取指标查询模块

func (*AgentManager) GetPackageConfigs

func (am *AgentManager) GetPackageConfigs(ctx context.Context, agentCode, packageName string) ([]*pb.ConfigItem, error)

GetConfigs 获取指定 agent 上特定包的配置

func (*AgentManager) GetPackageLogs

func (am *AgentManager) GetPackageLogs(ctx context.Context, agentCode, packageName string, count int32) ([]string, error)

func (*AgentManager) ListAgents

func (am *AgentManager) ListAgents(ctx context.Context, filter map[string]string, page, pageSize int) ([]models.Agent, int, error)

func (*AgentManager) PackageList

func (am *AgentManager) PackageList(ctx context.Context, agentCode string) ([]*pb.PackItem, error)

PackageList 列出指定 agent 上的所有包

func (*AgentManager) RestartAgent

func (am *AgentManager) RestartAgent(ctx context.Context, agentCode string) error

RestartAgent 重启Agent

func (*AgentManager) Start

func (am *AgentManager) Start() error

func (*AgentManager) StartPackage

func (am *AgentManager) StartPackage(ctx context.Context, agentCode, packageName string) error

Start 启动指定 agent 上的特定包

func (*AgentManager) StartPackageWatcher

func (am *AgentManager) StartPackageWatcher(ctx context.Context)

func (*AgentManager) Stop

func (am *AgentManager) Stop() error

func (*AgentManager) StopPackage

func (am *AgentManager) StopPackage(ctx context.Context, agentCode, packageName string) error

Stop 停止指定 agent 上的特定包

func (*AgentManager) UninstallAgent

func (am *AgentManager) UninstallAgent(ctx context.Context, agentCode string) error

UninstallAgent 卸载Agent

type AgentStatusEvent

type AgentStatusEvent struct {
	EventType     string    `json:"event_type"` // "agent_registered", "agent_unregistered", "status_changed"
	AgentCode     string    `json:"agent_code"`
	FunctionArea  string    `json:"function_area"`
	ControllerID  string    `json:"controller_id"`
	OldStatus     string    `json:"old_status,omitempty"`
	NewStatus     string    `json:"new_status"`
	HealthStatus  string    `json:"health_status,omitempty"`
	Address       string    `json:"address,omitempty"`
	Version       string    `json:"version,omitempty"`
	ServiceCount  int       `json:"service_count,omitempty"`
	LastHeartbeat string    `json:"last_heartbeat,omitempty"` // RFC3339格式
	Timestamp     time.Time `json:"timestamp"`
	Signature     string    `json:"signature"` // HMAC签名,用于验证
}

AgentStatusEvent Agent状态事件

type AppResolver

type AppResolver struct {
	ConfigManager *ConfigManager
}

func (*AppResolver) Resolve

func (ar *AppResolver) Resolve(ctx context.Context, agentID, appID string) (map[string]string, error)

type ApplicationTargetConverter

type ApplicationTargetConverter struct {
	// contains filtered or unexported fields
}

ApplicationTargetConverter 应用指标目标转换器

func NewApplicationTargetConverter

func NewApplicationTargetConverter(logger *zap.Logger) *ApplicationTargetConverter

NewApplicationTargetConverter 创建应用指标目标转换器

func (*ApplicationTargetConverter) Convert

func (c *ApplicationTargetConverter) Convert(targets []*CollectionTargetDTO, strategy *UnifiedCollectionStrategyDTO) (map[string]interface{}, error)

Convert 转换应用指标目标

func (*ApplicationTargetConverter) Supports

func (c *ApplicationTargetConverter) Supports(collectionType string) bool

Supports 检查是否支持该采集类型

type BaseConfig

type BaseConfig struct {
	DefaultPort           int                            `yaml:"default_port"`
	Resources             map[models.ResourceType]string `yaml:"resources"`
	PrometheusLabelKey    string                         `yaml:"prometheusLabelKey"`
	LokiLabelKey          string                         `yaml:"lokiLabelKey"`
	HomePath              string                         `yaml:"home_path"`
	Templates             []string                       `yaml:"templates"`
	PreferredNetworks     []string                       `yaml:"preferred_networks"`
	SshProxy              int                            `yaml:"ssh_proxy"`
	GrpcProxy             int                            `yaml:"grpc_proxy"`
	Deployment            Deployment                     `yaml:"deployment"`
	LokiListenPath        string                         `yaml:"loki_listen_path"`
	PrometheusListenPath  string                         `yaml:"prometheus_listen_path"`
	UpstreamLokiUrl       string                         `yaml:"upstream_loki_url"`
	UpstreamPrometheusUrl string                         `yaml:"upstream_prometheus_url"`
	PipelineTemplates     string                         `yaml:"pipelineTemplates"`
	// FirewallTemplatePath 防火墙模板路径,默认为 "pkg/nodemap/node/device/firewall/common/v4/templates"
	FirewallTemplatePath string `yaml:"firewall_template_path" json:"firewall_template_path"`
	// JumpServer 连接空闲超时时间(秒),默认60秒,0表示禁用
	JumpServerIdleTimeout int `yaml:"jump_server_idle_timeout"`
}

type BatchOperator

type BatchOperator struct {
	// contains filtered or unexported fields
}

BatchOperator 批量操作模块

func NewBatchOperator

func NewBatchOperator(agentManager *AgentManager) *BatchOperator

NewBatchOperator 创建批量操作模块

func (*BatchOperator) BatchRestartPackages

BatchRestartPackages 批量重启服务

func (*BatchOperator) BatchStartPackages

BatchStartPackages 批量启动服务

func (*BatchOperator) BatchStopPackages

BatchStopPackages 批量停止服务

func (*BatchOperator) BatchUpdateConfigs

BatchUpdateConfigs 批量更新配置

type CollectionTargetConverter

type CollectionTargetConverter struct {
	// contains filtered or unexported fields
}

CollectionTargetConverter 采集目标转换器 用于将 OneOps 的采集目标和策略转换为 Controller 的 CollectionTask

func NewCollectionTargetConverter

func NewCollectionTargetConverter(logger *zap.Logger, configLoader *config.PluginMappingConfigLoader) *CollectionTargetConverter

NewCollectionTargetConverter 创建采集目标转换器

func (*CollectionTargetConverter) ConvertTargetsToCollectionTasks

func (c *CollectionTargetConverter) ConvertTargetsToCollectionTasks(
	ctx context.Context,
	collectionType string,
	strategy *UnifiedCollectionStrategyDTO,
	targets []*CollectionTargetDTO,
) ([]models.CollectionTask, error)

ConvertTargetsToCollectionTasks 将采集目标和策略转换为 CollectionTasks

type CollectionTargetDTO

type CollectionTargetDTO struct {
	ID             string
	Address        string
	Port           int
	Config         map[string]interface{}
	CollectionType string
	TargetGroupID  string
}

CollectionTargetDTO OneOps 采集目标 DTO(简化版本,避免循环依赖)

type CollectionTaskService

type CollectionTaskService struct {
	pb.UnimplementedCollectionTaskServiceServer // 必须嵌入以支持向前兼容
	// contains filtered or unexported fields
}

CollectionTaskService 采集任务服务 负责接收OneOps分发的任务,存储到MongoDB,并转发给Agent

func NewCollectionTaskService

func NewCollectionTaskService(mongoClient *mongo.Client, logger *zap.Logger) *CollectionTaskService

NewCollectionTaskService 创建采集任务服务

func (*CollectionTaskService) DistributeTasks

DistributeTasks 接收OneOps分发的任务(gRPC服务端实现)

func (*CollectionTaskService) GetTaskStatus

GetTaskStatus 查询任务状态(gRPC服务端实现)

func (*CollectionTaskService) GetTasks

GetTasks 获取Agent的任务(gRPC服务端实现) 新架构:OneOps通过gRPC推送CollectionTask到Controller,Controller存储后直接返回给Agent

func (*CollectionTaskService) RegisterGRPCService

func (s *CollectionTaskService) RegisterGRPCService(grpcServer *grpc.Server)

RegisterGRPCService 注册gRPC服务

func (*CollectionTaskService) UpdateTaskStatus

UpdateTaskStatus 更新任务状态(gRPC服务端实现)

type CollectionTypeMapper

type CollectionTypeMapper interface {
	GetTaskType(collectionType string) string
	GetDefaultConfig(collectionType string) map[string]interface{}
}

CollectionTypeMapper 采集类型到任务类型的映射器接口

func NewConfigBasedCollectionTypeMapper

func NewConfigBasedCollectionTypeMapper(logger *zap.Logger, configLoader *config.PluginMappingConfigLoader) CollectionTypeMapper

NewConfigBasedCollectionTypeMapper 创建基于配置的映射器

type CollectionTypeMapping

type CollectionTypeMapping struct {
	TaskType      string
	DefaultConfig map[string]interface{}
}

CollectionTypeMapping 采集类型映射配置

type Config

type Config struct {
	Upstream       Upstream               `yaml:"upstream"`
	BaseConfig     BaseConfig             `yaml:"base_config" json:"base_config"`
	Server         ServerConfig           `yaml:"server" json:"server"` // 服务器配置(HTTP、gRPC端口等)
	GitConfig      GitConfig              `yaml:"git"`
	FunctionArea   string                 `yaml:"function_area" json:"function_area"`
	EtcdConfig     map[string]interface{} `yaml:"etcd" json:"etcd"`
	UseDockerEtcd  bool                   `yaml:"use_docker_etcd" json:"use_docker_etcd"` // 是否使用 Docker 启动 etcd,false 时使用外部 etcd,默认为 true(向后兼容)
	Telegraf       TelegrafConfig         `yaml:"telegraf" json:"telegraf"`
	Minio          MinioConfig            `yaml:"minio" json:"minio"`
	UniOpsConfig   UniOpsConfig           `yaml:"uniops" json:"uniops"`
	Database       DatabaseConfig         `yaml:"database" json:"database"`
	SyslogManager  SyslogManagerConfig    `yaml:"syslog_manager" json:"syslog_manager"`
	MetricsManager MetricsManagerConfig   `yaml:"metrics_manager" json:"metrics_manager"`
	Nacos          NacosConfig            `yaml:"nacos" json:"nacos"`
	Redis          RedisConfig            `yaml:"redis" json:"redis"`
	SkipInit       bool                   `yaml:"skip_init" json:"skip_init"`
}

Config 定义了控制器的配置

func LoadConfig

func LoadConfig(configPath string) (*Config, error)

LoadConfig 从指定的 YAML 文件加载配置

func ProvideConfig

func ProvideConfig(configPath string) (*Config, error)

ProvideConfig 提供配置对象,从本地YAML文件加载

type ConfigBasedCollectionTypeMapper

type ConfigBasedCollectionTypeMapper struct {
	// contains filtered or unexported fields
}

ConfigBasedCollectionTypeMapper 基于配置的映射器

func (*ConfigBasedCollectionTypeMapper) GetDefaultConfig

func (m *ConfigBasedCollectionTypeMapper) GetDefaultConfig(collectionType string) map[string]interface{}

GetDefaultConfig 获取默认配置

func (*ConfigBasedCollectionTypeMapper) GetTaskType

func (m *ConfigBasedCollectionTypeMapper) GetTaskType(collectionType string) string

GetTaskType 获取任务类型

type ConfigManager

type ConfigManager struct {
	EtcdClient   *clientv3.Client `wire:"-"`
	NacosManager *NacosManager
	KeyManager   *KeyManager

	Config *Config
	// contains filtered or unexported fields
}

func ProvideConfigManager

func ProvideConfigManager(km *KeyManager, etcdClient *clientv3.Client, config *Config, nm *NacosManager) (*ConfigManager, error)

ProvideConfigManager 为 Wire 依赖注入提供的构造函数

func (*ConfigManager) BatchGetConfigs

func (cm *ConfigManager) BatchGetConfigs(resourceType models.ResourceType, resourceIDs []string) (map[string]map[string]interface{}, error)

func (*ConfigManager) BatchUpdateConfigs

func (cm *ConfigManager) BatchUpdateConfigs(updates map[models.ResourceType]map[string]map[string]interface{}) error

func (*ConfigManager) CompareConfigVersions

func (cm *ConfigManager) CompareConfigVersions(resourceType models.ResourceType, resourceID string, version1, version2 string) (map[string]interface{}, error)

func (*ConfigManager) ContainerName

func (cm *ConfigManager) ContainerName(resourceType models.ResourceType) (string, error)

func (*ConfigManager) DeleteConfig

func (cm *ConfigManager) DeleteConfig(resourceType models.ResourceType, containerName string) error

func (*ConfigManager) FilterConfigs

func (cm *ConfigManager) FilterConfigs(resourceType models.ResourceType, filter func(map[string]interface{}) bool) ([]string, error)

func (*ConfigManager) GetAppVariables

func (cm *ConfigManager) GetAppVariables(appID string) (map[string]string, error)

func (*ConfigManager) GetConfig

func (cm *ConfigManager) GetConfig(resourceType models.ResourceType, containerName string) (map[string]interface{}, error)

func (*ConfigManager) GetConfigStats

func (cm *ConfigManager) GetConfigStats(resourceType models.ResourceType) (map[string]int, error)

func (*ConfigManager) GetConfigWithMeta

func (cm *ConfigManager) GetConfigWithMeta(resourceType models.ResourceType, containerName string) (config map[string]interface{}, meta map[string]interface{}, err error)

func (*ConfigManager) GetContainerNameByResourceType

func (cm *ConfigManager) GetContainerNameByResourceType(resourceType models.ResourceType) (string, error)

func (*ConfigManager) GetGlobalVariables

func (cm *ConfigManager) GetGlobalVariables() (map[string]string, error)

func (*ConfigManager) GetJson

func (cm *ConfigManager) GetJson(resourceType models.ResourceType) (string, error)

func (*ConfigManager) ListApps

func (cm *ConfigManager) ListApps() ([]string, error)

func (*ConfigManager) ListConfigKeys

func (cm *ConfigManager) ListConfigKeys(resourceType models.ResourceType) ([]string, error)

func (*ConfigManager) ListConfigsWithDetails

func (cm *ConfigManager) ListConfigsWithDetails(resourceType models.ResourceType) (map[string]map[string]interface{}, error)

func (*ConfigManager) ListResourceIDs

func (cm *ConfigManager) ListResourceIDs(resourceType models.ResourceType) ([]string, error)

func (*ConfigManager) SearchConfigs

func (cm *ConfigManager) SearchConfigs(resourceType models.ResourceType, searchTerm string) ([]string, error)

func (*ConfigManager) SetAppVariables

func (cm *ConfigManager) SetAppVariables(appID string, variables map[string]string) error

func (*ConfigManager) SetGlobalVariables

func (cm *ConfigManager) SetGlobalVariables(variables map[string]string) error

func (*ConfigManager) Start

func (cm *ConfigManager) Start() error

func (*ConfigManager) Stop

func (cm *ConfigManager) Stop() error

func (*ConfigManager) UpdateConfig

func (cm *ConfigManager) UpdateConfig(resourceType models.ResourceType, containerName string, config map[string]interface{}) error

func (*ConfigManager) UpdateConfigWithMeta

func (cm *ConfigManager) UpdateConfigWithMeta(resourceType models.ResourceType, containerName string, config map[string]interface{}, meta map[string]interface{}) error

type ConfigProvider

type ConfigProvider interface {
	GetConfig(serviceName string) (map[string]interface{}, error)
	SetConfig(serviceName string, config map[string]interface{}) error
}

type ConfigTemplate

type ConfigTemplate struct {
	// contains filtered or unexported fields
}

ConfigTemplate 是配置模板的主要结构

func NewConfigTemplate

func NewConfigTemplate(templateString string) (*ConfigTemplate, error)

NewConfigTemplate 创建一个新的配置模板

func (*ConfigTemplate) Generate

func (ct *ConfigTemplate) Generate(data TemplateData) (string, error)

Generate 根据提供的数据生成配置

type Controller

type Controller struct {
	ConfigManager     *ConfigManager
	ResourceManager   *ResourceManager
	RegistryManager   *RegistryManager
	KeyManager        *KeyManager
	MinioManager      *MinioManager
	DeploymentManager *DeploymentManager

	AgentManager          *AgentManager
	JumperServerManager   *JumperServerManager
	GrpcProxyManager      *GrpcProxyManager
	LokiForwarder         *LokiForwarder
	PrometheusForwarder   *PrometheusForwarder
	TelegrafManager       *TelegrafManager
	RedisManager          *RedisManager
	MonitoringService     *MonitoringService
	PluginTemplateService *PluginTemplateService
	// contains filtered or unexported fields
}

func InitializeControllerComponents

func InitializeControllerComponents(configPath string) (*Controller, error)

InitializeControllerComponents 初始化所有控制器组件

func ProvideController

ProvideController 为 Wire 依赖注入提供的构造函数

func (*Controller) CreateDeployment

func (c *Controller) CreateDeployment(req models.DeploymentRequest) (*models.Deployment, error)

POST /api/v1/deployments

func (*Controller) CreateResource

func (c *Controller) CreateResource(resourceType models.ResourceType, resourceID string, config map[string]interface{}) error

func (*Controller) DeleteConfig

func (c *Controller) DeleteConfig(resourceType models.ResourceType, resourceID string) error

func (*Controller) DeleteResource

func (c *Controller) DeleteResource(resourceType models.ResourceType, resourceID string) error

func (*Controller) GetAgentVariables

func (c *Controller) GetAgentVariables(ctx context.Context, agentID, appID string) (map[string]string, error)

func (*Controller) GetAssets

func (c *Controller) GetAssets(tags string) ([]*structs.L2DeviceRemoteInfo, error)

func (*Controller) GetConfig

func (c *Controller) GetConfig(resourceType models.ResourceType, resourceID string) (map[string]interface{}, error)

func (*Controller) GetMongoClient

func (c *Controller) GetMongoClient() *mongo.Client

GetMongoClient 返回 MongoDB 客户端(用于其他模块访问)

func (*Controller) GetRegisteredAgentsCount

func (c *Controller) GetRegisteredAgentsCount() int

func (*Controller) GetResourceStatus

func (c *Controller) GetResourceStatus(resourceType models.ResourceType, resourceID string) (string, error)

func (*Controller) GetStatus

func (c *Controller) GetStatus() (*models.ControllerStatus, error)

func (*Controller) ListConfigs

func (c *Controller) ListConfigs(resourceType models.ResourceType) ([]string, error)

func (*Controller) ListDeployments

func (c *Controller) ListDeployments(w http.ResponseWriter, r *http.Request)

GET /api/v1/deployments

func (*Controller) ListResources

func (c *Controller) ListResources() []string

func (*Controller) RegisterService

func (c *Controller) RegisterService() error

func (*Controller) RegisterToUpstreamEtcd

func (c *Controller) RegisterToUpstreamEtcd(upstreamEtcdEndpoints []string) error

func (*Controller) Start

func (c *Controller) Start(options ...controllerOptionFunc) error

func (*Controller) Stop

func (c *Controller) Stop() error

func (*Controller) UpdateConfig

func (c *Controller) UpdateConfig(resourceType models.ResourceType, resourceID string, config map[string]interface{}) error

type DataQualityChecker

type DataQualityChecker struct {
	// contains filtered or unexported fields
}

DataQualityChecker 数据质量检查器

func NewDataQualityChecker

func NewDataQualityChecker(qualityService *DataQualityService, metadataService *MetricMetadataService) *DataQualityChecker

NewDataQualityChecker 创建数据质量检查器

func (*DataQualityChecker) CheckMetric

func (c *DataQualityChecker) CheckMetric(ctx context.Context, metricName string, value float64, timestamp time.Time) ([]*models.QualityCheckResult, error)

CheckMetric 检查指标数据质量

func (*DataQualityChecker) CheckMetricsBatch

func (c *DataQualityChecker) CheckMetricsBatch(ctx context.Context, metrics map[string]float64, timestamp time.Time) (map[string][]*models.QualityCheckResult, error)

CheckMetricsBatch 批量检查指标数据质量

func (*DataQualityChecker) ClearCache

func (c *DataQualityChecker) ClearCache()

ClearCache 清除缓存

type DataQualityService

type DataQualityService struct {
	// contains filtered or unexported fields
}

DataQualityService 数据质量服务(从OneOps平台获取规则)

func NewDataQualityService

func NewDataQualityService(oneOpsBaseURL string) *DataQualityService

NewDataQualityService 创建数据质量服务

func ProvideDataQualityService

func ProvideDataQualityService(config *Config) *DataQualityService

ProvideDataQualityService 为 Wire 依赖注入提供的构造函数

func (*DataQualityService) GetRules

func (s *DataQualityService) GetRules(ctx context.Context, enabled bool) ([]*models.DataQualityRule, error)

GetRules 获取数据质量规则列表

func (*DataQualityService) GetRulesByMetric

func (s *DataQualityService) GetRulesByMetric(ctx context.Context, metricName string) ([]*models.DataQualityRule, error)

GetRulesByMetric 根据指标名称获取匹配的质量规则

type DatabaseConfig

type DatabaseConfig struct {
	Type        string `yaml:"type" json:"type"`                   // 数据库类型,这里应该是 "mongodb"
	URI         string `yaml:"uri" json:"uri"`                     // MongoDB 连接 URI
	Database    string `yaml:"database" json:"database"`           // 数据库名称
	MaxPoolSize uint64 `yaml:"max_pool_size" json:"max_pool_size"` // 连接池最大连接数
}

type DefaultTargetConverter

type DefaultTargetConverter struct {
	// contains filtered or unexported fields
}

DefaultTargetConverter 默认目标转换器

func NewDefaultTargetConverter

func NewDefaultTargetConverter(logger *zap.Logger) *DefaultTargetConverter

NewDefaultTargetConverter 创建默认目标转换器

func (*DefaultTargetConverter) Convert

func (c *DefaultTargetConverter) Convert(targets []*CollectionTargetDTO, strategy *UnifiedCollectionStrategyDTO) (map[string]interface{}, error)

Convert 默认转换逻辑

func (*DefaultTargetConverter) Supports

func (c *DefaultTargetConverter) Supports(collectionType string) bool

Supports 默认转换器支持所有类型(作为回退)

type Deployment

type Deployment struct {
	// Timeout               int `yaml:"timeout"`
	ConcurrentDeployments int `yaml:"concurrent_deployments"`
}

type DeploymentManager

type DeploymentManager struct {
	ConfigManager   *ConfigManager
	RegistryManager *RegistryManager
	MinioManager    *MinioManager
	MongoClient     *mongo.Client
	// contains filtered or unexported fields
}

func ProvideDeployManager

func ProvideDeployManager(configManager *ConfigManager, registryManager *RegistryManager, minio *MinioManager, mongoClient *mongo.Client) *DeploymentManager

func (*DeploymentManager) CreateDeployment

func (dm *DeploymentManager) CreateDeployment(req models.DeploymentRequest) (*models.Deployment, error)

func (*DeploymentManager) GetDeployment

func (dm *DeploymentManager) GetDeployment(deploymentID string) (*models.Deployment, error)

在 DeploymentManager 中更新此方法

func (*DeploymentManager) ListDeployments

func (dm *DeploymentManager) ListDeployments() ([]models.Deployment, error)

func (*DeploymentManager) UpdateDeploymentDeviceStatus

func (dm *DeploymentManager) UpdateDeploymentDeviceStatus(deploymentID string, deviceStatus models.TargetDevice) error

type EffectiveStrategy

type EffectiveStrategy struct {
	GlobalStrategy   *models.EnhancedGlobalMetricStrategy
	GroupStrategies  []*models.GroupMetricStrategy
	InstanceStrategy *models.EnhancedInstanceMetricStrategy
	CollectionTasks  []models.CollectionTask
}

EffectiveStrategy 生效策略(合并后的最终策略)

type EnhancedStrategyConverter

type EnhancedStrategyConverter struct {
	// contains filtered or unexported fields
}

EnhancedStrategyConverter 增强的策略转换器 支持采集任务、处理规则、输出规则等完整功能 使用配置驱动,替代硬编码

func NewEnhancedStrategyConverter

func NewEnhancedStrategyConverter(logger *zap.Logger, configLoader *config.PluginMappingConfigLoader) *EnhancedStrategyConverter

NewEnhancedStrategyConverter 创建增强的策略转换器

func (*EnhancedStrategyConverter) ConvertCollectionTaskToPluginConfig

func (esc *EnhancedStrategyConverter) ConvertCollectionTaskToPluginConfig(
	ctx context.Context,
	task *models.CollectionTask,
) (map[string]interface{}, []map[string]interface{}, []map[string]interface{}, error)

ConvertCollectionTaskToPluginConfig 将采集任务转换为插件配置

func (*EnhancedStrategyConverter) GetConfigLoader

GetConfigLoader 获取配置加载器(用于外部访问)

type FileOperation

type FileOperation struct {
	// contains filtered or unexported fields
}

FileOperation 文件操作模块

func NewFileOperation

func NewFileOperation(agentManager *AgentManager) *FileOperation

NewFileOperation 创建文件操作模块

func (*FileOperation) CreateDirectory

func (fo *FileOperation) CreateDirectory(ctx context.Context, agentCode, dirPath string, mode uint32) (map[string]interface{}, error)

CreateDirectory 创建目录

func (*FileOperation) DeleteFile

func (fo *FileOperation) DeleteFile(ctx context.Context, agentCode, filePath string, recursive bool) (map[string]interface{}, error)

DeleteFile 删除文件或目录

func (*FileOperation) DownloadFile

func (fo *FileOperation) DownloadFile(ctx context.Context, agentCode, filePath string, offset, length int64) (io.ReadCloser, error)

DownloadFile 下载文件(返回读取器)

func (*FileOperation) GetFileInfo

func (fo *FileOperation) GetFileInfo(ctx context.Context, agentCode, filePath string) (map[string]interface{}, error)

GetFileInfo 获取文件信息

func (*FileOperation) GetUploadStatus

func (fo *FileOperation) GetUploadStatus(ctx context.Context, agentCode, sessionID string) (map[string]interface{}, error)

GetUploadStatus 获取上传状态

func (*FileOperation) ListFiles

func (fo *FileOperation) ListFiles(ctx context.Context, agentCode, dirPath string) (map[string]interface{}, error)

ListFiles 列出文件

func (*FileOperation) StartFileUpload

func (fo *FileOperation) StartFileUpload(ctx context.Context, agentCode, filePath string, fileSize int64, md5, sha256 string) (map[string]interface{}, error)

StartFileUpload 开始文件上传

func (*FileOperation) UploadFileChunk

func (fo *FileOperation) UploadFileChunk(ctx context.Context, agentCode, sessionID string, chunkIndex int64, data []byte) error

UploadFileChunk 上传文件块

type GitConfig

type GitConfig struct {
	Address  string `yaml:"address"`
	Username string `yaml:"username"`
	Password string `yaml:"password"`
	Branch   string `yaml:"branch"`
}

type GlobalResolver

type GlobalResolver struct {
	ConfigManager *ConfigManager
}

func ProvideGlobalResolver

func ProvideGlobalResolver(cm *ConfigManager) *GlobalResolver

func (*GlobalResolver) Resolve

func (gr *GlobalResolver) Resolve(ctx context.Context, agentID, appID string) (map[string]string, error)

type GroupStrategyManager

type GroupStrategyManager struct {
	// contains filtered or unexported fields
}

GroupStrategyManager 分组策略管理器

func NewGroupStrategyManager

func NewGroupStrategyManager(mongoClient *mongo.Client, logger *zap.Logger) *GroupStrategyManager

NewGroupStrategyManager 创建分组策略管理器

func (*GroupStrategyManager) CreateGroupStrategy

func (gsm *GroupStrategyManager) CreateGroupStrategy(ctx context.Context, strategy *models.GroupMetricStrategy) error

CreateGroupStrategy 创建分组策略

func (*GroupStrategyManager) DeleteGroupStrategy

func (gsm *GroupStrategyManager) DeleteGroupStrategy(ctx context.Context, id string) error

DeleteGroupStrategy 删除分组策略

func (*GroupStrategyManager) GetGroupStrategiesForAgent

func (gsm *GroupStrategyManager) GetGroupStrategiesForAgent(ctx context.Context, agentCode string, agentManager *AgentManager) ([]*models.GroupMetricStrategy, error)

GetGroupStrategiesForAgent 获取 Agent 匹配的所有分组策略(按优先级排序)

func (*GroupStrategyManager) GetGroupStrategy

func (gsm *GroupStrategyManager) GetGroupStrategy(ctx context.Context, id string) (*models.GroupMetricStrategy, error)

GetGroupStrategy 获取分组策略

func (*GroupStrategyManager) ListGroupStrategies

func (gsm *GroupStrategyManager) ListGroupStrategies(ctx context.Context) ([]*models.GroupMetricStrategy, error)

ListGroupStrategies 列出所有分组策略

func (*GroupStrategyManager) MatchGroupStrategies

func (gsm *GroupStrategyManager) MatchGroupStrategies(ctx context.Context, agentLabels map[string]string) ([]*models.GroupMetricStrategy, error)

MatchGroupStrategies 根据 Agent 标签匹配分组策略 支持多标签 AND 逻辑匹配和通配符匹配

func (*GroupStrategyManager) UpdateGroupStrategy

func (gsm *GroupStrategyManager) UpdateGroupStrategy(ctx context.Context, strategy *models.GroupMetricStrategy) error

UpdateGroupStrategy 更新分组策略

type GrpcProxyManager

type GrpcProxyManager struct {
	StreamDirector proxy.StreamDirector
	// contains filtered or unexported fields
}

func ProvideGrpcProxyManager

func ProvideGrpcProxyManager(registryManager *RegistryManager, am *AgentManager, km *KeyManager, config *Config) *GrpcProxyManager

func (*GrpcProxyManager) Start

func (gpm *GrpcProxyManager) Start() error

func (*GrpcProxyManager) Stop

func (gpm *GrpcProxyManager) Stop() error

type HealthMonitor

type HealthMonitor struct {
	// contains filtered or unexported fields
}

HealthMonitor 健康监控模块

func NewHealthMonitor

func NewHealthMonitor(agentManager *AgentManager) *HealthMonitor

NewHealthMonitor 创建健康监控模块

func (*HealthMonitor) GetAgentOverallHealth

func (hm *HealthMonitor) GetAgentOverallHealth(ctx context.Context, agentCode string) (map[string]interface{}, error)

GetAgentOverallHealth 获取 Agent 整体健康状态

func (*HealthMonitor) GetHealthHistory

func (hm *HealthMonitor) GetHealthHistory(ctx context.Context, agentCode, packageName string, limit int32) ([]*pb.HealthHistoryEntry, error)

GetHealthHistory 获取服务的健康检查历史

func (*HealthMonitor) GetServiceHealth

func (hm *HealthMonitor) GetServiceHealth(ctx context.Context, agentCode, packageName string) (*pb.ServiceHealth, error)

GetServiceHealth 获取指定服务的健康状态

func (*HealthMonitor) ListAgentHealth

func (hm *HealthMonitor) ListAgentHealth(ctx context.Context, agentCode string, filterStatus []pb.HealthStatus) ([]*pb.ServiceHealth, error)

ListAgentHealth 获取 Agent 所有服务的健康状态

type JumperServerManager

type JumperServerManager struct {
	// contains filtered or unexported fields
}

func ProvideJumperServerManager

func ProvideJumperServerManager(registryManager *RegistryManager, km *KeyManager, config *Config) *JumperServerManager

func (*JumperServerManager) Start

func (jsm *JumperServerManager) Start() error

func (*JumperServerManager) Stop

func (jsm *JumperServerManager) Stop() error

type KeyManager

type KeyManager struct {
	// contains filtered or unexported fields
}

func NewKeyManager

func NewKeyManager(config *Config) (*KeyManager, error)

func ProvideKeyManager

func ProvideKeyManager(config *Config) (*KeyManager, error)

ProvideKeyManager 为 Wire 依赖注入提供的构造函数

func (*KeyManager) GenerateResourceKey

func (km *KeyManager) GenerateResourceKey(resourceType, containerName string) (string, error)

func (*KeyManager) GenerateResourcePrefix

func (km *KeyManager) GenerateResourcePrefix(resourceType string) (string, error)

func (*KeyManager) GenerateServiceKey

func (km *KeyManager) GenerateServiceKey(serviceName, hostIdentifier, port string) (string, error)

func (*KeyManager) GenerateServicePrefix

func (km *KeyManager) GenerateServicePrefix(serviceName string) (string, error)

func (*KeyManager) ParseResourceKey

func (km *KeyManager) ParseResourceKey(key string) (string, string, string, error)

func (*KeyManager) ParseServiceKey

func (km *KeyManager) ParseServiceKey(key string) (string, string, string, string, string, error)

type LayerResolver

type LayerResolver interface {
	Resolve(ctx context.Context, agentID, appID string) (map[string]string, error)
}

type LayeredVariableResolver

type LayeredVariableResolver struct {
	// contains filtered or unexported fields
}

func NewLayeredVariableResolver

func NewLayeredVariableResolver(rm *RegistryManager, cm *ConfigManager) *LayeredVariableResolver

func (*LayeredVariableResolver) Resolve

func (lvr *LayeredVariableResolver) Resolve(ctx context.Context, agentID, appID string) (map[string]string, error)

type LokiForwarder

type LokiForwarder struct {
	UpstreamURL   string
	Client        *http.Client
	BufferSize    int
	FlushInterval time.Duration

	RedisManager *RedisManager
	LokiLabelKey string
	// contains filtered or unexported fields
}

func ProvideLokiForwarder

func ProvideLokiForwarder(config *ConfigManager, redisManager *RedisManager) *LokiForwarder

func (*LokiForwarder) ForwardHandler

func (lf *LokiForwarder) ForwardHandler(w http.ResponseWriter, r *http.Request)

type LokiPushRequest

type LokiPushRequest struct {
	Streams []LokiStream `json:"streams"`
}

LokiPushRequest represents the structure of a Loki push request

type LokiStream

type LokiStream struct {
	Stream map[string]string `json:"stream"`
	Values [][]string        `json:"values"` // [timestamp, log message]
}

LokiStream represents a stream of log entries in Loki

type Manager

type Manager interface {
	Start() error
	Stop() error
}

type MetricDataPoint

type MetricDataPoint struct {
	Value     float64
	Timestamp time.Time
}

MetricDataPoint 指标数据点

type MetricMetadataService

type MetricMetadataService struct {
	// contains filtered or unexported fields
}

MetricMetadataService 指标元数据服务(从OneOps平台获取)

func NewMetricMetadataService

func NewMetricMetadataService(oneOpsBaseURL string) *MetricMetadataService

NewMetricMetadataService 创建指标元数据服务

func ProvideMetricMetadataService

func ProvideMetricMetadataService(config *Config) *MetricMetadataService

ProvideMetricMetadataService 为 Wire 依赖注入提供的构造函数

func (*MetricMetadataService) GetMetadata

func (s *MetricMetadataService) GetMetadata(ctx context.Context, name string) (*models.MetricMetadata, error)

GetMetadata 根据名称获取指标元数据

func (*MetricMetadataService) ListMetadata

func (s *MetricMetadataService) ListMetadata(ctx context.Context, category string) ([]*models.MetricMetadata, error)

ListMetadata 获取指标元数据列表

type MetricsManagerConfig

type MetricsManagerConfig struct {
	Port           int    `yaml:"port" json:"port"`
	RemoteWriteURL string `yaml:"remoteWriteurl" json:"remoteWriteurl"`
}

MetricsManagerConfig 定义了指标管理器的配置

type MetricsQuery

type MetricsQuery struct {
	// contains filtered or unexported fields
}

MetricsQuery 指标查询模块

func NewMetricsQuery

func NewMetricsQuery(agentManager *AgentManager) *MetricsQuery

NewMetricsQuery 创建指标查询模块

func (*MetricsQuery) GetAgentMetricsSummary

func (mq *MetricsQuery) GetAgentMetricsSummary(ctx context.Context, agentCode string) (map[string]interface{}, error)

GetAgentMetricsSummary 获取 Agent 指标摘要

func (*MetricsQuery) GetApplicationMetrics

func (mq *MetricsQuery) GetApplicationMetrics(ctx context.Context, agentCode, serviceName string) (map[string]interface{}, error)

GetApplicationMetrics 获取应用指标

func (*MetricsQuery) GetMetricsHistory

func (mq *MetricsQuery) GetMetricsHistory(ctx context.Context, agentCode, metricType, serviceName string, startTime, endTime time.Time) (map[string]interface{}, error)

GetMetricsHistory 获取指标历史数据

func (*MetricsQuery) GetServiceMetrics

func (mq *MetricsQuery) GetServiceMetrics(ctx context.Context, agentCode, packageName string) (*pb.ServiceMetrics, error)

GetServiceMetrics 获取指定服务的指标

func (*MetricsQuery) GetSystemMetrics

func (mq *MetricsQuery) GetSystemMetrics(ctx context.Context, agentCode string) (*pb.SystemMetrics, error)

GetSystemMetrics 获取系统指标

func (*MetricsQuery) ListServiceMetrics

func (mq *MetricsQuery) ListServiceMetrics(ctx context.Context, agentCode string, runningOnly bool) ([]*pb.ServiceMetrics, error)

ListServiceMetrics 获取 Agent 所有服务的指标

type MetricsStrategyService

type MetricsStrategyService struct {
	// contains filtered or unexported fields
}

MetricsStrategyService 指标策略管理服务 注意:数据源是OneOps MySQL,Controller的MongoDB作为缓存/同步目标 策略数据流向:OneOps MySQL → Controller MongoDB → Agent 本服务主要用于Agent查询策略配置,实际的数据管理在OneOps后端完成 MongoDB中的数据由OneOps后端同步更新

func NewMetricsStrategyService

func NewMetricsStrategyService(mongoClient *mongo.Client, agentManager *AgentManager, registryManager *RegistryManager, logger *zap.Logger, configLoader *config.PluginMappingConfigLoader) *MetricsStrategyService

NewMetricsStrategyService 创建指标策略管理服务

func (*MetricsStrategyService) DeleteInstanceStrategy

func (s *MetricsStrategyService) DeleteInstanceStrategy(ctx context.Context, agentCode string) error

DeleteInstanceStrategy 删除实例策略

func (*MetricsStrategyService) GenerateConfigFromEnhancedStrategy

func (s *MetricsStrategyService) GenerateConfigFromEnhancedStrategy(
	ctx context.Context,
	agentCode string,
) (string, error)

GenerateConfigFromEnhancedStrategy 从增强策略生成 Telegraf 配置

func (*MetricsStrategyService) GetApplicationStrategy

func (s *MetricsStrategyService) GetApplicationStrategy(ctx context.Context, agentCode string) (*models.ApplicationMetricStrategy, error)

GetApplicationStrategy 获取应用指标策略(全局或实例)

func (*MetricsStrategyService) GetAvailableMetrics

func (s *MetricsStrategyService) GetAvailableMetrics(ctx context.Context, agentCode string) ([]string, error)

GetAvailableMetrics 获取可用指标列表

func (*MetricsStrategyService) GetConfigStatus

func (s *MetricsStrategyService) GetConfigStatus(ctx context.Context, agentCode string) (*models.ConfigStatus, error)

GetConfigStatus 获取配置生效状态 注意:新架构中,配置状态应该从任务服务(CollectionTaskService)查询,而不是从策略服务查询 此方法已废弃,请使用CollectionTaskService.GetTaskStatus替代

func (*MetricsStrategyService) GetEffectiveEnhancedStrategy

func (s *MetricsStrategyService) GetEffectiveEnhancedStrategy(ctx context.Context, agentCode string) (*EffectiveStrategy, error)

GetEffectiveEnhancedStrategy 获取生效的增强策略 注意:策略合并逻辑已在 OneOps 中完成,同步到 Controller 的 MongoDB 时已经是已合并的策略 Controller 直接返回从 MongoDB 读取的策略即可,不需要再次合并 这样可以避免重复逻辑,并确保两个服务返回的策略一致

func (*MetricsStrategyService) GetEnhancedGlobalStrategy

func (s *MetricsStrategyService) GetEnhancedGlobalStrategy(ctx context.Context) (*models.EnhancedGlobalMetricStrategy, error)

GetEnhancedGlobalStrategy 获取增强的全局策略

func (*MetricsStrategyService) GetEnhancedInstanceStrategy

func (s *MetricsStrategyService) GetEnhancedInstanceStrategy(ctx context.Context, agentCode string) (*models.EnhancedInstanceMetricStrategy, error)

GetEnhancedInstanceStrategy 获取增强的实例策略

func (*MetricsStrategyService) PreviewTask

PreviewTask 预览采集任务匹配的指标

func (*MetricsStrategyService) UpdateApplicationStrategy

func (s *MetricsStrategyService) UpdateApplicationStrategy(ctx context.Context, strategy *models.ApplicationMetricStrategy) error

UpdateApplicationStrategy 更新应用指标策略

type MinioConfig

type MinioConfig struct {
	Endpoint        string `yaml:"endpoint" json:"endpoint"`
	AccessKeyID     string `yaml:"accessKeyID" json:"access_key_id"`
	SecretAccessKey string `yaml:"secretAccessKey" json:"secret_access_key"`
	UseSSL          bool   `yaml:"useSSL" json:"use_ssl"`
	BucketName      string `yaml:"bucketName" json:"bucketName"`
	ProxyAddr       string `yaml:"proxyAddr" json:"proxy_addr"`
}

type MinioManager

type MinioManager struct {
	KeyManager      *KeyManager
	ConfigManager   *ConfigManager
	RegistryManager *RegistryManager
	// contains filtered or unexported fields
}

func ProvideMinioManager

func ProvideMinioManager(km *KeyManager, configManager *ConfigManager, registryManager *RegistryManager) (*MinioManager, error)

func (*MinioManager) CreateBucket

func (mm *MinioManager) CreateBucket(bucketName string) error

func (*MinioManager) DeleteObject

func (mm *MinioManager) DeleteObject(bucketName, objectName string) error

func (*MinioManager) DownloadFile

func (mm *MinioManager) DownloadFile(bucketName, objectName string, filePath string) error

func (*MinioManager) GetMiniManagerStatus

func (mm *MinioManager) GetMiniManagerStatus() (models.ResourceStatus, error)

func (*MinioManager) GetObjectInfo

func (mm *MinioManager) GetObjectInfo(bucketName, objectName string) (minio.ObjectInfo, error)

GetObjectInfo 获取对象信息

func (*MinioManager) GetPresignedURL

func (mm *MinioManager) GetPresignedURL(bucketName, objectName string, expiry time.Duration, method string) (string, error)

GetPresignedURL 获取预签名URL GetPresignedURL 获取预签名URL

func (*MinioManager) GetProxyLatestPackageURL

func (mm *MinioManager) GetProxyLatestPackageURL(bucketName, objectName string) (string, error)

func (*MinioManager) GetProxyPackageURL

func (mm *MinioManager) GetProxyPackageURL(bucketName, objectName string) (string, error)

func (*MinioManager) ListBuckets

func (mm *MinioManager) ListBuckets() ([]minio.BucketInfo, error)

func (*MinioManager) ListObjects

func (mm *MinioManager) ListObjects(bucketName string) ([]minio.ObjectInfo, error)

func (*MinioManager) ListPackages

func (mm *MinioManager) ListPackages(bucketName string) ([]string, error)

func (*MinioManager) RegisterService

func (mm *MinioManager) RegisterService() error

func (*MinioManager) Start

func (mm *MinioManager) Start() error

Start 启动Minio管理器和代理服务器

func (*MinioManager) Stop

func (mm *MinioManager) Stop() error

Stop 停止Minio管理器和代理服务器

func (*MinioManager) UploadFile

func (mm *MinioManager) UploadFile(bucketName, objectName string, filePath string) error

type MonitoringService

type MonitoringService struct {
	// contains filtered or unexported fields
}

func ProvideMonitoringService

func ProvideMonitoringService(
	mongoClient *mongo.Client,
	config *Config,
	configGenerator *TelegrafConfigGenerator,
	templateService *PluginTemplateService,
	telegrafManager *TelegrafManager,
) (*MonitoringService, error)

func (*MonitoringService) CreatePlugin

func (ms *MonitoringService) CreatePlugin(ctx context.Context, plugin *models.PluginConfig) error

CreatePlugin 创建独立插件配置

func (*MonitoringService) CreateTask

func (ms *MonitoringService) CreateTask(ctx context.Context, task *models.MonitoringTask) error

CreateTask 创建监控任务

func (*MonitoringService) DeletePlugin

func (ms *MonitoringService) DeletePlugin(ctx context.Context, pluginID string) error

DeletePlugin 删除插件配置

func (*MonitoringService) DeleteTask

func (ms *MonitoringService) DeleteTask(ctx context.Context, taskID string) error

DeleteTask 删除监控任务

func (*MonitoringService) EnsureIndexes

func (ms *MonitoringService) EnsureIndexes(ctx context.Context) error

EnsureIndexes 创建索引

func (*MonitoringService) GetPlugin

func (ms *MonitoringService) GetPlugin(ctx context.Context, pluginID string) (*models.PluginConfig, error)

GetPlugin 获取插件配置

func (*MonitoringService) GetStatus

GetStatus 获取监控平台状态

func (*MonitoringService) GetTask

func (ms *MonitoringService) GetTask(ctx context.Context, taskID string) (*models.MonitoringTask, error)

GetTask 获取监控任务

func (*MonitoringService) ListPlugins

func (ms *MonitoringService) ListPlugins(ctx context.Context, filter map[string]interface{}) ([]*models.PluginConfig, error)

ListPlugins 列出插件配置

func (*MonitoringService) ListTasks

func (ms *MonitoringService) ListTasks(ctx context.Context, filter map[string]interface{}) ([]*models.MonitoringTask, error)

ListTasks 列出监控任务

func (*MonitoringService) PauseTask

func (ms *MonitoringService) PauseTask(ctx context.Context, taskID string) error

PauseTask 暂停监控任务(从active状态转为paused)

func (*MonitoringService) PublishTask

func (ms *MonitoringService) PublishTask(ctx context.Context, taskID string) error

PublishTask 发布监控任务(从created状态转为published)

func (*MonitoringService) ReloadTelegrafConfig

func (ms *MonitoringService) ReloadTelegrafConfig(ctx context.Context) error

ReloadTelegrafConfig 重新加载 telegraf 配置(公开方法)

func (*MonitoringService) ResumeTask

func (ms *MonitoringService) ResumeTask(ctx context.Context, taskID string) error

ResumeTask 恢复监控任务(从paused状态转为active)

func (*MonitoringService) RevokeTask

func (ms *MonitoringService) RevokeTask(ctx context.Context, taskID string) error

RevokeTask 撤回监控任务

func (*MonitoringService) StartTask

func (ms *MonitoringService) StartTask(ctx context.Context, taskID string) error

StartTask 启动监控任务(从published状态转为active)

func (*MonitoringService) StopTask

func (ms *MonitoringService) StopTask(ctx context.Context, taskID string) error

StopTask 停止监控任务(从active或paused状态转为stopped)

func (*MonitoringService) UpdatePlugin

func (ms *MonitoringService) UpdatePlugin(ctx context.Context, pluginID string, plugin *models.PluginConfig) error

UpdatePlugin 更新插件配置

func (*MonitoringService) UpdateTask

func (ms *MonitoringService) UpdateTask(ctx context.Context, taskID string, task *models.MonitoringTask) error

UpdateTask 更新监控任务

type NacosConfig

type NacosConfig struct {
	Server     string `yaml:"server" json:"server"`
	Port       int    `yaml:"port" json:"port"`
	Namespace  string `yaml:"namespace" json:"namespace"`
	Group      string `yaml:"group" json:"group"`
	AgentGroup string `yaml:"agent_group" json:"agent_group"`
	DataID     string `yaml:"data_id" json:"data_id"`
	Username   string `yaml:"username" json:"username"`
	Password   string `yaml:"password" json:"password"`
	LogDir     string `yaml:"log_dir" json:"log_dir"`
	CacheDir   string `yaml:"cache_dir" json:"cache_dir"`
	LogLevel   string `yaml:"log_level" json:"log_level"`
}

定义NacosConfig结构体

type NacosManager

type NacosManager struct {
	// contains filtered or unexported fields
}

func ProvideNacosManager

func ProvideNacosManager(config *Config) (*NacosManager, error)

func (*NacosManager) BatchGetConfig

func (nm *NacosManager) BatchGetConfig(params []vo.ConfigParam) (map[string]string, error)

func (*NacosManager) BatchSetConfig

func (nm *NacosManager) BatchSetConfig(configs map[string]vo.ConfigParam) error

func (*NacosManager) DeleteConfig

func (nm *NacosManager) DeleteConfig(dataId, group string) error

func (*NacosManager) GetConfig

func (nm *NacosManager) GetConfig(dataId, group string) (string, error)

func (*NacosManager) GetConfigWithRetry

func (nm *NacosManager) GetConfigWithRetry(dataId, group string, maxRetries int) (string, error)

func (*NacosManager) ListenConfig

func (nm *NacosManager) ListenConfig(dataId, group string, onChange func(string)) error

func (*NacosManager) SetConfig

func (nm *NacosManager) SetConfig(dataId, group, content string) error

type PackageInfo

type PackageInfo struct {
	ID        string `bson:"_id"`
	AgentCode string `bson:"agent_code"`
	Name      string `bson:"name"`
	Version   string `bson:"version"`
}

type PackageWatcher

type PackageWatcher struct {
	// contains filtered or unexported fields
}

func (*PackageWatcher) Watch

func (pw *PackageWatcher) Watch(ctx context.Context)

type PluginTemplateService

type PluginTemplateService struct {
	// contains filtered or unexported fields
}

func ProvidePluginTemplateService

func ProvidePluginTemplateService(mongoClient *mongo.Client, config *Config) (*PluginTemplateService, error)

func (*PluginTemplateService) ConvertConfigToTOML

func (pts *PluginTemplateService) ConvertConfigToTOML(config map[string]interface{}) (string, error)

ConvertConfigToTOML 将配置 map 转换为 TOML 格式的字符串

func (*PluginTemplateService) CreateTemplate

func (pts *PluginTemplateService) CreateTemplate(ctx context.Context, tmpl *models.PluginTemplate) error

CreateTemplate 创建插件模板

func (*PluginTemplateService) DeleteTemplate

func (pts *PluginTemplateService) DeleteTemplate(ctx context.Context, templateID string) error

DeleteTemplate 删除插件模板

func (*PluginTemplateService) EnsureIndexes

func (pts *PluginTemplateService) EnsureIndexes(ctx context.Context) error

EnsureIndexes 创建索引

func (*PluginTemplateService) GetTemplate

func (pts *PluginTemplateService) GetTemplate(ctx context.Context, templateID string) (*models.PluginTemplate, error)

GetTemplate 获取插件模板

func (*PluginTemplateService) GetTemplatesByType

func (pts *PluginTemplateService) GetTemplatesByType(ctx context.Context, pluginType string) ([]*models.PluginTemplate, error)

GetTemplatesByType 根据插件类型获取模板列表

func (*PluginTemplateService) ListTemplates

func (pts *PluginTemplateService) ListTemplates(ctx context.Context, filter map[string]interface{}) ([]*models.PluginTemplate, error)

ListTemplates 列出所有插件模板

func (*PluginTemplateService) PresetTemplates

func (pts *PluginTemplateService) PresetTemplates(ctx context.Context) error

PresetTemplates 预置常用插件模板

func (*PluginTemplateService) RenderTemplate

func (pts *PluginTemplateService) RenderTemplate(ctx context.Context, templateID string, params map[string]interface{}) (string, error)

RenderTemplate 渲染模板

func (*PluginTemplateService) RenderTemplateFromContent

func (pts *PluginTemplateService) RenderTemplateFromContent(ctx context.Context, templateContent string, params map[string]interface{}) (string, error)

RenderTemplateFromContent 直接从模板内容渲染(不存储)

func (*PluginTemplateService) UpdateTemplate

func (pts *PluginTemplateService) UpdateTemplate(ctx context.Context, templateID string, tmpl *models.PluginTemplate) error

UpdateTemplate 更新插件模板

func (*PluginTemplateService) ValidateTemplate

func (pts *PluginTemplateService) ValidateTemplate(ctx context.Context, templateContent string) error

ValidateTemplate 验证模板语法

func (*PluginTemplateService) ValidateTemplateParameters

func (pts *PluginTemplateService) ValidateTemplateParameters(ctx context.Context, templateID string, params map[string]interface{}) error

ValidateTemplateParameters 验证模板参数

type PrometheusForwarder

type PrometheusForwarder struct {
	UpstreamURL   string
	Client        *http.Client
	BufferSize    int
	FlushInterval time.Duration

	AddressLabels  map[string]map[string]string
	RedisManager   *RedisManager
	AttachLabelKey string
	// contains filtered or unexported fields
}

func ProvidePrometheusForwarder

func ProvidePrometheusForwarder(config *ConfigManager, redisManager *RedisManager) *PrometheusForwarder

func (*PrometheusForwarder) ForwardHandler

func (pf *PrometheusForwarder) ForwardHandler(w http.ResponseWriter, r *http.Request)

type RedisConfig

type RedisConfig struct {
	Addresses []string `yaml:"addresses" json:"addresses"`
	DB        int      `yaml:"db" json:"db"`
	Password  string   `yaml:"password" json:"password"`
	PoolSize  int      `yaml:"pool_size" json:"pool_size"`
}

type RedisManager

type RedisManager struct {
	// contains filtered or unexported fields
}

func ProvideRedisManager

func ProvideRedisManager(config *Config) (*RedisManager, error)

func (*RedisManager) Close

func (rm *RedisManager) Close() error

func (*RedisManager) Get

func (rm *RedisManager) Get(key string) (map[string]map[string]any, error)

func (*RedisManager) GetAll

func (rm *RedisManager) GetAll() ([]map[string]map[string]map[string]any, error)

func (*RedisManager) GetClient

func (rm *RedisManager) GetClient() *redis.Client

GetClient 返回 Redis 客户端,用于其他模块访问

func (*RedisManager) Start

func (rm *RedisManager) Start() error

func (*RedisManager) Stop

func (rm *RedisManager) Stop() error

func (*RedisManager) Subscribe

func (rm *RedisManager) Subscribe() error

type RegistryManager

type RegistryManager struct {
	KeyManager    *KeyManager
	ConfigManager *ConfigManager

	HostIdentifier string
	// contains filtered or unexported fields
}

func ProvideRegistryManager

func ProvideRegistryManager(km *KeyManager, cm *ConfigManager, mongoClient *mongo.Client) (*RegistryManager, error)

ProvideRegistryManager 为 Wire 依赖注入提供的构造函数

func (*RegistryManager) FirstByName

func (rm *RegistryManager) FirstByName(serviceName string) (*models.ServiceInfo, error)

func (*RegistryManager) GetAgentCount

func (rm *RegistryManager) GetAgentCount() (int, error)

func (*RegistryManager) GetAgentVariables

func (rm *RegistryManager) GetAgentVariables(agentID string) (map[string]string, error)

func (*RegistryManager) GetControllerAddress

func (rm *RegistryManager) GetControllerAddress() (string, error)

GetControllerAddress 返回 Controller 服务的地址

func (*RegistryManager) GetEtcdAddress

func (rm *RegistryManager) GetEtcdAddress() (string, error)

GetEtcdAddress 返回 Etcd 服务的地址

func (*RegistryManager) GetService

func (rm *RegistryManager) GetService(key string) (*models.ServiceInfo, error)

func (*RegistryManager) GetVariables

func (rm *RegistryManager) GetVariables(ctx context.Context, agentID, appID string) (map[string]string, error)

func (*RegistryManager) ListAgents

func (rm *RegistryManager) ListAgents(ctx context.Context, filter map[string]string, page, pageSize int) ([]models.Agent, int, error)

func (*RegistryManager) ListServices

func (rm *RegistryManager) ListServices() []*models.ServiceInfo

func (*RegistryManager) RefreshService

func (rm *RegistryManager) RefreshService(serviceID string, ttl time.Duration) error

func (*RegistryManager) RegisterService

func (rm *RegistryManager) RegisterService(service *models.ServiceInfo, ttl time.Duration) error

func (*RegistryManager) SetAgentVariables

func (rm *RegistryManager) SetAgentVariables(agentID string, variables map[string]string) error

func (*RegistryManager) Start

func (rm *RegistryManager) Start() error

func (*RegistryManager) Stop

func (rm *RegistryManager) Stop() error

func (*RegistryManager) UnregisterService

func (rm *RegistryManager) UnregisterService(key string) error

func (*RegistryManager) UpdateServiceConfig

func (rm *RegistryManager) UpdateServiceConfig(serviceID string, config map[string]interface{}) error

func (*RegistryManager) WatchServices

func (rm *RegistryManager) WatchServices(ctx context.Context) (<-chan *models.ServiceEvent, error)

type ResourceManager

type ResourceManager struct {
	Config *Config

	Resources  *tools.SafeMap[string, *models.ResourceInfo]
	KeyManager *KeyManager
	// contains filtered or unexported fields
}

func ProvideResourceManager

func ProvideResourceManager(km *KeyManager, c *Config) (*ResourceManager, error)

ProvideResourceManager 为 Wire 依赖注入提供的构造函数

func (*ResourceManager) ClearContainers

func (rm *ResourceManager) ClearContainers() error

func (*ResourceManager) CreateResource

func (rm *ResourceManager) CreateResource(resourceType models.ResourceType, containerName string, config map[string]interface{}, autoStart bool) (string, error)

func (*ResourceManager) DeleteResource

func (rm *ResourceManager) DeleteResource(multiLevelKey string) error

func (*ResourceManager) GetAllResourcesStatus

func (rm *ResourceManager) GetAllResourcesStatus() map[string]string

func (*ResourceManager) GetResourceFromContainerID

func (rm *ResourceManager) GetResourceFromContainerID(containerID string) (string, error)

func (*ResourceManager) GetResourceInfo

func (rm *ResourceManager) GetResourceInfo(multiLevelKey string) (*models.ResourceInfo, error)

func (*ResourceManager) GetResourceLogs

func (rm *ResourceManager) GetResourceLogs(multiLevelKey string, tail int) (string, error)

func (*ResourceManager) GetResourceStatus

func (rm *ResourceManager) GetResourceStatus(multiLevelKey string) (string, error)

func (*ResourceManager) GetStatus

func (rm *ResourceManager) GetStatus() map[string]interface{}

func (*ResourceManager) ListAllResources

func (rm *ResourceManager) ListAllResources() []string

func (*ResourceManager) RestartResource

func (rm *ResourceManager) RestartResource(multiLevelKey string) error

func (*ResourceManager) Start

func (rm *ResourceManager) Start() error

func (*ResourceManager) StartResource

func (rm *ResourceManager) StartResource(multiLevelKey string) error

func (*ResourceManager) Stop

func (rm *ResourceManager) Stop() error

func (*ResourceManager) StopResource

func (rm *ResourceManager) StopResource(multiLevelKey string) error

type ResourceProvider

type ResourceProvider interface {
	GetServiceInfo(serviceName string) (*ServiceInfo, error)
	StartService(serviceName string) error
	StopService(serviceName string) error
	RestartService(serviceName string) error
}

type SNMPTargetConverter

type SNMPTargetConverter struct {
	// contains filtered or unexported fields
}

SNMPTargetConverter SNMP 目标转换器

func NewSNMPTargetConverter

func NewSNMPTargetConverter(defaultPort int, defaultCommunity string, logger *zap.Logger) *SNMPTargetConverter

NewSNMPTargetConverter 创建 SNMP 目标转换器

func (*SNMPTargetConverter) Convert

func (c *SNMPTargetConverter) Convert(targets []*CollectionTargetDTO, strategy *UnifiedCollectionStrategyDTO) (map[string]interface{}, error)

Convert 转换 SNMP 目标

func (*SNMPTargetConverter) Supports

func (c *SNMPTargetConverter) Supports(collectionType string) bool

Supports 检查是否支持该采集类型

type SafeResourceMap

type SafeResourceMap struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

type ServerConfig

type ServerConfig struct {
	GRPCPort int `yaml:"grpc_port" json:"grpc_port"` // gRPC端口,默认9512
	HTTPPort int `yaml:"http_port" json:"http_port"` // HTTP端口,默认8080
}

ServerConfig 服务器配置

type ServiceInfo

type ServiceInfo struct {
	ID      string
	Name    string
	Image   string
	Status  string
	Ports   []types.Port
	Volumes []string
	Env     []string
}

type StrategyMerger

type StrategyMerger struct {
	// contains filtered or unexported fields
}

StrategyMerger 策略合并器 负责合并全局、分组、实例策略,生成最终生效的策略

func NewStrategyMerger

func NewStrategyMerger(logger *zap.Logger) *StrategyMerger

NewStrategyMerger 创建策略合并器

func (*StrategyMerger) GetEffectiveStrategyForAgent

func (sm *StrategyMerger) GetEffectiveStrategyForAgent(
	ctx context.Context,
	agentCode string,
	globalStrategy *models.EnhancedGlobalMetricStrategy,
	groupStrategyManager *GroupStrategyManager,
	agentManager *AgentManager,
	instanceStrategy *models.EnhancedInstanceMetricStrategy,
) (*EffectiveStrategy, error)

GetEffectiveStrategyForAgent 获取 Agent 的生效策略

func (*StrategyMerger) MergeStrategies

func (sm *StrategyMerger) MergeStrategies(
	ctx context.Context,
	globalStrategy *models.EnhancedGlobalMetricStrategy,
	groupStrategies []*models.GroupMetricStrategy,
	instanceStrategy *models.EnhancedInstanceMetricStrategy,
) (*EffectiveStrategy, error)

MergeStrategies 合并策略 优先级:实例策略 > 分组策略 > 全局策略

type StrategyToConfigConverter

type StrategyToConfigConverter struct {
	// contains filtered or unexported fields
}

StrategyToConfigConverter 策略到配置转换器 负责将指标采集策略转换为 Telegraf 配置 使用配置驱动,替代硬编码的指标映射

func NewStrategyToConfigConverter

func NewStrategyToConfigConverter(logger *zap.Logger, configLoader *config.PluginMappingConfigLoader) *StrategyToConfigConverter

NewStrategyToConfigConverter 创建策略到配置转换器

func (*StrategyToConfigConverter) ConvertStrategyToConfig

func (c *StrategyToConfigConverter) ConvertStrategyToConfig(
	ctx context.Context,
	globalStrategy *models.GlobalMetricStrategy,
	instanceStrategy *models.InstanceMetricStrategy,
) (map[string][]map[string]interface{}, map[string]interface{}, error)

ConvertStrategyToConfig 将策略转换为 Telegraf 配置(基于CollectionTasks) 返回插件配置映射:pluginType -> []pluginConfig

func (*StrategyToConfigConverter) GenerateConfigFromStrategy

func (c *StrategyToConfigConverter) GenerateConfigFromStrategy(
	ctx context.Context,
	configGenerator *TelegrafConfigGenerator,
	globalStrategy *models.GlobalMetricStrategy,
	instanceStrategy *models.InstanceMetricStrategy,
) (string, error)

GenerateConfigFromStrategy 从策略生成完整的 Telegraf 配置 这是一个便捷方法,整合了策略转换和配置生成

func (*StrategyToConfigConverter) GetMetricPrefixesForPlugin

func (c *StrategyToConfigConverter) GetMetricPrefixesForPlugin(pluginType string) []string

GetMetricPrefixesForPlugin 获取插件对应的指标前缀列表(公开方法)

func (*StrategyToConfigConverter) GetPluginForMetric

func (c *StrategyToConfigConverter) GetPluginForMetric(metricName string) string

GetPluginForMetric 获取指标对应的插件类型(公开方法)

type SyslogManagerConfig

type SyslogManagerConfig struct {
	Port         int    `yaml:"port" json:"port"`
	Protocol     string `yaml:"protocol" json:"protocol"`
	LokiEndpoint string `yaml:"loki_endpoint" json:"loki_endpoint"`
}

SyslogManagerConfig 定义了 Syslog 管理器的配置

type TargetConverter

type TargetConverter interface {
	Convert(targets []*CollectionTargetDTO, strategy *UnifiedCollectionStrategyDTO) (map[string]interface{}, error)
	Supports(collectionType string) bool
}

TargetConverter 采集目标转换器接口

type TargetConverterRegistry

type TargetConverterRegistry struct {
	// contains filtered or unexported fields
}

TargetConverterRegistry 转换器注册表

func NewTargetConverterRegistry

func NewTargetConverterRegistry(logger *zap.Logger, mapper CollectionTypeMapper) *TargetConverterRegistry

NewTargetConverterRegistry 创建转换器注册表

func (*TargetConverterRegistry) GetConverter

func (r *TargetConverterRegistry) GetConverter(collectionType string) TargetConverter

GetConverter 获取转换器

func (*TargetConverterRegistry) Register

func (r *TargetConverterRegistry) Register(collectionType string, converter TargetConverter)

Register 注册转换器

type TelegrafConfig

type TelegrafConfig struct {
	Enable             *bool  `yaml:"enable" json:"enable"` // 是否启用TelegrafManager,nil表示未设置(默认启用),true表示启用,false表示禁用
	Image              string `yaml:"image"`
	Name               string `yaml:"name"`
	ConfigPath         string `yaml:"config_path" json:"config_path"`
	PrometheusPort     int    `yaml:"prometheus_port" json:"prometheus_port"`
	LokiEndpoint       string `yaml:"loki_endpoint" json:"loki_endpoint"`
	PrometheusEndpoint string `yaml:"prometheus_endpoint" json:"prometheus_endpoint"`
}

type TelegrafConfigGenerator

type TelegrafConfigGenerator struct {
	TemplateService           *PluginTemplateService
	BaseConfigPath            string
	MetricMetadataService     *MetricMetadataService     // 指标元数据服务(从OneOps获取)
	EnhancedConverter         *EnhancedStrategyConverter // 增强的策略转换器(处理CollectionTasks)
	CollectionTargetConverter *CollectionTargetConverter // 采集目标转换器(从OneOps DTO转换)
}

func ProvideTelegrafConfigGenerator

func ProvideTelegrafConfigGenerator(templateService *PluginTemplateService, config *Config, metricMetadataService *MetricMetadataService) (*TelegrafConfigGenerator, error)

func (*TelegrafConfigGenerator) ConfigToTOML

func (tcg *TelegrafConfigGenerator) ConfigToTOML(config map[string]interface{}) ([]byte, error)

ConfigToTOML 将配置转换为 TOML 格式(公开方法)

func (*TelegrafConfigGenerator) ExtractMetadataFromConfig

func (tcg *TelegrafConfigGenerator) ExtractMetadataFromConfig(ctx context.Context, configStr string) ([]*models.MetricMetadata, error)

ExtractMetadataFromConfig 从配置中提取指标元数据

func (*TelegrafConfigGenerator) GenerateApplicationMetricsConfig

func (tcg *TelegrafConfigGenerator) GenerateApplicationMetricsConfig(appTargets []map[string]interface{}) map[string]interface{}

GenerateApplicationMetricsConfig 从应用指标策略生成HTTP客户端配置

func (*TelegrafConfigGenerator) GenerateConfig

func (tcg *TelegrafConfigGenerator) GenerateConfig(ctx context.Context, tasks []*models.MonitoringTask, standalonePlugins []*models.PluginConfig) (string, error)

GenerateConfig 生成完整的 telegraf 配置

func (*TelegrafConfigGenerator) GenerateConfigFromOneOpsDTO

func (tcg *TelegrafConfigGenerator) GenerateConfigFromOneOpsDTO(
	ctx context.Context,
	collectionType string,
	strategy *UnifiedCollectionStrategyDTO,
	targets []*CollectionTargetDTO,
) (string, error)

GenerateConfigFromOneOpsDTO 从 OneOps DTO 生成配置 这是 OneOps 调用的主要方法,将 OneOps 的 DTO 转换为 Controller 的 CollectionTask,然后生成配置

func (*TelegrafConfigGenerator) GenerateConfigFromStrategy

func (tcg *TelegrafConfigGenerator) GenerateConfigFromStrategy(
	ctx context.Context,
	globalStrategy *models.GlobalMetricStrategy,
	instanceStrategy *models.InstanceMetricStrategy,
) (string, error)

GenerateConfigFromStrategy 基于策略生成 Telegraf 配置(已废弃) 注意:Agent现在直接基于CollectionTasks执行采集,不再需要TOML配置 此方法保留用于向后兼容,但返回空字符串 Deprecated: Agent现在直接执行CollectionTasks,不再需要TOML配置

func (*TelegrafConfigGenerator) GenerateEBPFConfig

func (tcg *TelegrafConfigGenerator) GenerateEBPFConfig(ctx context.Context, target map[string]interface{}) (map[string]interface{}, error)

GenerateEBPFConfig 生成eBPF采集配置(通过eBPF插件)

func (*TelegrafConfigGenerator) GenerateNETCONFConfig

func (tcg *TelegrafConfigGenerator) GenerateNETCONFConfig(ctx context.Context, targets []map[string]interface{}) ([]map[string]interface{}, error)

GenerateNETCONFConfig 生成NETCONF采集配置(通过HTTP插件调用NETCONF API)

func (*TelegrafConfigGenerator) GeneratePacketCaptureConfig

func (tcg *TelegrafConfigGenerator) GeneratePacketCaptureConfig(ctx context.Context, targets []map[string]interface{}) ([]map[string]interface{}, error)

GeneratePacketCaptureConfig 生成抓包采集配置(通过exec插件调用tcpdump)

func (*TelegrafConfigGenerator) GeneratePluginConfigFromRaw

func (tcg *TelegrafConfigGenerator) GeneratePluginConfigFromRaw(config map[string]interface{}) (string, error)

GeneratePluginConfigFromRaw 从原始配置生成插件配置

func (*TelegrafConfigGenerator) GeneratePluginConfigFromTemplate

func (tcg *TelegrafConfigGenerator) GeneratePluginConfigFromTemplate(ctx context.Context, templateID string, params map[string]interface{}) (string, error)

GeneratePluginConfigFromTemplate 从模板生成插件配置

func (*TelegrafConfigGenerator) GenerateSNMPConfig

func (tcg *TelegrafConfigGenerator) GenerateSNMPConfig(snmpTargets []map[string]interface{}) map[string]interface{}

GenerateSNMPConfig 从SNMP采集策略生成SNMP input配置

func (*TelegrafConfigGenerator) GenerateSSHConfig

func (tcg *TelegrafConfigGenerator) GenerateSSHConfig(ctx context.Context, targets []map[string]interface{}) ([]map[string]interface{}, error)

GenerateSSHConfig 生成SSH采集配置(通过exec插件执行SSH命令)

func (*TelegrafConfigGenerator) ValidateConfig

func (tcg *TelegrafConfigGenerator) ValidateConfig(configStr string) error

ValidateConfig 验证配置有效性

type TelegrafConfigStruct

type TelegrafConfigStruct struct {
	GlobalTags map[string]string                   `toml:"global_tags,omitempty"`
	Agent      *AgentConfigStruct                  `toml:"agent,omitempty"`
	Inputs     map[string][]map[string]interface{} `toml:"inputs,omitempty"`
	Outputs    map[string][]map[string]interface{} `toml:"outputs,omitempty"`
}

TelegrafConfigStruct 表示 telegraf 配置结构(内部使用)

type TelegrafConfigValidator

type TelegrafConfigValidator struct {
	MetricMetadataService *MetricMetadataService
}

TelegrafConfigValidator Telegraf配置验证器

func NewTelegrafConfigValidator

func NewTelegrafConfigValidator(metricMetadataService *MetricMetadataService) *TelegrafConfigValidator

NewTelegrafConfigValidator 创建配置验证器

func (*TelegrafConfigValidator) ValidateConfig

func (v *TelegrafConfigValidator) ValidateConfig(ctx context.Context, configStr string) error

ValidateConfig 验证配置是否符合指标元数据规范

func (*TelegrafConfigValidator) ValidatePluginConfig

func (v *TelegrafConfigValidator) ValidatePluginConfig(ctx context.Context, pluginType string, config map[string]interface{}) error

ValidatePluginConfig 验证单个插件配置

type TelegrafManager

type TelegrafManager struct {
	Config          *Config
	ResourceManager *ResourceManager
	RegistryManager *RegistryManager
	ConfigManager   *ConfigManager
	KeyManager      *KeyManager
	ConfigGenerator *TelegrafConfigGenerator
	// contains filtered or unexported fields
}

func ProvideTelegrafManager

func ProvideTelegrafManager(rm *ResourceManager, cm *ConfigManager, registry *RegistryManager, km *KeyManager, config *Config, configGen *TelegrafConfigGenerator) (*TelegrafManager, error)

func (*TelegrafManager) GetConfig

func (tm *TelegrafManager) GetConfig() (map[string]interface{}, error)

func (*TelegrafManager) GetConfigVersion

func (tm *TelegrafManager) GetConfigVersion() string

GetConfigVersion 获取配置版本

func (*TelegrafManager) GetLastReloadTime

func (tm *TelegrafManager) GetLastReloadTime() *time.Time

GetLastReloadTime 获取最后重载时间

func (*TelegrafManager) GetStatus

func (tm *TelegrafManager) GetStatus() map[string]interface{}

func (*TelegrafManager) GetTelegrafStatus

func (tm *TelegrafManager) GetTelegrafStatus() (string, error)

func (*TelegrafManager) MultiLevelKey

func (tm *TelegrafManager) MultiLevelKey() string

func (*TelegrafManager) ReloadConfig

func (tm *TelegrafManager) ReloadConfig() error

ReloadConfig 重新加载 telegraf 配置

func (*TelegrafManager) Restart

func (tm *TelegrafManager) Restart() error

func (*TelegrafManager) Start

func (tm *TelegrafManager) Start() error

func (*TelegrafManager) Stop

func (tm *TelegrafManager) Stop() error

func (*TelegrafManager) UpdateConfig

func (tm *TelegrafManager) UpdateConfig(config map[string]interface{}) error

func (*TelegrafManager) UpdateConfigFromTasks

func (tm *TelegrafManager) UpdateConfigFromTasks(ctx context.Context, tasks []*models.MonitoringTask, standalonePlugins []*models.PluginConfig) error

UpdateConfigFromTasks 从监控任务更新配置

func (*TelegrafManager) UpdateTelegrafConfig

func (tm *TelegrafManager) UpdateTelegrafConfig(config map[string]interface{}) error

type TemplateData

type TemplateData map[string]interface{}

TemplateData 是一个通用的结构,用于存储模板数据

type UniOpsConfig

type UniOpsConfig struct {
	Address  string `yaml:"address"`
	Account  string `yaml:"account"`
	Password string `yaml:"password"`
	// Webhook配置(用于事件驱动模式)
	Webhook WebhookConfig `yaml:"webhook" json:"webhook"`
}

type UnifiedCollectionStrategyDTO

type UnifiedCollectionStrategyDTO struct {
	CollectionType  string
	DefaultInterval int
	EffectiveConfig map[string]interface{}
}

UnifiedCollectionStrategyDTO OneOps 统一采集策略 DTO(简化版本)

type Upstream

type Upstream struct {
	EtcdAddresses []string `yaml:"etcd_addresses"`
	Watch         string   `yaml:"watch"`
	Username      string   `yaml:"username"` // 新增
	Password      string   `yaml:"password"` // 新增
}

type WebhookConfig

type WebhookConfig struct {
	URLs             []string `yaml:"urls" json:"urls"`                             // OneOps Webhook URL列表
	Secret           string   `yaml:"secret" json:"secret"`                         // 签名密钥
	Timeout          int      `yaml:"timeout" json:"timeout"`                       // 请求超时(秒),默认10秒
	RetryMaxAttempts int      `yaml:"retry_max_attempts" json:"retry_max_attempts"` // 最大重试次数,默认3次
	RetryInterval    int      `yaml:"retry_interval" json:"retry_interval"`         // 重试间隔(秒),默认30秒
}

WebhookConfig Webhook配置

Directories

Path Synopsis
telnetproxy

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL