inspectiontaskbase

package
v0.49.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2025 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Relationship related tasks defined in this file provides a framework for discovering and merging relational data from various sources.

In many inspection scenarios, it's necessary to associate information across different log sources. For example, a log might contain an IP address, while another log maps that IP to a specific VM or container name. However, the availability of these log sources is not always guaranteed, and consumers of this relational data should not need to be aware of the specific tasks that provide it.

This framework introduces two main components to address this:

  1. RelationshipDiscoveryTask: A task responsible for extracting a relationship map from a single data source. Providers of a discovery task must ensure it is added to the task graph when a task that may require its data is included. This is achieved by using the coretask.NewSubsequentTaskRefsTaskLabel, which links the discovery task to the merger task.

  2. RelationshipMergerTask: A task that aggregates the results from all relevant RelationshipDiscoveryTasks. Consumers can simply depend on this single merger task to access the complete, consolidated relationship map without needing to know about the individual discovery tasks.

This approach decouples data consumers from data providers, allowing for a flexible and extensible inspection system.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewCachedTask

func NewCachedTask[T any](taskID taskid.TaskImplementationID[T], depdendencies []taskid.UntypedTaskReference, f func(ctx context.Context, prevValue CacheableTaskResult[T]) (CacheableTaskResult[T], error), labelOpt ...coretask.LabelOpt) coretask.Task[T]

NewCachedTask generates a task which can reuse the value last time.

func NewFieldSetReadTask

func NewFieldSetReadTask(taskId taskid.TaskImplementationID[[]*log.Log], logTask taskid.TaskReference[[]*log.Log], fieldSetReaders []log.FieldSetReader) coretask.Task[[]*log.Log]

NewFieldSetReadTask creates a task that consumes a list of logs and applies a set of FieldSetReaders to each log concurrently. This allows for parallel processing of log entries to extract specific fields needed in later tasks. Later parser tasks usually process logs from older to newer with grouped by resource, thus it can't be done in parallel. The process of extracting log fields must not depend on the other logs and it can be done in parallel.

func NewHistoryModifierTask

func NewHistoryModifierTask[T any](tid taskid.TaskImplementationID[struct{}], historyModifier HistoryModifer[T], labels ...coretask.LabelOpt) coretask.Task[struct{}]

NewHistoryModifierTask creates a task that modifies the history builder based on grouped logs. It processes logs in parallel and applies the logic from the provided HistoryModifer to build a comprehensive history of events.

func NewInspectionTask

func NewInspectionTask[T any](taskId taskid.TaskImplementationID[T], dependencies []taskid.UntypedTaskReference, taskFunc InspectionTaskFunc[T], labelOpts ...coretask.LabelOpt) coretask.Task[T]

NewInspectionTask creates a basic inspection task. The task is executed based on the task mode retrieved from the context.

Parameters:

  • taskId: The unique identifier for the task.
  • dependencies: A list of task references that this task depends on.
  • taskFunc: The function to execute for the task.
  • labelOpts: Optional labels to apply to the task.

Returns:

An inspection task.

func NewLogFilterTask

func NewLogFilterTask(tid taskid.TaskImplementationID[[]*log.Log], sourceLogs taskid.TaskReference[[]*log.Log], logFilter LogFilterFunc) coretask.Task[[]*log.Log]

NewLogFilterTask creates a task that consumes a list of logs and returns a new list containing only the logs that satisfy the filter function.

func NewLogGrouperTask

func NewLogGrouperTask(taskId taskid.TaskImplementationID[LogGroupMap], logTask taskid.TaskReference[[]*log.Log], grouper LogGrouper) coretask.Task[LogGroupMap]

NewLogGrouperTask creates a task that groups logs based on a grouper function. It processes a list of logs and organizes them into a map of LogGroup, where each group contains logs with the same key.

func NewLogSerializerTask

func NewLogSerializerTask(taskID taskid.TaskImplementationID[[]*log.Log], input taskid.TaskReference[[]*log.Log]) coretask.Task[[]*log.Log]

NewLogSerializerTask store its given logs to history to prepare the history type to have ChangeSet associated with the log. This must be called before HistoryModifier and Logs must be discarded before this task if it shouldn't be included in the result.

func NewProgressReportableInspectionTask

func NewProgressReportableInspectionTask[T any](taskId taskid.TaskImplementationID[T], dependencies []taskid.UntypedTaskReference, taskFunc ProgressReportableInspectionTaskFunc[T], labelOpts ...coretask.LabelOpt) coretask.Task[T]

NewProgressReportableInspectionTask generates a task with progress reporting capabilities. This task can report its progress during execution through the TaskProgress object. Use NewInspectionTask for tasks that complete immediately.

Parameters:

  • taskId: The unique identifier for the task.
  • dependencies: A list of task references that this task depends on.
  • taskFunc: The function to execute, which includes progress reporting.
  • labelOpts: Optional labels to apply to the task.

Returns:

A task with progress reporting capabilities.

func NewRelationshipDiscoveryTask

