Documentation
¶
Index ¶
- Constants
- Variables
- func ProvideDBClient(config *Config) (*mongo.Client, error)
- func ProvideEtcdClient(config *Config) (*clientv3.Client, error)
- func ProvidePluginMappingConfigLoader(logger *zap.Logger) (*config.PluginMappingConfigLoader, error)
- func WithPort(port int) controllerOptionFunc
- type AgentConfigStruct
- type AgentDiscovery
- type AgentEventNotifier
- func (n *AgentEventNotifier) NotifyAgentRegistered(agent *models.Agent) error
- func (n *AgentEventNotifier) NotifyAgentUnregistered(agentCode string) error
- func (n *AgentEventNotifier) NotifyStatusChanged(agentCode, oldStatus, newStatus string) error
- func (n *AgentEventNotifier) Start()
- func (n *AgentEventNotifier) Stop()
- type AgentManager
- func (am *AgentManager) ApplyPackageConfigs(ctx context.Context, agentCode, packageName string, configs []*pb.ConfigItem) error
- func (am *AgentManager) ExecuteCommandViaProxy(ctx context.Context, agentCode, command string, ...) (string, error)
- func (am *AgentManager) GetAgent(ctx context.Context, agentCode string) (*models.Agent, error)
- func (am *AgentManager) GetAgentDetail(ctx context.Context, agentID string) (*models.Agent, error)
- func (am *AgentManager) GetAgentHost(ctx context.Context, agentCode string) (string, error)
- func (am *AgentManager) GetAgentPackages(ctx context.Context, agentID string) ([]*pb.PackItem, error)
- func (am *AgentManager) GetBatchOperator() *BatchOperator
- func (am *AgentManager) GetFileOperation() *FileOperation
- func (am *AgentManager) GetHealthMonitor() *HealthMonitor
- func (am *AgentManager) GetMetricsQuery() *MetricsQuery
- func (am *AgentManager) GetPackageConfigs(ctx context.Context, agentCode, packageName string) ([]*pb.ConfigItem, error)
- func (am *AgentManager) GetPackageLogs(ctx context.Context, agentCode, packageName string, count int32) ([]string, error)
- func (am *AgentManager) ListAgents(ctx context.Context, filter map[string]string, page, pageSize int) ([]models.Agent, int, error)
- func (am *AgentManager) PackageList(ctx context.Context, agentCode string) ([]*pb.PackItem, error)
- func (am *AgentManager) RestartAgent(ctx context.Context, agentCode string) error
- func (am *AgentManager) Start() error
- func (am *AgentManager) StartPackage(ctx context.Context, agentCode, packageName string) error
- func (am *AgentManager) StartPackageWatcher(ctx context.Context)
- func (am *AgentManager) Stop() error
- func (am *AgentManager) StopPackage(ctx context.Context, agentCode, packageName string) error
- func (am *AgentManager) UninstallAgent(ctx context.Context, agentCode string) error
- type AgentStatusEvent
- type AppResolver
- type ApplicationTargetConverter
- type BaseConfig
- type BatchOperator
- func (bo *BatchOperator) BatchRestartPackages(ctx context.Context, req *models.BatchOperationRequest) (*models.BatchOperationResult, error)
- func (bo *BatchOperator) BatchStartPackages(ctx context.Context, req *models.BatchOperationRequest) (*models.BatchOperationResult, error)
- func (bo *BatchOperator) BatchStopPackages(ctx context.Context, req *models.BatchOperationRequest) (*models.BatchOperationResult, error)
- func (bo *BatchOperator) BatchUpdateConfigs(ctx context.Context, req *models.BatchConfigRequest) (*models.BatchOperationResult, error)
- type CollectionTargetConverter
- type CollectionTargetDTO
- type CollectionTaskService
- func (s *CollectionTaskService) DistributeTasks(ctx context.Context, req *pb.DistributeTasksReq) (*pb.DistributeTasksResp, error)
- func (s *CollectionTaskService) GetTaskStatus(ctx context.Context, req *pb.GetTaskStatusReq) (*pb.GetTaskStatusResp, error)
- func (s *CollectionTaskService) GetTasks(ctx context.Context, req *pb.GetTasksReq) (*pb.GetTasksResp, error)
- func (s *CollectionTaskService) RegisterGRPCService(grpcServer *grpc.Server)
- func (s *CollectionTaskService) UpdateTaskStatus(ctx context.Context, req *pb.UpdateTaskStatusReq) (*pb.UpdateTaskStatusResp, error)
- type CollectionTypeMapper
- type CollectionTypeMapping
- type Config
- type ConfigBasedCollectionTypeMapper
- type ConfigManager
- func (cm *ConfigManager) BatchGetConfigs(resourceType models.ResourceType, resourceIDs []string) (map[string]map[string]interface{}, error)
- func (cm *ConfigManager) BatchUpdateConfigs(updates map[models.ResourceType]map[string]map[string]interface{}) error
- func (cm *ConfigManager) CompareConfigVersions(resourceType models.ResourceType, resourceID string, version1, version2 string) (map[string]interface{}, error)
- func (cm *ConfigManager) ContainerName(resourceType models.ResourceType) (string, error)
- func (cm *ConfigManager) DeleteConfig(resourceType models.ResourceType, containerName string) error
- func (cm *ConfigManager) FilterConfigs(resourceType models.ResourceType, filter func(map[string]interface{}) bool) ([]string, error)
- func (cm *ConfigManager) GetAppVariables(appID string) (map[string]string, error)
- func (cm *ConfigManager) GetConfig(resourceType models.ResourceType, containerName string) (map[string]interface{}, error)
- func (cm *ConfigManager) GetConfigStats(resourceType models.ResourceType) (map[string]int, error)
- func (cm *ConfigManager) GetConfigWithMeta(resourceType models.ResourceType, containerName string) (config map[string]interface{}, meta map[string]interface{}, err error)
- func (cm *ConfigManager) GetContainerNameByResourceType(resourceType models.ResourceType) (string, error)
- func (cm *ConfigManager) GetGlobalVariables() (map[string]string, error)
- func (cm *ConfigManager) GetJson(resourceType models.ResourceType) (string, error)
- func (cm *ConfigManager) ListApps() ([]string, error)
- func (cm *ConfigManager) ListConfigKeys(resourceType models.ResourceType) ([]string, error)
- func (cm *ConfigManager) ListConfigsWithDetails(resourceType models.ResourceType) (map[string]map[string]interface{}, error)
- func (cm *ConfigManager) ListResourceIDs(resourceType models.ResourceType) ([]string, error)
- func (cm *ConfigManager) SearchConfigs(resourceType models.ResourceType, searchTerm string) ([]string, error)
- func (cm *ConfigManager) SetAppVariables(appID string, variables map[string]string) error
- func (cm *ConfigManager) SetGlobalVariables(variables map[string]string) error
- func (cm *ConfigManager) Start() error
- func (cm *ConfigManager) Stop() error
- func (cm *ConfigManager) UpdateConfig(resourceType models.ResourceType, containerName string, ...) error
- func (cm *ConfigManager) UpdateConfigWithMeta(resourceType models.ResourceType, containerName string, ...) error
- type ConfigProvider
- type ConfigTemplate
- type Controller
- func (c *Controller) CreateDeployment(req models.DeploymentRequest) (*models.Deployment, error)
- func (c *Controller) CreateResource(resourceType models.ResourceType, resourceID string, ...) error
- func (c *Controller) DeleteConfig(resourceType models.ResourceType, resourceID string) error
- func (c *Controller) DeleteResource(resourceType models.ResourceType, resourceID string) error
- func (c *Controller) GetAgentVariables(ctx context.Context, agentID, appID string) (map[string]string, error)
- func (c *Controller) GetAssets(tags string) ([]*structs.L2DeviceRemoteInfo, error)
- func (c *Controller) GetConfig(resourceType models.ResourceType, resourceID string) (map[string]interface{}, error)
- func (c *Controller) GetMongoClient() *mongo.Client
- func (c *Controller) GetRegisteredAgentsCount() int
- func (c *Controller) GetResourceStatus(resourceType models.ResourceType, resourceID string) (string, error)
- func (c *Controller) GetStatus() (*models.ControllerStatus, error)
- func (c *Controller) ListConfigs(resourceType models.ResourceType) ([]string, error)
- func (c *Controller) ListDeployments(w http.ResponseWriter, r *http.Request)
- func (c *Controller) ListResources() []string
- func (c *Controller) RegisterService() error
- func (c *Controller) RegisterToUpstreamEtcd(upstreamEtcdEndpoints []string) error
- func (c *Controller) Start(options ...controllerOptionFunc) error
- func (c *Controller) Stop() error
- func (c *Controller) UpdateConfig(resourceType models.ResourceType, resourceID string, ...) error
- type DataQualityChecker
- func (c *DataQualityChecker) CheckMetric(ctx context.Context, metricName string, value float64, timestamp time.Time) ([]*models.QualityCheckResult, error)
- func (c *DataQualityChecker) CheckMetricsBatch(ctx context.Context, metrics map[string]float64, timestamp time.Time) (map[string][]*models.QualityCheckResult, error)
- func (c *DataQualityChecker) ClearCache()
- type DataQualityService
- type DatabaseConfig
- type DefaultTargetConverter
- type Deployment
- type DeploymentManager
- func (dm *DeploymentManager) CreateDeployment(req models.DeploymentRequest) (*models.Deployment, error)
- func (dm *DeploymentManager) GetDeployment(deploymentID string) (*models.Deployment, error)
- func (dm *DeploymentManager) ListDeployments() ([]models.Deployment, error)
- func (dm *DeploymentManager) UpdateDeploymentDeviceStatus(deploymentID string, deviceStatus models.TargetDevice) error
- type EffectiveStrategy
- type EnhancedStrategyConverter
- type FileOperation
- func (fo *FileOperation) CreateDirectory(ctx context.Context, agentCode, dirPath string, mode uint32) (map[string]interface{}, error)
- func (fo *FileOperation) DeleteFile(ctx context.Context, agentCode, filePath string, recursive bool) (map[string]interface{}, error)
- func (fo *FileOperation) DownloadFile(ctx context.Context, agentCode, filePath string, offset, length int64) (io.ReadCloser, error)
- func (fo *FileOperation) GetFileInfo(ctx context.Context, agentCode, filePath string) (map[string]interface{}, error)
- func (fo *FileOperation) GetUploadStatus(ctx context.Context, agentCode, sessionID string) (map[string]interface{}, error)
- func (fo *FileOperation) ListFiles(ctx context.Context, agentCode, dirPath string) (map[string]interface{}, error)
- func (fo *FileOperation) StartFileUpload(ctx context.Context, agentCode, filePath string, fileSize int64, ...) (map[string]interface{}, error)
- func (fo *FileOperation) UploadFileChunk(ctx context.Context, agentCode, sessionID string, chunkIndex int64, ...) error
- type GitConfig
- type GlobalResolver
- type GroupStrategyManager
- func (gsm *GroupStrategyManager) CreateGroupStrategy(ctx context.Context, strategy *models.GroupMetricStrategy) error
- func (gsm *GroupStrategyManager) DeleteGroupStrategy(ctx context.Context, id string) error
- func (gsm *GroupStrategyManager) GetGroupStrategiesForAgent(ctx context.Context, agentCode string, agentManager *AgentManager) ([]*models.GroupMetricStrategy, error)
- func (gsm *GroupStrategyManager) GetGroupStrategy(ctx context.Context, id string) (*models.GroupMetricStrategy, error)
- func (gsm *GroupStrategyManager) ListGroupStrategies(ctx context.Context) ([]*models.GroupMetricStrategy, error)
- func (gsm *GroupStrategyManager) MatchGroupStrategies(ctx context.Context, agentLabels map[string]string) ([]*models.GroupMetricStrategy, error)
- func (gsm *GroupStrategyManager) UpdateGroupStrategy(ctx context.Context, strategy *models.GroupMetricStrategy) error
- type GrpcProxyManager
- type HealthMonitor
- func (hm *HealthMonitor) GetAgentOverallHealth(ctx context.Context, agentCode string) (map[string]interface{}, error)
- func (hm *HealthMonitor) GetHealthHistory(ctx context.Context, agentCode, packageName string, limit int32) ([]*pb.HealthHistoryEntry, error)
- func (hm *HealthMonitor) GetServiceHealth(ctx context.Context, agentCode, packageName string) (*pb.ServiceHealth, error)
- func (hm *HealthMonitor) ListAgentHealth(ctx context.Context, agentCode string, filterStatus []pb.HealthStatus) ([]*pb.ServiceHealth, error)
- type JumperServerManager
- type KeyManager
- func (km *KeyManager) GenerateResourceKey(resourceType, containerName string) (string, error)
- func (km *KeyManager) GenerateResourcePrefix(resourceType string) (string, error)
- func (km *KeyManager) GenerateServiceKey(serviceName, hostIdentifier, port string) (string, error)
- func (km *KeyManager) GenerateServicePrefix(serviceName string) (string, error)
- func (km *KeyManager) ParseResourceKey(key string) (string, string, string, error)
- func (km *KeyManager) ParseServiceKey(key string) (string, string, string, string, string, error)
- type LayerResolver
- type LayeredVariableResolver
- type LokiForwarder
- type LokiPushRequest
- type LokiStream
- type Manager
- type MetricDataPoint
- type MetricMetadataService
- type MetricsManagerConfig
- type MetricsQuery
- func (mq *MetricsQuery) GetAgentMetricsSummary(ctx context.Context, agentCode string) (map[string]interface{}, error)
- func (mq *MetricsQuery) GetApplicationMetrics(ctx context.Context, agentCode, serviceName string) (map[string]interface{}, error)
- func (mq *MetricsQuery) GetMetricsHistory(ctx context.Context, agentCode, metricType, serviceName string, ...) (map[string]interface{}, error)
- func (mq *MetricsQuery) GetServiceMetrics(ctx context.Context, agentCode, packageName string) (*pb.ServiceMetrics, error)
- func (mq *MetricsQuery) GetSystemMetrics(ctx context.Context, agentCode string) (*pb.SystemMetrics, error)
- func (mq *MetricsQuery) ListServiceMetrics(ctx context.Context, agentCode string, runningOnly bool) ([]*pb.ServiceMetrics, error)
- type MetricsStrategyService
- func (s *MetricsStrategyService) DeleteInstanceStrategy(ctx context.Context, agentCode string) error
- func (s *MetricsStrategyService) GenerateConfigFromEnhancedStrategy(ctx context.Context, agentCode string) (string, error)
- func (s *MetricsStrategyService) GetApplicationStrategy(ctx context.Context, agentCode string) (*models.ApplicationMetricStrategy, error)
- func (s *MetricsStrategyService) GetAvailableMetrics(ctx context.Context, agentCode string) ([]string, error)
- func (s *MetricsStrategyService) GetConfigStatus(ctx context.Context, agentCode string) (*models.ConfigStatus, error)
- func (s *MetricsStrategyService) GetEffectiveEnhancedStrategy(ctx context.Context, agentCode string) (*EffectiveStrategy, error)
- func (s *MetricsStrategyService) GetEnhancedGlobalStrategy(ctx context.Context) (*models.EnhancedGlobalMetricStrategy, error)
- func (s *MetricsStrategyService) GetEnhancedInstanceStrategy(ctx context.Context, agentCode string) (*models.EnhancedInstanceMetricStrategy, error)
- func (s *MetricsStrategyService) PreviewTask(ctx context.Context, task models.CollectionTask) (*models.MetricRulePreview, error)
- func (s *MetricsStrategyService) UpdateApplicationStrategy(ctx context.Context, strategy *models.ApplicationMetricStrategy) error
- type MinioConfig
- type MinioManager
- func (mm *MinioManager) CreateBucket(bucketName string) error
- func (mm *MinioManager) DeleteObject(bucketName, objectName string) error
- func (mm *MinioManager) DownloadFile(bucketName, objectName string, filePath string) error
- func (mm *MinioManager) GetMiniManagerStatus() (models.ResourceStatus, error)
- func (mm *MinioManager) GetObjectInfo(bucketName, objectName string) (minio.ObjectInfo, error)
- func (mm *MinioManager) GetPresignedURL(bucketName, objectName string, expiry time.Duration, method string) (string, error)
- func (mm *MinioManager) GetProxyLatestPackageURL(bucketName, objectName string) (string, error)
- func (mm *MinioManager) GetProxyPackageURL(bucketName, objectName string) (string, error)
- func (mm *MinioManager) ListBuckets() ([]minio.BucketInfo, error)
- func (mm *MinioManager) ListObjects(bucketName string) ([]minio.ObjectInfo, error)
- func (mm *MinioManager) ListPackages(bucketName string) ([]string, error)
- func (mm *MinioManager) RegisterService() error
- func (mm *MinioManager) Start() error
- func (mm *MinioManager) Stop() error
- func (mm *MinioManager) UploadFile(bucketName, objectName string, filePath string) error
- type MonitoringService
- func (ms *MonitoringService) CreatePlugin(ctx context.Context, plugin *models.PluginConfig) error
- func (ms *MonitoringService) CreateTask(ctx context.Context, task *models.MonitoringTask) error
- func (ms *MonitoringService) DeletePlugin(ctx context.Context, pluginID string) error
- func (ms *MonitoringService) DeleteTask(ctx context.Context, taskID string) error
- func (ms *MonitoringService) EnsureIndexes(ctx context.Context) error
- func (ms *MonitoringService) GetPlugin(ctx context.Context, pluginID string) (*models.PluginConfig, error)
- func (ms *MonitoringService) GetStatus(ctx context.Context) (*models.MonitoringStatus, error)
- func (ms *MonitoringService) GetTask(ctx context.Context, taskID string) (*models.MonitoringTask, error)
- func (ms *MonitoringService) ListPlugins(ctx context.Context, filter map[string]interface{}) ([]*models.PluginConfig, error)
- func (ms *MonitoringService) ListTasks(ctx context.Context, filter map[string]interface{}) ([]*models.MonitoringTask, error)
- func (ms *MonitoringService) PauseTask(ctx context.Context, taskID string) error
- func (ms *MonitoringService) PublishTask(ctx context.Context, taskID string) error
- func (ms *MonitoringService) ReloadTelegrafConfig(ctx context.Context) error
- func (ms *MonitoringService) ResumeTask(ctx context.Context, taskID string) error
- func (ms *MonitoringService) RevokeTask(ctx context.Context, taskID string) error
- func (ms *MonitoringService) StartTask(ctx context.Context, taskID string) error
- func (ms *MonitoringService) StopTask(ctx context.Context, taskID string) error
- func (ms *MonitoringService) UpdatePlugin(ctx context.Context, pluginID string, plugin *models.PluginConfig) error
- func (ms *MonitoringService) UpdateTask(ctx context.Context, taskID string, task *models.MonitoringTask) error
- type NacosConfig
- type NacosManager
- func (nm *NacosManager) BatchGetConfig(params []vo.ConfigParam) (map[string]string, error)
- func (nm *NacosManager) BatchSetConfig(configs map[string]vo.ConfigParam) error
- func (nm *NacosManager) DeleteConfig(dataId, group string) error
- func (nm *NacosManager) GetConfig(dataId, group string) (string, error)
- func (nm *NacosManager) GetConfigWithRetry(dataId, group string, maxRetries int) (string, error)
- func (nm *NacosManager) ListenConfig(dataId, group string, onChange func(string)) error
- func (nm *NacosManager) SetConfig(dataId, group, content string) error
- type PackageInfo
- type PackageWatcher
- type PluginTemplateService
- func (pts *PluginTemplateService) ConvertConfigToTOML(config map[string]interface{}) (string, error)
- func (pts *PluginTemplateService) CreateTemplate(ctx context.Context, tmpl *models.PluginTemplate) error
- func (pts *PluginTemplateService) DeleteTemplate(ctx context.Context, templateID string) error
- func (pts *PluginTemplateService) EnsureIndexes(ctx context.Context) error
- func (pts *PluginTemplateService) GetTemplate(ctx context.Context, templateID string) (*models.PluginTemplate, error)
- func (pts *PluginTemplateService) GetTemplatesByType(ctx context.Context, pluginType string) ([]*models.PluginTemplate, error)
- func (pts *PluginTemplateService) ListTemplates(ctx context.Context, filter map[string]interface{}) ([]*models.PluginTemplate, error)
- func (pts *PluginTemplateService) PresetTemplates(ctx context.Context) error
- func (pts *PluginTemplateService) RenderTemplate(ctx context.Context, templateID string, params map[string]interface{}) (string, error)
- func (pts *PluginTemplateService) RenderTemplateFromContent(ctx context.Context, templateContent string, params map[string]interface{}) (string, error)
- func (pts *PluginTemplateService) UpdateTemplate(ctx context.Context, templateID string, tmpl *models.PluginTemplate) error
- func (pts *PluginTemplateService) ValidateTemplate(ctx context.Context, templateContent string) error
- func (pts *PluginTemplateService) ValidateTemplateParameters(ctx context.Context, templateID string, params map[string]interface{}) error
- type PrometheusForwarder
- type RedisConfig
- type RedisManager
- func (rm *RedisManager) Close() error
- func (rm *RedisManager) Get(key string) (map[string]map[string]any, error)
- func (rm *RedisManager) GetAll() ([]map[string]map[string]map[string]any, error)
- func (rm *RedisManager) GetClient() *redis.Client
- func (rm *RedisManager) Start() error
- func (rm *RedisManager) Stop() error
- func (rm *RedisManager) Subscribe() error
- type RegistryManager
- func (rm *RegistryManager) FirstByName(serviceName string) (*models.ServiceInfo, error)
- func (rm *RegistryManager) GetAgentCount() (int, error)
- func (rm *RegistryManager) GetAgentVariables(agentID string) (map[string]string, error)
- func (rm *RegistryManager) GetControllerAddress() (string, error)
- func (rm *RegistryManager) GetEtcdAddress() (string, error)
- func (rm *RegistryManager) GetService(key string) (*models.ServiceInfo, error)
- func (rm *RegistryManager) GetVariables(ctx context.Context, agentID, appID string) (map[string]string, error)
- func (rm *RegistryManager) ListAgents(ctx context.Context, filter map[string]string, page, pageSize int) ([]models.Agent, int, error)
- func (rm *RegistryManager) ListServices() []*models.ServiceInfo
- func (rm *RegistryManager) RefreshService(serviceID string, ttl time.Duration) error
- func (rm *RegistryManager) RegisterService(service *models.ServiceInfo, ttl time.Duration) error
- func (rm *RegistryManager) SetAgentVariables(agentID string, variables map[string]string) error
- func (rm *RegistryManager) Start() error
- func (rm *RegistryManager) Stop() error
- func (rm *RegistryManager) UnregisterService(key string) error
- func (rm *RegistryManager) UpdateServiceConfig(serviceID string, config map[string]interface{}) error
- func (rm *RegistryManager) WatchServices(ctx context.Context) (<-chan *models.ServiceEvent, error)
- type ResourceManager
- func (rm *ResourceManager) ClearContainers() error
- func (rm *ResourceManager) CreateResource(resourceType models.ResourceType, containerName string, ...) (string, error)
- func (rm *ResourceManager) DeleteResource(multiLevelKey string) error
- func (rm *ResourceManager) GetAllResourcesStatus() map[string]string
- func (rm *ResourceManager) GetResourceFromContainerID(containerID string) (string, error)
- func (rm *ResourceManager) GetResourceInfo(multiLevelKey string) (*models.ResourceInfo, error)
- func (rm *ResourceManager) GetResourceLogs(multiLevelKey string, tail int) (string, error)
- func (rm *ResourceManager) GetResourceStatus(multiLevelKey string) (string, error)
- func (rm *ResourceManager) GetStatus() map[string]interface{}
- func (rm *ResourceManager) ListAllResources() []string
- func (rm *ResourceManager) RestartResource(multiLevelKey string) error
- func (rm *ResourceManager) Start() error
- func (rm *ResourceManager) StartResource(multiLevelKey string) error
- func (rm *ResourceManager) Stop() error
- func (rm *ResourceManager) StopResource(multiLevelKey string) error
- type ResourceProvider
- type SNMPTargetConverter
- type SafeResourceMap
- type ServerConfig
- type ServiceInfo
- type StrategyMerger
- type StrategyToConfigConverter
- func (c *StrategyToConfigConverter) ConvertStrategyToConfig(ctx context.Context, globalStrategy *models.GlobalMetricStrategy, ...) (map[string][]map[string]interface{}, map[string]interface{}, error)
- func (c *StrategyToConfigConverter) GenerateConfigFromStrategy(ctx context.Context, configGenerator *TelegrafConfigGenerator, ...) (string, error)
- func (c *StrategyToConfigConverter) GetMetricPrefixesForPlugin(pluginType string) []string
- func (c *StrategyToConfigConverter) GetPluginForMetric(metricName string) string
- type SyslogManagerConfig
- type TargetConverter
- type TargetConverterRegistry
- type TelegrafConfig
- type TelegrafConfigGenerator
- func (tcg *TelegrafConfigGenerator) ConfigToTOML(config map[string]interface{}) ([]byte, error)
- func (tcg *TelegrafConfigGenerator) ExtractMetadataFromConfig(ctx context.Context, configStr string) ([]*models.MetricMetadata, error)
- func (tcg *TelegrafConfigGenerator) GenerateApplicationMetricsConfig(appTargets []map[string]interface{}) map[string]interface{}
- func (tcg *TelegrafConfigGenerator) GenerateConfig(ctx context.Context, tasks []*models.MonitoringTask, ...) (string, error)
- func (tcg *TelegrafConfigGenerator) GenerateConfigFromOneOpsDTO(ctx context.Context, collectionType string, ...) (string, error)
- func (tcg *TelegrafConfigGenerator) GenerateConfigFromStrategy(ctx context.Context, globalStrategy *models.GlobalMetricStrategy, ...) (string, error)
- func (tcg *TelegrafConfigGenerator) GenerateEBPFConfig(ctx context.Context, target map[string]interface{}) (map[string]interface{}, error)
- func (tcg *TelegrafConfigGenerator) GenerateNETCONFConfig(ctx context.Context, targets []map[string]interface{}) ([]map[string]interface{}, error)
- func (tcg *TelegrafConfigGenerator) GeneratePacketCaptureConfig(ctx context.Context, targets []map[string]interface{}) ([]map[string]interface{}, error)
- func (tcg *TelegrafConfigGenerator) GeneratePluginConfigFromRaw(config map[string]interface{}) (string, error)
- func (tcg *TelegrafConfigGenerator) GeneratePluginConfigFromTemplate(ctx context.Context, templateID string, params map[string]interface{}) (string, error)
- func (tcg *TelegrafConfigGenerator) GenerateSNMPConfig(snmpTargets []map[string]interface{}) map[string]interface{}
- func (tcg *TelegrafConfigGenerator) GenerateSSHConfig(ctx context.Context, targets []map[string]interface{}) ([]map[string]interface{}, error)
- func (tcg *TelegrafConfigGenerator) ValidateConfig(configStr string) error
- type TelegrafConfigStruct
- type TelegrafConfigValidator
- type TelegrafManager
- func (tm *TelegrafManager) GetConfig() (map[string]interface{}, error)
- func (tm *TelegrafManager) GetConfigVersion() string
- func (tm *TelegrafManager) GetLastReloadTime() *time.Time
- func (tm *TelegrafManager) GetStatus() map[string]interface{}
- func (tm *TelegrafManager) GetTelegrafStatus() (string, error)
- func (tm *TelegrafManager) MultiLevelKey() string
- func (tm *TelegrafManager) ReloadConfig() error
- func (tm *TelegrafManager) Restart() error
- func (tm *TelegrafManager) Start() error
- func (tm *TelegrafManager) Stop() error
- func (tm *TelegrafManager) UpdateConfig(config map[string]interface{}) error
- func (tm *TelegrafManager) UpdateConfigFromTasks(ctx context.Context, tasks []*models.MonitoringTask, ...) error
- func (tm *TelegrafManager) UpdateTelegrafConfig(config map[string]interface{}) error
- type TemplateData
- type UniOpsConfig
- type UnifiedCollectionStrategyDTO
- type Upstream
- type WebhookConfig
Constants ¶
const ( GlobalStrategyID = "global" CollectionName = "metric_strategies" )
const ( AttachMetricRedisBusinessKey = "AttachMetricRedisBusinessKey" PubSubTopic = "metricAttachMsg" )
const (
CollectionTaskCollectionName = "collection_tasks"
)
const DefaultEtcdContainerName = "etcd-main"
const DefaultTelegrafResourceName = "telegraf"
const (
GroupStrategyCollection = "group_metric_strategies"
)
const Version = "0.1.0"
Variables ¶
var ErrNacosConfigNotFound = errors.New("nacos config not found")
var MinioResouceName = ""
var ProviderSet = wire.NewSet( ProvideKeyManager, ProvideAgentManager, ProvideMinioManager, ProvideResourceManager, ProvideConfigManager, ProvideRegistryManager, ProvideEtcdClient, ProvideConfig, ProvideController, ProvideDeployManager, ProvideDBClient, ProvideJumperServerManager, ProvideGrpcProxyManager, ProvideLokiForwarder, ProvidePrometheusForwarder, ProvidePluginTemplateService, ProvideMetricMetadataService, ProvideDataQualityService, ProvideTelegrafConfigGenerator, ProvideTelegrafManager, ProvideMonitoringService, ProvideNacosManager, ProvideRedisManager, )
ProviderSet 是一个 Wire 提供者集合,包含了所有控制器组件的提供者
var TelegrafContainerName = ""
Functions ¶
func ProvideDBClient ¶
ProvideDBClient 创建并返回一个 MongoDB 客户端
func ProvideEtcdClient ¶
ProvideEtcdClient 创建并返回一个 etcd 客户端 支持 Docker etcd 和外部 etcd
func ProvidePluginMappingConfigLoader ¶
func ProvidePluginMappingConfigLoader(logger *zap.Logger) (*config.PluginMappingConfigLoader, error)
ProvidePluginMappingConfigLoader 提供插件映射配置加载器
Types ¶
type AgentConfigStruct ¶
type AgentConfigStruct struct {
Interval string `toml:"interval,omitempty"`
RoundInterval bool `toml:"round_interval,omitempty"`
MetricBatchSize int `toml:"metric_batch_size,omitempty"`
MetricBufferLimit int `toml:"metric_buffer_limit,omitempty"`
CollectionJitter string `toml:"collection_jitter,omitempty"`
FlushInterval string `toml:"flush_interval,omitempty"`
FlushJitter string `toml:"flush_jitter,omitempty"`
Precision string `toml:"precision,omitempty"`
Hostname string `toml:"hostname,omitempty"`
OmitHostname bool `toml:"omit_hostname,omitempty"`
}
AgentConfigStruct 表示 agent 配置(内部使用)
type AgentDiscovery ¶
type AgentDiscovery struct {
// contains filtered or unexported fields
}
AgentDiscovery Agent 自动发现模块
func NewAgentDiscovery ¶
func NewAgentDiscovery(etcdClient *clientv3.Client, mongoClient *mongo.Client, watchPrefix string, eventNotifier *AgentEventNotifier, controllerID, functionArea string, logger *zap.Logger) *AgentDiscovery
NewAgentDiscovery 创建 Agent 发现模块
func (*AgentDiscovery) PeriodicSync ¶
func (ad *AgentDiscovery) PeriodicSync(ctx context.Context)
PeriodicSync 定期从 etcd 同步 Agent 信息
func (*AgentDiscovery) SyncAgentsFromEtcd ¶
func (ad *AgentDiscovery) SyncAgentsFromEtcd(ctx context.Context) error
SyncAgentsFromEtcd 从 etcd 同步所有 Agent 到 MongoDB
func (*AgentDiscovery) WatchAgents ¶
func (ad *AgentDiscovery) WatchAgents(ctx context.Context)
WatchAgents 监听 etcd 中的 Agent 变化
type AgentEventNotifier ¶
type AgentEventNotifier struct {
// contains filtered or unexported fields
}
AgentEventNotifier Agent事件通知器
func NewAgentEventNotifier ¶
func NewAgentEventNotifier(webhookURLs []string, secret, controllerID, functionArea string) *AgentEventNotifier
NewAgentEventNotifier 创建Agent事件通知器(使用默认配置)
func NewAgentEventNotifierWithConfig ¶
func NewAgentEventNotifierWithConfig(webhookURLs []string, secret, controllerID, functionArea string, timeout, retryMaxAttempts, retryInterval int) *AgentEventNotifier
NewAgentEventNotifierWithConfig 创建Agent事件通知器(使用自定义配置)
func (*AgentEventNotifier) NotifyAgentRegistered ¶
func (n *AgentEventNotifier) NotifyAgentRegistered(agent *models.Agent) error
NotifyAgentRegistered 通知Agent注册
func (*AgentEventNotifier) NotifyAgentUnregistered ¶
func (n *AgentEventNotifier) NotifyAgentUnregistered(agentCode string) error
NotifyAgentUnregistered 通知Agent注销
func (*AgentEventNotifier) NotifyStatusChanged ¶
func (n *AgentEventNotifier) NotifyStatusChanged(agentCode, oldStatus, newStatus string) error
NotifyStatusChanged 通知状态变化
type AgentManager ¶
type AgentManager struct {
// contains filtered or unexported fields
}
func ProvideAgentManager ¶
func ProvideAgentManager(registry *RegistryManager, rm *ResourceManager, cm *ConfigManager, km *KeyManager, mm *MinioManager, mongoClient *mongo.Client) (*AgentManager, error)
func (*AgentManager) ApplyPackageConfigs ¶
func (am *AgentManager) ApplyPackageConfigs(ctx context.Context, agentCode, packageName string, configs []*pb.ConfigItem) error
func (*AgentManager) ExecuteCommandViaProxy ¶
func (am *AgentManager) ExecuteCommandViaProxy(ctx context.Context, agentCode, command string, rmInfo *structs.L2DeviceRemoteInfo) (string, error)
func (*AgentManager) GetAgentDetail ¶
GetAgentDetail 获取 Agent 详情
func (*AgentManager) GetAgentHost ¶
func (*AgentManager) GetAgentPackages ¶
func (am *AgentManager) GetAgentPackages(ctx context.Context, agentID string) ([]*pb.PackItem, error)
GetAgentPackages 获取 Agent 管理的服务列表
func (*AgentManager) GetBatchOperator ¶
func (am *AgentManager) GetBatchOperator() *BatchOperator
GetBatchOperator 获取批量操作模块
func (*AgentManager) GetFileOperation ¶
func (am *AgentManager) GetFileOperation() *FileOperation
GetFileOperation 获取文件操作模块
func (*AgentManager) GetHealthMonitor ¶
func (am *AgentManager) GetHealthMonitor() *HealthMonitor
GetHealthMonitor 获取健康监控模块
func (*AgentManager) GetMetricsQuery ¶
func (am *AgentManager) GetMetricsQuery() *MetricsQuery
GetMetricsQuery 获取指标查询模块
func (*AgentManager) GetPackageConfigs ¶
func (am *AgentManager) GetPackageConfigs(ctx context.Context, agentCode, packageName string) ([]*pb.ConfigItem, error)
GetConfigs 获取指定 agent 上特定包的配置
func (*AgentManager) GetPackageLogs ¶
func (*AgentManager) ListAgents ¶
func (*AgentManager) PackageList ¶
PackageList 列出指定 agent 上的所有包
func (*AgentManager) RestartAgent ¶
func (am *AgentManager) RestartAgent(ctx context.Context, agentCode string) error
RestartAgent 重启Agent
func (*AgentManager) Start ¶
func (am *AgentManager) Start() error
func (*AgentManager) StartPackage ¶
func (am *AgentManager) StartPackage(ctx context.Context, agentCode, packageName string) error
Start 启动指定 agent 上的特定包
func (*AgentManager) StartPackageWatcher ¶
func (am *AgentManager) StartPackageWatcher(ctx context.Context)
func (*AgentManager) Stop ¶
func (am *AgentManager) Stop() error
func (*AgentManager) StopPackage ¶
func (am *AgentManager) StopPackage(ctx context.Context, agentCode, packageName string) error
Stop 停止指定 agent 上的特定包
func (*AgentManager) UninstallAgent ¶
func (am *AgentManager) UninstallAgent(ctx context.Context, agentCode string) error
UninstallAgent 卸载Agent
type AgentStatusEvent ¶
type AgentStatusEvent struct {
EventType string `json:"event_type"` // "agent_registered", "agent_unregistered", "status_changed"
AgentCode string `json:"agent_code"`
FunctionArea string `json:"function_area"`
ControllerID string `json:"controller_id"`
OldStatus string `json:"old_status,omitempty"`
NewStatus string `json:"new_status"`
HealthStatus string `json:"health_status,omitempty"`
Address string `json:"address,omitempty"`
Version string `json:"version,omitempty"`
ServiceCount int `json:"service_count,omitempty"`
LastHeartbeat string `json:"last_heartbeat,omitempty"` // RFC3339格式
Timestamp time.Time `json:"timestamp"`
Signature string `json:"signature"` // HMAC签名,用于验证
}
AgentStatusEvent Agent状态事件
type AppResolver ¶
type AppResolver struct {
ConfigManager *ConfigManager
}
type ApplicationTargetConverter ¶
type ApplicationTargetConverter struct {
// contains filtered or unexported fields
}
ApplicationTargetConverter 应用指标目标转换器
func NewApplicationTargetConverter ¶
func NewApplicationTargetConverter(logger *zap.Logger) *ApplicationTargetConverter
NewApplicationTargetConverter 创建应用指标目标转换器
func (*ApplicationTargetConverter) Convert ¶
func (c *ApplicationTargetConverter) Convert(targets []*CollectionTargetDTO, strategy *UnifiedCollectionStrategyDTO) (map[string]interface{}, error)
Convert 转换应用指标目标
func (*ApplicationTargetConverter) Supports ¶
func (c *ApplicationTargetConverter) Supports(collectionType string) bool
Supports 检查是否支持该采集类型
type BaseConfig ¶
type BaseConfig struct {
DefaultPort int `yaml:"default_port"`
Resources map[models.ResourceType]string `yaml:"resources"`
PrometheusLabelKey string `yaml:"prometheusLabelKey"`
LokiLabelKey string `yaml:"lokiLabelKey"`
HomePath string `yaml:"home_path"`
Templates []string `yaml:"templates"`
PreferredNetworks []string `yaml:"preferred_networks"`
SshProxy int `yaml:"ssh_proxy"`
GrpcProxy int `yaml:"grpc_proxy"`
Deployment Deployment `yaml:"deployment"`
LokiListenPath string `yaml:"loki_listen_path"`
PrometheusListenPath string `yaml:"prometheus_listen_path"`
UpstreamLokiUrl string `yaml:"upstream_loki_url"`
UpstreamPrometheusUrl string `yaml:"upstream_prometheus_url"`
PipelineTemplates string `yaml:"pipelineTemplates"`
// FirewallTemplatePath 防火墙模板路径,默认为 "pkg/nodemap/node/device/firewall/common/v4/templates"
FirewallTemplatePath string `yaml:"firewall_template_path" json:"firewall_template_path"`
// JumpServer 连接空闲超时时间(秒),默认60秒,0表示禁用
JumpServerIdleTimeout int `yaml:"jump_server_idle_timeout"`
}
type BatchOperator ¶
type BatchOperator struct {
// contains filtered or unexported fields
}
BatchOperator 批量操作模块
func NewBatchOperator ¶
func NewBatchOperator(agentManager *AgentManager) *BatchOperator
NewBatchOperator 创建批量操作模块
func (*BatchOperator) BatchRestartPackages ¶
func (bo *BatchOperator) BatchRestartPackages(ctx context.Context, req *models.BatchOperationRequest) (*models.BatchOperationResult, error)
BatchRestartPackages 批量重启服务
func (*BatchOperator) BatchStartPackages ¶
func (bo *BatchOperator) BatchStartPackages(ctx context.Context, req *models.BatchOperationRequest) (*models.BatchOperationResult, error)
BatchStartPackages 批量启动服务
func (*BatchOperator) BatchStopPackages ¶
func (bo *BatchOperator) BatchStopPackages(ctx context.Context, req *models.BatchOperationRequest) (*models.BatchOperationResult, error)
BatchStopPackages 批量停止服务
func (*BatchOperator) BatchUpdateConfigs ¶
func (bo *BatchOperator) BatchUpdateConfigs(ctx context.Context, req *models.BatchConfigRequest) (*models.BatchOperationResult, error)
BatchUpdateConfigs 批量更新配置
type CollectionTargetConverter ¶
type CollectionTargetConverter struct {
// contains filtered or unexported fields
}
CollectionTargetConverter 采集目标转换器 用于将 OneOps 的采集目标和策略转换为 Controller 的 CollectionTask
func NewCollectionTargetConverter ¶
func NewCollectionTargetConverter(logger *zap.Logger, configLoader *config.PluginMappingConfigLoader) *CollectionTargetConverter
NewCollectionTargetConverter 创建采集目标转换器
func (*CollectionTargetConverter) ConvertTargetsToCollectionTasks ¶
func (c *CollectionTargetConverter) ConvertTargetsToCollectionTasks( ctx context.Context, collectionType string, strategy *UnifiedCollectionStrategyDTO, targets []*CollectionTargetDTO, ) ([]models.CollectionTask, error)
ConvertTargetsToCollectionTasks 将采集目标和策略转换为 CollectionTasks
type CollectionTargetDTO ¶
type CollectionTargetDTO struct {
ID string
Address string
Port int
Config map[string]interface{}
CollectionType string
TargetGroupID string
}
CollectionTargetDTO OneOps 采集目标 DTO(简化版本,避免循环依赖)
type CollectionTaskService ¶
type CollectionTaskService struct {
pb.UnimplementedCollectionTaskServiceServer // 必须嵌入以支持向前兼容
// contains filtered or unexported fields
}
CollectionTaskService 采集任务服务 负责接收OneOps分发的任务,存储到MongoDB,并转发给Agent
func NewCollectionTaskService ¶
func NewCollectionTaskService(mongoClient *mongo.Client, logger *zap.Logger) *CollectionTaskService
NewCollectionTaskService 创建采集任务服务
func (*CollectionTaskService) DistributeTasks ¶
func (s *CollectionTaskService) DistributeTasks(ctx context.Context, req *pb.DistributeTasksReq) (*pb.DistributeTasksResp, error)
DistributeTasks 接收OneOps分发的任务(gRPC服务端实现)
func (*CollectionTaskService) GetTaskStatus ¶
func (s *CollectionTaskService) GetTaskStatus(ctx context.Context, req *pb.GetTaskStatusReq) (*pb.GetTaskStatusResp, error)
GetTaskStatus 查询任务状态(gRPC服务端实现)
func (*CollectionTaskService) GetTasks ¶
func (s *CollectionTaskService) GetTasks(ctx context.Context, req *pb.GetTasksReq) (*pb.GetTasksResp, error)
GetTasks 获取Agent的任务(gRPC服务端实现) 新架构:OneOps通过gRPC推送CollectionTask到Controller,Controller存储后直接返回给Agent
func (*CollectionTaskService) RegisterGRPCService ¶
func (s *CollectionTaskService) RegisterGRPCService(grpcServer *grpc.Server)
RegisterGRPCService 注册gRPC服务
func (*CollectionTaskService) UpdateTaskStatus ¶
func (s *CollectionTaskService) UpdateTaskStatus(ctx context.Context, req *pb.UpdateTaskStatusReq) (*pb.UpdateTaskStatusResp, error)
UpdateTaskStatus 更新任务状态(gRPC服务端实现)
type CollectionTypeMapper ¶
type CollectionTypeMapper interface {
GetTaskType(collectionType string) string
GetDefaultConfig(collectionType string) map[string]interface{}
}
CollectionTypeMapper 采集类型到任务类型的映射器接口
func NewConfigBasedCollectionTypeMapper ¶
func NewConfigBasedCollectionTypeMapper(logger *zap.Logger, configLoader *config.PluginMappingConfigLoader) CollectionTypeMapper
NewConfigBasedCollectionTypeMapper 创建基于配置的映射器
type CollectionTypeMapping ¶
CollectionTypeMapping 采集类型映射配置
type Config ¶
type Config struct {
Upstream Upstream `yaml:"upstream"`
BaseConfig BaseConfig `yaml:"base_config" json:"base_config"`
Server ServerConfig `yaml:"server" json:"server"` // 服务器配置(HTTP、gRPC端口等)
GitConfig GitConfig `yaml:"git"`
FunctionArea string `yaml:"function_area" json:"function_area"`
EtcdConfig map[string]interface{} `yaml:"etcd" json:"etcd"`
UseDockerEtcd bool `yaml:"use_docker_etcd" json:"use_docker_etcd"` // 是否使用 Docker 启动 etcd,false 时使用外部 etcd,默认为 true(向后兼容)
Telegraf TelegrafConfig `yaml:"telegraf" json:"telegraf"`
Minio MinioConfig `yaml:"minio" json:"minio"`
UniOpsConfig UniOpsConfig `yaml:"uniops" json:"uniops"`
Database DatabaseConfig `yaml:"database" json:"database"`
SyslogManager SyslogManagerConfig `yaml:"syslog_manager" json:"syslog_manager"`
MetricsManager MetricsManagerConfig `yaml:"metrics_manager" json:"metrics_manager"`
Nacos NacosConfig `yaml:"nacos" json:"nacos"`
Redis RedisConfig `yaml:"redis" json:"redis"`
SkipInit bool `yaml:"skip_init" json:"skip_init"`
}
Config 定义了控制器的配置
func ProvideConfig ¶
ProvideConfig 提供配置对象,从本地YAML文件加载
type ConfigBasedCollectionTypeMapper ¶
type ConfigBasedCollectionTypeMapper struct {
// contains filtered or unexported fields
}
ConfigBasedCollectionTypeMapper 基于配置的映射器
func (*ConfigBasedCollectionTypeMapper) GetDefaultConfig ¶
func (m *ConfigBasedCollectionTypeMapper) GetDefaultConfig(collectionType string) map[string]interface{}
GetDefaultConfig 获取默认配置
func (*ConfigBasedCollectionTypeMapper) GetTaskType ¶
func (m *ConfigBasedCollectionTypeMapper) GetTaskType(collectionType string) string
GetTaskType 获取任务类型
type ConfigManager ¶
type ConfigManager struct {
EtcdClient *clientv3.Client `wire:"-"`
NacosManager *NacosManager
KeyManager *KeyManager
Config *Config
// contains filtered or unexported fields
}
func ProvideConfigManager ¶
func ProvideConfigManager(km *KeyManager, etcdClient *clientv3.Client, config *Config, nm *NacosManager) (*ConfigManager, error)
ProvideConfigManager 为 Wire 依赖注入提供的构造函数
func (*ConfigManager) BatchGetConfigs ¶
func (cm *ConfigManager) BatchGetConfigs(resourceType models.ResourceType, resourceIDs []string) (map[string]map[string]interface{}, error)
func (*ConfigManager) BatchUpdateConfigs ¶
func (cm *ConfigManager) BatchUpdateConfigs(updates map[models.ResourceType]map[string]map[string]interface{}) error
func (*ConfigManager) CompareConfigVersions ¶
func (cm *ConfigManager) CompareConfigVersions(resourceType models.ResourceType, resourceID string, version1, version2 string) (map[string]interface{}, error)
func (*ConfigManager) ContainerName ¶
func (cm *ConfigManager) ContainerName(resourceType models.ResourceType) (string, error)
func (*ConfigManager) DeleteConfig ¶
func (cm *ConfigManager) DeleteConfig(resourceType models.ResourceType, containerName string) error
func (*ConfigManager) FilterConfigs ¶
func (cm *ConfigManager) FilterConfigs(resourceType models.ResourceType, filter func(map[string]interface{}) bool) ([]string, error)
func (*ConfigManager) GetAppVariables ¶
func (cm *ConfigManager) GetAppVariables(appID string) (map[string]string, error)
func (*ConfigManager) GetConfig ¶
func (cm *ConfigManager) GetConfig(resourceType models.ResourceType, containerName string) (map[string]interface{}, error)
func (*ConfigManager) GetConfigStats ¶
func (cm *ConfigManager) GetConfigStats(resourceType models.ResourceType) (map[string]int, error)
func (*ConfigManager) GetConfigWithMeta ¶
func (cm *ConfigManager) GetConfigWithMeta(resourceType models.ResourceType, containerName string) (config map[string]interface{}, meta map[string]interface{}, err error)
func (*ConfigManager) GetContainerNameByResourceType ¶
func (cm *ConfigManager) GetContainerNameByResourceType(resourceType models.ResourceType) (string, error)
func (*ConfigManager) GetGlobalVariables ¶
func (cm *ConfigManager) GetGlobalVariables() (map[string]string, error)
func (*ConfigManager) GetJson ¶
func (cm *ConfigManager) GetJson(resourceType models.ResourceType) (string, error)
func (*ConfigManager) ListApps ¶
func (cm *ConfigManager) ListApps() ([]string, error)
func (*ConfigManager) ListConfigKeys ¶
func (cm *ConfigManager) ListConfigKeys(resourceType models.ResourceType) ([]string, error)
func (*ConfigManager) ListConfigsWithDetails ¶
func (cm *ConfigManager) ListConfigsWithDetails(resourceType models.ResourceType) (map[string]map[string]interface{}, error)
func (*ConfigManager) ListResourceIDs ¶
func (cm *ConfigManager) ListResourceIDs(resourceType models.ResourceType) ([]string, error)
func (*ConfigManager) SearchConfigs ¶
func (cm *ConfigManager) SearchConfigs(resourceType models.ResourceType, searchTerm string) ([]string, error)
func (*ConfigManager) SetAppVariables ¶
func (cm *ConfigManager) SetAppVariables(appID string, variables map[string]string) error
func (*ConfigManager) SetGlobalVariables ¶
func (cm *ConfigManager) SetGlobalVariables(variables map[string]string) error
func (*ConfigManager) Start ¶
func (cm *ConfigManager) Start() error
func (*ConfigManager) Stop ¶
func (cm *ConfigManager) Stop() error
func (*ConfigManager) UpdateConfig ¶
func (cm *ConfigManager) UpdateConfig(resourceType models.ResourceType, containerName string, config map[string]interface{}) error
func (*ConfigManager) UpdateConfigWithMeta ¶
func (cm *ConfigManager) UpdateConfigWithMeta(resourceType models.ResourceType, containerName string, config map[string]interface{}, meta map[string]interface{}) error
type ConfigProvider ¶
type ConfigTemplate ¶
type ConfigTemplate struct {
// contains filtered or unexported fields
}
ConfigTemplate 是配置模板的主要结构
func NewConfigTemplate ¶
func NewConfigTemplate(templateString string) (*ConfigTemplate, error)
NewConfigTemplate 创建一个新的配置模板
func (*ConfigTemplate) Generate ¶
func (ct *ConfigTemplate) Generate(data TemplateData) (string, error)
Generate 根据提供的数据生成配置
type Controller ¶
type Controller struct {
ConfigManager *ConfigManager
ResourceManager *ResourceManager
RegistryManager *RegistryManager
KeyManager *KeyManager
MinioManager *MinioManager
DeploymentManager *DeploymentManager
AgentManager *AgentManager
JumperServerManager *JumperServerManager
GrpcProxyManager *GrpcProxyManager
LokiForwarder *LokiForwarder
PrometheusForwarder *PrometheusForwarder
TelegrafManager *TelegrafManager
RedisManager *RedisManager
MonitoringService *MonitoringService
PluginTemplateService *PluginTemplateService
// contains filtered or unexported fields
}
func InitializeControllerComponents ¶
func InitializeControllerComponents(configPath string) (*Controller, error)
InitializeControllerComponents 初始化所有控制器组件
func ProvideController ¶
func ProvideController( cm *ConfigManager, rm *ResourceManager, regm *RegistryManager, km *KeyManager, mm *MinioManager, ds *DeploymentManager, mongoClient *mongo.Client, am *AgentManager, jpm *JumperServerManager, gpm *GrpcProxyManager, lf *LokiForwarder, pf *PrometheusForwarder, tgm *TelegrafManager, rd *RedisManager, ms *MonitoringService, pts *PluginTemplateService, ) (*Controller, error)
ProvideController 为 Wire 依赖注入提供的构造函数
func (*Controller) CreateDeployment ¶
func (c *Controller) CreateDeployment(req models.DeploymentRequest) (*models.Deployment, error)
POST /api/v1/deployments
func (*Controller) CreateResource ¶
func (c *Controller) CreateResource(resourceType models.ResourceType, resourceID string, config map[string]interface{}) error
func (*Controller) DeleteConfig ¶
func (c *Controller) DeleteConfig(resourceType models.ResourceType, resourceID string) error
func (*Controller) DeleteResource ¶
func (c *Controller) DeleteResource(resourceType models.ResourceType, resourceID string) error
func (*Controller) GetAgentVariables ¶
func (*Controller) GetAssets ¶
func (c *Controller) GetAssets(tags string) ([]*structs.L2DeviceRemoteInfo, error)
func (*Controller) GetConfig ¶
func (c *Controller) GetConfig(resourceType models.ResourceType, resourceID string) (map[string]interface{}, error)
func (*Controller) GetMongoClient ¶
func (c *Controller) GetMongoClient() *mongo.Client
GetMongoClient 返回 MongoDB 客户端(用于其他模块访问)
func (*Controller) GetRegisteredAgentsCount ¶
func (c *Controller) GetRegisteredAgentsCount() int
func (*Controller) GetResourceStatus ¶
func (c *Controller) GetResourceStatus(resourceType models.ResourceType, resourceID string) (string, error)
func (*Controller) GetStatus ¶
func (c *Controller) GetStatus() (*models.ControllerStatus, error)
func (*Controller) ListConfigs ¶
func (c *Controller) ListConfigs(resourceType models.ResourceType) ([]string, error)
func (*Controller) ListDeployments ¶
func (c *Controller) ListDeployments(w http.ResponseWriter, r *http.Request)
GET /api/v1/deployments
func (*Controller) ListResources ¶
func (c *Controller) ListResources() []string
func (*Controller) RegisterService ¶
func (c *Controller) RegisterService() error
func (*Controller) RegisterToUpstreamEtcd ¶
func (c *Controller) RegisterToUpstreamEtcd(upstreamEtcdEndpoints []string) error
func (*Controller) Start ¶
func (c *Controller) Start(options ...controllerOptionFunc) error
func (*Controller) Stop ¶
func (c *Controller) Stop() error
func (*Controller) UpdateConfig ¶
func (c *Controller) UpdateConfig(resourceType models.ResourceType, resourceID string, config map[string]interface{}) error
type DataQualityChecker ¶
type DataQualityChecker struct {
// contains filtered or unexported fields
}
DataQualityChecker 数据质量检查器
func NewDataQualityChecker ¶
func NewDataQualityChecker(qualityService *DataQualityService, metadataService *MetricMetadataService) *DataQualityChecker
NewDataQualityChecker 创建数据质量检查器
func (*DataQualityChecker) CheckMetric ¶
func (c *DataQualityChecker) CheckMetric(ctx context.Context, metricName string, value float64, timestamp time.Time) ([]*models.QualityCheckResult, error)
CheckMetric 检查指标数据质量
func (*DataQualityChecker) CheckMetricsBatch ¶
func (c *DataQualityChecker) CheckMetricsBatch(ctx context.Context, metrics map[string]float64, timestamp time.Time) (map[string][]*models.QualityCheckResult, error)
CheckMetricsBatch 批量检查指标数据质量
type DataQualityService ¶
type DataQualityService struct {
// contains filtered or unexported fields
}
DataQualityService 数据质量服务(从OneOps平台获取规则)
func NewDataQualityService ¶
func NewDataQualityService(oneOpsBaseURL string) *DataQualityService
NewDataQualityService 创建数据质量服务
func ProvideDataQualityService ¶
func ProvideDataQualityService(config *Config) *DataQualityService
ProvideDataQualityService 为 Wire 依赖注入提供的构造函数
func (*DataQualityService) GetRules ¶
func (s *DataQualityService) GetRules(ctx context.Context, enabled bool) ([]*models.DataQualityRule, error)
GetRules 获取数据质量规则列表
func (*DataQualityService) GetRulesByMetric ¶
func (s *DataQualityService) GetRulesByMetric(ctx context.Context, metricName string) ([]*models.DataQualityRule, error)
GetRulesByMetric 根据指标名称获取匹配的质量规则
type DatabaseConfig ¶
type DefaultTargetConverter ¶
type DefaultTargetConverter struct {
// contains filtered or unexported fields
}
DefaultTargetConverter 默认目标转换器
func NewDefaultTargetConverter ¶
func NewDefaultTargetConverter(logger *zap.Logger) *DefaultTargetConverter
NewDefaultTargetConverter 创建默认目标转换器
func (*DefaultTargetConverter) Convert ¶
func (c *DefaultTargetConverter) Convert(targets []*CollectionTargetDTO, strategy *UnifiedCollectionStrategyDTO) (map[string]interface{}, error)
Convert 默认转换逻辑
func (*DefaultTargetConverter) Supports ¶
func (c *DefaultTargetConverter) Supports(collectionType string) bool
Supports 默认转换器支持所有类型(作为回退)
type Deployment ¶
type Deployment struct {
// Timeout int `yaml:"timeout"`
ConcurrentDeployments int `yaml:"concurrent_deployments"`
}
type DeploymentManager ¶
type DeploymentManager struct {
ConfigManager *ConfigManager
RegistryManager *RegistryManager
MinioManager *MinioManager
MongoClient *mongo.Client
// contains filtered or unexported fields
}
func ProvideDeployManager ¶
func ProvideDeployManager(configManager *ConfigManager, registryManager *RegistryManager, minio *MinioManager, mongoClient *mongo.Client) *DeploymentManager
func (*DeploymentManager) CreateDeployment ¶
func (dm *DeploymentManager) CreateDeployment(req models.DeploymentRequest) (*models.Deployment, error)
func (*DeploymentManager) GetDeployment ¶
func (dm *DeploymentManager) GetDeployment(deploymentID string) (*models.Deployment, error)
在 DeploymentManager 中更新此方法
func (*DeploymentManager) ListDeployments ¶
func (dm *DeploymentManager) ListDeployments() ([]models.Deployment, error)
func (*DeploymentManager) UpdateDeploymentDeviceStatus ¶
func (dm *DeploymentManager) UpdateDeploymentDeviceStatus(deploymentID string, deviceStatus models.TargetDevice) error
type EffectiveStrategy ¶
type EffectiveStrategy struct {
GlobalStrategy *models.EnhancedGlobalMetricStrategy
GroupStrategies []*models.GroupMetricStrategy
InstanceStrategy *models.EnhancedInstanceMetricStrategy
CollectionTasks []models.CollectionTask
}
EffectiveStrategy 生效策略(合并后的最终策略)
type EnhancedStrategyConverter ¶
type EnhancedStrategyConverter struct {
// contains filtered or unexported fields
}
EnhancedStrategyConverter 增强的策略转换器 支持采集任务、处理规则、输出规则等完整功能 使用配置驱动,替代硬编码
func NewEnhancedStrategyConverter ¶
func NewEnhancedStrategyConverter(logger *zap.Logger, configLoader *config.PluginMappingConfigLoader) *EnhancedStrategyConverter
NewEnhancedStrategyConverter 创建增强的策略转换器
func (*EnhancedStrategyConverter) ConvertCollectionTaskToPluginConfig ¶
func (esc *EnhancedStrategyConverter) ConvertCollectionTaskToPluginConfig( ctx context.Context, task *models.CollectionTask, ) (map[string]interface{}, []map[string]interface{}, []map[string]interface{}, error)
ConvertCollectionTaskToPluginConfig 将采集任务转换为插件配置
func (*EnhancedStrategyConverter) GetConfigLoader ¶
func (esc *EnhancedStrategyConverter) GetConfigLoader() *config.PluginMappingConfigLoader
GetConfigLoader 获取配置加载器(用于外部访问)
type FileOperation ¶
type FileOperation struct {
// contains filtered or unexported fields
}
FileOperation 文件操作模块
func NewFileOperation ¶
func NewFileOperation(agentManager *AgentManager) *FileOperation
NewFileOperation 创建文件操作模块
func (*FileOperation) CreateDirectory ¶
func (fo *FileOperation) CreateDirectory(ctx context.Context, agentCode, dirPath string, mode uint32) (map[string]interface{}, error)
CreateDirectory 创建目录
func (*FileOperation) DeleteFile ¶
func (fo *FileOperation) DeleteFile(ctx context.Context, agentCode, filePath string, recursive bool) (map[string]interface{}, error)
DeleteFile 删除文件或目录
func (*FileOperation) DownloadFile ¶
func (fo *FileOperation) DownloadFile(ctx context.Context, agentCode, filePath string, offset, length int64) (io.ReadCloser, error)
DownloadFile 下载文件(返回读取器)
func (*FileOperation) GetFileInfo ¶
func (fo *FileOperation) GetFileInfo(ctx context.Context, agentCode, filePath string) (map[string]interface{}, error)
GetFileInfo 获取文件信息
func (*FileOperation) GetUploadStatus ¶
func (fo *FileOperation) GetUploadStatus(ctx context.Context, agentCode, sessionID string) (map[string]interface{}, error)
GetUploadStatus 获取上传状态
func (*FileOperation) ListFiles ¶
func (fo *FileOperation) ListFiles(ctx context.Context, agentCode, dirPath string) (map[string]interface{}, error)
ListFiles 列出文件
func (*FileOperation) StartFileUpload ¶
func (fo *FileOperation) StartFileUpload(ctx context.Context, agentCode, filePath string, fileSize int64, md5, sha256 string) (map[string]interface{}, error)
StartFileUpload 开始文件上传
func (*FileOperation) UploadFileChunk ¶
func (fo *FileOperation) UploadFileChunk(ctx context.Context, agentCode, sessionID string, chunkIndex int64, data []byte) error
UploadFileChunk 上传文件块
type GlobalResolver ¶
type GlobalResolver struct {
ConfigManager *ConfigManager
}
func ProvideGlobalResolver ¶
func ProvideGlobalResolver(cm *ConfigManager) *GlobalResolver
type GroupStrategyManager ¶
type GroupStrategyManager struct {
// contains filtered or unexported fields
}
GroupStrategyManager 分组策略管理器
func NewGroupStrategyManager ¶
func NewGroupStrategyManager(mongoClient *mongo.Client, logger *zap.Logger) *GroupStrategyManager
NewGroupStrategyManager 创建分组策略管理器
func (*GroupStrategyManager) CreateGroupStrategy ¶
func (gsm *GroupStrategyManager) CreateGroupStrategy(ctx context.Context, strategy *models.GroupMetricStrategy) error
CreateGroupStrategy 创建分组策略
func (*GroupStrategyManager) DeleteGroupStrategy ¶
func (gsm *GroupStrategyManager) DeleteGroupStrategy(ctx context.Context, id string) error
DeleteGroupStrategy 删除分组策略
func (*GroupStrategyManager) GetGroupStrategiesForAgent ¶
func (gsm *GroupStrategyManager) GetGroupStrategiesForAgent(ctx context.Context, agentCode string, agentManager *AgentManager) ([]*models.GroupMetricStrategy, error)
GetGroupStrategiesForAgent 获取 Agent 匹配的所有分组策略(按优先级排序)
func (*GroupStrategyManager) GetGroupStrategy ¶
func (gsm *GroupStrategyManager) GetGroupStrategy(ctx context.Context, id string) (*models.GroupMetricStrategy, error)
GetGroupStrategy 获取分组策略
func (*GroupStrategyManager) ListGroupStrategies ¶
func (gsm *GroupStrategyManager) ListGroupStrategies(ctx context.Context) ([]*models.GroupMetricStrategy, error)
ListGroupStrategies 列出所有分组策略
func (*GroupStrategyManager) MatchGroupStrategies ¶
func (gsm *GroupStrategyManager) MatchGroupStrategies(ctx context.Context, agentLabels map[string]string) ([]*models.GroupMetricStrategy, error)
MatchGroupStrategies 根据 Agent 标签匹配分组策略 支持多标签 AND 逻辑匹配和通配符匹配
func (*GroupStrategyManager) UpdateGroupStrategy ¶
func (gsm *GroupStrategyManager) UpdateGroupStrategy(ctx context.Context, strategy *models.GroupMetricStrategy) error
UpdateGroupStrategy 更新分组策略
type GrpcProxyManager ¶
type GrpcProxyManager struct {
StreamDirector proxy.StreamDirector
// contains filtered or unexported fields
}
func ProvideGrpcProxyManager ¶
func ProvideGrpcProxyManager(registryManager *RegistryManager, am *AgentManager, km *KeyManager, config *Config) *GrpcProxyManager
func (*GrpcProxyManager) Start ¶
func (gpm *GrpcProxyManager) Start() error
func (*GrpcProxyManager) Stop ¶
func (gpm *GrpcProxyManager) Stop() error
type HealthMonitor ¶
type HealthMonitor struct {
// contains filtered or unexported fields
}
HealthMonitor 健康监控模块
func NewHealthMonitor ¶
func NewHealthMonitor(agentManager *AgentManager) *HealthMonitor
NewHealthMonitor 创建健康监控模块
func (*HealthMonitor) GetAgentOverallHealth ¶
func (hm *HealthMonitor) GetAgentOverallHealth(ctx context.Context, agentCode string) (map[string]interface{}, error)
GetAgentOverallHealth 获取 Agent 整体健康状态
func (*HealthMonitor) GetHealthHistory ¶
func (hm *HealthMonitor) GetHealthHistory(ctx context.Context, agentCode, packageName string, limit int32) ([]*pb.HealthHistoryEntry, error)
GetHealthHistory 获取服务的健康检查历史
func (*HealthMonitor) GetServiceHealth ¶
func (hm *HealthMonitor) GetServiceHealth(ctx context.Context, agentCode, packageName string) (*pb.ServiceHealth, error)
GetServiceHealth 获取指定服务的健康状态
func (*HealthMonitor) ListAgentHealth ¶
func (hm *HealthMonitor) ListAgentHealth(ctx context.Context, agentCode string, filterStatus []pb.HealthStatus) ([]*pb.ServiceHealth, error)
ListAgentHealth 获取 Agent 所有服务的健康状态
type JumperServerManager ¶
type JumperServerManager struct {
// contains filtered or unexported fields
}
func ProvideJumperServerManager ¶
func ProvideJumperServerManager(registryManager *RegistryManager, km *KeyManager, config *Config) *JumperServerManager
func (*JumperServerManager) Start ¶
func (jsm *JumperServerManager) Start() error
func (*JumperServerManager) Stop ¶
func (jsm *JumperServerManager) Stop() error
type KeyManager ¶
type KeyManager struct {
// contains filtered or unexported fields
}
func NewKeyManager ¶
func NewKeyManager(config *Config) (*KeyManager, error)
func ProvideKeyManager ¶
func ProvideKeyManager(config *Config) (*KeyManager, error)
ProvideKeyManager 为 Wire 依赖注入提供的构造函数
func (*KeyManager) GenerateResourceKey ¶
func (km *KeyManager) GenerateResourceKey(resourceType, containerName string) (string, error)
func (*KeyManager) GenerateResourcePrefix ¶
func (km *KeyManager) GenerateResourcePrefix(resourceType string) (string, error)
func (*KeyManager) GenerateServiceKey ¶
func (km *KeyManager) GenerateServiceKey(serviceName, hostIdentifier, port string) (string, error)
func (*KeyManager) GenerateServicePrefix ¶
func (km *KeyManager) GenerateServicePrefix(serviceName string) (string, error)
func (*KeyManager) ParseResourceKey ¶
func (*KeyManager) ParseServiceKey ¶
type LayerResolver ¶
type LayeredVariableResolver ¶
type LayeredVariableResolver struct {
// contains filtered or unexported fields
}
func NewLayeredVariableResolver ¶
func NewLayeredVariableResolver(rm *RegistryManager, cm *ConfigManager) *LayeredVariableResolver
type LokiForwarder ¶
type LokiForwarder struct {
UpstreamURL string
Client *http.Client
BufferSize int
FlushInterval time.Duration
RedisManager *RedisManager
LokiLabelKey string
// contains filtered or unexported fields
}
func ProvideLokiForwarder ¶
func ProvideLokiForwarder(config *ConfigManager, redisManager *RedisManager) *LokiForwarder
func (*LokiForwarder) ForwardHandler ¶
func (lf *LokiForwarder) ForwardHandler(w http.ResponseWriter, r *http.Request)
type LokiPushRequest ¶
type LokiPushRequest struct {
Streams []LokiStream `json:"streams"`
}
LokiPushRequest represents the structure of a Loki push request
type LokiStream ¶
type LokiStream struct {
Stream map[string]string `json:"stream"`
Values [][]string `json:"values"` // [timestamp, log message]
}
LokiStream represents a stream of log entries in Loki
type MetricDataPoint ¶
MetricDataPoint 指标数据点
type MetricMetadataService ¶
type MetricMetadataService struct {
// contains filtered or unexported fields
}
MetricMetadataService 指标元数据服务(从OneOps平台获取)
func NewMetricMetadataService ¶
func NewMetricMetadataService(oneOpsBaseURL string) *MetricMetadataService
NewMetricMetadataService 创建指标元数据服务
func ProvideMetricMetadataService ¶
func ProvideMetricMetadataService(config *Config) *MetricMetadataService
ProvideMetricMetadataService 为 Wire 依赖注入提供的构造函数
func (*MetricMetadataService) GetMetadata ¶
func (s *MetricMetadataService) GetMetadata(ctx context.Context, name string) (*models.MetricMetadata, error)
GetMetadata 根据名称获取指标元数据
func (*MetricMetadataService) ListMetadata ¶
func (s *MetricMetadataService) ListMetadata(ctx context.Context, category string) ([]*models.MetricMetadata, error)
ListMetadata 获取指标元数据列表
type MetricsManagerConfig ¶
type MetricsManagerConfig struct {
Port int `yaml:"port" json:"port"`
RemoteWriteURL string `yaml:"remoteWriteurl" json:"remoteWriteurl"`
}
MetricsManagerConfig 定义了指标管理器的配置
type MetricsQuery ¶
type MetricsQuery struct {
// contains filtered or unexported fields
}
MetricsQuery 指标查询模块
func NewMetricsQuery ¶
func NewMetricsQuery(agentManager *AgentManager) *MetricsQuery
NewMetricsQuery 创建指标查询模块
func (*MetricsQuery) GetAgentMetricsSummary ¶
func (mq *MetricsQuery) GetAgentMetricsSummary(ctx context.Context, agentCode string) (map[string]interface{}, error)
GetAgentMetricsSummary 获取 Agent 指标摘要
func (*MetricsQuery) GetApplicationMetrics ¶
func (mq *MetricsQuery) GetApplicationMetrics(ctx context.Context, agentCode, serviceName string) (map[string]interface{}, error)
GetApplicationMetrics 获取应用指标
func (*MetricsQuery) GetMetricsHistory ¶
func (mq *MetricsQuery) GetMetricsHistory(ctx context.Context, agentCode, metricType, serviceName string, startTime, endTime time.Time) (map[string]interface{}, error)
GetMetricsHistory 获取指标历史数据
func (*MetricsQuery) GetServiceMetrics ¶
func (mq *MetricsQuery) GetServiceMetrics(ctx context.Context, agentCode, packageName string) (*pb.ServiceMetrics, error)
GetServiceMetrics 获取指定服务的指标
func (*MetricsQuery) GetSystemMetrics ¶
func (mq *MetricsQuery) GetSystemMetrics(ctx context.Context, agentCode string) (*pb.SystemMetrics, error)
GetSystemMetrics 获取系统指标
func (*MetricsQuery) ListServiceMetrics ¶
func (mq *MetricsQuery) ListServiceMetrics(ctx context.Context, agentCode string, runningOnly bool) ([]*pb.ServiceMetrics, error)
ListServiceMetrics 获取 Agent 所有服务的指标
type MetricsStrategyService ¶
type MetricsStrategyService struct {
// contains filtered or unexported fields
}
MetricsStrategyService 指标策略管理服务 注意:数据源是OneOps MySQL,Controller的MongoDB作为缓存/同步目标 策略数据流向:OneOps MySQL → Controller MongoDB → Agent 本服务主要用于Agent查询策略配置,实际的数据管理在OneOps后端完成 MongoDB中的数据由OneOps后端同步更新
func NewMetricsStrategyService ¶
func NewMetricsStrategyService(mongoClient *mongo.Client, agentManager *AgentManager, registryManager *RegistryManager, logger *zap.Logger, configLoader *config.PluginMappingConfigLoader) *MetricsStrategyService
NewMetricsStrategyService 创建指标策略管理服务
func (*MetricsStrategyService) DeleteInstanceStrategy ¶
func (s *MetricsStrategyService) DeleteInstanceStrategy(ctx context.Context, agentCode string) error
DeleteInstanceStrategy 删除实例策略
func (*MetricsStrategyService) GenerateConfigFromEnhancedStrategy ¶
func (s *MetricsStrategyService) GenerateConfigFromEnhancedStrategy( ctx context.Context, agentCode string, ) (string, error)
GenerateConfigFromEnhancedStrategy 从增强策略生成 Telegraf 配置
func (*MetricsStrategyService) GetApplicationStrategy ¶
func (s *MetricsStrategyService) GetApplicationStrategy(ctx context.Context, agentCode string) (*models.ApplicationMetricStrategy, error)
GetApplicationStrategy 获取应用指标策略(全局或实例)
func (*MetricsStrategyService) GetAvailableMetrics ¶
func (s *MetricsStrategyService) GetAvailableMetrics(ctx context.Context, agentCode string) ([]string, error)
GetAvailableMetrics 获取可用指标列表
func (*MetricsStrategyService) GetConfigStatus ¶
func (s *MetricsStrategyService) GetConfigStatus(ctx context.Context, agentCode string) (*models.ConfigStatus, error)
GetConfigStatus 获取配置生效状态 注意:新架构中,配置状态应该从任务服务(CollectionTaskService)查询,而不是从策略服务查询 此方法已废弃,请使用CollectionTaskService.GetTaskStatus替代
func (*MetricsStrategyService) GetEffectiveEnhancedStrategy ¶
func (s *MetricsStrategyService) GetEffectiveEnhancedStrategy(ctx context.Context, agentCode string) (*EffectiveStrategy, error)
GetEffectiveEnhancedStrategy 获取生效的增强策略 注意:策略合并逻辑已在 OneOps 中完成,同步到 Controller 的 MongoDB 时已经是已合并的策略 Controller 直接返回从 MongoDB 读取的策略即可,不需要再次合并 这样可以避免重复逻辑,并确保两个服务返回的策略一致
func (*MetricsStrategyService) GetEnhancedGlobalStrategy ¶
func (s *MetricsStrategyService) GetEnhancedGlobalStrategy(ctx context.Context) (*models.EnhancedGlobalMetricStrategy, error)
GetEnhancedGlobalStrategy 获取增强的全局策略
func (*MetricsStrategyService) GetEnhancedInstanceStrategy ¶
func (s *MetricsStrategyService) GetEnhancedInstanceStrategy(ctx context.Context, agentCode string) (*models.EnhancedInstanceMetricStrategy, error)
GetEnhancedInstanceStrategy 获取增强的实例策略
func (*MetricsStrategyService) PreviewTask ¶
func (s *MetricsStrategyService) PreviewTask(ctx context.Context, task models.CollectionTask) (*models.MetricRulePreview, error)
PreviewTask 预览采集任务匹配的指标
func (*MetricsStrategyService) UpdateApplicationStrategy ¶
func (s *MetricsStrategyService) UpdateApplicationStrategy(ctx context.Context, strategy *models.ApplicationMetricStrategy) error
UpdateApplicationStrategy 更新应用指标策略
type MinioConfig ¶
type MinioConfig struct {
Endpoint string `yaml:"endpoint" json:"endpoint"`
AccessKeyID string `yaml:"accessKeyID" json:"access_key_id"`
SecretAccessKey string `yaml:"secretAccessKey" json:"secret_access_key"`
UseSSL bool `yaml:"useSSL" json:"use_ssl"`
BucketName string `yaml:"bucketName" json:"bucketName"`
ProxyAddr string `yaml:"proxyAddr" json:"proxy_addr"`
}
type MinioManager ¶
type MinioManager struct {
KeyManager *KeyManager
ConfigManager *ConfigManager
RegistryManager *RegistryManager
// contains filtered or unexported fields
}
func ProvideMinioManager ¶
func ProvideMinioManager(km *KeyManager, configManager *ConfigManager, registryManager *RegistryManager) (*MinioManager, error)
func (*MinioManager) CreateBucket ¶
func (mm *MinioManager) CreateBucket(bucketName string) error
func (*MinioManager) DeleteObject ¶
func (mm *MinioManager) DeleteObject(bucketName, objectName string) error
func (*MinioManager) DownloadFile ¶
func (mm *MinioManager) DownloadFile(bucketName, objectName string, filePath string) error
func (*MinioManager) GetMiniManagerStatus ¶
func (mm *MinioManager) GetMiniManagerStatus() (models.ResourceStatus, error)
func (*MinioManager) GetObjectInfo ¶
func (mm *MinioManager) GetObjectInfo(bucketName, objectName string) (minio.ObjectInfo, error)
GetObjectInfo 获取对象信息
func (*MinioManager) GetPresignedURL ¶
func (mm *MinioManager) GetPresignedURL(bucketName, objectName string, expiry time.Duration, method string) (string, error)
GetPresignedURL 获取预签名URL GetPresignedURL 获取预签名URL
func (*MinioManager) GetProxyLatestPackageURL ¶
func (mm *MinioManager) GetProxyLatestPackageURL(bucketName, objectName string) (string, error)
func (*MinioManager) GetProxyPackageURL ¶
func (mm *MinioManager) GetProxyPackageURL(bucketName, objectName string) (string, error)
func (*MinioManager) ListBuckets ¶
func (mm *MinioManager) ListBuckets() ([]minio.BucketInfo, error)
func (*MinioManager) ListObjects ¶
func (mm *MinioManager) ListObjects(bucketName string) ([]minio.ObjectInfo, error)
func (*MinioManager) ListPackages ¶
func (mm *MinioManager) ListPackages(bucketName string) ([]string, error)
func (*MinioManager) RegisterService ¶
func (mm *MinioManager) RegisterService() error
func (*MinioManager) UploadFile ¶
func (mm *MinioManager) UploadFile(bucketName, objectName string, filePath string) error
type MonitoringService ¶
type MonitoringService struct {
// contains filtered or unexported fields
}
func ProvideMonitoringService ¶
func ProvideMonitoringService( mongoClient *mongo.Client, config *Config, configGenerator *TelegrafConfigGenerator, templateService *PluginTemplateService, telegrafManager *TelegrafManager, ) (*MonitoringService, error)
func (*MonitoringService) CreatePlugin ¶
func (ms *MonitoringService) CreatePlugin(ctx context.Context, plugin *models.PluginConfig) error
CreatePlugin 创建独立插件配置
func (*MonitoringService) CreateTask ¶
func (ms *MonitoringService) CreateTask(ctx context.Context, task *models.MonitoringTask) error
CreateTask 创建监控任务
func (*MonitoringService) DeletePlugin ¶
func (ms *MonitoringService) DeletePlugin(ctx context.Context, pluginID string) error
DeletePlugin 删除插件配置
func (*MonitoringService) DeleteTask ¶
func (ms *MonitoringService) DeleteTask(ctx context.Context, taskID string) error
DeleteTask 删除监控任务
func (*MonitoringService) EnsureIndexes ¶
func (ms *MonitoringService) EnsureIndexes(ctx context.Context) error
EnsureIndexes 创建索引
func (*MonitoringService) GetPlugin ¶
func (ms *MonitoringService) GetPlugin(ctx context.Context, pluginID string) (*models.PluginConfig, error)
GetPlugin 获取插件配置
func (*MonitoringService) GetStatus ¶
func (ms *MonitoringService) GetStatus(ctx context.Context) (*models.MonitoringStatus, error)
GetStatus 获取监控平台状态
func (*MonitoringService) GetTask ¶
func (ms *MonitoringService) GetTask(ctx context.Context, taskID string) (*models.MonitoringTask, error)
GetTask 获取监控任务
func (*MonitoringService) ListPlugins ¶
func (ms *MonitoringService) ListPlugins(ctx context.Context, filter map[string]interface{}) ([]*models.PluginConfig, error)
ListPlugins 列出插件配置
func (*MonitoringService) ListTasks ¶
func (ms *MonitoringService) ListTasks(ctx context.Context, filter map[string]interface{}) ([]*models.MonitoringTask, error)
ListTasks 列出监控任务
func (*MonitoringService) PauseTask ¶
func (ms *MonitoringService) PauseTask(ctx context.Context, taskID string) error
PauseTask 暂停监控任务(从active状态转为paused)
func (*MonitoringService) PublishTask ¶
func (ms *MonitoringService) PublishTask(ctx context.Context, taskID string) error
PublishTask 发布监控任务(从created状态转为published)
func (*MonitoringService) ReloadTelegrafConfig ¶
func (ms *MonitoringService) ReloadTelegrafConfig(ctx context.Context) error
ReloadTelegrafConfig 重新加载 telegraf 配置(公开方法)
func (*MonitoringService) ResumeTask ¶
func (ms *MonitoringService) ResumeTask(ctx context.Context, taskID string) error
ResumeTask 恢复监控任务(从paused状态转为active)
func (*MonitoringService) RevokeTask ¶
func (ms *MonitoringService) RevokeTask(ctx context.Context, taskID string) error
RevokeTask 撤回监控任务
func (*MonitoringService) StartTask ¶
func (ms *MonitoringService) StartTask(ctx context.Context, taskID string) error
StartTask 启动监控任务(从published状态转为active)
func (*MonitoringService) StopTask ¶
func (ms *MonitoringService) StopTask(ctx context.Context, taskID string) error
StopTask 停止监控任务(从active或paused状态转为stopped)
func (*MonitoringService) UpdatePlugin ¶
func (ms *MonitoringService) UpdatePlugin(ctx context.Context, pluginID string, plugin *models.PluginConfig) error
UpdatePlugin 更新插件配置
func (*MonitoringService) UpdateTask ¶
func (ms *MonitoringService) UpdateTask(ctx context.Context, taskID string, task *models.MonitoringTask) error
UpdateTask 更新监控任务
type NacosConfig ¶
type NacosConfig struct {
Server string `yaml:"server" json:"server"`
Port int `yaml:"port" json:"port"`
Namespace string `yaml:"namespace" json:"namespace"`
Group string `yaml:"group" json:"group"`
AgentGroup string `yaml:"agent_group" json:"agent_group"`
DataID string `yaml:"data_id" json:"data_id"`
Username string `yaml:"username" json:"username"`
Password string `yaml:"password" json:"password"`
LogDir string `yaml:"log_dir" json:"log_dir"`
CacheDir string `yaml:"cache_dir" json:"cache_dir"`
LogLevel string `yaml:"log_level" json:"log_level"`
}
定义NacosConfig结构体
type NacosManager ¶
type NacosManager struct {
// contains filtered or unexported fields
}
func ProvideNacosManager ¶
func ProvideNacosManager(config *Config) (*NacosManager, error)
func (*NacosManager) BatchGetConfig ¶
func (nm *NacosManager) BatchGetConfig(params []vo.ConfigParam) (map[string]string, error)
func (*NacosManager) BatchSetConfig ¶
func (nm *NacosManager) BatchSetConfig(configs map[string]vo.ConfigParam) error
func (*NacosManager) DeleteConfig ¶
func (nm *NacosManager) DeleteConfig(dataId, group string) error
func (*NacosManager) GetConfig ¶
func (nm *NacosManager) GetConfig(dataId, group string) (string, error)
func (*NacosManager) GetConfigWithRetry ¶
func (nm *NacosManager) GetConfigWithRetry(dataId, group string, maxRetries int) (string, error)
func (*NacosManager) ListenConfig ¶
func (nm *NacosManager) ListenConfig(dataId, group string, onChange func(string)) error
func (*NacosManager) SetConfig ¶
func (nm *NacosManager) SetConfig(dataId, group, content string) error
type PackageInfo ¶
type PackageWatcher ¶
type PackageWatcher struct {
// contains filtered or unexported fields
}
func (*PackageWatcher) Watch ¶
func (pw *PackageWatcher) Watch(ctx context.Context)
type PluginTemplateService ¶
type PluginTemplateService struct {
// contains filtered or unexported fields
}
func ProvidePluginTemplateService ¶
func ProvidePluginTemplateService(mongoClient *mongo.Client, config *Config) (*PluginTemplateService, error)
func (*PluginTemplateService) ConvertConfigToTOML ¶
func (pts *PluginTemplateService) ConvertConfigToTOML(config map[string]interface{}) (string, error)
ConvertConfigToTOML 将配置 map 转换为 TOML 格式的字符串
func (*PluginTemplateService) CreateTemplate ¶
func (pts *PluginTemplateService) CreateTemplate(ctx context.Context, tmpl *models.PluginTemplate) error
CreateTemplate 创建插件模板
func (*PluginTemplateService) DeleteTemplate ¶
func (pts *PluginTemplateService) DeleteTemplate(ctx context.Context, templateID string) error
DeleteTemplate 删除插件模板
func (*PluginTemplateService) EnsureIndexes ¶
func (pts *PluginTemplateService) EnsureIndexes(ctx context.Context) error
EnsureIndexes 创建索引
func (*PluginTemplateService) GetTemplate ¶
func (pts *PluginTemplateService) GetTemplate(ctx context.Context, templateID string) (*models.PluginTemplate, error)
GetTemplate 获取插件模板
func (*PluginTemplateService) GetTemplatesByType ¶
func (pts *PluginTemplateService) GetTemplatesByType(ctx context.Context, pluginType string) ([]*models.PluginTemplate, error)
GetTemplatesByType 根据插件类型获取模板列表
func (*PluginTemplateService) ListTemplates ¶
func (pts *PluginTemplateService) ListTemplates(ctx context.Context, filter map[string]interface{}) ([]*models.PluginTemplate, error)
ListTemplates 列出所有插件模板
func (*PluginTemplateService) PresetTemplates ¶
func (pts *PluginTemplateService) PresetTemplates(ctx context.Context) error
PresetTemplates 预置常用插件模板
func (*PluginTemplateService) RenderTemplate ¶
func (pts *PluginTemplateService) RenderTemplate(ctx context.Context, templateID string, params map[string]interface{}) (string, error)
RenderTemplate 渲染模板
func (*PluginTemplateService) RenderTemplateFromContent ¶
func (pts *PluginTemplateService) RenderTemplateFromContent(ctx context.Context, templateContent string, params map[string]interface{}) (string, error)
RenderTemplateFromContent 直接从模板内容渲染(不存储)
func (*PluginTemplateService) UpdateTemplate ¶
func (pts *PluginTemplateService) UpdateTemplate(ctx context.Context, templateID string, tmpl *models.PluginTemplate) error
UpdateTemplate 更新插件模板
func (*PluginTemplateService) ValidateTemplate ¶
func (pts *PluginTemplateService) ValidateTemplate(ctx context.Context, templateContent string) error
ValidateTemplate 验证模板语法
func (*PluginTemplateService) ValidateTemplateParameters ¶
func (pts *PluginTemplateService) ValidateTemplateParameters(ctx context.Context, templateID string, params map[string]interface{}) error
ValidateTemplateParameters 验证模板参数
type PrometheusForwarder ¶
type PrometheusForwarder struct {
UpstreamURL string
Client *http.Client
BufferSize int
FlushInterval time.Duration
AddressLabels map[string]map[string]string
RedisManager *RedisManager
AttachLabelKey string
// contains filtered or unexported fields
}
func ProvidePrometheusForwarder ¶
func ProvidePrometheusForwarder(config *ConfigManager, redisManager *RedisManager) *PrometheusForwarder
func (*PrometheusForwarder) ForwardHandler ¶
func (pf *PrometheusForwarder) ForwardHandler(w http.ResponseWriter, r *http.Request)
type RedisConfig ¶
type RedisManager ¶
type RedisManager struct {
// contains filtered or unexported fields
}
func ProvideRedisManager ¶
func ProvideRedisManager(config *Config) (*RedisManager, error)
func (*RedisManager) Close ¶
func (rm *RedisManager) Close() error
func (*RedisManager) GetClient ¶
func (rm *RedisManager) GetClient() *redis.Client
GetClient 返回 Redis 客户端,用于其他模块访问
func (*RedisManager) Start ¶
func (rm *RedisManager) Start() error
func (*RedisManager) Stop ¶
func (rm *RedisManager) Stop() error
func (*RedisManager) Subscribe ¶
func (rm *RedisManager) Subscribe() error
type RegistryManager ¶
type RegistryManager struct {
KeyManager *KeyManager
ConfigManager *ConfigManager
HostIdentifier string
// contains filtered or unexported fields
}
func ProvideRegistryManager ¶
func ProvideRegistryManager(km *KeyManager, cm *ConfigManager, mongoClient *mongo.Client) (*RegistryManager, error)
ProvideRegistryManager 为 Wire 依赖注入提供的构造函数
func (*RegistryManager) FirstByName ¶
func (rm *RegistryManager) FirstByName(serviceName string) (*models.ServiceInfo, error)
func (*RegistryManager) GetAgentCount ¶
func (rm *RegistryManager) GetAgentCount() (int, error)
func (*RegistryManager) GetAgentVariables ¶
func (rm *RegistryManager) GetAgentVariables(agentID string) (map[string]string, error)
func (*RegistryManager) GetControllerAddress ¶
func (rm *RegistryManager) GetControllerAddress() (string, error)
GetControllerAddress 返回 Controller 服务的地址
func (*RegistryManager) GetEtcdAddress ¶
func (rm *RegistryManager) GetEtcdAddress() (string, error)
GetEtcdAddress 返回 Etcd 服务的地址
func (*RegistryManager) GetService ¶
func (rm *RegistryManager) GetService(key string) (*models.ServiceInfo, error)
func (*RegistryManager) GetVariables ¶
func (*RegistryManager) ListAgents ¶
func (*RegistryManager) ListServices ¶
func (rm *RegistryManager) ListServices() []*models.ServiceInfo
func (*RegistryManager) RefreshService ¶
func (rm *RegistryManager) RefreshService(serviceID string, ttl time.Duration) error
func (*RegistryManager) RegisterService ¶
func (rm *RegistryManager) RegisterService(service *models.ServiceInfo, ttl time.Duration) error
func (*RegistryManager) SetAgentVariables ¶
func (rm *RegistryManager) SetAgentVariables(agentID string, variables map[string]string) error
func (*RegistryManager) Start ¶
func (rm *RegistryManager) Start() error
func (*RegistryManager) Stop ¶
func (rm *RegistryManager) Stop() error
func (*RegistryManager) UnregisterService ¶
func (rm *RegistryManager) UnregisterService(key string) error
func (*RegistryManager) UpdateServiceConfig ¶
func (rm *RegistryManager) UpdateServiceConfig(serviceID string, config map[string]interface{}) error
func (*RegistryManager) WatchServices ¶
func (rm *RegistryManager) WatchServices(ctx context.Context) (<-chan *models.ServiceEvent, error)
type ResourceManager ¶
type ResourceManager struct {
Config *Config
Resources *tools.SafeMap[string, *models.ResourceInfo]
KeyManager *KeyManager
// contains filtered or unexported fields
}
func ProvideResourceManager ¶
func ProvideResourceManager(km *KeyManager, c *Config) (*ResourceManager, error)
ProvideResourceManager 为 Wire 依赖注入提供的构造函数
func (*ResourceManager) ClearContainers ¶
func (rm *ResourceManager) ClearContainers() error
func (*ResourceManager) CreateResource ¶
func (rm *ResourceManager) CreateResource(resourceType models.ResourceType, containerName string, config map[string]interface{}, autoStart bool) (string, error)
func (*ResourceManager) DeleteResource ¶
func (rm *ResourceManager) DeleteResource(multiLevelKey string) error
func (*ResourceManager) GetAllResourcesStatus ¶
func (rm *ResourceManager) GetAllResourcesStatus() map[string]string
func (*ResourceManager) GetResourceFromContainerID ¶
func (rm *ResourceManager) GetResourceFromContainerID(containerID string) (string, error)
func (*ResourceManager) GetResourceInfo ¶
func (rm *ResourceManager) GetResourceInfo(multiLevelKey string) (*models.ResourceInfo, error)
func (*ResourceManager) GetResourceLogs ¶
func (rm *ResourceManager) GetResourceLogs(multiLevelKey string, tail int) (string, error)
func (*ResourceManager) GetResourceStatus ¶
func (rm *ResourceManager) GetResourceStatus(multiLevelKey string) (string, error)
func (*ResourceManager) GetStatus ¶
func (rm *ResourceManager) GetStatus() map[string]interface{}
func (*ResourceManager) ListAllResources ¶
func (rm *ResourceManager) ListAllResources() []string
func (*ResourceManager) RestartResource ¶
func (rm *ResourceManager) RestartResource(multiLevelKey string) error
func (*ResourceManager) Start ¶
func (rm *ResourceManager) Start() error
func (*ResourceManager) StartResource ¶
func (rm *ResourceManager) StartResource(multiLevelKey string) error
func (*ResourceManager) Stop ¶
func (rm *ResourceManager) Stop() error
func (*ResourceManager) StopResource ¶
func (rm *ResourceManager) StopResource(multiLevelKey string) error
type ResourceProvider ¶
type SNMPTargetConverter ¶
type SNMPTargetConverter struct {
// contains filtered or unexported fields
}
SNMPTargetConverter SNMP 目标转换器
func NewSNMPTargetConverter ¶
func NewSNMPTargetConverter(defaultPort int, defaultCommunity string, logger *zap.Logger) *SNMPTargetConverter
NewSNMPTargetConverter 创建 SNMP 目标转换器
func (*SNMPTargetConverter) Convert ¶
func (c *SNMPTargetConverter) Convert(targets []*CollectionTargetDTO, strategy *UnifiedCollectionStrategyDTO) (map[string]interface{}, error)
Convert 转换 SNMP 目标
func (*SNMPTargetConverter) Supports ¶
func (c *SNMPTargetConverter) Supports(collectionType string) bool
Supports 检查是否支持该采集类型
type SafeResourceMap ¶
type ServerConfig ¶
type ServerConfig struct {
GRPCPort int `yaml:"grpc_port" json:"grpc_port"` // gRPC端口,默认9512
HTTPPort int `yaml:"http_port" json:"http_port"` // HTTP端口,默认8080
}
ServerConfig 服务器配置
type ServiceInfo ¶
type StrategyMerger ¶
type StrategyMerger struct {
// contains filtered or unexported fields
}
StrategyMerger 策略合并器 负责合并全局、分组、实例策略,生成最终生效的策略
func NewStrategyMerger ¶
func NewStrategyMerger(logger *zap.Logger) *StrategyMerger
NewStrategyMerger 创建策略合并器
func (*StrategyMerger) GetEffectiveStrategyForAgent ¶
func (sm *StrategyMerger) GetEffectiveStrategyForAgent( ctx context.Context, agentCode string, globalStrategy *models.EnhancedGlobalMetricStrategy, groupStrategyManager *GroupStrategyManager, agentManager *AgentManager, instanceStrategy *models.EnhancedInstanceMetricStrategy, ) (*EffectiveStrategy, error)
GetEffectiveStrategyForAgent 获取 Agent 的生效策略
func (*StrategyMerger) MergeStrategies ¶
func (sm *StrategyMerger) MergeStrategies( ctx context.Context, globalStrategy *models.EnhancedGlobalMetricStrategy, groupStrategies []*models.GroupMetricStrategy, instanceStrategy *models.EnhancedInstanceMetricStrategy, ) (*EffectiveStrategy, error)
MergeStrategies 合并策略 优先级:实例策略 > 分组策略 > 全局策略
type StrategyToConfigConverter ¶
type StrategyToConfigConverter struct {
// contains filtered or unexported fields
}
StrategyToConfigConverter 策略到配置转换器 负责将指标采集策略转换为 Telegraf 配置 使用配置驱动,替代硬编码的指标映射
func NewStrategyToConfigConverter ¶
func NewStrategyToConfigConverter(logger *zap.Logger, configLoader *config.PluginMappingConfigLoader) *StrategyToConfigConverter
NewStrategyToConfigConverter 创建策略到配置转换器
func (*StrategyToConfigConverter) ConvertStrategyToConfig ¶
func (c *StrategyToConfigConverter) ConvertStrategyToConfig( ctx context.Context, globalStrategy *models.GlobalMetricStrategy, instanceStrategy *models.InstanceMetricStrategy, ) (map[string][]map[string]interface{}, map[string]interface{}, error)
ConvertStrategyToConfig 将策略转换为 Telegraf 配置(基于CollectionTasks) 返回插件配置映射:pluginType -> []pluginConfig
func (*StrategyToConfigConverter) GenerateConfigFromStrategy ¶
func (c *StrategyToConfigConverter) GenerateConfigFromStrategy( ctx context.Context, configGenerator *TelegrafConfigGenerator, globalStrategy *models.GlobalMetricStrategy, instanceStrategy *models.InstanceMetricStrategy, ) (string, error)
GenerateConfigFromStrategy 从策略生成完整的 Telegraf 配置 这是一个便捷方法,整合了策略转换和配置生成
func (*StrategyToConfigConverter) GetMetricPrefixesForPlugin ¶
func (c *StrategyToConfigConverter) GetMetricPrefixesForPlugin(pluginType string) []string
GetMetricPrefixesForPlugin 获取插件对应的指标前缀列表(公开方法)
func (*StrategyToConfigConverter) GetPluginForMetric ¶
func (c *StrategyToConfigConverter) GetPluginForMetric(metricName string) string
GetPluginForMetric 获取指标对应的插件类型(公开方法)
type SyslogManagerConfig ¶
type SyslogManagerConfig struct {
Port int `yaml:"port" json:"port"`
Protocol string `yaml:"protocol" json:"protocol"`
LokiEndpoint string `yaml:"loki_endpoint" json:"loki_endpoint"`
}
SyslogManagerConfig 定义了 Syslog 管理器的配置
type TargetConverter ¶
type TargetConverter interface {
Convert(targets []*CollectionTargetDTO, strategy *UnifiedCollectionStrategyDTO) (map[string]interface{}, error)
Supports(collectionType string) bool
}
TargetConverter 采集目标转换器接口
type TargetConverterRegistry ¶
type TargetConverterRegistry struct {
// contains filtered or unexported fields
}
TargetConverterRegistry 转换器注册表
func NewTargetConverterRegistry ¶
func NewTargetConverterRegistry(logger *zap.Logger, mapper CollectionTypeMapper) *TargetConverterRegistry
NewTargetConverterRegistry 创建转换器注册表
func (*TargetConverterRegistry) GetConverter ¶
func (r *TargetConverterRegistry) GetConverter(collectionType string) TargetConverter
GetConverter 获取转换器
func (*TargetConverterRegistry) Register ¶
func (r *TargetConverterRegistry) Register(collectionType string, converter TargetConverter)
Register 注册转换器
type TelegrafConfig ¶
type TelegrafConfig struct {
Enable *bool `yaml:"enable" json:"enable"` // 是否启用TelegrafManager,nil表示未设置(默认启用),true表示启用,false表示禁用
Image string `yaml:"image"`
Name string `yaml:"name"`
ConfigPath string `yaml:"config_path" json:"config_path"`
PrometheusPort int `yaml:"prometheus_port" json:"prometheus_port"`
LokiEndpoint string `yaml:"loki_endpoint" json:"loki_endpoint"`
PrometheusEndpoint string `yaml:"prometheus_endpoint" json:"prometheus_endpoint"`
}
type TelegrafConfigGenerator ¶
type TelegrafConfigGenerator struct {
TemplateService *PluginTemplateService
BaseConfigPath string
MetricMetadataService *MetricMetadataService // 指标元数据服务(从OneOps获取)
EnhancedConverter *EnhancedStrategyConverter // 增强的策略转换器(处理CollectionTasks)
CollectionTargetConverter *CollectionTargetConverter // 采集目标转换器(从OneOps DTO转换)
}
func ProvideTelegrafConfigGenerator ¶
func ProvideTelegrafConfigGenerator(templateService *PluginTemplateService, config *Config, metricMetadataService *MetricMetadataService) (*TelegrafConfigGenerator, error)
func (*TelegrafConfigGenerator) ConfigToTOML ¶
func (tcg *TelegrafConfigGenerator) ConfigToTOML(config map[string]interface{}) ([]byte, error)
ConfigToTOML 将配置转换为 TOML 格式(公开方法)
func (*TelegrafConfigGenerator) ExtractMetadataFromConfig ¶
func (tcg *TelegrafConfigGenerator) ExtractMetadataFromConfig(ctx context.Context, configStr string) ([]*models.MetricMetadata, error)
ExtractMetadataFromConfig 从配置中提取指标元数据
func (*TelegrafConfigGenerator) GenerateApplicationMetricsConfig ¶
func (tcg *TelegrafConfigGenerator) GenerateApplicationMetricsConfig(appTargets []map[string]interface{}) map[string]interface{}
GenerateApplicationMetricsConfig 从应用指标策略生成HTTP客户端配置
func (*TelegrafConfigGenerator) GenerateConfig ¶
func (tcg *TelegrafConfigGenerator) GenerateConfig(ctx context.Context, tasks []*models.MonitoringTask, standalonePlugins []*models.PluginConfig) (string, error)
GenerateConfig 生成完整的 telegraf 配置
func (*TelegrafConfigGenerator) GenerateConfigFromOneOpsDTO ¶
func (tcg *TelegrafConfigGenerator) GenerateConfigFromOneOpsDTO( ctx context.Context, collectionType string, strategy *UnifiedCollectionStrategyDTO, targets []*CollectionTargetDTO, ) (string, error)
GenerateConfigFromOneOpsDTO 从 OneOps DTO 生成配置 这是 OneOps 调用的主要方法,将 OneOps 的 DTO 转换为 Controller 的 CollectionTask,然后生成配置
func (*TelegrafConfigGenerator) GenerateConfigFromStrategy ¶
func (tcg *TelegrafConfigGenerator) GenerateConfigFromStrategy( ctx context.Context, globalStrategy *models.GlobalMetricStrategy, instanceStrategy *models.InstanceMetricStrategy, ) (string, error)
GenerateConfigFromStrategy 基于策略生成 Telegraf 配置(已废弃) 注意:Agent现在直接基于CollectionTasks执行采集,不再需要TOML配置 此方法保留用于向后兼容,但返回空字符串 Deprecated: Agent现在直接执行CollectionTasks,不再需要TOML配置
func (*TelegrafConfigGenerator) GenerateEBPFConfig ¶
func (tcg *TelegrafConfigGenerator) GenerateEBPFConfig(ctx context.Context, target map[string]interface{}) (map[string]interface{}, error)
GenerateEBPFConfig 生成eBPF采集配置(通过eBPF插件)
func (*TelegrafConfigGenerator) GenerateNETCONFConfig ¶
func (tcg *TelegrafConfigGenerator) GenerateNETCONFConfig(ctx context.Context, targets []map[string]interface{}) ([]map[string]interface{}, error)
GenerateNETCONFConfig 生成NETCONF采集配置(通过HTTP插件调用NETCONF API)
func (*TelegrafConfigGenerator) GeneratePacketCaptureConfig ¶
func (tcg *TelegrafConfigGenerator) GeneratePacketCaptureConfig(ctx context.Context, targets []map[string]interface{}) ([]map[string]interface{}, error)
GeneratePacketCaptureConfig 生成抓包采集配置(通过exec插件调用tcpdump)
func (*TelegrafConfigGenerator) GeneratePluginConfigFromRaw ¶
func (tcg *TelegrafConfigGenerator) GeneratePluginConfigFromRaw(config map[string]interface{}) (string, error)
GeneratePluginConfigFromRaw 从原始配置生成插件配置
func (*TelegrafConfigGenerator) GeneratePluginConfigFromTemplate ¶
func (tcg *TelegrafConfigGenerator) GeneratePluginConfigFromTemplate(ctx context.Context, templateID string, params map[string]interface{}) (string, error)
GeneratePluginConfigFromTemplate 从模板生成插件配置
func (*TelegrafConfigGenerator) GenerateSNMPConfig ¶
func (tcg *TelegrafConfigGenerator) GenerateSNMPConfig(snmpTargets []map[string]interface{}) map[string]interface{}
GenerateSNMPConfig 从SNMP采集策略生成SNMP input配置
func (*TelegrafConfigGenerator) GenerateSSHConfig ¶
func (tcg *TelegrafConfigGenerator) GenerateSSHConfig(ctx context.Context, targets []map[string]interface{}) ([]map[string]interface{}, error)
GenerateSSHConfig 生成SSH采集配置(通过exec插件执行SSH命令)
func (*TelegrafConfigGenerator) ValidateConfig ¶
func (tcg *TelegrafConfigGenerator) ValidateConfig(configStr string) error
ValidateConfig 验证配置有效性
type TelegrafConfigStruct ¶
type TelegrafConfigStruct struct {
GlobalTags map[string]string `toml:"global_tags,omitempty"`
Agent *AgentConfigStruct `toml:"agent,omitempty"`
Inputs map[string][]map[string]interface{} `toml:"inputs,omitempty"`
Outputs map[string][]map[string]interface{} `toml:"outputs,omitempty"`
}
TelegrafConfigStruct 表示 telegraf 配置结构(内部使用)
type TelegrafConfigValidator ¶
type TelegrafConfigValidator struct {
MetricMetadataService *MetricMetadataService
}
TelegrafConfigValidator Telegraf配置验证器
func NewTelegrafConfigValidator ¶
func NewTelegrafConfigValidator(metricMetadataService *MetricMetadataService) *TelegrafConfigValidator
NewTelegrafConfigValidator 创建配置验证器
func (*TelegrafConfigValidator) ValidateConfig ¶
func (v *TelegrafConfigValidator) ValidateConfig(ctx context.Context, configStr string) error
ValidateConfig 验证配置是否符合指标元数据规范
func (*TelegrafConfigValidator) ValidatePluginConfig ¶
func (v *TelegrafConfigValidator) ValidatePluginConfig(ctx context.Context, pluginType string, config map[string]interface{}) error
ValidatePluginConfig 验证单个插件配置
type TelegrafManager ¶
type TelegrafManager struct {
Config *Config
ResourceManager *ResourceManager
RegistryManager *RegistryManager
ConfigManager *ConfigManager
KeyManager *KeyManager
ConfigGenerator *TelegrafConfigGenerator
// contains filtered or unexported fields
}
func ProvideTelegrafManager ¶
func ProvideTelegrafManager(rm *ResourceManager, cm *ConfigManager, registry *RegistryManager, km *KeyManager, config *Config, configGen *TelegrafConfigGenerator) (*TelegrafManager, error)
func (*TelegrafManager) GetConfig ¶
func (tm *TelegrafManager) GetConfig() (map[string]interface{}, error)
func (*TelegrafManager) GetConfigVersion ¶
func (tm *TelegrafManager) GetConfigVersion() string
GetConfigVersion 获取配置版本
func (*TelegrafManager) GetLastReloadTime ¶
func (tm *TelegrafManager) GetLastReloadTime() *time.Time
GetLastReloadTime 获取最后重载时间
func (*TelegrafManager) GetStatus ¶
func (tm *TelegrafManager) GetStatus() map[string]interface{}
func (*TelegrafManager) GetTelegrafStatus ¶
func (tm *TelegrafManager) GetTelegrafStatus() (string, error)
func (*TelegrafManager) MultiLevelKey ¶
func (tm *TelegrafManager) MultiLevelKey() string
func (*TelegrafManager) ReloadConfig ¶
func (tm *TelegrafManager) ReloadConfig() error
ReloadConfig 重新加载 telegraf 配置
func (*TelegrafManager) Restart ¶
func (tm *TelegrafManager) Restart() error
func (*TelegrafManager) Start ¶
func (tm *TelegrafManager) Start() error
func (*TelegrafManager) Stop ¶
func (tm *TelegrafManager) Stop() error
func (*TelegrafManager) UpdateConfig ¶
func (tm *TelegrafManager) UpdateConfig(config map[string]interface{}) error
func (*TelegrafManager) UpdateConfigFromTasks ¶
func (tm *TelegrafManager) UpdateConfigFromTasks(ctx context.Context, tasks []*models.MonitoringTask, standalonePlugins []*models.PluginConfig) error
UpdateConfigFromTasks 从监控任务更新配置
func (*TelegrafManager) UpdateTelegrafConfig ¶
func (tm *TelegrafManager) UpdateTelegrafConfig(config map[string]interface{}) error
type UniOpsConfig ¶
type UniOpsConfig struct {
Address string `yaml:"address"`
Account string `yaml:"account"`
Password string `yaml:"password"`
// Webhook配置(用于事件驱动模式)
Webhook WebhookConfig `yaml:"webhook" json:"webhook"`
}
type UnifiedCollectionStrategyDTO ¶
type UnifiedCollectionStrategyDTO struct {
CollectionType string
DefaultInterval int
EffectiveConfig map[string]interface{}
}
UnifiedCollectionStrategyDTO OneOps 统一采集策略 DTO(简化版本)
type WebhookConfig ¶
type WebhookConfig struct {
URLs []string `yaml:"urls" json:"urls"` // OneOps Webhook URL列表
Secret string `yaml:"secret" json:"secret"` // 签名密钥
Timeout int `yaml:"timeout" json:"timeout"` // 请求超时(秒),默认10秒
RetryMaxAttempts int `yaml:"retry_max_attempts" json:"retry_max_attempts"` // 最大重试次数,默认3次
RetryInterval int `yaml:"retry_interval" json:"retry_interval"` // 重试间隔(秒),默认30秒
}
WebhookConfig Webhook配置
Source Files
¶
- agent_discovery.go
- agent_event_notifier.go
- agent_manager.go
- batch_operator.go
- collection_target_converter.go
- collection_task_service.go
- collection_type_mapper.go
- config_manager.go
- config_providers.go
- controller.go
- data_quality_checker.go
- data_quality_service.go
- database.go
- deployment.go
- enhanced_strategy_converter.go
- file_operation.go
- group_strategy_manager.go
- grpc_proxy_manager.go
- health_monitor.go
- jump_server_manager.go
- key_mananger.go
- loki_forwarder.go
- metric_metadata_service.go
- metrics_query.go
- metrics_strategy.go
- minio_manager.go
- monitoring_service.go
- nacos.go
- plugin_template_service.go
- prometheus_forwarder.go
- redis_manager.go
- registry_manager.go
- resource_manager.go
- strategy_merger.go
- strategy_to_config_converter.go
- target_converter.go
- target_converter_registry.go
- telegraf_config_generator.go
- telegraf_config_testdata.go
- telegraf_config_validator.go
- telegraf_manager.go
- template.go
- types.go
- variable_resover.go
- version.go
- wire_gen.go