Documentation
¶
Overview ¶
This package provides inspection tasks for Kubernetes node logs from Google Cloud Logging. The following is a Mermaid graph of the task dependencies within this package.
```mermaid graph TD
subgraph Inputs
InputProjectId
InputClusterName
InputNodeNameFilter
end
subgraph Log Fetching
ListLogEntries
end
subgraph Common Processing
LogSerializer
CommonFieldSetReader
end
subgraph Containerd Pipeline
ContainerdLogFilter
ContainerdLogGroup
ContainerdIDDiscovery
ContainerdLogToTimelineMapper
end
subgraph Kubelet Pipeline
KubeletLogFilter
KubeletLogGroup
KubeletLogToTimelineMapper
end
subgraph Other Pipeline
OtherLogFilter
OtherLogGroup
OtherLogToTimelineMapper
end
subgraph Finalization
Tail
end
%% Input Dependencies
InputProjectId --> ListLogEntries
InputClusterName --> ListLogEntries
InputNodeNameFilter --> ListLogEntries
%% Common Processing Dependencies
ListLogEntries --> LogSerializer
ListLogEntries --> CommonFieldSetReader
%% Containerd Pipeline Dependencies
CommonFieldSetReader --> ContainerdLogFilter
ContainerdLogFilter --> ContainerdLogGroup
ContainerdLogFilter --> ContainerdIDDiscovery
ContainerdIDDiscovery --> ContainerdLogToTimelineMapper
ContainerdLogGroup --> ContainerdLogToTimelineMapper
LogSerializer --> ContainerdLogToTimelineMapper
%% Kubelet Pipeline Dependencies
CommonFieldSetReader --> KubeletLogFilter
KubeletLogFilter --> KubeletLogGroup
ContainerdIDDiscovery --> KubeletLogToTimelineMapper
KubeletLogGroup --> KubeletLogToTimelineMapper
LogSerializer --> KubeletLogToTimelineMapper
%% Other Pipeline Dependencies
CommonFieldSetReader --> OtherLogFilter
OtherLogFilter --> OtherLogGroup
OtherLogGroup --> OtherLogToTimelineMapper
LogSerializer --> OtherLogToTimelineMapper
%% Finalization
ContainerdLogToTimelineMapper --> Tail
KubeletLogToTimelineMapper --> Tail
OtherLogToTimelineMapper --> Tail
```
Index ¶
Constants ¶
View Source
const ContainerdStartingMsg = "starting containerd"
View Source
const ContainerdTerminationMsg = "Stop CRI service"
Variables ¶
View Source
var CommonFieldSetReaderTask = inspectiontaskbase.NewFieldSetReadTask(googlecloudlogk8snode_contract.CommonFieldsetReaderTaskID, googlecloudlogk8snode_contract.ListLogEntriesTaskID.Ref(), []log.FieldSetReader{ &googlecloudlogk8snode_contract.K8sNodeLogCommonFieldSetReader{ StructuredLogParser: logutil.NewMultiTextLogParser( logutil.NewKLogTextParser(true), logutil.NewLogfmtTextParser(), &logutil.FallbackRawTextLogParser{}, ), }, })
View Source
var ContainerIDDiscoveryTask = commonlogk8sauditv2_contract.ContainerIDInventoryBuilder.DiscoveryTask(googlecloudlogk8snode_contract.ContainerIDDiscoveryTaskID, []taskid.UntypedTaskReference{ googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref(), }, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (commonlogk8sauditv2_contract.ContainerIDToContainerIdentity, error) { if taskMode == inspectioncore_contract.TaskModeDryRun { return nil, nil } logs := coretask.GetTaskResult(ctx, googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref()) doneLogCount := atomic.Int32{} updator := progressutil.NewProgressUpdator(progress, time.Second, func(tp *inspectionmetadata.TaskProgressMetadata) { current := doneLogCount.Load() if len(logs) > 0 { tp.Percentage = float32(current) / float32(len(logs)) } tp.Message = fmt.Sprintf("%d/%d", current, len(logs)) }) updator.Start(ctx) defer updator.Done() result := commonlogk8sauditv2_contract.ContainerIDToContainerIdentity{} logChan := make(chan *log.Log) errGrp, childRoutineCtx := errgroup.WithContext(ctx) containerIdentitiesChan := make(chan *commonlogk8sauditv2_contract.ContainerIdentity, runtime.GOMAXPROCS(0)) for i := 0; i < runtime.GOMAXPROCS(0); i++ { errGrp.Go(func() error { for { select { case <-childRoutineCtx.Done(): return childRoutineCtx.Err() case l, ok := <-logChan: if !ok { return nil } processContainerIDDiscoveryForLog(ctx, l, containerIdentitiesChan) doneLogCount.Add(1) } } }) } consumerGrp, childConsumerRoutineCtx := errgroup.WithContext(ctx) consumerGrp.Go(func() error { for { select { case <-childConsumerRoutineCtx.Done(): return childConsumerRoutineCtx.Err() case c, ok := <-containerIdentitiesChan: if !ok { return nil } result[c.ContainerID] = c } } }) for _, l := range logs { logChan <- l } close(logChan) err := errGrp.Wait() close(containerIdentitiesChan) consumerErr := consumerGrp.Wait() if err != nil { return nil, err } if consumerErr != nil { return nil, consumerErr } return result, nil }, )
View Source
var ContainerdLogFilterTask = newParserTypeFilterTask(googlecloudlogk8snode_contract.ContainerdLogFilterTaskID, googlecloudlogk8snode_contract.CommonFieldsetReaderTaskID.Ref(), googlecloudlogk8snode_contract.Containerd)
View Source
var ContainerdLogGroupTask = newNodeAndComponentNameGrouperTask(googlecloudlogk8snode_contract.ContainerdLogGroupTaskID, googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref())
View Source
var ContainerdNodeLogLogToTimelineMapperTask = inspectiontaskbase.NewLogToTimelineMapperTask[struct{}](googlecloudlogk8snode_contract.ContainerdLogLogToTimelineMapperTaskID, &containerdNodeLogLogToTimelineMapperSetting{})
View Source
var KubeletLogFilterTask = newParserTypeFilterTask(googlecloudlogk8snode_contract.KubeletLogFilterTaskID, googlecloudlogk8snode_contract.CommonFieldsetReaderTaskID.Ref(), googlecloudlogk8snode_contract.Kubelet)
View Source
var KubeletLogGroupTask = newNodeAndComponentNameGrouperTask(googlecloudlogk8snode_contract.KubeletLogGroupTaskID, googlecloudlogk8snode_contract.KubeletLogFilterTaskID.Ref())
View Source
var KubeletLogLogToTimelineMapperTask = inspectiontaskbase.NewLogToTimelineMapperTask[struct{}](googlecloudlogk8snode_contract.KubeletLogLogToTimelineMapperTaskID, &kubeletNodeLogLogToTimelineMapperSetting{})
View Source
var ListLogEntriesTask = googlecloudcommon_contract.NewListLogEntriesTask(&k8snodeListLogEntriesTaskSetting{})
View Source
var LogIngesterTask = inspectiontaskbase.NewLogIngesterTask(googlecloudlogk8snode_contract.LogIngesterTaskID, googlecloudlogk8snode_contract.ListLogEntriesTaskID.Ref())
LogIngesterTask serializes logs to history for timeline mappers to associate event or revisions in later tasks. No node logs are discarded, thus this LogIngesterTask simply receives logs from the ListLogEntriesTask.
View Source
var OtherLogFilterTask = newParserTypeFilterTask(googlecloudlogk8snode_contract.OtherLogFilterTaskID, googlecloudlogk8snode_contract.CommonFieldsetReaderTaskID.Ref(), googlecloudlogk8snode_contract.Other)
View Source
var OtherLogGroupTask = newNodeAndComponentNameGrouperTask(googlecloudlogk8snode_contract.OtherLogGroupTaskID, googlecloudlogk8snode_contract.OtherLogFilterTaskID.Ref())
View Source
var OtherLogLogToTimelineMapperTask = inspectiontaskbase.NewLogToTimelineMapperTask[struct{}](googlecloudlogk8snode_contract.OtherLogLogToTimelineMapperTaskID, &otherNodeLogLogToTimelineMapperSetting{ StartingMessagesByComponent: map[string]string{ "dockerd": "Starting up", "configure.sh": "Start to install kubernetes files", "configure-helper.sh": "Start to configure instance for kubernetes", }, TerminatingMessagesByComponent: map[string]string{ "dockerd": "Daemon shutdown complete", "configure.sh": "Done for installing kubernetes files", "configure-helper.sh": "Done for the configuration for kubernetes", }, })
View Source
var PodSandboxIDDiscoveryTask = inspectiontaskbase.NewProgressReportableInspectionTask(googlecloudlogk8snode_contract.PodSandboxIDDiscoveryTaskID, []taskid.UntypedTaskReference{ googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref(), }, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (patternfinder.PatternFinder[*googlecloudlogk8snode_contract.PodSandboxIDInfo], error) { if taskMode == inspectioncore_contract.TaskModeDryRun { return nil, nil } logs := coretask.GetTaskResult(ctx, googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref()) doneLogCount := atomic.Int32{} updator := progressutil.NewProgressUpdator(progress, time.Second, func(tp *inspectionmetadata.TaskProgressMetadata) { current := doneLogCount.Load() if len(logs) > 0 { tp.Percentage = float32(current) / float32(len(logs)) } tp.Message = fmt.Sprintf("%d/%d", current, len(logs)) }) updator.Start(ctx) defer updator.Done() logChan := make(chan *log.Log) errGrp, childCtx := errgroup.WithContext(ctx) podSandboxIDFinder := patternfinder.NewTriePatternFinder[*googlecloudlogk8snode_contract.PodSandboxIDInfo]() for i := 0; i < runtime.GOMAXPROCS(0); i++ { errGrp.Go(func() error { for { select { case <-childCtx.Done(): return childCtx.Err() case l, ok := <-logChan: if !ok { return nil } processPodSandboxIDDiscoveryForLog(ctx, l, podSandboxIDFinder) doneLogCount.Add(1) } } }) } for _, l := range logs { logChan <- l } close(logChan) errGrp.Wait() return podSandboxIDFinder, nil }, )
View Source
var TailTask = inspectiontaskbase.NewInspectionTask(googlecloudlogk8snode_contract.TailTaskID, []taskid.UntypedTaskReference{ googlecloudlogk8snode_contract.ContainerdLogLogToTimelineMapperTaskID.Ref(), googlecloudlogk8snode_contract.KubeletLogLogToTimelineMapperTaskID.Ref(), googlecloudlogk8snode_contract.OtherLogLogToTimelineMapperTaskID.Ref(), googlecloudlogk8snode_contract.ContainerIDDiscoveryTaskID.Ref(), }, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (struct{}, error) { return struct{}{}, nil }, inspectioncore_contract.FeatureTaskLabel( "Kubernetes Node Logs", "Gather node components(e.g docker/container) logs. Log volume can be huge when the cluster has many nodes.", enum.LogTypeControlPlaneComponent, 3000, false, googlecloudinspectiontypegroup_contract.GCPK8sClusterInspectionTypes..., ), )
Functions ¶
func GenerateK8sNodeLogQuery ¶
func GenerateK8sNodeLogQuery(projectId string, clusterId string, nodeNameSubstrings []string) string
GenerateK8sNodeLogQuery generates a query for GKE node logs.
func Register ¶
func Register(registry coreinspection.InspectionTaskRegistry) error
Register registers all googlecloudlogk8snode inspection tasks to the registry.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.