inspectiontaskbase

package
v0.52.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Inventory related tasks defined in this file provides a framework for discovering and merging inventory data from various sources.

In many inspection scenarios, it's necessary to associate information across different log sources. For example, a log might contain an IP address, while another log maps that IP to a specific VM or container name. However, the availability of these log sources is not always guaranteed, and consumers of this inventory data should not need to be aware of the specific tasks that provide it.

This framework introduces two main components to address this:

  1. DiscoveryTask: A task responsible for extracting a inventory map from a single data source. Providers of a discovery task must ensure it is added to the task graph when a task that may require its data is included. This is achieved by using the coretask.NewSubsequentTaskRefsTaskLabel, which links the discovery task to the merger task.

  2. InventoryTask: A task that aggregates the results from all relevant DiscoveryTasks. Consumers can simply depend on this single merger task to access the complete, consolidated inventory map without needing to know about the individual discovery tasks.

This approach decouples data consumers from data providers, allowing for a flexible and extensible inspection system.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewCachedTask

func NewCachedTask[T any](taskID taskid.TaskImplementationID[T], depdendencies []taskid.UntypedTaskReference, f func(ctx context.Context, prevValue CacheableTaskResult[T]) (CacheableTaskResult[T], error), labelOpt ...coretask.LabelOpt) coretask.Task[T]

NewCachedTask generates a task which can reuse the value last time.

func NewFieldSetReadTask

func NewFieldSetReadTask(taskId taskid.TaskImplementationID[[]*log.Log], logTask taskid.TaskReference[[]*log.Log], fieldSetReaders []log.FieldSetReader, labelOpts ...coretask.LabelOpt) coretask.Task[[]*log.Log]

NewFieldSetReadTask creates a task that consumes a list of logs and applies a set of FieldSetReaders to each log concurrently. This allows for parallel processing of log entries to extract specific fields needed in later tasks. Later parser tasks usually process logs from older to newer with grouped by resource, thus it can't be done in parallel. The process of extracting log fields must not depend on the other logs and it can be done in parallel.

func NewInspectionTask

func NewInspectionTask[T any](taskId taskid.TaskImplementationID[T], dependencies []taskid.UntypedTaskReference, taskFunc InspectionTaskFunc[T], labelOpts ...coretask.LabelOpt) coretask.Task[T]

NewInspectionTask creates a basic inspection task. The task is executed based on the task mode retrieved from the context.

Parameters:

  • taskId: The unique identifier for the task.
  • dependencies: A list of task references that this task depends on.
  • taskFunc: The function to execute for the task.
  • labelOpts: Optional labels to apply to the task.

Returns:

An inspection task.

func NewLogFilterTask

func NewLogFilterTask(tid taskid.TaskImplementationID[[]*log.Log], sourceLogs taskid.TaskReference[[]*log.Log], logFilter LogFilterFunc) coretask.Task[[]*log.Log]

NewLogFilterTask creates a task that consumes a list of logs and returns a new list containing only the logs that satisfy the filter function.

func NewLogGrouperTask

func NewLogGrouperTask(taskId taskid.TaskImplementationID[LogGroupMap], logTask taskid.TaskReference[[]*log.Log], grouper LogGrouperFunc) coretask.Task[LogGroupMap]

NewLogGrouperTask creates a task that groups logs based on a grouper function. It processes a list of logs and organizes them into a map of LogGroup, where each group contains logs with the same key.

func NewLogIngesterTask added in v0.50.0

func NewLogIngesterTask(taskID taskid.TaskImplementationID[[]*log.Log], input taskid.TaskReference[[]*log.Log]) coretask.Task[[]*log.Log]

NewLogIngesterTask returns a task that stores its given logs to history to prepare the history type to have ChangeSet associated with the log. This must be called before LogToTimelineMapperTask and Logs must be discarded before this task if it shouldn't be included in the result.

func NewLogSorterByTimeTask added in v0.50.0

func NewLogSorterByTimeTask(taskID taskid.TaskImplementationID[[]*log.Log], logSource taskid.TaskReference[[]*log.Log]) coretask.Task[[]*log.Log]

func NewLogToTimelineMapperTask added in v0.50.0

func NewLogToTimelineMapperTask[T any](tid taskid.TaskImplementationID[struct{}], mapper LogToTimelineMapper[T], labels ...coretask.LabelOpt) coretask.Task[struct{}]

NewLogToTimelineMapperTask creates a task that modifies the history builder based on grouped logs. It processes logs in parallel and applies the logic from the provided LogToTimelineMapper to build a comprehensive history of events.

func NewProgressReportableInspectionTask

func NewProgressReportableInspectionTask[T any](taskId taskid.TaskImplementationID[T], dependencies []taskid.UntypedTaskReference, taskFunc ProgressReportableInspectionTaskFunc[T], labelOpts ...coretask.LabelOpt) coretask.Task[T]

NewProgressReportableInspectionTask generates a task with progress reporting capabilities. This task can report its progress during execution through the TaskProgress object. Use NewInspectionTask for tasks that complete immediately.

Parameters:

  • taskId: The unique identifier for the task.
  • dependencies: A list of task references that this task depends on.
  • taskFunc: The function to execute, which includes progress reporting.
  • labelOpts: Optional labels to apply to the task.

Returns:

A task with progress reporting capabilities.

Types

type CacheableTaskResult

type CacheableTaskResult[T any] struct {
	// Value is the value used previous run.
	Value T
	// DependencyDigest is a string representation of digest of its inputs.
	// Task must generate a different value for the different combination of the input and task should compare the current digest generated from the current inputs and the previous value digest, then it should return the previous value only when the digest is not changed.
	DependencyDigest string
}

CacheableTaskResult is the combination of the cached value and a digest of its dependency.

type InspectionTaskFunc

type InspectionTaskFunc[T any] = func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (T, error)

InspectionTaskFunc is a type for basic inspection task functions.

type InventoryMergerStrategy added in v0.50.0

type InventoryMergerStrategy[T any] interface {

	// Merge defines the logic to combine multiple results from various InventoryDiscoveryTasks
	// into a single, consolidated result.
	Merge(results []T) (T, error)
}

InventoryMergerStrategy defines the strategy how the generated InventoryTask merges results received from multiple discovery tasks.

type InventoryTaskBuilder added in v0.50.0

type InventoryTaskBuilder[T any] struct {
	// contains filtered or unexported fields
}

InventoryTaskBuilder builds a inventory task and discovery tasks. Inventory task merges information found in logs from multiple discovery tasks.

func NewInventoryTaskBuilder added in v0.50.0

func NewInventoryTaskBuilder[T any](inventoryTaskID taskid.TaskImplementationID[T]) *InventoryTaskBuilder[T]

func (*InventoryTaskBuilder[T]) DiscoveryTask added in v0.50.0

func (s *InventoryTaskBuilder[T]) DiscoveryTask(taskID taskid.TaskImplementationID[T], dependencies []taskid.UntypedTaskReference, taskFunc ProgressReportableInspectionTaskFunc[T], labelOpts ...coretask.LabelOpt) coretask.Task[T]

DiscoveryTask builds a discovery task the returned value from discovery tasks are aggregated in inventory task

func (*InventoryTaskBuilder[T]) InventoryTask added in v0.50.0

func (s *InventoryTaskBuilder[T]) InventoryTask(strategy InventoryMergerStrategy[T]) coretask.Task[T]

InventoryTask builds a inventory task with given merger strategy.

type LogFilterFunc

type LogFilterFunc = func(ctx context.Context, log *log.Log) bool

LogFilterFunc defines the function signature for filtering logs. It returns true if the log should be kept.

type LogGroup

type LogGroup struct {
	Group string
	Logs  []*log.Log
}

LogGroup holds a collection of logs that belong to the same group.

type LogGroupMap

type LogGroupMap = map[string]*LogGroup

LogGroupMap is a map of log groups, where the key is the group identifier.

type LogGrouperFunc added in v0.50.0

type LogGrouperFunc = func(ctx context.Context, log *log.Log) string

LogGrouperFunc defines a function that returns a group key for a given log.

type LogToTimelineMapper added in v0.50.0

type LogToTimelineMapper[T any] interface {
	// LogIngesterTask is one of prerequisite task of LogToTimelineMapper ingesting logs to history data before processing with this mapper.
	LogIngesterTask() taskid.TaskReference[[]*log.Log]
	// Dependencies are the additional references used in timeline mapper.
	Dependencies() []taskid.UntypedTaskReference
	// GroupedLogTask returns a reference to the task that provides the grouped logs.
	GroupedLogTask() taskid.TaskReference[LogGroupMap]
	// ProcessLogByGroup is called for each log entry to modify the corresponding ChangeSet.
	// This method allows for custom logic to be applied during the history building process.
	// The prevGroupData is the returned value from the last procesed log in the same group.
	ProcessLogByGroup(ctx context.Context, l *log.Log, cs *history.ChangeSet, builder *history.Builder, prevGroupData T) (T, error)
}

LogToTimelineMapper defines the interface for mapping logs to timeline elements (events or revisions). Implementations of this interface can be used to customize how log data is transformed into timeline elements. To process data generated from processing the last log in the same group, the method ProcessLogByGroup receives and returns a variable typed T.

type ProgressReportableInspectionTaskFunc

type ProgressReportableInspectionTaskFunc[T any] = func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (T, error)

ProgressReportableInspectionTaskFunc is a type for inspection task functions with progress reporting capabilities.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL