Documentation
¶
Overview ¶
Relationship related tasks defined in this file provides a framework for discovering and merging relational data from various sources.
In many inspection scenarios, it's necessary to associate information across different log sources. For example, a log might contain an IP address, while another log maps that IP to a specific VM or container name. However, the availability of these log sources is not always guaranteed, and consumers of this relational data should not need to be aware of the specific tasks that provide it.
This framework introduces two main components to address this:
RelationshipDiscoveryTask: A task responsible for extracting a relationship map from a single data source. Providers of a discovery task must ensure it is added to the task graph when a task that may require its data is included. This is achieved by using the coretask.NewSubsequentTaskRefsTaskLabel, which links the discovery task to the merger task.
RelationshipMergerTask: A task that aggregates the results from all relevant RelationshipDiscoveryTasks. Consumers can simply depend on this single merger task to access the complete, consolidated relationship map without needing to know about the individual discovery tasks.
This approach decouples data consumers from data providers, allowing for a flexible and extensible inspection system.
Index ¶
- func NewCachedTask[T any](taskID taskid.TaskImplementationID[T], ...) coretask.Task[T]
- func NewFieldSetReadTask(taskId taskid.TaskImplementationID[[]*log.Log], ...) coretask.Task[[]*log.Log]
- func NewHistoryModifierTask[T any](tid taskid.TaskImplementationID[struct{}], historyModifier HistoryModifer[T], ...) coretask.Task[struct{}]
- func NewInspectionTask[T any](taskId taskid.TaskImplementationID[T], ...) coretask.Task[T]
- func NewLogFilterTask(tid taskid.TaskImplementationID[[]*log.Log], ...) coretask.Task[[]*log.Log]
- func NewLogGrouperTask(taskId taskid.TaskImplementationID[LogGroupMap], ...) coretask.Task[LogGroupMap]
- func NewLogSerializerTask(taskID taskid.TaskImplementationID[[]*log.Log], ...) coretask.Task[[]*log.Log]
- func NewProgressReportableInspectionTask[T any](taskId taskid.TaskImplementationID[T], ...) coretask.Task[T]
- func NewRelationshipDiscoveryTask[T any](taskID taskid.TaskImplementationID[T], idSource *RelationshipTaskIDSource[T], ...) coretask.Task[T]
- func NewRelationshipMergerTask[T any](setting RelationshipMergerTaskSetting[T]) coretask.Task[T]
- type CacheableTaskResult
- type HistoryModifer
- type InspectionTaskFunc
- type LogFilterFunc
- type LogGroup
- type LogGroupMap
- type LogGrouper
- type ProgressReportableInspectionTaskFunc
- type RelationshipMergerTaskSetting
- type RelationshipTaskIDSource
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewCachedTask ¶
func NewCachedTask[T any](taskID taskid.TaskImplementationID[T], depdendencies []taskid.UntypedTaskReference, f func(ctx context.Context, prevValue CacheableTaskResult[T]) (CacheableTaskResult[T], error), labelOpt ...coretask.LabelOpt) coretask.Task[T]
NewCachedTask generates a task which can reuse the value last time.
func NewFieldSetReadTask ¶
func NewFieldSetReadTask(taskId taskid.TaskImplementationID[[]*log.Log], logTask taskid.TaskReference[[]*log.Log], fieldSetReaders []log.FieldSetReader) coretask.Task[[]*log.Log]
NewFieldSetReadTask creates a task that consumes a list of logs and applies a set of FieldSetReaders to each log concurrently. This allows for parallel processing of log entries to extract specific fields needed in later tasks. Later parser tasks usually process logs from older to newer with grouped by resource, thus it can't be done in parallel. The process of extracting log fields must not depend on the other logs and it can be done in parallel.
func NewHistoryModifierTask ¶
func NewHistoryModifierTask[T any](tid taskid.TaskImplementationID[struct{}], historyModifier HistoryModifer[T], labels ...coretask.LabelOpt) coretask.Task[struct{}]
NewHistoryModifierTask creates a task that modifies the history builder based on grouped logs. It processes logs in parallel and applies the logic from the provided HistoryModifer to build a comprehensive history of events.
func NewInspectionTask ¶
func NewInspectionTask[T any](taskId taskid.TaskImplementationID[T], dependencies []taskid.UntypedTaskReference, taskFunc InspectionTaskFunc[T], labelOpts ...coretask.LabelOpt) coretask.Task[T]
NewInspectionTask creates a basic inspection task. The task is executed based on the task mode retrieved from the context.
Parameters:
- taskId: The unique identifier for the task.
- dependencies: A list of task references that this task depends on.
- taskFunc: The function to execute for the task.
- labelOpts: Optional labels to apply to the task.
Returns:
An inspection task.
func NewLogFilterTask ¶
func NewLogFilterTask(tid taskid.TaskImplementationID[[]*log.Log], sourceLogs taskid.TaskReference[[]*log.Log], logFilter LogFilterFunc) coretask.Task[[]*log.Log]
NewLogFilterTask creates a task that consumes a list of logs and returns a new list containing only the logs that satisfy the filter function.
func NewLogGrouperTask ¶
func NewLogGrouperTask(taskId taskid.TaskImplementationID[LogGroupMap], logTask taskid.TaskReference[[]*log.Log], grouper LogGrouper) coretask.Task[LogGroupMap]
NewLogGrouperTask creates a task that groups logs based on a grouper function. It processes a list of logs and organizes them into a map of LogGroup, where each group contains logs with the same key.
func NewLogSerializerTask ¶
func NewLogSerializerTask(taskID taskid.TaskImplementationID[[]*log.Log], input taskid.TaskReference[[]*log.Log]) coretask.Task[[]*log.Log]
NewLogSerializerTask store its given logs to history to prepare the history type to have ChangeSet associated with the log. This must be called before HistoryModifier and Logs must be discarded before this task if it shouldn't be included in the result.
func NewProgressReportableInspectionTask ¶
func NewProgressReportableInspectionTask[T any](taskId taskid.TaskImplementationID[T], dependencies []taskid.UntypedTaskReference, taskFunc ProgressReportableInspectionTaskFunc[T], labelOpts ...coretask.LabelOpt) coretask.Task[T]
NewProgressReportableInspectionTask generates a task with progress reporting capabilities. This task can report its progress during execution through the TaskProgress object. Use NewInspectionTask for tasks that complete immediately.
Parameters:
- taskId: The unique identifier for the task.
- dependencies: A list of task references that this task depends on.
- taskFunc: The function to execute, which includes progress reporting.
- labelOpts: Optional labels to apply to the task.
Returns:
A task with progress reporting capabilities.
func NewRelationshipDiscoveryTask ¶
func NewRelationshipDiscoveryTask[T any](taskID taskid.TaskImplementationID[T], idSource *RelationshipTaskIDSource[T], dependencies []taskid.UntypedTaskReference, taskFunc ProgressReportableInspectionTaskFunc[T], labelOpts ...coretask.LabelOpt) coretask.Task[T]
NewRelationshipDiscoveryTask creates a task that discovers relationships from a specific data source, such as mapping an IP address to a VM name from a log source. The discovered relationship map is then provided to the RelationshipMergerTask. Implementers of a discovery task must ensure that it is added to the task graph when its data is potentially required. This is enforced by adding task reference to this discovery task to the coretask.NewSubsequentTaskRefsTaskLabel on its parent task, which require this discovery task to depend on its parent when it's included.
func NewRelationshipMergerTask ¶
func NewRelationshipMergerTask[T any](setting RelationshipMergerTaskSetting[T]) coretask.Task[T]
NewRelationshipMergerTask creates a task that merges relationship maps from all related RelationshipDiscoveryTasks. Consumers of the relationship data can simply depend on this merger task without needing to know about the specific discovery tasks. It retrieves results from all discovery tasks registered with the provided IDSource and merges them using the logic defined in the RelationshipMergerTaskSetting.
Types ¶
type CacheableTaskResult ¶
type CacheableTaskResult[T any] struct { // Value is the value used previous run. Value T // DependencyDigest is a string representation of digest of its inputs. // Task must generate a different value for the different combination of the input and task should compare the current digest generated from the current inputs and the previous value digest, then it should return the previous value only when the digest is not changed. DependencyDigest string }
CacheableTaskResult is the combination of the cached value and a digest of its dependency.
type HistoryModifer ¶
type HistoryModifer[T any] interface { // LogSerializerTask is one of prerequiste task of HistoryModifier serializes its logs to history data before processing with this modifier. LogSerializerTask() taskid.TaskReference[[]*log.Log] // Dependencies are the additional references used in history modifier. Dependencies() []taskid.UntypedTaskReference // GroupedLogTask returns a reference to the task that provides the grouped logs. GroupedLogTask() taskid.TaskReference[LogGroupMap] // ModifyChangeSetFromLog is called for each log entry to modify the corresponding ChangeSet. // This method allows for custom logic to be applied during the history building process. // The prevGroupData is the returned value from the last procesed log in the same group. ModifyChangeSetFromLog(ctx context.Context, l *log.Log, cs *history.ChangeSet, builder *history.Builder, prevGroupData T) (T, error) }
HistoryModifer defines the interface for modifying the History with change sets based on log entries. Implementations of this interface can be used to customize how log data is transformed into structured history. To process data generated from processing the last log in the same group, the method ModifyChangeSetFromLog receive and return a variable typed T.
type InspectionTaskFunc ¶
type InspectionTaskFunc[T any] = func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType) (T, error)
InspectionTaskFunc is a type for basic inspection task functions.
type LogFilterFunc ¶
LogFilterFunc defines the function signature for filtering logs. It returns true if the log should be kept.
type LogGroupMap ¶
LogGroupMap is a map of log groups, where the key is the group identifier.
type LogGrouper ¶
LogGrouper defines a function that returns a group key for a given log.
type ProgressReportableInspectionTaskFunc ¶
type ProgressReportableInspectionTaskFunc[T any] = func(ctx context.Context, taskMode inspectioncore_contract.InspectionTaskModeType, progress *inspectionmetadata.TaskProgressMetadata) (T, error)
ProgressReportableInspectionTaskFunc is a type for inspection task functions with progress reporting capabilities.
type RelationshipMergerTaskSetting ¶
type RelationshipMergerTaskSetting[T any] interface { // IDSource returns the pointer to the RelationshipTaskIDSource that manages the task IDs. IDSource() *RelationshipTaskIDSource[T] // Merge defines the logic to combine multiple results from various RelationshipDiscoveryTasks // into a single, consolidated result. Merge(results []T) (T, error) }
RelationshipMergerTaskSetting defines the settings for a RelationshipMergerTask. It provides the ID source and the logic to merge results from multiple discovery tasks.
type RelationshipTaskIDSource ¶
type RelationshipTaskIDSource[T any] struct { // contains filtered or unexported fields }
RelationshipTaskIDSource manages the task IDs for RelationshipMergerTask and RelationshipDiscoveryTasks, maintaining the references between them.
func NewRelationshipTaskIDSource ¶
func NewRelationshipTaskIDSource[T any](mergerTaskID taskid.TaskImplementationID[T]) *RelationshipTaskIDSource[T]
func (*RelationshipTaskIDSource[T]) GenerateDefaultRelationshipDiscoveryTaskID ¶
func (s *RelationshipTaskIDSource[T]) GenerateDefaultRelationshipDiscoveryTaskID(taskReferenceID string) taskid.TaskImplementationID[T]
func (*RelationshipTaskIDSource[T]) MergerTaskRef ¶
func (s *RelationshipTaskIDSource[T]) MergerTaskRef() taskid.TaskReference[T]