googlecloudlogk8snode_impl

package
v0.51.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2026 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

This package provides inspection tasks for Kubernetes node logs from Google Cloud Logging. The following is a Mermaid graph of the task dependencies within this package.

```mermaid graph TD

subgraph Inputs
    InputProjectId
    InputClusterName
    InputNodeNameFilter
end

subgraph Log Fetching
    ListLogEntries
end

subgraph Common Processing
    LogSerializer
    CommonFieldSetReader
end

subgraph Containerd Pipeline
    ContainerdLogFilter
    ContainerdLogGroup
    ContainerdIDDiscovery
    ContainerdLogToTimelineMapper
end

subgraph Kubelet Pipeline
    KubeletLogFilter
    KubeletLogGroup
    KubeletLogToTimelineMapper
end

subgraph Other Pipeline
    OtherLogFilter
    OtherLogGroup
    OtherLogToTimelineMapper
end

subgraph Finalization
    Tail
end

%% Input Dependencies
InputProjectId --> ListLogEntries
InputClusterName --> ListLogEntries
InputNodeNameFilter --> ListLogEntries

%% Common Processing Dependencies
ListLogEntries --> LogSerializer
ListLogEntries --> CommonFieldSetReader

%% Containerd Pipeline Dependencies
CommonFieldSetReader --> ContainerdLogFilter
ContainerdLogFilter --> ContainerdLogGroup
ContainerdLogFilter --> ContainerdIDDiscovery
ContainerdIDDiscovery --> ContainerdLogToTimelineMapper
ContainerdLogGroup --> ContainerdLogToTimelineMapper
LogSerializer --> ContainerdLogToTimelineMapper

%% Kubelet Pipeline Dependencies
CommonFieldSetReader --> KubeletLogFilter
KubeletLogFilter --> KubeletLogGroup
ContainerdIDDiscovery --> KubeletLogToTimelineMapper
KubeletLogGroup --> KubeletLogToTimelineMapper
LogSerializer --> KubeletLogToTimelineMapper

%% Other Pipeline Dependencies
CommonFieldSetReader --> OtherLogFilter
OtherLogFilter --> OtherLogGroup
OtherLogGroup --> OtherLogToTimelineMapper
LogSerializer --> OtherLogToTimelineMapper

%% Finalization
ContainerdLogToTimelineMapper --> Tail
KubeletLogToTimelineMapper --> Tail
OtherLogToTimelineMapper --> Tail

```

Index

Constants

View Source
const ContainerdStartingMsg = "starting containerd"
View Source
const ContainerdTerminationMsg = "Stop CRI service"

Variables

View Source
var ContainerIDDiscoveryTask = commonlogk8sauditv2_contract.ContainerIDInventoryBuilder.DiscoveryTask(googlecloudlogk8snode_contract.ContainerIDDiscoveryTaskID,
	[]taskid.UntypedTaskReference{
		googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref(),
	},
	func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (commonlogk8sauditv2_contract.ContainerIDToContainerIdentity, error) {
		if taskMode == inspectioncore_contract.TaskModeDryRun {
			return nil, nil
		}

		logs := coretask.GetTaskResult(ctx, googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref())

		doneLogCount := atomic.Int32{}
		updator := progressutil.NewProgressUpdator(progress, time.Second, func(tp *inspectionmetadata.TaskProgressMetadata) {
			current := doneLogCount.Load()
			if len(logs) > 0 {
				tp.Percentage = float32(current) / float32(len(logs))
			}
			tp.Message = fmt.Sprintf("%d/%d", current, len(logs))
		})
		updator.Start(ctx)
		defer updator.Done()

		result := commonlogk8sauditv2_contract.ContainerIDToContainerIdentity{}
		logChan := make(chan *log.Log)
		errGrp, childRoutineCtx := errgroup.WithContext(ctx)
		containerIdentitiesChan := make(chan *commonlogk8sauditv2_contract.ContainerIdentity, runtime.GOMAXPROCS(0))
		for i := 0; i < runtime.GOMAXPROCS(0); i++ {
			errGrp.Go(func() error {
				for {
					select {
					case <-childRoutineCtx.Done():
						return childRoutineCtx.Err()
					case l, ok := <-logChan:
						if !ok {
							return nil
						}
						processContainerIDDiscoveryForLog(ctx, l, containerIdentitiesChan)
						doneLogCount.Add(1)
					}
				}
			})
		}
		consumerGrp, childConsumerRoutineCtx := errgroup.WithContext(ctx)
		consumerGrp.Go(func() error {
			for {
				select {
				case <-childConsumerRoutineCtx.Done():
					return childConsumerRoutineCtx.Err()
				case c, ok := <-containerIdentitiesChan:
					if !ok {
						return nil
					}
					result[c.ContainerID] = c
				}
			}
		})

		for _, l := range logs {
			logChan <- l
		}
		close(logChan)
		err := errGrp.Wait()
		close(containerIdentitiesChan)
		consumerErr := consumerGrp.Wait()
		if err != nil {
			return nil, err
		}
		if consumerErr != nil {
			return nil, consumerErr
		}

		return result, nil
	},
)
View Source
var ContainerdNodeLogLogToTimelineMapperTask = inspectiontaskbase.NewLogToTimelineMapperTask[struct{}](googlecloudlogk8snode_contract.ContainerdLogLogToTimelineMapperTaskID, &containerdNodeLogLogToTimelineMapperSetting{})
View Source
var KubeletLogLogToTimelineMapperTask = inspectiontaskbase.NewLogToTimelineMapperTask[struct{}](googlecloudlogk8snode_contract.KubeletLogLogToTimelineMapperTaskID, &kubeletNodeLogLogToTimelineMapperSetting{})
View Source
var ListLogEntriesTask = googlecloudcommon_contract.NewListLogEntriesTask(&k8snodeListLogEntriesTaskSetting{})

LogIngesterTask serializes logs to history for timeline mappers to associate event or revisions in later tasks. No node logs are discarded, thus this LogIngesterTask simply receives logs from the ListLogEntriesTask.

View Source
var OtherLogLogToTimelineMapperTask = inspectiontaskbase.NewLogToTimelineMapperTask[struct{}](googlecloudlogk8snode_contract.OtherLogLogToTimelineMapperTaskID, &otherNodeLogLogToTimelineMapperSetting{
	StartingMessagesByComponent: map[string]string{
		"dockerd":             "Starting up",
		"configure.sh":        "Start to install kubernetes files",
		"configure-helper.sh": "Start to configure instance for kubernetes",
	},
	TerminatingMessagesByComponent: map[string]string{
		"dockerd":             "Daemon shutdown complete",
		"configure.sh":        "Done for installing kubernetes files",
		"configure-helper.sh": "Done for the configuration for kubernetes",
	},
})
View Source
var PodSandboxIDDiscoveryTask = inspectiontaskbase.NewProgressReportableInspectionTask(googlecloudlogk8snode_contract.PodSandboxIDDiscoveryTaskID,
	[]taskid.UntypedTaskReference{
		googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref(),
	},
	func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (patternfinder.PatternFinder[*googlecloudlogk8snode_contract.PodSandboxIDInfo], error) {
		if taskMode == inspectioncore_contract.TaskModeDryRun {
			return nil, nil
		}
		logs := coretask.GetTaskResult(ctx, googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref())

		doneLogCount := atomic.Int32{}
		updator := progressutil.NewProgressUpdator(progress, time.Second, func(tp *inspectionmetadata.TaskProgressMetadata) {
			current := doneLogCount.Load()
			if len(logs) > 0 {
				tp.Percentage = float32(current) / float32(len(logs))
			}
			tp.Message = fmt.Sprintf("%d/%d", current, len(logs))
		})
		updator.Start(ctx)
		defer updator.Done()

		logChan := make(chan *log.Log)
		errGrp, childCtx := errgroup.WithContext(ctx)
		podSandboxIDFinder := patternfinder.NewTriePatternFinder[*googlecloudlogk8snode_contract.PodSandboxIDInfo]()
		for i := 0; i < runtime.GOMAXPROCS(0); i++ {
			errGrp.Go(func() error {
				for {
					select {
					case <-childCtx.Done():
						return childCtx.Err()
					case l, ok := <-logChan:
						if !ok {
							return nil
						}
						processPodSandboxIDDiscoveryForLog(ctx, l, podSandboxIDFinder)
						doneLogCount.Add(1)
					}
				}
			})
		}

		for _, l := range logs {
			logChan <- l
		}
		close(logChan)
		errGrp.Wait()

		return podSandboxIDFinder, nil
	},
)

Functions

func GenerateK8sNodeLogQuery

func GenerateK8sNodeLogQuery(projectId string, clusterId string, nodeNameSubstrings []string) string

GenerateK8sNodeLogQuery generates a query for GKE node logs.

func Register

func Register(registry coreinspection.InspectionTaskRegistry) error

Register registers all googlecloudlogk8snode inspection tasks to the registry.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL