Documentation
¶
Overview ¶
This package provides inspection tasks for Kubernetes node logs from Google Cloud Logging. The following is a Mermaid graph of the task dependencies within this package.
```mermaid graph TD
subgraph Inputs
InputProjectId
InputClusterName
InputNodeNameFilter
end
subgraph Log Fetching
ListLogEntries
end
subgraph Common Processing
LogSerializer
CommonFieldSetReader
end
subgraph Containerd Pipeline
ContainerdLogFilter
ContainerdLogGroup
ContainerdIDDiscovery
ContainerdHistoryModifier
end
subgraph Kubelet Pipeline
KubeletLogFilter
KubeletLogGroup
KubeletHistoryModifier
end
subgraph Other Pipeline
OtherLogFilter
OtherLogGroup
OtherHistoryModifier
end
subgraph Finalization
Tail
end
%% Input Dependencies
InputProjectId --> ListLogEntries
InputClusterName --> ListLogEntries
InputNodeNameFilter --> ListLogEntries
%% Common Processing Dependencies
ListLogEntries --> LogSerializer
ListLogEntries --> CommonFieldSetReader
%% Containerd Pipeline Dependencies
CommonFieldSetReader --> ContainerdLogFilter
ContainerdLogFilter --> ContainerdLogGroup
ContainerdLogFilter --> ContainerdIDDiscovery
ContainerdIDDiscovery --> ContainerdHistoryModifier
ContainerdLogGroup --> ContainerdHistoryModifier
LogSerializer --> ContainerdHistoryModifier
%% Kubelet Pipeline Dependencies
CommonFieldSetReader --> KubeletLogFilter
KubeletLogFilter --> KubeletLogGroup
ContainerdIDDiscovery --> KubeletHistoryModifier
KubeletLogGroup --> KubeletHistoryModifier
LogSerializer --> KubeletHistoryModifier
%% Other Pipeline Dependencies
CommonFieldSetReader --> OtherLogFilter
OtherLogFilter --> OtherLogGroup
OtherLogGroup --> OtherHistoryModifier
LogSerializer --> OtherHistoryModifier
%% Finalization
ContainerdHistoryModifier --> Tail
KubeletHistoryModifier --> Tail
OtherHistoryModifier --> Tail
```
Index ¶
Constants ¶
View Source
const ContainerdStartingMsg = "starting containerd"
View Source
const ContainerdTerminationMsg = "Stop CRI service"
Variables ¶
View Source
var CommonFieldSetReaderTask = inspectiontaskbase.NewFieldSetReadTask(googlecloudlogk8snode_contract.CommonFieldsetReaderTaskID, googlecloudlogk8snode_contract.ListLogEntriesTaskID.Ref(), []log.FieldSetReader{ &googlecloudlogk8snode_contract.K8sNodeLogCommonFieldSetReader{ StructuredLogParser: logutil.NewMultiTextLogParser( logutil.NewKLogTextParser(true), logutil.NewLogfmtTextParser(), &logutil.FallbackRawTextLogParser{}, ), }, })
View Source
var ContainerIDDiscoveryTask = commonlogk8sauditv2_contract.ContainerIDInventoryBuilder.DiscoveryTask(googlecloudlogk8snode_contract.ContainerIDDiscoveryTaskID, []taskid.UntypedTaskReference{ googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref(), }, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (commonlogk8sauditv2_contract.ContainerIDToContainerIdentity, error) { if taskMode == inspectioncore_contract.TaskModeDryRun { return nil, nil } logs := coretask.GetTaskResult(ctx, googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref()) doneLogCount := atomic.Int32{} updator := progressutil.NewProgressUpdator(progress, time.Second, func(tp *inspectionmetadata.TaskProgressMetadata) { current := doneLogCount.Load() if len(logs) > 0 { tp.Percentage = float32(current) / float32(len(logs)) } tp.Message = fmt.Sprintf("%d/%d", current, len(logs)) }) updator.Start(ctx) defer updator.Done() result := commonlogk8sauditv2_contract.ContainerIDToContainerIdentity{} logChan := make(chan *log.Log) errGrp, childRoutineCtx := errgroup.WithContext(ctx) containerIdentitiesChan := make(chan *commonlogk8sauditv2_contract.ContainerIdentity, runtime.GOMAXPROCS(0)) for i := 0; i < runtime.GOMAXPROCS(0); i++ { errGrp.Go(func() error { for { select { case <-childRoutineCtx.Done(): return childRoutineCtx.Err() case l, ok := <-logChan: if !ok { return nil } processContainerIDDiscoveryForLog(ctx, l, containerIdentitiesChan) doneLogCount.Add(1) } } }) } consumerGrp, childConsumerRoutineCtx := errgroup.WithContext(ctx) consumerGrp.Go(func() error { for { select { case <-childConsumerRoutineCtx.Done(): return childConsumerRoutineCtx.Err() case c, ok := <-containerIdentitiesChan: if !ok { return nil } result[c.ContainerID] = c } } }) for _, l := range logs { logChan <- l } close(logChan) err := errGrp.Wait() close(containerIdentitiesChan) consumerErr := consumerGrp.Wait() if err != nil { return nil, err } if consumerErr != nil { return nil, consumerErr } return result, nil }, )
View Source
var ContainerdLogFilterTask = newParserTypeFilterTask(googlecloudlogk8snode_contract.ContainerdLogFilterTaskID, googlecloudlogk8snode_contract.CommonFieldsetReaderTaskID.Ref(), googlecloudlogk8snode_contract.Containerd)
View Source
var ContainerdLogGroupTask = newNodeAndComponentNameGrouperTask(googlecloudlogk8snode_contract.ContainerdLogGroupTaskID, googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref())
View Source
var ContainerdNodeLogHistoryModifierTask = inspectiontaskbase.NewHistoryModifierTask[struct{}](googlecloudlogk8snode_contract.ContainerdLogHistoryModifierTaskID, &containerdNodeLogHistoryModifierSetting{})
View Source
var KubeletLogFilterTask = newParserTypeFilterTask(googlecloudlogk8snode_contract.KubeletLogFilterTaskID, googlecloudlogk8snode_contract.CommonFieldsetReaderTaskID.Ref(), googlecloudlogk8snode_contract.Kubelet)
View Source
var KubeletLogGroupTask = newNodeAndComponentNameGrouperTask(googlecloudlogk8snode_contract.KubeletLogGroupTaskID, googlecloudlogk8snode_contract.KubeletLogFilterTaskID.Ref())
View Source
var KubeletLogHistoryModifierTask = inspectiontaskbase.NewHistoryModifierTask[struct{}](googlecloudlogk8snode_contract.KubeletLogHistoryModifierTaskID, &kubeletNodeLogHistoryModifierSetting{})
View Source
var ListLogEntriesTask = googlecloudcommon_contract.NewListLogEntriesTask(&k8snodeListLogEntriesTaskSetting{})
View Source
var LogSerializerTask = inspectiontaskbase.NewLogSerializerTask(googlecloudlogk8snode_contract.LogSerializerTaskID, googlecloudlogk8snode_contract.ListLogEntriesTaskID.Ref())
LogSerializerTask serializes logs to history for history modifiers to associate event or revisions in later tasks. No node logs are discarded, thus this LogSerializerTask simply receives logs from the ListLogEntriesTask.
View Source
var OtherLogFilterTask = newParserTypeFilterTask(googlecloudlogk8snode_contract.OtherLogFilterTaskID, googlecloudlogk8snode_contract.CommonFieldsetReaderTaskID.Ref(), googlecloudlogk8snode_contract.Other)
View Source
var OtherLogGroupTask = newNodeAndComponentNameGrouperTask(googlecloudlogk8snode_contract.OtherLogGroupTaskID, googlecloudlogk8snode_contract.OtherLogFilterTaskID.Ref())
View Source
var OtherLogHistoryModifierTask = inspectiontaskbase.NewHistoryModifierTask[struct{}](googlecloudlogk8snode_contract.OtherLogHistoryModifierTaskID, &otherNodeLogHistoryModifierSetting{ StartingMessagesByComponent: map[string]string{ "dockerd": "Starting up", "configure.sh": "Start to install kubernetes files", "configure-helper.sh": "Start to configure instance for kubernetes", }, TerminatingMessagesByComponent: map[string]string{ "dockerd": "Daemon shutdown complete", "configure.sh": "Done for installing kubernetes files", "configure-helper.sh": "Done for the configuration for kubernetes", }, })
View Source
var PodSandboxIDDiscoveryTask = inspectiontaskbase.NewProgressReportableInspectionTask(googlecloudlogk8snode_contract.PodSandboxIDDiscoveryTaskID, []taskid.UntypedTaskReference{ googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref(), }, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (patternfinder.PatternFinder[*googlecloudlogk8snode_contract.PodSandboxIDInfo], error) { if taskMode == inspectioncore_contract.TaskModeDryRun { return nil, nil } logs := coretask.GetTaskResult(ctx, googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref()) doneLogCount := atomic.Int32{} updator := progressutil.NewProgressUpdator(progress, time.Second, func(tp *inspectionmetadata.TaskProgressMetadata) { current := doneLogCount.Load() if len(logs) > 0 { tp.Percentage = float32(current) / float32(len(logs)) } tp.Message = fmt.Sprintf("%d/%d", current, len(logs)) }) updator.Start(ctx) defer updator.Done() logChan := make(chan *log.Log) errGrp, childCtx := errgroup.WithContext(ctx) podSandboxIDFinder := patternfinder.NewTriePatternFinder[*googlecloudlogk8snode_contract.PodSandboxIDInfo]() for i := 0; i < runtime.GOMAXPROCS(0); i++ { errGrp.Go(func() error { for { select { case <-childCtx.Done(): return childCtx.Err() case l, ok := <-logChan: if !ok { return nil } processPodSandboxIDDiscoveryForLog(ctx, l, podSandboxIDFinder) doneLogCount.Add(1) } } }) } for _, l := range logs { logChan <- l } close(logChan) errGrp.Wait() return podSandboxIDFinder, nil }, )
View Source
var TailTask = inspectiontaskbase.NewInspectionTask(googlecloudlogk8snode_contract.TailTaskID, []taskid.UntypedTaskReference{ googlecloudlogk8snode_contract.ContainerdLogHistoryModifierTaskID.Ref(), googlecloudlogk8snode_contract.KubeletLogHistoryModifierTaskID.Ref(), googlecloudlogk8snode_contract.OtherLogHistoryModifierTaskID.Ref(), googlecloudlogk8snode_contract.ContainerIDDiscoveryTaskID.Ref(), }, func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (struct{}, error) { return struct{}{}, nil }, inspectioncore_contract.FeatureTaskLabel( "Kubernetes Node Logs", "Gather node components(e.g docker/container) logs. Log volume can be huge when the cluster has many nodes.", enum.LogTypeControlPlaneComponent, 3000, false, googlecloudinspectiontypegroup_contract.GCPK8sClusterInspectionTypes..., ), )
Functions ¶
func GenerateK8sNodeLogQuery ¶
func GenerateK8sNodeLogQuery(projectId string, clusterId string, nodeNameSubstrings []string) string
GenerateK8sNodeLogQuery generates a query for GKE node logs.
func Register ¶
func Register(registry coreinspection.InspectionTaskRegistry) error
Register registers all googlecloudlogk8snode inspection tasks to the registry.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.