ingest

package
v1.9.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

README

Integrations Ingest

This package is a provider-agnostic ingestion pipeline using the CEL expression library rather than intermediary structs or taking the upstream data types and loading directly into our data types. It converts provider alert payloads into normalized Vulnerability records using declarative mapping rules, rather than hard-coding provider-specific transforms in each integration.

Why This Approach?

  1. Decouples provider operations from persistence logic. Providers only emit alert envelopes; ingestion owns normalization and storage.
  2. Enables customization without code changes. Mappings and overrides live in integration config so teams can tune field mapping or filtering per provider, alert type, or environment.
  3. Scales to new providers. Adding a provider becomes “produce alert envelopes + ship default mappings,” not “write a new persistence pipeline.” (ideally...)
  4. Enforces consistent validation. A single schema-driven validation step ensures required fields are present before persistence.

Data Flow

  1. Provider operation collects raw alerts and wraps them in types.AlertEnvelope (alert type, resource, payload)
  2. Ingest builds a mapping context with integration config, provider state, and operation inputs
  3. CEL filter expressions decide whether each envelope should be ingested
  4. CEL map expressions produce a normalized output map
  5. Output is validated against the schema and persisted via upsert

Key Concepts

  • AlertEnvelope: provider-agnostic wrapper for alert payloads and metadata
  • Mapping schemas: canonical field sets defined in integrationgenerated.IntegrationMappingSchemas
  • Mapping overrides: per-integration overrides that select a mapping by provider, schema, and alert type
  • Default mappings: built-in mappings (for example GitHub) in defaults_github.go
  • Retention policy: governs whether raw payloads are stored (intent would be to store via object storage provider)

This structure keeps ingestion consistent, testable, and configurable while avoiding one-off provider logic scattered across integrations. Hopefully.

Documentation

Overview

Package ingest provides mapping and persistence for integration operation payloads. It transforms provider-specific alert data into normalized domain objects using CEL expressions.

Index

Constants

View Source
const (
	// TopicIntegrationIngestRequested is emitted when webhook payloads should be ingested.
	TopicIntegrationIngestRequested = "integration.ingest.requested"
)

Variables

View Source
var ErrDBClientRequired = errors.New("ingest: db client required")

ErrDBClientRequired is returned when the database client is missing

View Source
var ErrExternalIDRequired = errors.New("ingest: external id required")

ErrExternalIDRequired is returned when an external ID is missing for persistence

View Source
var ErrIngestEmitterRequired = errors.New("ingest: event emitter required")

ErrIngestEmitterRequired is returned when the ingest event bus is missing

View Source
var ErrIngestIntegrationRequired = errors.New("ingest: integration id required")

ErrIngestIntegrationRequired is returned when the integration id is missing

View Source
var ErrIngestOrgIDRequired = errors.New("ingest: org id required")

ErrIngestOrgIDRequired is returned when the organization id is missing

View Source
var ErrIngestProviderUnknown = errors.New("ingest: provider unknown")

ErrIngestProviderUnknown is returned when provider type is unknown

View Source
var ErrIngestSchemaRequired = errors.New("ingest: schema required")

ErrIngestSchemaRequired is returned when the ingest schema is missing

View Source
var ErrIngestSchemaUnsupported = errors.New("ingest: schema unsupported")

ErrIngestSchemaUnsupported is returned when the ingest schema is unsupported

View Source
var ErrMappingFilterType = errors.New("mapping filter did not return boolean")

ErrMappingFilterType is returned when a filter expression does not return a boolean

View Source
var ErrMappingNotFound = errors.New("integration mapping not found")

ErrMappingNotFound is returned when no mapping expression is available

View Source
var ErrMappingOutputEmpty = errors.New("mapping output was empty")

ErrMappingOutputEmpty is returned when a mapping expression returns nil

View Source
var ErrMappingRequiredField = errors.New("mapping output missing required field")

ErrMappingRequiredField is returned when a required mapping field is missing

View Source
var ErrMappingSchemaNotFound = errors.New("ingest: mapping schema not found")

ErrMappingSchemaNotFound is returned when the required mapping schema is missing

View Source
var IntegrationIngestRequestedTopic = soiree.NewTypedTopic[RequestedPayload](TopicIntegrationIngestRequested,
	soiree.WithObservability(soiree.ObservabilitySpec[RequestedPayload]{
		Operation: "handle_integration_ingest_requested",
		Origin:    "listeners",
	}),
)

IntegrationIngestRequestedTopic is emitted when webhook payloads should be ingested.

Functions

func DecodeAlertEnvelopes

func DecodeAlertEnvelopes(value any) ([]integrationtypes.AlertEnvelope, error)

DecodeAlertEnvelopes converts an arbitrary value into alert envelopes

func RegisterIngestListeners

func RegisterIngestListeners(bus *soiree.EventBus, db *ent.Client) error

RegisterIngestListeners registers ingest listeners on the supplied event bus.

func SupportsVulnerabilityIngest

func SupportsVulnerabilityIngest(provider integrationtypes.ProviderType, config openapi.IntegrationConfig) bool

SupportsVulnerabilityIngest reports whether default or configured mappings exist

Types

type Evaluator

type Evaluator interface {
	// EvaluateFilter evaluates a CEL filter expression and returns a boolean
	EvaluateFilter(ctx context.Context, expression string, vars map[string]any) (bool, error)
	// EvaluateMap evaluates a CEL expression and returns a JSON object map
	EvaluateMap(ctx context.Context, expression string, vars map[string]any) (map[string]any, error)
}

Evaluator defines the interface for mapping expression evaluation

type MappingEvaluator

type MappingEvaluator struct {
	// contains filtered or unexported fields
}

MappingEvaluator runs CEL expressions against integration payloads It is intentionally small to keep evaluation consistent across integrations

func NewMappingEvaluator

func NewMappingEvaluator() (*MappingEvaluator, error)

NewMappingEvaluator creates a CEL evaluator for integration mappings

func (*MappingEvaluator) EvaluateFilter

func (m *MappingEvaluator) EvaluateFilter(ctx context.Context, expression string, vars map[string]any) (bool, error)

EvaluateFilter evaluates a CEL filter expression and returns a boolean

func (*MappingEvaluator) EvaluateMap

func (m *MappingEvaluator) EvaluateMap(ctx context.Context, expression string, vars map[string]any) (map[string]any, error)

EvaluateMap evaluates a CEL expression and returns a JSON object map

type MappingVars

type MappingVars struct {
	// Payload holds the raw provider payload for mapping
	Payload map[string]any
	// Resource identifies the upstream resource associated with the payload
	Resource string
	// AlertType identifies the alert type for the payload
	AlertType string
	// Provider identifies the integration provider
	Provider integrationtypes.ProviderType
	// Operation identifies the operation that produced the payload
	Operation integrationtypes.OperationName
	// OrgID identifies the organization that owns the integration
	OrgID string
	// IntegrationID identifies the integration record
	IntegrationID string
	// Config holds operation configuration values
	Config map[string]any
	// IntegrationConfig holds integration-level configuration values
	IntegrationConfig map[string]any
	// ProviderState holds provider state captured during activation
	ProviderState map[string]any
}

MappingVars holds CEL variables for integration mappings

func (MappingVars) Map

func (m MappingVars) Map() map[string]any

Map converts MappingVars into the CEL variable map

type RequestedPayload

type RequestedPayload struct {
	// IntegrationID identifies the integration that owns the payload.
	IntegrationID string `json:"integration_id"`
	// Schema identifies the ingest mapping schema (vulnerability, asset, etc).
	Schema string `json:"schema"`
	// Envelopes holds provider alert payloads for ingestion.
	Envelopes []types.AlertEnvelope `json:"envelopes"`
}

RequestedPayload captures webhook alert envelopes for ingestion.

type VulnerabilityIngestRequest

type VulnerabilityIngestRequest struct {
	// OrgID identifies the organization that owns the vulnerabilities
	OrgID string
	// IntegrationID identifies the integration record
	IntegrationID string
	// Provider identifies the integration provider
	Provider integrationtypes.ProviderType
	// Operation identifies the operation that produced the alerts
	Operation integrationtypes.OperationName
	// IntegrationConfig supplies integration-level configuration for mapping
	IntegrationConfig openapi.IntegrationConfig
	// ProviderState carries provider-specific state for mapping
	ProviderState any
	// OperationConfig supplies operation-level configuration for mapping
	OperationConfig map[string]any
	// Envelopes holds the alert payloads to ingest
	Envelopes []integrationtypes.AlertEnvelope
	// DB provides access to the persistence layer
	DB *generated.Client
}

VulnerabilityIngestRequest defines the inputs required for vulnerability ingestion It expects alert envelopes produced by integration operations

func (*VulnerabilityIngestRequest) Validate

func (r *VulnerabilityIngestRequest) Validate() error

Validate checks that required fields are present

type VulnerabilityIngestResult

type VulnerabilityIngestResult struct {
	// Summary aggregates ingestion totals
	Summary VulnerabilityIngestSummary
	// Errors captures per-alert error messages
	Errors []string
}

VulnerabilityIngestResult captures the results of an ingestion run

func VulnerabilityAlerts

VulnerabilityAlerts maps provider alerts into vulnerability inputs and persists them

type VulnerabilityIngestSummary

type VulnerabilityIngestSummary struct {
	// Total counts total alerts processed
	Total int
	// Mapped counts alerts that produced mapped output
	Mapped int
	// Persisted counts alerts that were persisted
	Persisted int
	// Skipped counts alerts filtered out by mapping
	Skipped int
	// Failed counts alerts that failed mapping or persistence
	Failed int
	// Created counts new vulnerabilities created
	Created int
	// Updated counts existing vulnerabilities updated
	Updated int
}

VulnerabilityIngestSummary reports mapping and persistence stats

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL