Documentation
¶
Overview ¶
Package operations dispatches and executes definition-scoped operations for integrations
Generated by entx integration mapping generator as a starting point. Modify as needed.
Generated by entx integration mapping generator as a starting point. Modify as needed.
Generated by entx integration mapping generator as a starting point. Modify as needed.
Generated by entx integration mapping generator as a starting point. Modify as needed.
Generated by entx integration mapping generator as a starting point. Modify as needed.
Generated by entx integration mapping generator as a starting point. Modify as needed.
Generated by entx integration mapping generator as a starting point. Modify as needed.
Generated by entx integration mapping generator as a starting point. Modify as needed.
Code generated by entx integration mapping generator. DO NOT EDIT.
Generated by entx integration mapping generator as a starting point. Modify as needed.
Generated by entx integration mapping generator as a starting point. Modify as needed.
Index ¶
- Variables
- func CompleteRun(ctx context.Context, db *ent.Client, runID string, startedAt time.Time, ...) error
- func ContextCodecs() []gala.ContextCodec
- func CreatePendingRun(ctx context.Context, db *ent.Client, installation *ent.Integration, ...) (*ent.IntegrationRun, error)
- func EmitPayloadSets(ctx context.Context, ic IngestContext, operationName string, ...) error
- func LastSuccessfulRunAt(ctx context.Context, db *ent.Client, integrationID, operationName string) (*time.Time, error)
- func MarkRunRunning(ctx context.Context, db *ent.Client, runID string) error
- func NextCampaignRunAt(from time.Time, frequency enums.Frequency, interval int, timezone string) time.Time
- func ProcessPayloadSets(ctx context.Context, ic IngestContext, operationName string, ...) error
- func RegisterIngestListeners(runtime *gala.Gala) error
- func RegisterReconcileListener(runtime *gala.Gala, reg *registry.Registry, handle ReconcileHandler, ...) error
- func RegisterRecurringCampaignListener(runtime *gala.Gala, handle RecurringCampaignHandler, schedule gala.Schedule) error
- func RegisterRuntimeListeners(runtime *gala.Gala, reg *registry.Registry, ...) error
- func RegisterScheduledListener[T any](cfg ScheduledListenerConfig[T]) error
- func ValidateConfig(schema json.RawMessage, value json.RawMessage) error
- type DispatchRequest
- type DispatchResult
- type Envelope
- type IngestContext
- type IngestOptions
- type ReconcileEnvelope
- type ReconcileHandler
- type RecurringCampaignEnvelope
- type RecurringCampaignHandler
- type RunResult
- type ScheduledListenerConfig
- type WebhookEnvelope
Constants ¶
This section is empty.
Variables ¶
var ( // ErrGalaRequired indicates the gala dependency is missing ErrGalaRequired = errors.New("integrations/operations: gala required") // ErrDispatchInputInvalid indicates the queued operation request failed caller-input validation ErrDispatchInputInvalid = errors.New("integrations/operations: dispatch input invalid") // ErrInstallationIDRequired indicates the installation identifier is missing ErrInstallationIDRequired = errors.New("integrations/operations: installation id required") // ErrOperationConfigInvalid indicates queued operation config failed caller-input validation ErrOperationConfigInvalid = errors.New("integrations/operations: operation config invalid") // ErrRunIDRequired indicates the run identifier is missing ErrRunIDRequired = errors.New("integrations/operations: run id required") // ErrIngestDefinitionNotFound indicates the operation definition could not be resolved for ingest ErrIngestDefinitionNotFound = errors.New("integrations/operations: ingest definition not found") // ErrIngestSchemaNotFound indicates the generated ingest schema contract was not found ErrIngestSchemaNotFound = errors.New("integrations/operations: ingest schema not found") // ErrIngestSchemaNotDeclared indicates the payload schema was not declared in the operation's ingest contracts ErrIngestSchemaNotDeclared = errors.New("integrations/operations: ingest schema not declared in contracts") // ErrIngestMappingNotFound indicates the definition does not provide a mapping for the emitted payload variant ErrIngestMappingNotFound = errors.New("integrations/operations: ingest mapping not found") // ErrIngestFilterFailed indicates the CEL filter evaluation failed ErrIngestFilterFailed = errors.New("integrations/operations: ingest filter failed") // ErrIngestInstallationFilterConfigInvalid indicates the installation filter configuration could not be decoded ErrIngestInstallationFilterConfigInvalid = errors.New("integrations/operations: ingest installation filter config invalid") // ErrIngestTransformFailed indicates the CEL map evaluation failed ErrIngestTransformFailed = errors.New("integrations/operations: ingest transform failed") // ErrIngestMappedDocumentInvalid indicates the mapped payload did not satisfy the generated schema contract ErrIngestMappedDocumentInvalid = errors.New("integrations/operations: ingest mapped document invalid") // ErrIngestUpsertKeyMissing indicates the mapped payload omitted every generated upsert key ErrIngestUpsertKeyMissing = errors.New("integrations/operations: ingest upsert key missing") // ErrIngestUpsertConflict indicates the generated upsert keys matched more than one record ErrIngestUpsertConflict = errors.New("integrations/operations: ingest upsert conflict") // ErrIngestUnsupportedSchema indicates the runtime does not yet support the requested generated ingest schema ErrIngestUnsupportedSchema = errors.New("integrations/operations: ingest schema unsupported") // ErrIngestPersistFailed indicates the mapped record could not be persisted ErrIngestPersistFailed = errors.New("integrations/operations: ingest persistence failed") // ErrOperationDisabled indicates the operation is disabled for this installation and the reconcile cycle should stop ErrOperationDisabled = errors.New("integrations/operations: operation disabled") )
var ( // ReconcileTopic is the Gala topic name for reconciliation envelopes ReconcileTopic = gala.TopicName("integration." + reconcileSchemaName) )
var ( // RecurringCampaignTopic is the Gala topic name for recurring campaign polling RecurringCampaignTopic = gala.TopicName("campaign.recurring." + recurringCampaignSchemaName) )
Functions ¶
func CompleteRun ¶
func CompleteRun(ctx context.Context, db *ent.Client, runID string, startedAt time.Time, result RunResult) error
CompleteRun writes the final run outcome
func ContextCodecs ¶
func ContextCodecs() []gala.ContextCodec
ContextCodecs returns the durable context codecs required by integration dispatch and ingest listeners
func CreatePendingRun ¶
func CreatePendingRun(ctx context.Context, db *ent.Client, installation *ent.Integration, req DispatchRequest) (*ent.IntegrationRun, error)
CreatePendingRun inserts one pending run record for a dispatch request
func EmitPayloadSets ¶
func EmitPayloadSets(ctx context.Context, ic IngestContext, operationName string, contracts []types.IngestContract, payloadSets []types.IngestPayloadSet, options IngestOptions) error
EmitPayloadSets transforms one batch of mapped payload sets and dispatches them through the appropriate ingest path
func LastSuccessfulRunAt ¶ added in v1.19.1
func LastSuccessfulRunAt(ctx context.Context, db *ent.Client, integrationID, operationName string) (*time.Time, error)
LastSuccessfulRunAt returns the finish time of the most recent successful run for the given integration and operation, or nil if no successful run exists yet
func MarkRunRunning ¶
MarkRunRunning transitions one run to running
func NextCampaignRunAt ¶ added in v1.20.0
func NextCampaignRunAt(from time.Time, frequency enums.Frequency, interval int, timezone string) time.Time
NextCampaignRunAt computes the next run time from the given base time using calendar-based frequency and interval arithmetic. All frequencies are calendar-relative (month boundaries, not fixed durations) so time.AddDate is used rather than time.Add
func ProcessPayloadSets ¶
func ProcessPayloadSets(ctx context.Context, ic IngestContext, operationName string, contracts []types.IngestContract, payloadSets []types.IngestPayloadSet, options IngestOptions) error
ProcessPayloadSets persists one batch of mapped payload sets synchronously
func RegisterIngestListeners ¶
RegisterIngestListeners attaches second-stage ingest listeners for all supported generated ingest schemas
func RegisterReconcileListener ¶
func RegisterReconcileListener(runtime *gala.Gala, reg *registry.Registry, handle ReconcileHandler, schedule gala.Schedule) error
RegisterReconcileListener registers the Gala listener for integration reconciliation
func RegisterRecurringCampaignListener ¶ added in v1.20.0
func RegisterRecurringCampaignListener(runtime *gala.Gala, handle RecurringCampaignHandler, schedule gala.Schedule) error
RegisterRecurringCampaignListener registers the Gala listener for recurring campaign polling
func RegisterRuntimeListeners ¶
func RegisterRuntimeListeners(runtime *gala.Gala, reg *registry.Registry, operationHandle func(context.Context, Envelope) error, webhookHandle func(context.Context, WebhookEnvelope) error, reconcileHandle ReconcileHandler, reconcileSchedule gala.Schedule, recurringCampaignHandle RecurringCampaignHandler, recurringCampaignSchedule gala.Schedule) error
RegisterRuntimeListeners registers all Gala listeners needed by the integration runtime
func RegisterScheduledListener ¶ added in v1.20.0
func RegisterScheduledListener[T any](cfg ScheduledListenerConfig[T]) error
RegisterScheduledListener registers a self-sustaining Gala listener that processes one cycle, computes the next adaptive interval, and re-emits
func ValidateConfig ¶
func ValidateConfig(schema json.RawMessage, value json.RawMessage) error
ValidateConfig validates one raw configuration payload against the operation schema
Types ¶
type DispatchRequest ¶
type DispatchRequest struct {
// IntegrationID is the target integration identifier; required on the customer path, empty on the runtime path
IntegrationID string
// DefinitionID is the definition identifier carried as metadata on the runtime path
DefinitionID string
// OwnerID is the owning organization carried on the runtime path; derived from the DB record for customer dispatch
OwnerID string
// Operation is the definition-local operation identifier
Operation string
// Config is the operation configuration payload
Config json.RawMessage
// ForceClientRebuild requests client cache bypass
ForceClientRebuild bool
// RunType is the integration run type recorded for the dispatch
RunType enums.IntegrationRunType
// Workflow carries optional workflow linkage for workflow-triggered operations
Workflow *types.WorkflowMeta
// ScheduledAt defers execution until the specified time; nil means immediate
ScheduledAt *time.Time
// Runtime signals that this dispatch should use the runtime provider path
Runtime bool
}
DispatchRequest describes one requested operation dispatch
type DispatchResult ¶
type DispatchResult struct {
// RunID is the persisted run identifier
RunID string
// EventID is the emitted Gala event identifier
EventID string
// Status is the run status at dispatch time
Status enums.IntegrationRunStatus
}
DispatchResult captures the queued run metadata
func Dispatch ¶
func Dispatch(ctx context.Context, reg *registry.Registry, db *ent.Client, runtime *gala.Gala, req DispatchRequest) (DispatchResult, error)
Dispatch validates and enqueues one operation execution request. When DispatchRequest.Runtime is true, no DB integration lookup is performed and the client is resolved from the registry at execution time
type Envelope ¶
type Envelope struct {
types.ExecutionMetadata
// Config is the operation configuration payload
Config json.RawMessage `json:"config,omitempty"`
// ForceClientRebuild requests client cache bypass
ForceClientRebuild bool `json:"forceClientRebuild,omitempty"`
}
Envelope is the payload emitted to the operation topic
type IngestContext ¶
type IngestContext struct {
// Registry is the integration definition registry used to resolve mappings and definitions
Registry *registry.Registry
// DB is the ent client used for persistence
DB *ent.Client
// Runtime is the Gala instance used for async emit; nil on the synchronous persist path
Runtime *gala.Gala
// Integration is the integration record being ingested into
Integration *ent.Integration
}
IngestContext holds the stable per-integration dependencies shared across all ingest call paths
type IngestOptions ¶
type IngestOptions struct {
// DirectorySyncRunID groups all directory-related ingest records from one sync batch
DirectorySyncRunID string
// SkipDirectorySyncRunFinalization instructs the processor not to finalize the directory sync run after processing
SkipDirectorySyncRunFinalization bool
// Source identifies the mechanism that produced the ingest data (e.g. webhook, poll, manual)
Source integrationgenerated.IntegrationIngestSource
// RunID is a caller-supplied correlation identifier for the overall operation run
RunID string
// Webhook is the webhook name or identifier that triggered this ingest
Webhook string
// WebhookEvent is the event type reported by the webhook provider
WebhookEvent string
// DeliveryID is the provider-assigned delivery identifier; used for deduplication
DeliveryID string
// WorkflowMeta carries workflow instance context
WorkflowMeta *types.WorkflowMeta
}
IngestOptions carries the minimal ingest-time metadata needed by persistence
func IngestOptionsFromMetadata ¶
func IngestOptionsFromMetadata(source integrationgenerated.IntegrationIngestSource, m types.ExecutionMetadata) IngestOptions
IngestOptionsFromMetadata derives ingest options from execution metadata
type ReconcileEnvelope ¶
type ReconcileEnvelope struct {
types.ExecutionMetadata
// Schedule is the adaptive scheduling state carried across cycles
Schedule gala.ScheduleState `json:"schedule"`
}
ReconcileEnvelope is the durable payload for a scheduled reconciliation cycle
type ReconcileHandler ¶
type ReconcileHandler func(context.Context, ReconcileEnvelope) (int, error)
ReconcileHandler processes one reconciliation envelope and returns the number of operations dispatched (used as the delta for adaptive scheduling)
type RecurringCampaignEnvelope ¶ added in v1.20.0
type RecurringCampaignEnvelope struct {
// Schedule is the adaptive scheduling state carried across polling cycles
Schedule gala.ScheduleState `json:"schedule"`
}
RecurringCampaignEnvelope is the durable payload for a recurring campaign polling cycle
type RecurringCampaignHandler ¶ added in v1.20.0
type RecurringCampaignHandler func(context.Context, RecurringCampaignEnvelope) (int, error)
RecurringCampaignHandler processes one polling cycle and returns the number of campaigns dispatched (used as the delta for adaptive scheduling)
type RunResult ¶
type RunResult struct {
// Status is the terminal run status
Status enums.IntegrationRunStatus
// Summary is the optional summary text stored on the run
Summary string
// Error is the optional terminal error text stored on the run
Error string
// Metrics is the structured metrics payload stored on the run
Metrics map[string]any
}
RunResult captures the terminal state of one run
type ScheduledListenerConfig ¶ added in v1.20.0
type ScheduledListenerConfig[T any] struct { // Runtime is the Gala instance to register on Runtime *gala.Gala // Topic is the Gala topic name Topic gala.TopicName // Name is the stable listener name Name string // Schedule controls adaptive interval computation Schedule gala.Schedule // Handle is the handler invoked each cycle, returning the delta for scheduling Handle func(context.Context, T) (int, error) // State extracts the ScheduleState from the envelope State func(T) gala.ScheduleState // Wrap builds a new envelope carrying the updated ScheduleState Wrap func(T, gala.ScheduleState) T // PrepareEmit optionally enriches the context and headers before re-emitting PrepareEmit func(context.Context, T) (context.Context, gala.Headers) // ShouldCancel optionally classifies an execution error; when it returns // true the cycle is cancelled immediately without scheduling a retry ShouldCancel func(context.Context, T, error) bool // ScheduleOverride optionally returns a per-envelope schedule that // overrides the default; returning nil falls back to the config-level Schedule ScheduleOverride func(T) *gala.Schedule }
ScheduledListenerConfig defines the registration parameters for a self-sustaining Gala listener with adaptive scheduling
type WebhookEnvelope ¶
type WebhookEnvelope struct {
types.ExecutionMetadata
// Payload is the raw webhook request body
Payload json.RawMessage `json:"payload"`
// Headers contains the inbound HTTP request headers
Headers map[string]string `json:"headers,omitempty"`
}
WebhookEnvelope is the durable payload emitted for one inbound integration webhook event
Source Files
¶
- context.go
- dispatcher.go
- doc.go
- errors.go
- executor.go
- ingest.go
- ingest_asset_persist.go
- ingest_checkresult_persist.go
- ingest_contact_persist.go
- ingest_directoryaccount_persist.go
- ingest_directorygroup_persist.go
- ingest_directorymembership_persist.go
- ingest_entity_persist.go
- ingest_finding_persist.go
- ingest_generated.go
- ingest_risk_persist.go
- ingest_vulnerability_persist.go
- reconcile.go
- recurring_campaign.go
- run_store.go
- scheduled_listener.go
- textnormalize.go
- types.go
- upsert.go