runs

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JobStatusPending   = "pending"
	JobStatusClaimed   = "claimed"
	JobStatusRunning   = "running"
	JobStatusSucceeded = "succeeded"
	JobStatusFailed    = "failed"
	JobStatusCancelled = "cancelled"
)

Job run status constants

View Source
const (
	DefaultSchedulerInterval = 1 * time.Minute
	DefaultMaxWorkers        = 10
	DefaultLeaseExpiry       = 5 * time.Minute
	DefaultClaimExpiry       = 30 * time.Second
)
View Source
const (
	StatusCreated   = "created"
	StatusUpdated   = "updated"
	StatusUnchanged = "unchanged"
	StatusDeleted   = "deleted"
	StatusFailed    = "failed"
)

Variables

View Source
var (
	ErrScheduleNotFound      = errors.New("schedule not found")
	ErrScheduleNameExists    = errors.New("schedule name already exists")
	ErrJobRunNotFound        = errors.New("job run not found")
	ErrJobRunNotClaimable    = errors.New("job run not claimable")
	ErrInvalidJobStatus      = errors.New("invalid job status")
	ErrInvalidCronExpression = errors.New("invalid cron expression")
)
View Source
var (
	ErrRunNotFound   = errors.New("run not found")
	ErrInvalidInput  = errors.New("invalid input")
	ErrInvalidStatus = errors.New("invalid status transition")
)
View Source
var (
	ErrNotFound = errors.New("run not found")
	ErrConflict = errors.New("run already exists")
)
View Source
var (
	ErrEncryptionNotConfigured = errors.New("encryption not configured: MARMOT_SERVER_ENCRYPTION_KEY must be set")
)

Functions

func DecryptScheduleConfig added in v0.5.0

func DecryptScheduleConfig(schedule *Schedule, encryptor *crypto.Encryptor) error

DecryptScheduleConfig decrypts sensitive fields in a schedule's config

func EncryptScheduleConfig added in v0.5.0

func EncryptScheduleConfig(schedule *Schedule, encryptor *crypto.Encryptor) error

EncryptScheduleConfig encrypts sensitive fields in a schedule's config

func GetEncryptor added in v0.5.0

func GetEncryptor(cfg *config.Config) (*crypto.Encryptor, error)

GetEncryptor creates an encryptor from the config

func ValidJobStatus added in v0.5.0

func ValidJobStatus(status string) bool

ValidJobStatus checks if a job status is valid

Types

type AssetResult

type AssetResult struct {
	Name     string      `json:"name"`
	Type     string      `json:"type"`
	Provider string      `json:"provider"`
	MRN      string      `json:"mrn"`
	Asset    interface{} `json:"asset"`
	Status   string      `json:"status"`
	Error    string      `json:"error,omitempty"`
}

type CreateAssetInput

type CreateAssetInput struct {
	Name          string                 `json:"name"`
	MRN           *string                `json:"mrn,omitempty"`
	Type          string                 `json:"type"`
	Providers     []string               `json:"providers"`
	Description   *string                `json:"description"`
	Metadata      map[string]interface{} `json:"metadata"`
	Schema        map[string]interface{} `json:"schema"`
	Tags          []string               `json:"tags"`
	Sources       []string               `json:"sources"`
	ExternalLinks []map[string]string    `json:"external_links"`
	Query         *string                `json:"query,omitempty"`
	QueryLanguage *string                `json:"query_language,omitempty"`
}

type DestroyRunResponse

type DestroyRunResponse struct {
	AssetsDeleted        int      `json:"assets_deleted"`
	LineageDeleted       int      `json:"lineage_deleted"`
	DocumentationDeleted int      `json:"documentation_deleted"`
	DeletedEntityMRNs    []string `json:"deleted_entity_mrns"`
}

type DocumentationInput

type DocumentationInput struct {
	AssetMRN string `json:"asset_mrn"`
	Content  string `json:"content"`
	Type     string `json:"type"`
}

type DocumentationResult

type DocumentationResult struct {
	AssetMRN string `json:"asset_mrn"`
	Type     string `json:"type"`
	Status   string `json:"status"`
	Error    string `json:"error,omitempty"`
}

type EventBroadcaster added in v0.5.0

type EventBroadcaster interface {
	BroadcastJobRunCreated(run *JobRun)
	BroadcastJobRunClaimed(run *JobRun)
	BroadcastJobRunStarted(run *JobRun)
	BroadcastJobRunProgress(run *JobRun)
	BroadcastJobRunCompleted(run *JobRun)
	BroadcastJobRunCancelled(run *JobRun)
}

EventBroadcaster defines the interface for broadcasting job run events

type JobRun added in v0.5.0

type JobRun struct {
	ID                 string                 `json:"id"`
	ScheduleID         *string                `json:"schedule_id,omitempty"`
	PluginRunID        *string                `json:"plugin_run_id,omitempty"`
	PipelineName       string                 `json:"pipeline_name"`
	SourceName         string                 `json:"source_name"`
	RunID              string                 `json:"run_id"`
	Status             string                 `json:"status"`
	ClaimedBy          *string                `json:"claimed_by,omitempty"`
	ClaimedAt          *time.Time             `json:"claimed_at,omitempty"`
	StartedAt          *time.Time             `json:"started_at,omitempty"`
	FinishedAt         *time.Time             `json:"finished_at,omitempty"`
	Log                *string                `json:"log,omitempty"`
	ErrorMessage       *string                `json:"error_message,omitempty"`
	AssetsCreated      int                    `json:"assets_created"`
	AssetsUpdated      int                    `json:"assets_updated"`
	AssetsDeleted      int                    `json:"assets_deleted"`
	LineageCreated     int                    `json:"lineage_created"`
	DocumentationAdded int                    `json:"documentation_added"`
	Config             map[string]interface{} `json:"config,omitempty"`
	CreatedBy          string                 `json:"created_by"`
	CreatedAt          time.Time              `json:"created_at"`
	UpdatedAt          time.Time              `json:"updated_at"`
}

type LineageInput

type LineageInput struct {
	Source string `json:"source"`
	Target string `json:"target"`
	Type   string `json:"type"`
}

type LineageResult

type LineageResult struct {
	Source string `json:"source"`
	Target string `json:"target"`
	Type   string `json:"type"`
	Status string `json:"status"`
	Error  string `json:"error,omitempty"`
}

type NoopBroadcaster added in v0.5.0

type NoopBroadcaster struct{}

NoopBroadcaster is a broadcaster that does nothing (for when websockets are disabled)

func (*NoopBroadcaster) BroadcastJobRunCancelled added in v0.5.0

func (n *NoopBroadcaster) BroadcastJobRunCancelled(run *JobRun)

func (*NoopBroadcaster) BroadcastJobRunClaimed added in v0.5.0

func (n *NoopBroadcaster) BroadcastJobRunClaimed(run *JobRun)

func (*NoopBroadcaster) BroadcastJobRunCompleted added in v0.5.0

func (n *NoopBroadcaster) BroadcastJobRunCompleted(run *JobRun)

func (*NoopBroadcaster) BroadcastJobRunCreated added in v0.5.0

func (n *NoopBroadcaster) BroadcastJobRunCreated(run *JobRun)

func (*NoopBroadcaster) BroadcastJobRunProgress added in v0.5.0

func (n *NoopBroadcaster) BroadcastJobRunProgress(run *JobRun)

func (*NoopBroadcaster) BroadcastJobRunStarted added in v0.5.0

func (n *NoopBroadcaster) BroadcastJobRunStarted(run *JobRun)

type PostgresRepository

type PostgresRepository struct {
	// contains filtered or unexported fields
}

func NewPostgresRepository

func NewPostgresRepository(db *pgxpool.Pool) *PostgresRepository

func (*PostgresRepository) AddCheckpoint

func (r *PostgresRepository) AddCheckpoint(ctx context.Context, runDBID string, checkpoint *plugin.RunCheckpoint) error

func (*PostgresRepository) AddRunEntity

func (r *PostgresRepository) AddRunEntity(ctx context.Context, runDBID string, entity *RunEntity) error

func (*PostgresRepository) CleanupStaleRuns

func (r *PostgresRepository) CleanupStaleRuns(ctx context.Context, timeout time.Duration) (int, error)

func (*PostgresRepository) Create

func (r *PostgresRepository) Create(ctx context.Context, run *plugin.Run) error

func (*PostgresRepository) DeleteCheckpoints

func (r *PostgresRepository) DeleteCheckpoints(ctx context.Context, pipelineName, sourceName string) error

func (*PostgresRepository) Get

func (r *PostgresRepository) Get(ctx context.Context, id string) (*plugin.Run, error)

func (*PostgresRepository) GetByRunID

func (r *PostgresRepository) GetByRunID(ctx context.Context, runID string) (*plugin.Run, error)

func (*PostgresRepository) GetLastRunCheckpoints

func (r *PostgresRepository) GetLastRunCheckpoints(ctx context.Context, pipelineName, sourceName string) (map[string]*plugin.RunCheckpoint, error)

func (*PostgresRepository) GetPipelines

func (r *PostgresRepository) GetPipelines(ctx context.Context) ([]string, error)

func (*PostgresRepository) List

func (r *PostgresRepository) List(ctx context.Context, pipelineName string, limit, offset int) ([]*plugin.Run, int, error)

func (*PostgresRepository) ListRunEntities

func (r *PostgresRepository) ListRunEntities(ctx context.Context, runDBID, entityType, status string, limit, offset int) ([]*RunEntity, int, error)

func (*PostgresRepository) ListWithFilters

func (r *PostgresRepository) ListWithFilters(ctx context.Context, pipelines, statuses []string, limit, offset int) ([]*plugin.Run, int, []string, error)

func (*PostgresRepository) Update

func (r *PostgresRepository) Update(ctx context.Context, run *plugin.Run) error

type ProcessAssetsResponse

type ProcessAssetsResponse struct {
	Assets               []AssetResult         `json:"assets"`
	Lineage              []LineageResult       `json:"lineage"`
	Documentation        []DocumentationResult `json:"documentation"`
	StaleEntitiesRemoved []string              `json:"stale_entities_removed,omitempty"`
}

type Repository

type Repository interface {
	Create(ctx context.Context, run *plugin.Run) error
	Get(ctx context.Context, id string) (*plugin.Run, error)
	GetByRunID(ctx context.Context, runID string) (*plugin.Run, error)
	Update(ctx context.Context, run *plugin.Run) error
	List(ctx context.Context, pipelineName string, limit, offset int) ([]*plugin.Run, int, error)
	ListWithFilters(ctx context.Context, pipelines, statuses []string, limit, offset int) ([]*plugin.Run, int, []string, error)
	AddCheckpoint(ctx context.Context, runDBID string, checkpoint *plugin.RunCheckpoint) error
	DeleteCheckpoints(ctx context.Context, pipelineName, sourceName string) error
	GetLastRunCheckpoints(ctx context.Context, pipelineName, sourceName string) (map[string]*plugin.RunCheckpoint, error)
	CleanupStaleRuns(ctx context.Context, timeout time.Duration) (int, error)
	AddRunEntity(ctx context.Context, runDBID string, entity *RunEntity) error
	ListRunEntities(ctx context.Context, runDBID, entityType, status string, limit, offset int) ([]*RunEntity, int, error)
}

type RunCompletionObserver added in v0.6.0

type RunCompletionObserver interface {
	OnRunCompleted(ctx context.Context, run *plugin.Run)
}

RunCompletionObserver is notified when runs complete.

type RunEntity

type RunEntity struct {
	ID           string    `json:"id"`
	RunID        string    `json:"run_id"`
	EntityType   string    `json:"entity_type"`
	EntityMRN    string    `json:"entity_mrn"`
	EntityName   string    `json:"entity_name,omitempty"`
	Status       string    `json:"status"`
	ErrorMessage string    `json:"error_message,omitempty"`
	CreatedAt    time.Time `json:"created_at"`
}

type RunHistoryInput added in v0.5.0

type RunHistoryInput struct {
	AssetMRN     string                 `json:"asset_mrn"`
	RunID        string                 `json:"run_id"`
	JobNamespace string                 `json:"job_namespace"`
	JobName      string                 `json:"job_name"`
	EventType    string                 `json:"event_type"`
	EventTime    time.Time              `json:"event_time"`
	RunFacets    map[string]interface{} `json:"run_facets,omitempty"`
	JobFacets    map[string]interface{} `json:"job_facets,omitempty"`
}

type Schedule added in v0.5.0

type Schedule struct {
	ID             string                 `json:"id"`
	Name           string                 `json:"name"`
	PluginID       string                 `json:"plugin_id"`
	Config         map[string]interface{} `json:"config"`
	CronExpression string                 `json:"cron_expression"`
	Enabled        bool                   `json:"enabled"`
	LastRunAt      *time.Time             `json:"last_run_at,omitempty"`
	LastRunStatus  *string                `json:"last_run_status,omitempty"`
	NextRunAt      *time.Time             `json:"next_run_at,omitempty"`
	CreatedBy      *string                `json:"created_by,omitempty"`
	CreatedAt      time.Time              `json:"created_at"`
	UpdatedAt      time.Time              `json:"updated_at"`
}

type SchedulePostgresRepository added in v0.5.0

type SchedulePostgresRepository struct {
	// contains filtered or unexported fields
}

func (*SchedulePostgresRepository) CancelJobRun added in v0.5.0

func (r *SchedulePostgresRepository) CancelJobRun(ctx context.Context, id string) error

func (*SchedulePostgresRepository) ClaimJobRun added in v0.5.0

func (r *SchedulePostgresRepository) ClaimJobRun(ctx context.Context, id, workerID string) (*JobRun, error)

func (*SchedulePostgresRepository) CompleteJobRun added in v0.5.0

func (r *SchedulePostgresRepository) CompleteJobRun(ctx context.Context, id string, status string, errorMessage *string, assetsCreated, assetsUpdated, assetsDeleted, lineageCreated, documentationAdded int) error

func (*SchedulePostgresRepository) CreateJobRun added in v0.5.0

func (r *SchedulePostgresRepository) CreateJobRun(ctx context.Context, run *JobRun) error

func (*SchedulePostgresRepository) CreateSchedule added in v0.5.0

func (r *SchedulePostgresRepository) CreateSchedule(ctx context.Context, schedule *Schedule) error

func (*SchedulePostgresRepository) DeleteSchedule added in v0.5.0

func (r *SchedulePostgresRepository) DeleteSchedule(ctx context.Context, id string) error

func (*SchedulePostgresRepository) GetJobRun added in v0.5.0

func (r *SchedulePostgresRepository) GetJobRun(ctx context.Context, id string) (*JobRun, error)

func (*SchedulePostgresRepository) GetSchedule added in v0.5.0

func (r *SchedulePostgresRepository) GetSchedule(ctx context.Context, id string) (*Schedule, error)

func (*SchedulePostgresRepository) GetScheduleByName added in v0.5.0

func (r *SchedulePostgresRepository) GetScheduleByName(ctx context.Context, name string) (*Schedule, error)

func (*SchedulePostgresRepository) GetSchedulesDueForRun added in v0.5.0

func (r *SchedulePostgresRepository) GetSchedulesDueForRun(ctx context.Context, limit int) ([]*Schedule, error)

func (*SchedulePostgresRepository) ListJobRuns added in v0.5.0

func (r *SchedulePostgresRepository) ListJobRuns(ctx context.Context, scheduleID *string, status *string, limit, offset int) ([]*JobRun, int, error)

func (*SchedulePostgresRepository) ListSchedules added in v0.5.0

func (r *SchedulePostgresRepository) ListSchedules(ctx context.Context, enabled *bool, limit, offset int) ([]*Schedule, int, error)

func (*SchedulePostgresRepository) ReleaseExpiredClaims added in v0.5.0

func (r *SchedulePostgresRepository) ReleaseExpiredClaims(ctx context.Context, expiry time.Duration) (int, error)

func (*SchedulePostgresRepository) SetJobRunPluginRunID added in v0.5.0

func (r *SchedulePostgresRepository) SetJobRunPluginRunID(ctx context.Context, jobRunID, pluginRunID string) error

func (*SchedulePostgresRepository) UpdateJobRun added in v0.5.0

func (r *SchedulePostgresRepository) UpdateJobRun(ctx context.Context, run *JobRun) error

func (*SchedulePostgresRepository) UpdateJobRunProgress added in v0.5.0

func (r *SchedulePostgresRepository) UpdateJobRunProgress(ctx context.Context, id string, assetsCreated, assetsUpdated, assetsDeleted, lineageCreated, documentationAdded int) error

func (*SchedulePostgresRepository) UpdateJobRunStatus added in v0.5.0

func (r *SchedulePostgresRepository) UpdateJobRunStatus(ctx context.Context, id, status string) error

func (*SchedulePostgresRepository) UpdateSchedule added in v0.5.0

func (r *SchedulePostgresRepository) UpdateSchedule(ctx context.Context, schedule *Schedule) error

func (*SchedulePostgresRepository) UpdateScheduleLastRun added in v0.5.0

func (r *SchedulePostgresRepository) UpdateScheduleLastRun(ctx context.Context, id string, lastRunAt time.Time) error

func (*SchedulePostgresRepository) UpdateScheduleNextRun added in v0.5.0

func (r *SchedulePostgresRepository) UpdateScheduleNextRun(ctx context.Context, id string, nextRunAt time.Time) error

type ScheduleRepository added in v0.5.0

type ScheduleRepository interface {
	// Schedule operations
	CreateSchedule(ctx context.Context, schedule *Schedule) error
	GetSchedule(ctx context.Context, id string) (*Schedule, error)
	GetScheduleByName(ctx context.Context, name string) (*Schedule, error)
	UpdateSchedule(ctx context.Context, schedule *Schedule) error
	DeleteSchedule(ctx context.Context, id string) error
	ListSchedules(ctx context.Context, enabled *bool, limit, offset int) ([]*Schedule, int, error)
	UpdateScheduleNextRun(ctx context.Context, id string, nextRunAt time.Time) error
	UpdateScheduleLastRun(ctx context.Context, id string, lastRunAt time.Time) error
	GetSchedulesDueForRun(ctx context.Context, limit int) ([]*Schedule, error)

	// Job run operations
	CreateJobRun(ctx context.Context, run *JobRun) error
	GetJobRun(ctx context.Context, id string) (*JobRun, error)
	UpdateJobRun(ctx context.Context, run *JobRun) error
	ListJobRuns(ctx context.Context, scheduleID *string, status *string, limit, offset int) ([]*JobRun, int, error)
	ClaimJobRun(ctx context.Context, id, workerID string) (*JobRun, error)
	UpdateJobRunStatus(ctx context.Context, id, status string) error
	UpdateJobRunProgress(ctx context.Context, id string, assetsCreated, assetsUpdated, assetsDeleted, lineageCreated, documentationAdded int) error
	SetJobRunPluginRunID(ctx context.Context, jobRunID, pluginRunID string) error
	CompleteJobRun(ctx context.Context, id string, status string, errorMessage *string, assetsCreated, assetsUpdated, assetsDeleted, lineageCreated, documentationAdded int) error
	ReleaseExpiredClaims(ctx context.Context, expiry time.Duration) (int, error)
	CancelJobRun(ctx context.Context, id string) error
}

ScheduleRepository defines the interface for schedule data access

func NewSchedulePostgresRepository added in v0.5.0

func NewSchedulePostgresRepository(db *pgxpool.Pool) ScheduleRepository

type ScheduleService added in v0.5.0

type ScheduleService struct {
	// contains filtered or unexported fields
}

func NewScheduleService added in v0.5.0

func NewScheduleService(repo ScheduleRepository) *ScheduleService

func (*ScheduleService) CalculateNextRun added in v0.5.0

func (s *ScheduleService) CalculateNextRun(cronExpression string, fromTime time.Time) (time.Time, error)

CalculateNextRun calculates the next run time for a schedule

func (*ScheduleService) CancelJobRun added in v0.5.0

func (s *ScheduleService) CancelJobRun(ctx context.Context, id string) error

func (*ScheduleService) ClaimJobRun added in v0.5.0

func (s *ScheduleService) ClaimJobRun(ctx context.Context, id, workerID string) (*JobRun, error)

func (*ScheduleService) CompleteJobRun added in v0.5.0

func (s *ScheduleService) CompleteJobRun(ctx context.Context, id string, success bool, errorMessage *string, assetsCreated, assetsUpdated, assetsDeleted, lineageCreated, documentationAdded int) error

func (*ScheduleService) CreateJobRun added in v0.5.0

func (s *ScheduleService) CreateJobRun(ctx context.Context, scheduleID *string, triggeredBy string) (*JobRun, error)

func (*ScheduleService) CreateSchedule added in v0.5.0

func (s *ScheduleService) CreateSchedule(ctx context.Context, name, pluginID string, config map[string]interface{}, cronExpression string, enabled bool, createdBy *string) (*Schedule, error)

func (*ScheduleService) DeleteSchedule added in v0.5.0

func (s *ScheduleService) DeleteSchedule(ctx context.Context, id string) error

func (*ScheduleService) GetJobRun added in v0.5.0

func (s *ScheduleService) GetJobRun(ctx context.Context, id string) (*JobRun, error)

func (*ScheduleService) GetJobRunPluginRunID added in v0.5.0

func (s *ScheduleService) GetJobRunPluginRunID(ctx context.Context, jobRunID string) (*string, error)

GetJobRunPluginRunID gets the plugin run ID for a job run

func (*ScheduleService) GetSchedule added in v0.5.0

func (s *ScheduleService) GetSchedule(ctx context.Context, id string) (*Schedule, error)

func (*ScheduleService) GetScheduleByName added in v0.5.0

func (s *ScheduleService) GetScheduleByName(ctx context.Context, name string) (*Schedule, error)

func (*ScheduleService) GetSchedulesDueForRun added in v0.5.0

func (s *ScheduleService) GetSchedulesDueForRun(ctx context.Context, limit int) ([]*Schedule, error)

func (*ScheduleService) ListJobRuns added in v0.5.0

func (s *ScheduleService) ListJobRuns(ctx context.Context, scheduleID *string, status *string, limit, offset int) ([]*JobRun, int, error)

func (*ScheduleService) ListSchedules added in v0.5.0

func (s *ScheduleService) ListSchedules(ctx context.Context, enabled *bool, limit, offset int) ([]*Schedule, int, error)

func (*ScheduleService) ReleaseExpiredClaims added in v0.5.0

func (s *ScheduleService) ReleaseExpiredClaims(ctx context.Context, expiry time.Duration) (int, error)

func (*ScheduleService) SetBroadcaster added in v0.5.0

func (s *ScheduleService) SetBroadcaster(broadcaster EventBroadcaster)

SetBroadcaster sets the event broadcaster for this service

func (*ScheduleService) SetJobRunPluginRunID added in v0.5.0

func (s *ScheduleService) SetJobRunPluginRunID(ctx context.Context, jobRunID, pluginRunID string) error

SetJobRunPluginRunID sets the plugin run ID for a job run

func (*ScheduleService) StartJobRun added in v0.5.0

func (s *ScheduleService) StartJobRun(ctx context.Context, id string) error

func (*ScheduleService) UpdateJobRunProgress added in v0.5.0

func (s *ScheduleService) UpdateJobRunProgress(ctx context.Context, id string, assetsCreated, assetsUpdated, assetsDeleted, lineageCreated, documentationAdded int) error

func (*ScheduleService) UpdateSchedule added in v0.5.0

func (s *ScheduleService) UpdateSchedule(ctx context.Context, id string, name, pluginID string, config map[string]interface{}, cronExpression string, enabled bool) (*Schedule, error)

func (*ScheduleService) UpdateScheduleLastRun added in v0.5.0

func (s *ScheduleService) UpdateScheduleLastRun(ctx context.Context, id string, lastRunAt time.Time) error

UpdateScheduleLastRun updates the last_run_at timestamp for a schedule

func (*ScheduleService) UpdateScheduleNextRun added in v0.5.0

func (s *ScheduleService) UpdateScheduleNextRun(ctx context.Context, id string, nextRunAt time.Time) error

UpdateScheduleNextRun updates the next_run_at timestamp for a schedule

type Scheduler added in v0.5.0

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler added in v0.5.0

func NewScheduler(service *ScheduleService, runsService Service, encryptor *crypto.Encryptor, registry *plugin.Registry, config *SchedulerConfig) *Scheduler

func (*Scheduler) Start added in v0.5.0

func (s *Scheduler) Start(ctx context.Context) error

func (*Scheduler) Stop added in v0.5.0

func (s *Scheduler) Stop()

type SchedulerConfig added in v0.5.0

type SchedulerConfig struct {
	MaxWorkers        int
	SchedulerInterval time.Duration
	LeaseExpiry       time.Duration
	ClaimExpiry       time.Duration
	DB                *pgxpool.Pool
}

type Service

type Service interface {
	StartRun(ctx context.Context, pipelineName, sourceName, createdBy string, config plugin.RawPluginConfig) (*plugin.Run, error)
	CompleteRun(ctx context.Context, runID string, status plugin.RunStatus, summary *plugin.RunSummary, errorMessage string) error
	ProcessAssets(ctx context.Context, runID string, assets []CreateAssetInput, pipelineName, sourceName string) (*ProcessAssetsResponse, error)
	ProcessEntities(ctx context.Context, runID string, assets []CreateAssetInput, lineage []LineageInput, docs []DocumentationInput, stats []StatisticInput, pipelineName, sourceName string) (*ProcessAssetsResponse, error)
	ProcessRunHistory(ctx context.Context, runHistory []RunHistoryInput) (int, error)
	AddCheckpoint(ctx context.Context, runID, entityType, entityMRN, operation string, sourceFields []string) error
	GetLastRunCheckpoints(ctx context.Context, pipelineName, sourceName string) (map[string]*plugin.RunCheckpoint, error)
	GetStaleEntities(ctx context.Context, lastCheckpoints map[string]*plugin.RunCheckpoint, currentEntityMRNs []string) []string
	DestroyPipeline(ctx context.Context, pipelineName string) (*DestroyRunResponse, error)
	CleanupStaleRuns(ctx context.Context, timeout time.Duration) (int, error)
	ListRuns(ctx context.Context, pipelineName string, limit, offset int) ([]*plugin.Run, int, error)
	ListRunsWithFilters(ctx context.Context, pipelines, statuses []string, limit, offset int) ([]*plugin.Run, int, []string, error)
	GetRun(ctx context.Context, id string) (*plugin.Run, error)
	ListRunEntities(ctx context.Context, runID, entityType, status string, limit, offset int) ([]*RunEntity, int, error)
	SetCompletionObserver(observer RunCompletionObserver)
}

func NewService

func NewService(repo Repository, assetService asset.Service, lineageService lineage.Service, metricsRecorder metrics.Recorder) Service

type StatisticInput added in v0.3.1

type StatisticInput struct {
	AssetMRN   string  `json:"asset_mrn"`
	MetricName string  `json:"metric_name"`
	Value      float64 `json:"value"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL