Documentation
¶
Index ¶
- Constants
- Variables
- func DecryptScheduleConfig(schedule *Schedule, encryptor *crypto.Encryptor) error
- func EncryptScheduleConfig(schedule *Schedule, encryptor *crypto.Encryptor) error
- func GetEncryptor(cfg *config.Config) (*crypto.Encryptor, error)
- func ValidJobStatus(status string) bool
- type AssetResult
- type CreateAssetInput
- type DestroyRunResponse
- type DocumentationInput
- type DocumentationResult
- type EventBroadcaster
- type JobRun
- type LineageInput
- type LineageResult
- type NoopBroadcaster
- func (n *NoopBroadcaster) BroadcastJobRunCancelled(run *JobRun)
- func (n *NoopBroadcaster) BroadcastJobRunClaimed(run *JobRun)
- func (n *NoopBroadcaster) BroadcastJobRunCompleted(run *JobRun)
- func (n *NoopBroadcaster) BroadcastJobRunCreated(run *JobRun)
- func (n *NoopBroadcaster) BroadcastJobRunProgress(run *JobRun)
- func (n *NoopBroadcaster) BroadcastJobRunStarted(run *JobRun)
- type PostgresRepository
- func (r *PostgresRepository) AddCheckpoint(ctx context.Context, runDBID string, checkpoint *plugin.RunCheckpoint) error
- func (r *PostgresRepository) AddRunEntity(ctx context.Context, runDBID string, entity *RunEntity) error
- func (r *PostgresRepository) CleanupStaleRuns(ctx context.Context, timeout time.Duration) (int, error)
- func (r *PostgresRepository) Create(ctx context.Context, run *plugin.Run) error
- func (r *PostgresRepository) DeleteCheckpoints(ctx context.Context, pipelineName, sourceName string) error
- func (r *PostgresRepository) Get(ctx context.Context, id string) (*plugin.Run, error)
- func (r *PostgresRepository) GetByRunID(ctx context.Context, runID string) (*plugin.Run, error)
- func (r *PostgresRepository) GetLastRunCheckpoints(ctx context.Context, pipelineName, sourceName string) (map[string]*plugin.RunCheckpoint, error)
- func (r *PostgresRepository) GetPipelines(ctx context.Context) ([]string, error)
- func (r *PostgresRepository) List(ctx context.Context, pipelineName string, limit, offset int) ([]*plugin.Run, int, error)
- func (r *PostgresRepository) ListRunEntities(ctx context.Context, runDBID, entityType, status string, limit, offset int) ([]*RunEntity, int, error)
- func (r *PostgresRepository) ListWithFilters(ctx context.Context, pipelines, statuses []string, limit, offset int) ([]*plugin.Run, int, []string, error)
- func (r *PostgresRepository) Update(ctx context.Context, run *plugin.Run) error
- type ProcessAssetsResponse
- type Repository
- type RunCompletionObserver
- type RunEntity
- type RunHistoryInput
- type Schedule
- type SchedulePostgresRepository
- func (r *SchedulePostgresRepository) CancelJobRun(ctx context.Context, id string) error
- func (r *SchedulePostgresRepository) ClaimJobRun(ctx context.Context, id, workerID string) (*JobRun, error)
- func (r *SchedulePostgresRepository) CompleteJobRun(ctx context.Context, id string, status string, errorMessage *string, ...) error
- func (r *SchedulePostgresRepository) CreateJobRun(ctx context.Context, run *JobRun) error
- func (r *SchedulePostgresRepository) CreateSchedule(ctx context.Context, schedule *Schedule) error
- func (r *SchedulePostgresRepository) DeleteSchedule(ctx context.Context, id string) error
- func (r *SchedulePostgresRepository) GetJobRun(ctx context.Context, id string) (*JobRun, error)
- func (r *SchedulePostgresRepository) GetSchedule(ctx context.Context, id string) (*Schedule, error)
- func (r *SchedulePostgresRepository) GetScheduleByName(ctx context.Context, name string) (*Schedule, error)
- func (r *SchedulePostgresRepository) GetSchedulesDueForRun(ctx context.Context, limit int) ([]*Schedule, error)
- func (r *SchedulePostgresRepository) ListJobRuns(ctx context.Context, scheduleID *string, status *string, limit, offset int) ([]*JobRun, int, error)
- func (r *SchedulePostgresRepository) ListSchedules(ctx context.Context, enabled *bool, limit, offset int) ([]*Schedule, int, error)
- func (r *SchedulePostgresRepository) ReleaseExpiredClaims(ctx context.Context, expiry time.Duration) (int, error)
- func (r *SchedulePostgresRepository) SetJobRunPluginRunID(ctx context.Context, jobRunID, pluginRunID string) error
- func (r *SchedulePostgresRepository) UpdateJobRun(ctx context.Context, run *JobRun) error
- func (r *SchedulePostgresRepository) UpdateJobRunProgress(ctx context.Context, id string, ...) error
- func (r *SchedulePostgresRepository) UpdateJobRunStatus(ctx context.Context, id, status string) error
- func (r *SchedulePostgresRepository) UpdateSchedule(ctx context.Context, schedule *Schedule) error
- func (r *SchedulePostgresRepository) UpdateScheduleLastRun(ctx context.Context, id string, lastRunAt time.Time) error
- func (r *SchedulePostgresRepository) UpdateScheduleNextRun(ctx context.Context, id string, nextRunAt time.Time) error
- type ScheduleRepository
- type ScheduleService
- func (s *ScheduleService) CalculateNextRun(cronExpression string, fromTime time.Time) (time.Time, error)
- func (s *ScheduleService) CancelJobRun(ctx context.Context, id string) error
- func (s *ScheduleService) ClaimJobRun(ctx context.Context, id, workerID string) (*JobRun, error)
- func (s *ScheduleService) CompleteJobRun(ctx context.Context, id string, success bool, errorMessage *string, ...) error
- func (s *ScheduleService) CreateJobRun(ctx context.Context, scheduleID *string, triggeredBy string) (*JobRun, error)
- func (s *ScheduleService) CreateSchedule(ctx context.Context, name, pluginID string, config map[string]interface{}, ...) (*Schedule, error)
- func (s *ScheduleService) DeleteSchedule(ctx context.Context, id string) error
- func (s *ScheduleService) GetJobRun(ctx context.Context, id string) (*JobRun, error)
- func (s *ScheduleService) GetJobRunPluginRunID(ctx context.Context, jobRunID string) (*string, error)
- func (s *ScheduleService) GetSchedule(ctx context.Context, id string) (*Schedule, error)
- func (s *ScheduleService) GetScheduleByName(ctx context.Context, name string) (*Schedule, error)
- func (s *ScheduleService) GetSchedulesDueForRun(ctx context.Context, limit int) ([]*Schedule, error)
- func (s *ScheduleService) ListJobRuns(ctx context.Context, scheduleID *string, status *string, limit, offset int) ([]*JobRun, int, error)
- func (s *ScheduleService) ListSchedules(ctx context.Context, enabled *bool, limit, offset int) ([]*Schedule, int, error)
- func (s *ScheduleService) ReleaseExpiredClaims(ctx context.Context, expiry time.Duration) (int, error)
- func (s *ScheduleService) SetBroadcaster(broadcaster EventBroadcaster)
- func (s *ScheduleService) SetJobRunPluginRunID(ctx context.Context, jobRunID, pluginRunID string) error
- func (s *ScheduleService) StartJobRun(ctx context.Context, id string) error
- func (s *ScheduleService) UpdateJobRunProgress(ctx context.Context, id string, ...) error
- func (s *ScheduleService) UpdateSchedule(ctx context.Context, id string, name, pluginID string, ...) (*Schedule, error)
- func (s *ScheduleService) UpdateScheduleLastRun(ctx context.Context, id string, lastRunAt time.Time) error
- func (s *ScheduleService) UpdateScheduleNextRun(ctx context.Context, id string, nextRunAt time.Time) error
- type Scheduler
- type SchedulerConfig
- type Service
- type StatisticInput
Constants ¶
const ( JobStatusPending = "pending" JobStatusClaimed = "claimed" JobStatusRunning = "running" JobStatusSucceeded = "succeeded" JobStatusFailed = "failed" JobStatusCancelled = "cancelled" )
Job run status constants
const ( DefaultSchedulerInterval = 1 * time.Minute DefaultMaxWorkers = 10 DefaultLeaseExpiry = 5 * time.Minute DefaultClaimExpiry = 30 * time.Second )
const ( StatusCreated = "created" StatusUpdated = "updated" StatusUnchanged = "unchanged" StatusDeleted = "deleted" StatusFailed = "failed" )
Variables ¶
var ( ErrScheduleNotFound = errors.New("schedule not found") ErrScheduleNameExists = errors.New("schedule name already exists") ErrJobRunNotFound = errors.New("job run not found") ErrJobRunNotClaimable = errors.New("job run not claimable") ErrInvalidJobStatus = errors.New("invalid job status") ErrInvalidCronExpression = errors.New("invalid cron expression") )
var ( ErrRunNotFound = errors.New("run not found") ErrInvalidInput = errors.New("invalid input") ErrInvalidStatus = errors.New("invalid status transition") )
var ( ErrNotFound = errors.New("run not found") ErrConflict = errors.New("run already exists") )
var (
ErrEncryptionNotConfigured = errors.New("encryption not configured: MARMOT_SERVER_ENCRYPTION_KEY must be set")
)
Functions ¶
func DecryptScheduleConfig ¶ added in v0.5.0
DecryptScheduleConfig decrypts sensitive fields in a schedule's config
func EncryptScheduleConfig ¶ added in v0.5.0
EncryptScheduleConfig encrypts sensitive fields in a schedule's config
func GetEncryptor ¶ added in v0.5.0
GetEncryptor creates an encryptor from the config
func ValidJobStatus ¶ added in v0.5.0
ValidJobStatus checks if a job status is valid
Types ¶
type AssetResult ¶
type CreateAssetInput ¶
type CreateAssetInput struct {
Name string `json:"name"`
MRN *string `json:"mrn,omitempty"`
Type string `json:"type"`
Providers []string `json:"providers"`
Description *string `json:"description"`
Metadata map[string]interface{} `json:"metadata"`
Schema map[string]interface{} `json:"schema"`
Tags []string `json:"tags"`
Sources []string `json:"sources"`
ExternalLinks []map[string]string `json:"external_links"`
Query *string `json:"query,omitempty"`
QueryLanguage *string `json:"query_language,omitempty"`
}
type DestroyRunResponse ¶
type DocumentationInput ¶
type DocumentationResult ¶
type EventBroadcaster ¶ added in v0.5.0
type EventBroadcaster interface {
BroadcastJobRunCreated(run *JobRun)
BroadcastJobRunClaimed(run *JobRun)
BroadcastJobRunStarted(run *JobRun)
BroadcastJobRunProgress(run *JobRun)
BroadcastJobRunCompleted(run *JobRun)
BroadcastJobRunCancelled(run *JobRun)
}
EventBroadcaster defines the interface for broadcasting job run events
type JobRun ¶ added in v0.5.0
type JobRun struct {
ID string `json:"id"`
ScheduleID *string `json:"schedule_id,omitempty"`
PluginRunID *string `json:"plugin_run_id,omitempty"`
PipelineName string `json:"pipeline_name"`
SourceName string `json:"source_name"`
RunID string `json:"run_id"`
Status string `json:"status"`
ClaimedBy *string `json:"claimed_by,omitempty"`
ClaimedAt *time.Time `json:"claimed_at,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
Log *string `json:"log,omitempty"`
ErrorMessage *string `json:"error_message,omitempty"`
AssetsCreated int `json:"assets_created"`
AssetsUpdated int `json:"assets_updated"`
AssetsDeleted int `json:"assets_deleted"`
LineageCreated int `json:"lineage_created"`
DocumentationAdded int `json:"documentation_added"`
Config map[string]interface{} `json:"config,omitempty"`
CreatedBy string `json:"created_by"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type LineageInput ¶
type LineageResult ¶
type NoopBroadcaster ¶ added in v0.5.0
type NoopBroadcaster struct{}
NoopBroadcaster is a broadcaster that does nothing (for when websockets are disabled)
func (*NoopBroadcaster) BroadcastJobRunCancelled ¶ added in v0.5.0
func (n *NoopBroadcaster) BroadcastJobRunCancelled(run *JobRun)
func (*NoopBroadcaster) BroadcastJobRunClaimed ¶ added in v0.5.0
func (n *NoopBroadcaster) BroadcastJobRunClaimed(run *JobRun)
func (*NoopBroadcaster) BroadcastJobRunCompleted ¶ added in v0.5.0
func (n *NoopBroadcaster) BroadcastJobRunCompleted(run *JobRun)
func (*NoopBroadcaster) BroadcastJobRunCreated ¶ added in v0.5.0
func (n *NoopBroadcaster) BroadcastJobRunCreated(run *JobRun)
func (*NoopBroadcaster) BroadcastJobRunProgress ¶ added in v0.5.0
func (n *NoopBroadcaster) BroadcastJobRunProgress(run *JobRun)
func (*NoopBroadcaster) BroadcastJobRunStarted ¶ added in v0.5.0
func (n *NoopBroadcaster) BroadcastJobRunStarted(run *JobRun)
type PostgresRepository ¶
type PostgresRepository struct {
// contains filtered or unexported fields
}
func NewPostgresRepository ¶
func NewPostgresRepository(db *pgxpool.Pool) *PostgresRepository
func (*PostgresRepository) AddCheckpoint ¶
func (r *PostgresRepository) AddCheckpoint(ctx context.Context, runDBID string, checkpoint *plugin.RunCheckpoint) error
func (*PostgresRepository) AddRunEntity ¶
func (*PostgresRepository) CleanupStaleRuns ¶
func (*PostgresRepository) DeleteCheckpoints ¶
func (r *PostgresRepository) DeleteCheckpoints(ctx context.Context, pipelineName, sourceName string) error
func (*PostgresRepository) GetByRunID ¶
func (*PostgresRepository) GetLastRunCheckpoints ¶
func (r *PostgresRepository) GetLastRunCheckpoints(ctx context.Context, pipelineName, sourceName string) (map[string]*plugin.RunCheckpoint, error)
func (*PostgresRepository) GetPipelines ¶
func (r *PostgresRepository) GetPipelines(ctx context.Context) ([]string, error)
func (*PostgresRepository) ListRunEntities ¶
func (*PostgresRepository) ListWithFilters ¶
type ProcessAssetsResponse ¶
type ProcessAssetsResponse struct {
Assets []AssetResult `json:"assets"`
Lineage []LineageResult `json:"lineage"`
Documentation []DocumentationResult `json:"documentation"`
StaleEntitiesRemoved []string `json:"stale_entities_removed,omitempty"`
}
type Repository ¶
type Repository interface {
Create(ctx context.Context, run *plugin.Run) error
Get(ctx context.Context, id string) (*plugin.Run, error)
GetByRunID(ctx context.Context, runID string) (*plugin.Run, error)
Update(ctx context.Context, run *plugin.Run) error
List(ctx context.Context, pipelineName string, limit, offset int) ([]*plugin.Run, int, error)
ListWithFilters(ctx context.Context, pipelines, statuses []string, limit, offset int) ([]*plugin.Run, int, []string, error)
AddCheckpoint(ctx context.Context, runDBID string, checkpoint *plugin.RunCheckpoint) error
DeleteCheckpoints(ctx context.Context, pipelineName, sourceName string) error
GetLastRunCheckpoints(ctx context.Context, pipelineName, sourceName string) (map[string]*plugin.RunCheckpoint, error)
CleanupStaleRuns(ctx context.Context, timeout time.Duration) (int, error)
AddRunEntity(ctx context.Context, runDBID string, entity *RunEntity) error
ListRunEntities(ctx context.Context, runDBID, entityType, status string, limit, offset int) ([]*RunEntity, int, error)
}
type RunCompletionObserver ¶ added in v0.6.0
RunCompletionObserver is notified when runs complete.
type RunEntity ¶
type RunEntity struct {
ID string `json:"id"`
RunID string `json:"run_id"`
EntityType string `json:"entity_type"`
EntityMRN string `json:"entity_mrn"`
EntityName string `json:"entity_name,omitempty"`
Status string `json:"status"`
ErrorMessage string `json:"error_message,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
type RunHistoryInput ¶ added in v0.5.0
type RunHistoryInput struct {
AssetMRN string `json:"asset_mrn"`
RunID string `json:"run_id"`
JobNamespace string `json:"job_namespace"`
JobName string `json:"job_name"`
EventType string `json:"event_type"`
EventTime time.Time `json:"event_time"`
RunFacets map[string]interface{} `json:"run_facets,omitempty"`
JobFacets map[string]interface{} `json:"job_facets,omitempty"`
}
type Schedule ¶ added in v0.5.0
type Schedule struct {
ID string `json:"id"`
Name string `json:"name"`
PluginID string `json:"plugin_id"`
Config map[string]interface{} `json:"config"`
CronExpression string `json:"cron_expression"`
Enabled bool `json:"enabled"`
LastRunAt *time.Time `json:"last_run_at,omitempty"`
LastRunStatus *string `json:"last_run_status,omitempty"`
NextRunAt *time.Time `json:"next_run_at,omitempty"`
CreatedBy *string `json:"created_by,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type SchedulePostgresRepository ¶ added in v0.5.0
type SchedulePostgresRepository struct {
// contains filtered or unexported fields
}
func (*SchedulePostgresRepository) CancelJobRun ¶ added in v0.5.0
func (r *SchedulePostgresRepository) CancelJobRun(ctx context.Context, id string) error
func (*SchedulePostgresRepository) ClaimJobRun ¶ added in v0.5.0
func (*SchedulePostgresRepository) CompleteJobRun ¶ added in v0.5.0
func (*SchedulePostgresRepository) CreateJobRun ¶ added in v0.5.0
func (r *SchedulePostgresRepository) CreateJobRun(ctx context.Context, run *JobRun) error
func (*SchedulePostgresRepository) CreateSchedule ¶ added in v0.5.0
func (r *SchedulePostgresRepository) CreateSchedule(ctx context.Context, schedule *Schedule) error
func (*SchedulePostgresRepository) DeleteSchedule ¶ added in v0.5.0
func (r *SchedulePostgresRepository) DeleteSchedule(ctx context.Context, id string) error
func (*SchedulePostgresRepository) GetSchedule ¶ added in v0.5.0
func (*SchedulePostgresRepository) GetScheduleByName ¶ added in v0.5.0
func (*SchedulePostgresRepository) GetSchedulesDueForRun ¶ added in v0.5.0
func (*SchedulePostgresRepository) ListJobRuns ¶ added in v0.5.0
func (*SchedulePostgresRepository) ListSchedules ¶ added in v0.5.0
func (*SchedulePostgresRepository) ReleaseExpiredClaims ¶ added in v0.5.0
func (*SchedulePostgresRepository) SetJobRunPluginRunID ¶ added in v0.5.0
func (r *SchedulePostgresRepository) SetJobRunPluginRunID(ctx context.Context, jobRunID, pluginRunID string) error
func (*SchedulePostgresRepository) UpdateJobRun ¶ added in v0.5.0
func (r *SchedulePostgresRepository) UpdateJobRun(ctx context.Context, run *JobRun) error
func (*SchedulePostgresRepository) UpdateJobRunProgress ¶ added in v0.5.0
func (*SchedulePostgresRepository) UpdateJobRunStatus ¶ added in v0.5.0
func (r *SchedulePostgresRepository) UpdateJobRunStatus(ctx context.Context, id, status string) error
func (*SchedulePostgresRepository) UpdateSchedule ¶ added in v0.5.0
func (r *SchedulePostgresRepository) UpdateSchedule(ctx context.Context, schedule *Schedule) error
func (*SchedulePostgresRepository) UpdateScheduleLastRun ¶ added in v0.5.0
func (*SchedulePostgresRepository) UpdateScheduleNextRun ¶ added in v0.5.0
type ScheduleRepository ¶ added in v0.5.0
type ScheduleRepository interface {
// Schedule operations
CreateSchedule(ctx context.Context, schedule *Schedule) error
GetSchedule(ctx context.Context, id string) (*Schedule, error)
GetScheduleByName(ctx context.Context, name string) (*Schedule, error)
UpdateSchedule(ctx context.Context, schedule *Schedule) error
DeleteSchedule(ctx context.Context, id string) error
ListSchedules(ctx context.Context, enabled *bool, limit, offset int) ([]*Schedule, int, error)
UpdateScheduleNextRun(ctx context.Context, id string, nextRunAt time.Time) error
UpdateScheduleLastRun(ctx context.Context, id string, lastRunAt time.Time) error
GetSchedulesDueForRun(ctx context.Context, limit int) ([]*Schedule, error)
// Job run operations
CreateJobRun(ctx context.Context, run *JobRun) error
GetJobRun(ctx context.Context, id string) (*JobRun, error)
UpdateJobRun(ctx context.Context, run *JobRun) error
ListJobRuns(ctx context.Context, scheduleID *string, status *string, limit, offset int) ([]*JobRun, int, error)
ClaimJobRun(ctx context.Context, id, workerID string) (*JobRun, error)
UpdateJobRunStatus(ctx context.Context, id, status string) error
UpdateJobRunProgress(ctx context.Context, id string, assetsCreated, assetsUpdated, assetsDeleted, lineageCreated, documentationAdded int) error
SetJobRunPluginRunID(ctx context.Context, jobRunID, pluginRunID string) error
CompleteJobRun(ctx context.Context, id string, status string, errorMessage *string, assetsCreated, assetsUpdated, assetsDeleted, lineageCreated, documentationAdded int) error
ReleaseExpiredClaims(ctx context.Context, expiry time.Duration) (int, error)
CancelJobRun(ctx context.Context, id string) error
}
ScheduleRepository defines the interface for schedule data access
func NewSchedulePostgresRepository ¶ added in v0.5.0
func NewSchedulePostgresRepository(db *pgxpool.Pool) ScheduleRepository
type ScheduleService ¶ added in v0.5.0
type ScheduleService struct {
// contains filtered or unexported fields
}
func NewScheduleService ¶ added in v0.5.0
func NewScheduleService(repo ScheduleRepository) *ScheduleService
func (*ScheduleService) CalculateNextRun ¶ added in v0.5.0
func (s *ScheduleService) CalculateNextRun(cronExpression string, fromTime time.Time) (time.Time, error)
CalculateNextRun calculates the next run time for a schedule
func (*ScheduleService) CancelJobRun ¶ added in v0.5.0
func (s *ScheduleService) CancelJobRun(ctx context.Context, id string) error
func (*ScheduleService) ClaimJobRun ¶ added in v0.5.0
func (*ScheduleService) CompleteJobRun ¶ added in v0.5.0
func (*ScheduleService) CreateJobRun ¶ added in v0.5.0
func (*ScheduleService) CreateSchedule ¶ added in v0.5.0
func (*ScheduleService) DeleteSchedule ¶ added in v0.5.0
func (s *ScheduleService) DeleteSchedule(ctx context.Context, id string) error
func (*ScheduleService) GetJobRunPluginRunID ¶ added in v0.5.0
func (s *ScheduleService) GetJobRunPluginRunID(ctx context.Context, jobRunID string) (*string, error)
GetJobRunPluginRunID gets the plugin run ID for a job run
func (*ScheduleService) GetSchedule ¶ added in v0.5.0
func (*ScheduleService) GetScheduleByName ¶ added in v0.5.0
func (*ScheduleService) GetSchedulesDueForRun ¶ added in v0.5.0
func (*ScheduleService) ListJobRuns ¶ added in v0.5.0
func (*ScheduleService) ListSchedules ¶ added in v0.5.0
func (*ScheduleService) ReleaseExpiredClaims ¶ added in v0.5.0
func (*ScheduleService) SetBroadcaster ¶ added in v0.5.0
func (s *ScheduleService) SetBroadcaster(broadcaster EventBroadcaster)
SetBroadcaster sets the event broadcaster for this service
func (*ScheduleService) SetJobRunPluginRunID ¶ added in v0.5.0
func (s *ScheduleService) SetJobRunPluginRunID(ctx context.Context, jobRunID, pluginRunID string) error
SetJobRunPluginRunID sets the plugin run ID for a job run
func (*ScheduleService) StartJobRun ¶ added in v0.5.0
func (s *ScheduleService) StartJobRun(ctx context.Context, id string) error
func (*ScheduleService) UpdateJobRunProgress ¶ added in v0.5.0
func (*ScheduleService) UpdateSchedule ¶ added in v0.5.0
func (*ScheduleService) UpdateScheduleLastRun ¶ added in v0.5.0
func (s *ScheduleService) UpdateScheduleLastRun(ctx context.Context, id string, lastRunAt time.Time) error
UpdateScheduleLastRun updates the last_run_at timestamp for a schedule
func (*ScheduleService) UpdateScheduleNextRun ¶ added in v0.5.0
func (s *ScheduleService) UpdateScheduleNextRun(ctx context.Context, id string, nextRunAt time.Time) error
UpdateScheduleNextRun updates the next_run_at timestamp for a schedule
type Scheduler ¶ added in v0.5.0
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶ added in v0.5.0
func NewScheduler(service *ScheduleService, runsService Service, encryptor *crypto.Encryptor, registry *plugin.Registry, config *SchedulerConfig) *Scheduler
type SchedulerConfig ¶ added in v0.5.0
type Service ¶
type Service interface {
StartRun(ctx context.Context, pipelineName, sourceName, createdBy string, config plugin.RawPluginConfig) (*plugin.Run, error)
CompleteRun(ctx context.Context, runID string, status plugin.RunStatus, summary *plugin.RunSummary, errorMessage string) error
ProcessAssets(ctx context.Context, runID string, assets []CreateAssetInput, pipelineName, sourceName string) (*ProcessAssetsResponse, error)
ProcessEntities(ctx context.Context, runID string, assets []CreateAssetInput, lineage []LineageInput, docs []DocumentationInput, stats []StatisticInput, pipelineName, sourceName string) (*ProcessAssetsResponse, error)
ProcessRunHistory(ctx context.Context, runHistory []RunHistoryInput) (int, error)
AddCheckpoint(ctx context.Context, runID, entityType, entityMRN, operation string, sourceFields []string) error
GetLastRunCheckpoints(ctx context.Context, pipelineName, sourceName string) (map[string]*plugin.RunCheckpoint, error)
GetStaleEntities(ctx context.Context, lastCheckpoints map[string]*plugin.RunCheckpoint, currentEntityMRNs []string) []string
DestroyPipeline(ctx context.Context, pipelineName string) (*DestroyRunResponse, error)
CleanupStaleRuns(ctx context.Context, timeout time.Duration) (int, error)
ListRuns(ctx context.Context, pipelineName string, limit, offset int) ([]*plugin.Run, int, error)
ListRunsWithFilters(ctx context.Context, pipelines, statuses []string, limit, offset int) ([]*plugin.Run, int, []string, error)
GetRun(ctx context.Context, id string) (*plugin.Run, error)
ListRunEntities(ctx context.Context, runID, entityType, status string, limit, offset int) ([]*RunEntity, int, error)
SetCompletionObserver(observer RunCompletionObserver)
}