func NewRelationshipDiscoveryTask[T any](taskID taskid.TaskImplementationID[T], idSource *RelationshipTaskIDSource[T], dependencies []taskid.UntypedTaskReference, taskFunc ProgressReportableInspectionTaskFunc[T], labelOpts ...coretask.LabelOpt) coretask.Task[T]

NewRelationshipDiscoveryTask creates a task that discovers relationships from a specific data source, such as mapping an IP address to a VM name from a log source. The discovered relationship map is then provided to the RelationshipMergerTask. Implementers of a discovery task must ensure that it is added to the task graph when its data is potentially required. This is enforced by adding task reference to this discovery task to the coretask.NewSubsequentTaskRefsTaskLabel on its parent task, which require this discovery task to depend on its parent when it's included.

func NewRelationshipMergerTask

func NewRelationshipMergerTask[T any](setting RelationshipMergerTaskSetting[T]) coretask.Task[T]

NewRelationshipMergerTask creates a task that merges relationship maps from all related RelationshipDiscoveryTasks. Consumers of the relationship data can simply depend on this merger task without needing to know about the specific discovery tasks. It retrieves results from all discovery tasks registered with the provided IDSource and merges them using the logic defined in the RelationshipMergerTaskSetting.

Types

type CacheableTaskResult

type CacheableTaskResult[T any] struct {
	// Value is the value used previous run.
	Value T
	// DependencyDigest is a string representation of digest of its inputs.
	// Task must generate a different value for the different combination of the input and task should compare the current digest generated from the current inputs and the previous value digest, then it should return the previous value only when the digest is not changed.
	DependencyDigest string
}

CacheableTaskResult is the combination of the cached value and a digest of its dependency.

type HistoryModifer

type HistoryModifer[T any] interface {
	// LogSerializerTask is one of prerequiste task of HistoryModifier serializes its logs to history data before processing with this modifier.
	LogSerializerTask() taskid.TaskReference[[]*log.Log]
	// Dependencies are the additional references used in history modifier.
	Dependencies() []taskid.UntypedTaskReference
	// GroupedLogTask returns a reference to the task that provides the grouped logs.
	GroupedLogTask() taskid.TaskReference[LogGroupMap]
	// ModifyChangeSetFromLog is called for each log entry to modify the corresponding ChangeSet.
	// This method allows for custom logic to be applied during the history building process.
	// The prevGroupData is the returned value from the last procesed log in the same group.
	ModifyChangeSetFromLog(ctx context.Context, l *log.Log, cs *history.ChangeSet, builder *history.Builder, prevGroupData T) (T, error)
}

HistoryModifer defines the interface for modifying the History with change sets based on log entries. Implementations of this interface can be used to customize how log data is transformed into structured history. To process data generated from processing the last log in the same group, the method ModifyChangeSetFromLog receive and return a variable typed T.

type InspectionTaskFunc

type InspectionTaskFunc[T any] = func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (T, error)

InspectionTaskFunc is a type for basic inspection task functions.

type LogFilterFunc

type LogFilterFunc = func(ctx context.Context, log *log.Log) bool

LogFilterFunc defines the function signature for filtering logs. It returns true if the log should be kept.

type LogGroup

type LogGroup struct {
	Group string
	Logs  []*log.Log
}

LogGroup holds a collection of logs that belong to the same group.

type LogGroupMap

type LogGroupMap = map[string]*LogGroup

LogGroupMap is a map of log groups, where the key is the group identifier.

type LogGrouper

type LogGrouper = func(ctx context.Context, log *log.Log) string

LogGrouper defines a function that returns a group key for a given log.

type ProgressReportableInspectionTaskFunc

type ProgressReportableInspectionTaskFunc[T any] = func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (T, error)

ProgressReportableInspectionTaskFunc is a type for inspection task functions with progress reporting capabilities.

type RelationshipMergerTaskSetting

type RelationshipMergerTaskSetting[T any] interface {
	// IDSource returns the pointer to the RelationshipTaskIDSource that manages the task IDs.
	IDSource() *RelationshipTaskIDSource[T]

	// Merge defines the logic to combine multiple results from various RelationshipDiscoveryTasks
	// into a single, consolidated result.
	Merge(results []T) (T, error)
}

RelationshipMergerTaskSetting defines the settings for a RelationshipMergerTask. It provides the ID source and the logic to merge results from multiple discovery tasks.

type RelationshipTaskIDSource

type RelationshipTaskIDSource[T any] struct {
	// contains filtered or unexported fields
}

RelationshipTaskIDSource manages the task IDs for RelationshipMergerTask and RelationshipDiscoveryTasks, maintaining the references between them.

func NewRelationshipTaskIDSource

func NewRelationshipTaskIDSource[T any](mergerTaskID taskid.TaskImplementationID[T]) *RelationshipTaskIDSource[T]

func (*RelationshipTaskIDSource[T]) GenerateDefaultRelationshipDiscoveryTaskID

func (s *RelationshipTaskIDSource[T]) GenerateDefaultRelationshipDiscoveryTaskID(taskReferenceID string) taskid.TaskImplementationID[T]

func (*RelationshipTaskIDSource[T]) MergerTaskRef

func (s *RelationshipTaskIDSource[T]) MergerTaskRef() taskid.TaskReference[T]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL