operations

package
v1.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2026 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Overview

Package operations dispatches and executes definition-scoped operations for integrations

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Code generated by entx integration mapping generator. DO NOT EDIT.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrGalaRequired indicates the gala dependency is missing
	ErrGalaRequired = errors.New("integrations/operations: gala required")
	// ErrDispatchInputInvalid indicates the queued operation request failed caller-input validation
	ErrDispatchInputInvalid = errors.New("integrations/operations: dispatch input invalid")
	// ErrInstallationIDRequired indicates the installation identifier is missing
	ErrInstallationIDRequired = errors.New("integrations/operations: installation id required")
	// ErrOperationConfigInvalid indicates queued operation config failed caller-input validation
	ErrOperationConfigInvalid = errors.New("integrations/operations: operation config invalid")
	// ErrRunIDRequired indicates the run identifier is missing
	ErrRunIDRequired = errors.New("integrations/operations: run id required")
	// ErrIngestDefinitionNotFound indicates the operation definition could not be resolved for ingest
	ErrIngestDefinitionNotFound = errors.New("integrations/operations: ingest definition not found")
	// ErrIngestSchemaNotFound indicates the generated ingest schema contract was not found
	ErrIngestSchemaNotFound = errors.New("integrations/operations: ingest schema not found")
	// ErrIngestSchemaNotDeclared indicates the payload schema was not declared in the operation's ingest contracts
	ErrIngestSchemaNotDeclared = errors.New("integrations/operations: ingest schema not declared in contracts")
	// ErrIngestMappingNotFound indicates the definition does not provide a mapping for the emitted payload variant
	ErrIngestMappingNotFound = errors.New("integrations/operations: ingest mapping not found")
	// ErrIngestFilterFailed indicates the CEL filter evaluation failed
	ErrIngestFilterFailed = errors.New("integrations/operations: ingest filter failed")
	// ErrIngestInstallationFilterConfigInvalid indicates the installation filter configuration could not be decoded
	ErrIngestInstallationFilterConfigInvalid = errors.New("integrations/operations: ingest installation filter config invalid")
	// ErrIngestTransformFailed indicates the CEL map evaluation failed
	ErrIngestTransformFailed = errors.New("integrations/operations: ingest transform failed")
	// ErrIngestMappedDocumentInvalid indicates the mapped payload did not satisfy the generated schema contract
	ErrIngestMappedDocumentInvalid = errors.New("integrations/operations: ingest mapped document invalid")
	// ErrIngestUpsertKeyMissing indicates the mapped payload omitted every generated upsert key
	ErrIngestUpsertKeyMissing = errors.New("integrations/operations: ingest upsert key missing")
	// ErrIngestUpsertConflict indicates the generated upsert keys matched more than one record
	ErrIngestUpsertConflict = errors.New("integrations/operations: ingest upsert conflict")
	// ErrIngestUnsupportedSchema indicates the runtime does not yet support the requested generated ingest schema
	ErrIngestUnsupportedSchema = errors.New("integrations/operations: ingest schema unsupported")
	// ErrIngestPersistFailed indicates the mapped record could not be persisted
	ErrIngestPersistFailed = errors.New("integrations/operations: ingest persistence failed")
)
View Source
var (
	// ReconcileTopic is the Gala topic name for reconciliation envelopes
	ReconcileTopic = gala.TopicName("integration." + reconcileSchemaName)
)

Functions

func CompleteRun

func CompleteRun(ctx context.Context, db *ent.Client, runID string, startedAt time.Time, result RunResult) error

CompleteRun writes the final run outcome

func ContextCodecs

func ContextCodecs() []gala.ContextCodec

ContextCodecs returns the durable context codecs required by integration dispatch and ingest listeners

func CreatePendingRun

func CreatePendingRun(ctx context.Context, db *ent.Client, installation *ent.Integration, req DispatchRequest) (*ent.IntegrationRun, error)

CreatePendingRun inserts one pending run record for a dispatch request

func EmitPayloadSets

func EmitPayloadSets(ctx context.Context, ic IngestContext, operationName string, contracts []types.IngestContract, payloadSets []types.IngestPayloadSet, options IngestOptions) error

EmitPayloadSets transforms one batch of mapped payload sets and dispatches them through the appropriate ingest path

func MarkRunRunning

func MarkRunRunning(ctx context.Context, db *ent.Client, runID string) error

MarkRunRunning transitions one run to running

func ProcessPayloadSets

func ProcessPayloadSets(ctx context.Context, ic IngestContext, contracts []types.IngestContract, payloadSets []types.IngestPayloadSet, options IngestOptions) error

ProcessPayloadSets persists one batch of mapped payload sets synchronously

func RegisterIngestListeners

func RegisterIngestListeners(runtime *gala.Gala) error

RegisterIngestListeners attaches second-stage ingest listeners for all supported generated ingest schemas

func RegisterReconcileListener

func RegisterReconcileListener(runtime *gala.Gala, handle ReconcileHandler, schedule gala.Schedule) error

RegisterReconcileListener registers the Gala listener for integration reconciliation

func RegisterRuntimeListeners

func RegisterRuntimeListeners(runtime *gala.Gala, reg *registry.Registry, operationHandle func(context.Context, Envelope) error, webhookHandle func(context.Context, WebhookEnvelope) error, reconcileHandle ReconcileHandler, reconcileSchedule gala.Schedule) error

RegisterRuntimeListeners registers all Gala listeners needed by the integration runtime

func ValidateConfig

func ValidateConfig(schema json.RawMessage, value json.RawMessage) error

ValidateConfig validates one raw configuration payload against the operation schema

Types

type DispatchRequest

type DispatchRequest struct {
	// IntegrationID is the target installation identifier
	IntegrationID string
	// Operation is the definition-local operation identifier
	Operation string
	// Config is the operation configuration payload
	Config json.RawMessage
	// ForceClientRebuild requests client cache bypass
	ForceClientRebuild bool
	// RunType is the integration run type recorded for the dispatch
	RunType enums.IntegrationRunType
	// Workflow carries optional workflow linkage for workflow-triggered operations
	Workflow *types.WorkflowMeta
}

DispatchRequest describes one requested operation dispatch

type DispatchResult

type DispatchResult struct {
	// RunID is the persisted run identifier
	RunID string
	// EventID is the emitted Gala event identifier
	EventID string
	// Status is the run status at dispatch time
	Status enums.IntegrationRunStatus
}

DispatchResult captures the queued run metadata

func Dispatch

func Dispatch(ctx context.Context, reg *registry.Registry, db *ent.Client, runtime *gala.Gala, req DispatchRequest) (DispatchResult, error)

Dispatch validates and enqueues one operation execution request

type Envelope

type Envelope struct {
	types.ExecutionMetadata
	// Config is the operation configuration payload
	Config json.RawMessage `json:"config,omitempty"`
	// ForceClientRebuild requests client cache bypass
	ForceClientRebuild bool `json:"forceClientRebuild,omitempty"`
}

Envelope is the payload emitted to the operation topic

type IngestContext

type IngestContext struct {
	// Registry is the integration definition registry used to resolve mappings and definitions
	Registry *registry.Registry
	// DB is the ent client used for persistence
	DB *ent.Client
	// Runtime is the Gala instance used for async emit; nil on the synchronous persist path
	Runtime *gala.Gala
	// Integration is the integration record being ingested into
	Integration *ent.Integration
}

IngestContext holds the stable per-integration dependencies shared across all ingest call paths

type IngestOptions

type IngestOptions struct {
	// DirectorySyncRunID groups all directory-related ingest records from one sync batch
	DirectorySyncRunID string
	// SkipDirectorySyncRunFinalization instructs the processor not to finalize the directory sync run after processing
	SkipDirectorySyncRunFinalization bool
	// Source identifies the mechanism that produced the ingest data (e.g. webhook, poll, manual)
	Source integrationgenerated.IntegrationIngestSource
	// RunID is a caller-supplied correlation identifier for the overall operation run
	RunID string
	// Webhook is the webhook name or identifier that triggered this ingest
	Webhook string
	// WebhookEvent is the event type reported by the webhook provider
	WebhookEvent string
	// DeliveryID is the provider-assigned delivery identifier; used for deduplication
	DeliveryID string
	// WorkflowMeta carries workflow instance context
	WorkflowMeta *types.WorkflowMeta
}

IngestOptions carries the minimal ingest-time metadata needed by persistence

func IngestOptionsFromMetadata

IngestOptionsFromMetadata derives ingest options from execution metadata

type ReconcileEnvelope

type ReconcileEnvelope struct {
	types.ExecutionMetadata
	// Schedule is the adaptive scheduling state carried across cycles
	Schedule gala.ScheduleState `json:"schedule"`
}

ReconcileEnvelope is the durable payload for a scheduled reconciliation cycle

type ReconcileHandler

type ReconcileHandler func(context.Context, ReconcileEnvelope) (int, error)

ReconcileHandler processes one reconciliation envelope and returns the number of operations dispatched (used as the delta for adaptive scheduling)

type RunResult

type RunResult struct {
	// Status is the terminal run status
	Status enums.IntegrationRunStatus
	// Summary is the optional summary text stored on the run
	Summary string
	// Error is the optional terminal error text stored on the run
	Error string
	// Metrics is the structured metrics payload stored on the run
	Metrics map[string]any
}

RunResult captures the terminal state of one run

type WebhookEnvelope

type WebhookEnvelope struct {
	types.ExecutionMetadata
	// Payload is the raw webhook request body
	Payload json.RawMessage `json:"payload"`
	// Headers contains the inbound HTTP request headers
	Headers map[string]string `json:"headers,omitempty"`
}

WebhookEnvelope is the durable payload emitted for one inbound integration webhook event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL