Documentation
¶
Index ¶
- Constants
- type Dataset
- type Job
- type LineageChangeObserver
- type LineageEdge
- type LineageNode
- type LineageResponse
- type Logger
- type MetricsClient
- type PostgresRepository
- func (r *PostgresRepository) CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string, lineageType string) (string, error)
- func (r *PostgresRepository) DeleteDirectLineage(ctx context.Context, edgeID string) error
- func (r *PostgresRepository) EdgeExists(ctx context.Context, source, target string) (bool, error)
- func (r *PostgresRepository) GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
- func (r *PostgresRepository) GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
- func (r *PostgresRepository) GetImmediateNeighbors(ctx context.Context, assetMRN string, direction string) ([]string, error)
- func (r *PostgresRepository) StoreRunHistory(ctx context.Context, entry *RunHistoryEntry) error
- type Repository
- type Run
- type RunEvent
- type RunHistoryEntry
- type Service
- type ServiceOption
Constants ¶
View Source
const ( EventTypeStart = "START" EventTypeRunning = "RUNNING" EventTypeComplete = "COMPLETE" EventTypeFail = "FAIL" EventTypeAbort = "ABORT" EventTypeOther = "OTHER" )
View Source
const ( AssetTypeQuery = "Query" AssetTypeCommand = "Command" AssetTypeDAG = "Dag" AssetTypeTask = "Task" AssetTypeJob = "Job" AssetTypeModel = "Model" AssetTypeTable = "Table" AssetTypeTopic = "Topic" AssetTypeFile = "File" AssetTypeBucket = "Bucket" AssetTypeDataset = "Dataset" AssetTypeProject = "Project" )
View Source
const ( ProviderDBT = "DBT" ProviderAirflow = "Airflow" ProviderSpark = "Spark" ProviderBigQuery = "BigQuery" ProviderPostgreSQL = "PostgreSQL" ProviderMySQL = "MySQL" ProviderSQLServer = "SQLServer" ProviderKafka = "Kafka" ProviderS3 = "S3" ProviderGCS = "GCS" ProviderAzure = "Azure" ProviderOpenLineage = "OpenLineage" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type LineageChangeObserver ¶ added in v0.6.0
type LineageChangeObserver interface {
OnEdgeCreated(ctx context.Context, sourceMRN, targetMRN, edgeType string)
OnEdgeDeleted(ctx context.Context, sourceMRN, targetMRN string)
}
LineageChangeObserver is notified when lineage edges are created or deleted. Observers must be registered via SetLineageChangeObserver before any lineage mutations occur (i.e., during server initialization, before Start is called).
type LineageEdge ¶
type LineageNode ¶
type LineageResponse ¶
type LineageResponse struct {
Nodes []LineageNode `json:"nodes"`
Edges []LineageEdge `json:"edges"`
}
type MetricsClient ¶
type PostgresRepository ¶
type PostgresRepository struct {
// contains filtered or unexported fields
}
func (*PostgresRepository) CreateDirectLineage ¶
func (*PostgresRepository) DeleteDirectLineage ¶
func (r *PostgresRepository) DeleteDirectLineage(ctx context.Context, edgeID string) error
func (*PostgresRepository) EdgeExists ¶
func (*PostgresRepository) GetAssetLineage ¶
func (r *PostgresRepository) GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
func (*PostgresRepository) GetDirectLineage ¶
func (r *PostgresRepository) GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
func (*PostgresRepository) GetImmediateNeighbors ¶ added in v0.6.0
func (*PostgresRepository) StoreRunHistory ¶ added in v0.3.0
func (r *PostgresRepository) StoreRunHistory(ctx context.Context, entry *RunHistoryEntry) error
type Repository ¶
type Repository interface {
GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string, lineageType string) (string, error)
EdgeExists(ctx context.Context, source, target string) (bool, error)
DeleteDirectLineage(ctx context.Context, edgeID string) error
GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
GetImmediateNeighbors(ctx context.Context, assetMRN string, direction string) ([]string, error)
StoreRunHistory(ctx context.Context, entry *RunHistoryEntry) error
}
func NewPostgresRepository ¶
func NewPostgresRepository(db *pgxpool.Pool) Repository
type RunEvent ¶
type RunEvent struct {
EventType string `json:"eventType"`
EventTime time.Time `json:"eventTime"`
Run Run `json:"run"`
Job Job `json:"job"`
Inputs []Dataset `json:"inputs,omitempty"`
Outputs []Dataset `json:"outputs,omitempty"`
Producer string `json:"producer,omitempty"`
SchemaURL string `json:"schemaURL,omitempty"`
}
type RunHistoryEntry ¶ added in v0.3.0
type RunHistoryEntry struct {
ID string `json:"id"`
AssetID string `json:"asset_id"`
RunID string `json:"run_id"`
JobNamespace string `json:"job_namespace"`
JobName string `json:"job_name"`
EventType string `json:"event_type"`
EventTime time.Time `json:"event_time"`
Producer string `json:"producer,omitempty"`
RunFacets map[string]interface{} `json:"run_facets,omitempty"`
JobFacets map[string]interface{} `json:"job_facets,omitempty"`
Inputs []Dataset `json:"inputs,omitempty"`
Outputs []Dataset `json:"outputs,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
type Service ¶
type Service interface {
GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string, lineageType string) (string, error)
EdgeExists(ctx context.Context, source, target string) (bool, error)
DeleteDirectLineage(ctx context.Context, edgeID string) error
GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
GetImmediateNeighbors(ctx context.Context, assetMRN string, direction string) ([]string, error)
SetLineageChangeObserver(observer LineageChangeObserver)
ProcessOpenLineageEvent(ctx context.Context, event *RunEvent, createdBy string) error
StoreRunHistory(ctx context.Context, entry *RunHistoryEntry) error
}
func NewService ¶
func NewService(repo Repository, assetSvc asset.Service, opts ...ServiceOption) Service
type ServiceOption ¶
type ServiceOption func(*service)
func WithMetrics ¶
func WithMetrics(metrics MetricsClient) ServiceOption
Click to show internal directories.
Click to hide internal directories.