googlecloudlogk8snode_impl

package
v0.50.0-beta-2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2025 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

This package provides inspection tasks for Kubernetes node logs from Google Cloud Logging. The following is a Mermaid graph of the task dependencies within this package.

```mermaid graph TD

subgraph Inputs
    InputProjectId
    InputClusterName
    InputNodeNameFilter
end

subgraph Log Fetching
    ListLogEntries
end

subgraph Common Processing
    LogSerializer
    CommonFieldSetReader
end

subgraph Containerd Pipeline
    ContainerdLogFilter
    ContainerdLogGroup
    ContainerdIDDiscovery
    ContainerdHistoryModifier
end

subgraph Kubelet Pipeline
    KubeletLogFilter
    KubeletLogGroup
    KubeletHistoryModifier
end

subgraph Other Pipeline
    OtherLogFilter
    OtherLogGroup
    OtherHistoryModifier
end

subgraph Finalization
    Tail
end

%% Input Dependencies
InputProjectId --> ListLogEntries
InputClusterName --> ListLogEntries
InputNodeNameFilter --> ListLogEntries

%% Common Processing Dependencies
ListLogEntries --> LogSerializer
ListLogEntries --> CommonFieldSetReader

%% Containerd Pipeline Dependencies
CommonFieldSetReader --> ContainerdLogFilter
ContainerdLogFilter --> ContainerdLogGroup
ContainerdLogFilter --> ContainerdIDDiscovery
ContainerdIDDiscovery --> ContainerdHistoryModifier
ContainerdLogGroup --> ContainerdHistoryModifier
LogSerializer --> ContainerdHistoryModifier

%% Kubelet Pipeline Dependencies
CommonFieldSetReader --> KubeletLogFilter
KubeletLogFilter --> KubeletLogGroup
ContainerdIDDiscovery --> KubeletHistoryModifier
KubeletLogGroup --> KubeletHistoryModifier
LogSerializer --> KubeletHistoryModifier

%% Other Pipeline Dependencies
CommonFieldSetReader --> OtherLogFilter
OtherLogFilter --> OtherLogGroup
OtherLogGroup --> OtherHistoryModifier
LogSerializer --> OtherHistoryModifier

%% Finalization
ContainerdHistoryModifier --> Tail
KubeletHistoryModifier --> Tail
OtherHistoryModifier --> Tail

```

Index

Constants

View Source
const ContainerdStartingMsg = "starting containerd"
View Source
const ContainerdTerminationMsg = "Stop CRI service"

Variables

View Source
var ContainerIDDiscoveryTask = commonlogk8sauditv2_contract.ContainerIDInventoryBuilder.DiscoveryTask(googlecloudlogk8snode_contract.ContainerIDDiscoveryTaskID,
	[]taskid.UntypedTaskReference{
		googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref(),
	},
	func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (commonlogk8sauditv2_contract.ContainerIDToContainerIdentity, error) {
		if taskMode == inspectioncore_contract.TaskModeDryRun {
			return nil, nil
		}

		logs := coretask.GetTaskResult(ctx, googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref())

		doneLogCount := atomic.Int32{}
		updator := progressutil.NewProgressUpdator(progress, time.Second, func(tp *inspectionmetadata.TaskProgressMetadata) {
			current := doneLogCount.Load()
			if len(logs) > 0 {
				tp.Percentage = float32(current) / float32(len(logs))
			}
			tp.Message = fmt.Sprintf("%d/%d", current, len(logs))
		})
		updator.Start(ctx)
		defer updator.Done()

		result := commonlogk8sauditv2_contract.ContainerIDToContainerIdentity{}
		logChan := make(chan *log.Log)
		errGrp, childRoutineCtx := errgroup.WithContext(ctx)
		containerIdentitiesChan := make(chan *commonlogk8sauditv2_contract.ContainerIdentity, runtime.GOMAXPROCS(0))
		for i := 0; i < runtime.GOMAXPROCS(0); i++ {
			errGrp.Go(func() error {
				for {
					select {
					case <-childRoutineCtx.Done():
						return childRoutineCtx.Err()
					case l, ok := <-logChan:
						if !ok {
							return nil
						}
						processContainerIDDiscoveryForLog(ctx, l, containerIdentitiesChan)
						doneLogCount.Add(1)
					}
				}
			})
		}
		consumerGrp, childConsumerRoutineCtx := errgroup.WithContext(ctx)
		consumerGrp.Go(func() error {
			for {
				select {
				case <-childConsumerRoutineCtx.Done():
					return childConsumerRoutineCtx.Err()
				case c, ok := <-containerIdentitiesChan:
					if !ok {
						return nil
					}
					result[c.ContainerID] = c
				}
			}
		})

		for _, l := range logs {
			logChan <- l
		}
		close(logChan)
		err := errGrp.Wait()
		close(containerIdentitiesChan)
		consumerErr := consumerGrp.Wait()
		if err != nil {
			return nil, err
		}
		if consumerErr != nil {
			return nil, consumerErr
		}

		return result, nil
	},
)
View Source
var ContainerdNodeLogHistoryModifierTask = inspectiontaskbase.NewHistoryModifierTask[struct{}](googlecloudlogk8snode_contract.ContainerdLogHistoryModifierTaskID, &containerdNodeLogHistoryModifierSetting{})
View Source
var KubeletLogHistoryModifierTask = inspectiontaskbase.NewHistoryModifierTask[struct{}](googlecloudlogk8snode_contract.KubeletLogHistoryModifierTaskID, &kubeletNodeLogHistoryModifierSetting{})
View Source
var ListLogEntriesTask = googlecloudcommon_contract.NewListLogEntriesTask(&k8snodeListLogEntriesTaskSetting{})

LogSerializerTask serializes logs to history for history modifiers to associate event or revisions in later tasks. No node logs are discarded, thus this LogSerializerTask simply receives logs from the ListLogEntriesTask.

View Source
var OtherLogHistoryModifierTask = inspectiontaskbase.NewHistoryModifierTask[struct{}](googlecloudlogk8snode_contract.OtherLogHistoryModifierTaskID, &otherNodeLogHistoryModifierSetting{
	StartingMessagesByComponent: map[string]string{
		"dockerd":             "Starting up",
		"configure.sh":        "Start to install kubernetes files",
		"configure-helper.sh": "Start to configure instance for kubernetes",
	},
	TerminatingMessagesByComponent: map[string]string{
		"dockerd":             "Daemon shutdown complete",
		"configure.sh":        "Done for installing kubernetes files",
		"configure-helper.sh": "Done for the configuration for kubernetes",
	},
})
View Source
var PodSandboxIDDiscoveryTask = inspectiontaskbase.NewProgressReportableInspectionTask(googlecloudlogk8snode_contract.PodSandboxIDDiscoveryTaskID,
	[]taskid.UntypedTaskReference{
		googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref(),
	},
	func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (patternfinder.PatternFinder[*googlecloudlogk8snode_contract.PodSandboxIDInfo], error) {
		if taskMode == inspectioncore_contract.TaskModeDryRun {
			return nil, nil
		}
		logs := coretask.GetTaskResult(ctx, googlecloudlogk8snode_contract.ContainerdLogFilterTaskID.Ref())

		doneLogCount := atomic.Int32{}
		updator := progressutil.NewProgressUpdator(progress, time.Second, func(tp *inspectionmetadata.TaskProgressMetadata) {
			current := doneLogCount.Load()
			if len(logs) > 0 {
				tp.Percentage = float32(current) / float32(len(logs))
			}
			tp.Message = fmt.Sprintf("%d/%d", current, len(logs))
		})
		updator.Start(ctx)
		defer updator.Done()

		logChan := make(chan *log.Log)
		errGrp, childCtx := errgroup.WithContext(ctx)
		podSandboxIDFinder := patternfinder.NewTriePatternFinder[*googlecloudlogk8snode_contract.PodSandboxIDInfo]()
		for i := 0; i < runtime.GOMAXPROCS(0); i++ {
			errGrp.Go(func() error {
				for {
					select {
					case <-childCtx.Done():
						return childCtx.Err()
					case l, ok := <-logChan:
						if !ok {
							return nil
						}
						processPodSandboxIDDiscoveryForLog(ctx, l, podSandboxIDFinder)
						doneLogCount.Add(1)
					}
				}
			})
		}

		for _, l := range logs {
			logChan <- l
		}
		close(logChan)
		errGrp.Wait()

		return podSandboxIDFinder, nil
	},
)

Functions

func GenerateK8sNodeLogQuery

func GenerateK8sNodeLogQuery(projectId string, clusterId string, nodeNameSubstrings []string) string

GenerateK8sNodeLogQuery generates a query for GKE node logs.

func Register

func Register(registry coreinspection.InspectionTaskRegistry) error

Register registers all googlecloudlogk8snode inspection tasks to the registry.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL