operations

package
v1.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Overview

Package operations dispatches and executes definition-scoped operations for integrations

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Code generated by entx integration mapping generator. DO NOT EDIT.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Generated by entx integration mapping generator as a starting point. Modify as needed.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrGalaRequired indicates the gala dependency is missing
	ErrGalaRequired = errors.New("integrations/operations: gala required")
	// ErrDispatchInputInvalid indicates the queued operation request failed caller-input validation
	ErrDispatchInputInvalid = errors.New("integrations/operations: dispatch input invalid")
	// ErrInstallationIDRequired indicates the installation identifier is missing
	ErrInstallationIDRequired = errors.New("integrations/operations: installation id required")
	// ErrOperationConfigInvalid indicates queued operation config failed caller-input validation
	ErrOperationConfigInvalid = errors.New("integrations/operations: operation config invalid")
	// ErrRunIDRequired indicates the run identifier is missing
	ErrRunIDRequired = errors.New("integrations/operations: run id required")
	// ErrIngestDefinitionNotFound indicates the operation definition could not be resolved for ingest
	ErrIngestDefinitionNotFound = errors.New("integrations/operations: ingest definition not found")
	// ErrIngestSchemaNotFound indicates the generated ingest schema contract was not found
	ErrIngestSchemaNotFound = errors.New("integrations/operations: ingest schema not found")
	// ErrIngestSchemaNotDeclared indicates the payload schema was not declared in the operation's ingest contracts
	ErrIngestSchemaNotDeclared = errors.New("integrations/operations: ingest schema not declared in contracts")
	// ErrIngestMappingNotFound indicates the definition does not provide a mapping for the emitted payload variant
	ErrIngestMappingNotFound = errors.New("integrations/operations: ingest mapping not found")
	// ErrIngestFilterFailed indicates the CEL filter evaluation failed
	ErrIngestFilterFailed = errors.New("integrations/operations: ingest filter failed")
	// ErrIngestInstallationFilterConfigInvalid indicates the installation filter configuration could not be decoded
	ErrIngestInstallationFilterConfigInvalid = errors.New("integrations/operations: ingest installation filter config invalid")
	// ErrIngestTransformFailed indicates the CEL map evaluation failed
	ErrIngestTransformFailed = errors.New("integrations/operations: ingest transform failed")
	// ErrIngestMappedDocumentInvalid indicates the mapped payload did not satisfy the generated schema contract
	ErrIngestMappedDocumentInvalid = errors.New("integrations/operations: ingest mapped document invalid")
	// ErrIngestUpsertKeyMissing indicates the mapped payload omitted every generated upsert key
	ErrIngestUpsertKeyMissing = errors.New("integrations/operations: ingest upsert key missing")
	// ErrIngestUpsertConflict indicates the generated upsert keys matched more than one record
	ErrIngestUpsertConflict = errors.New("integrations/operations: ingest upsert conflict")
	// ErrIngestUnsupportedSchema indicates the runtime does not yet support the requested generated ingest schema
	ErrIngestUnsupportedSchema = errors.New("integrations/operations: ingest schema unsupported")
	// ErrIngestPersistFailed indicates the mapped record could not be persisted
	ErrIngestPersistFailed = errors.New("integrations/operations: ingest persistence failed")
	// ErrOperationDisabled indicates the operation is disabled for this installation and the reconcile cycle should stop
	ErrOperationDisabled = errors.New("integrations/operations: operation disabled")
)
View Source
var (
	// ReconcileTopic is the Gala topic name for reconciliation envelopes
	ReconcileTopic = gala.TopicName("integration." + reconcileSchemaName)
)
View Source
var (
	// RecurringCampaignTopic is the Gala topic name for recurring campaign polling
	RecurringCampaignTopic = gala.TopicName("campaign.recurring." + recurringCampaignSchemaName)
)

Functions

func CompleteRun

func CompleteRun(ctx context.Context, db *ent.Client, runID string, startedAt time.Time, result RunResult) error

CompleteRun writes the final run outcome

func ContextCodecs

func ContextCodecs() []gala.ContextCodec

ContextCodecs returns the durable context codecs required by integration dispatch and ingest listeners

func CreatePendingRun

func CreatePendingRun(ctx context.Context, db *ent.Client, installation *ent.Integration, req DispatchRequest) (*ent.IntegrationRun, error)

CreatePendingRun inserts one pending run record for a dispatch request

func EmitPayloadSets

func EmitPayloadSets(ctx context.Context, ic IngestContext, operationName string, contracts []types.IngestContract, payloadSets []types.IngestPayloadSet, options IngestOptions) error

EmitPayloadSets transforms one batch of mapped payload sets and dispatches them through the appropriate ingest path

func LastSuccessfulRunAt added in v1.19.1

func LastSuccessfulRunAt(ctx context.Context, db *ent.Client, integrationID, operationName string) (*time.Time, error)

LastSuccessfulRunAt returns the finish time of the most recent successful run for the given integration and operation, or nil if no successful run exists yet

func MarkRunRunning

func MarkRunRunning(ctx context.Context, db *ent.Client, runID string) error

MarkRunRunning transitions one run to running

func NextCampaignRunAt added in v1.20.0

func NextCampaignRunAt(from time.Time, frequency enums.Frequency, interval int, timezone string) time.Time

NextCampaignRunAt computes the next run time from the given base time using calendar-based frequency and interval arithmetic. All frequencies are calendar-relative (month boundaries, not fixed durations) so time.AddDate is used rather than time.Add

func ProcessPayloadSets

func ProcessPayloadSets(ctx context.Context, ic IngestContext, operationName string, contracts []types.IngestContract, payloadSets []types.IngestPayloadSet, options IngestOptions) error

ProcessPayloadSets persists one batch of mapped payload sets synchronously

func RegisterIngestListeners

func RegisterIngestListeners(runtime *gala.Gala) error

RegisterIngestListeners attaches second-stage ingest listeners for all supported generated ingest schemas

func RegisterReconcileListener

func RegisterReconcileListener(runtime *gala.Gala, reg *registry.Registry, handle ReconcileHandler, schedule gala.Schedule) error

RegisterReconcileListener registers the Gala listener for integration reconciliation

func RegisterRecurringCampaignListener added in v1.20.0

func RegisterRecurringCampaignListener(runtime *gala.Gala, handle RecurringCampaignHandler, schedule gala.Schedule) error

RegisterRecurringCampaignListener registers the Gala listener for recurring campaign polling

func RegisterRuntimeListeners

func RegisterRuntimeListeners(runtime *gala.Gala, reg *registry.Registry, operationHandle func(context.Context, Envelope) error, webhookHandle func(context.Context, WebhookEnvelope) error, reconcileHandle ReconcileHandler, reconcileSchedule gala.Schedule, recurringCampaignHandle RecurringCampaignHandler, recurringCampaignSchedule gala.Schedule) error

RegisterRuntimeListeners registers all Gala listeners needed by the integration runtime

func RegisterScheduledListener added in v1.20.0

func RegisterScheduledListener[T any](cfg ScheduledListenerConfig[T]) error

RegisterScheduledListener registers a self-sustaining Gala listener that processes one cycle, computes the next adaptive interval, and re-emits

func ValidateConfig

func ValidateConfig(schema json.RawMessage, value json.RawMessage) error

ValidateConfig validates one raw configuration payload against the operation schema

Types

type DispatchRequest

type DispatchRequest struct {
	// IntegrationID is the target integration identifier; required on the customer path, empty on the runtime path
	IntegrationID string
	// DefinitionID is the definition identifier carried as metadata on the runtime path
	DefinitionID string
	// OwnerID is the owning organization carried on the runtime path; derived from the DB record for customer dispatch
	OwnerID string
	// Operation is the definition-local operation identifier
	Operation string
	// Config is the operation configuration payload
	Config json.RawMessage
	// ForceClientRebuild requests client cache bypass
	ForceClientRebuild bool
	// RunType is the integration run type recorded for the dispatch
	RunType enums.IntegrationRunType
	// Workflow carries optional workflow linkage for workflow-triggered operations
	Workflow *types.WorkflowMeta
	// ScheduledAt defers execution until the specified time; nil means immediate
	ScheduledAt *time.Time
	// Runtime signals that this dispatch should use the runtime provider path
	Runtime bool
}

DispatchRequest describes one requested operation dispatch

type DispatchResult

type DispatchResult struct {
	// RunID is the persisted run identifier
	RunID string
	// EventID is the emitted Gala event identifier
	EventID string
	// Status is the run status at dispatch time
	Status enums.IntegrationRunStatus
}

DispatchResult captures the queued run metadata

func Dispatch

func Dispatch(ctx context.Context, reg *registry.Registry, db *ent.Client, runtime *gala.Gala, req DispatchRequest) (DispatchResult, error)

Dispatch validates and enqueues one operation execution request. When DispatchRequest.Runtime is true, no DB integration lookup is performed and the client is resolved from the registry at execution time

type Envelope

type Envelope struct {
	types.ExecutionMetadata
	// Config is the operation configuration payload
	Config json.RawMessage `json:"config,omitempty"`
	// ForceClientRebuild requests client cache bypass
	ForceClientRebuild bool `json:"forceClientRebuild,omitempty"`
}

Envelope is the payload emitted to the operation topic

type IngestContext

type IngestContext struct {
	// Registry is the integration definition registry used to resolve mappings and definitions
	Registry *registry.Registry
	// DB is the ent client used for persistence
	DB *ent.Client
	// Runtime is the Gala instance used for async emit; nil on the synchronous persist path
	Runtime *gala.Gala
	// Integration is the integration record being ingested into
	Integration *ent.Integration
}

IngestContext holds the stable per-integration dependencies shared across all ingest call paths

type IngestOptions

type IngestOptions struct {
	// DirectorySyncRunID groups all directory-related ingest records from one sync batch
	DirectorySyncRunID string
	// SkipDirectorySyncRunFinalization instructs the processor not to finalize the directory sync run after processing
	SkipDirectorySyncRunFinalization bool
	// Source identifies the mechanism that produced the ingest data (e.g. webhook, poll, manual)
	Source integrationgenerated.IntegrationIngestSource
	// RunID is a caller-supplied correlation identifier for the overall operation run
	RunID string
	// Webhook is the webhook name or identifier that triggered this ingest
	Webhook string
	// WebhookEvent is the event type reported by the webhook provider
	WebhookEvent string
	// DeliveryID is the provider-assigned delivery identifier; used for deduplication
	DeliveryID string
	// WorkflowMeta carries workflow instance context
	WorkflowMeta *types.WorkflowMeta
}

IngestOptions carries the minimal ingest-time metadata needed by persistence

func IngestOptionsFromMetadata

IngestOptionsFromMetadata derives ingest options from execution metadata

type ReconcileEnvelope

type ReconcileEnvelope struct {
	types.ExecutionMetadata
	// Schedule is the adaptive scheduling state carried across cycles
	Schedule gala.ScheduleState `json:"schedule"`
}

ReconcileEnvelope is the durable payload for a scheduled reconciliation cycle

type ReconcileHandler

type ReconcileHandler func(context.Context, ReconcileEnvelope) (int, error)

ReconcileHandler processes one reconciliation envelope and returns the number of operations dispatched (used as the delta for adaptive scheduling)

type RecurringCampaignEnvelope added in v1.20.0

type RecurringCampaignEnvelope struct {
	// Schedule is the adaptive scheduling state carried across polling cycles
	Schedule gala.ScheduleState `json:"schedule"`
}

RecurringCampaignEnvelope is the durable payload for a recurring campaign polling cycle

type RecurringCampaignHandler added in v1.20.0

type RecurringCampaignHandler func(context.Context, RecurringCampaignEnvelope) (int, error)

RecurringCampaignHandler processes one polling cycle and returns the number of campaigns dispatched (used as the delta for adaptive scheduling)

type RunResult

type RunResult struct {
	// Status is the terminal run status
	Status enums.IntegrationRunStatus
	// Summary is the optional summary text stored on the run
	Summary string
	// Error is the optional terminal error text stored on the run
	Error string
	// Metrics is the structured metrics payload stored on the run
	Metrics map[string]any
}

RunResult captures the terminal state of one run

type ScheduledListenerConfig added in v1.20.0

type ScheduledListenerConfig[T any] struct {
	// Runtime is the Gala instance to register on
	Runtime *gala.Gala
	// Topic is the Gala topic name
	Topic gala.TopicName
	// Name is the stable listener name
	Name string
	// Schedule controls adaptive interval computation
	Schedule gala.Schedule
	// Handle is the handler invoked each cycle, returning the delta for scheduling
	Handle func(context.Context, T) (int, error)
	// State extracts the ScheduleState from the envelope
	State func(T) gala.ScheduleState
	// Wrap builds a new envelope carrying the updated ScheduleState
	Wrap func(T, gala.ScheduleState) T
	// PrepareEmit optionally enriches the context and headers before re-emitting
	PrepareEmit func(context.Context, T) (context.Context, gala.Headers)
	// ShouldCancel optionally classifies an execution error; when it returns
	// true the cycle is cancelled immediately without scheduling a retry
	ShouldCancel func(context.Context, T, error) bool
	// ScheduleOverride optionally returns a per-envelope schedule that
	// overrides the default; returning nil falls back to the config-level Schedule
	ScheduleOverride func(T) *gala.Schedule
}

ScheduledListenerConfig defines the registration parameters for a self-sustaining Gala listener with adaptive scheduling

type WebhookEnvelope

type WebhookEnvelope struct {
	types.ExecutionMetadata
	// Payload is the raw webhook request body
	Payload json.RawMessage `json:"payload"`
	// Headers contains the inbound HTTP request headers
	Headers map[string]string `json:"headers,omitempty"`
}

WebhookEnvelope is the durable payload emitted for one inbound integration webhook event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL