lineage

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EventTypeStart    = "START"
	EventTypeRunning  = "RUNNING"
	EventTypeComplete = "COMPLETE"
	EventTypeFail     = "FAIL"
	EventTypeAbort    = "ABORT"
	EventTypeOther    = "OTHER"
)
View Source
const (
	AssetTypeQuery   = "Query"
	AssetTypeCommand = "Command"
	AssetTypeDAG     = "Dag"
	AssetTypeTask    = "Task"
	AssetTypeJob     = "Job"
	AssetTypeModel   = "Model"
	AssetTypeTable   = "Table"
	AssetTypeTopic   = "Topic"
	AssetTypeFile    = "File"
	AssetTypeBucket  = "Bucket"
	AssetTypeDataset = "Dataset"
	AssetTypeProject = "Project"
)
View Source
const (
	ProviderDBT         = "DBT"
	ProviderAirflow     = "Airflow"
	ProviderSpark       = "Spark"
	ProviderBigQuery    = "BigQuery"
	ProviderPostgreSQL  = "PostgreSQL"
	ProviderMySQL       = "MySQL"
	ProviderSQLServer   = "SQLServer"
	ProviderKafka       = "Kafka"
	ProviderS3          = "S3"
	ProviderGCS         = "GCS"
	ProviderAzure       = "Azure"
	ProviderOpenLineage = "OpenLineage"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Dataset added in v0.3.0

type Dataset struct {
	Namespace    string                 `json:"namespace"`
	Name         string                 `json:"name"`
	Facets       map[string]interface{} `json:"facets,omitempty"`
	InputFacets  map[string]interface{} `json:"inputFacets,omitempty"`
	OutputFacets map[string]interface{} `json:"outputFacets,omitempty"`
}

type Job

type Job struct {
	Namespace string                 `json:"namespace"`
	Name      string                 `json:"name"`
	Facets    map[string]interface{} `json:"facets,omitempty"`
}

type LineageChangeObserver added in v0.6.0

type LineageChangeObserver interface {
	OnEdgeCreated(ctx context.Context, sourceMRN, targetMRN, edgeType string)
	OnEdgeDeleted(ctx context.Context, sourceMRN, targetMRN string)
}

LineageChangeObserver is notified when lineage edges are created or deleted. Observers must be registered via SetLineageChangeObserver before any lineage mutations occur (i.e., during server initialization, before Start is called).

type LineageEdge

type LineageEdge struct {
	ID     string `json:"id"`
	Source string `json:"source"`
	Target string `json:"target"`
	Type   string `json:"type"`
	JobMRN string `json:"job_mrn,omitempty"`
}

type LineageNode

type LineageNode struct {
	ID    string       `json:"id"`
	Type  string       `json:"type"`
	Asset *asset.Asset `json:"asset"`
	Depth int          `json:"depth"`
}

type LineageResponse

type LineageResponse struct {
	Nodes []LineageNode `json:"nodes"`
	Edges []LineageEdge `json:"edges"`
}

type Logger

type Logger interface {
	Info(msg string, fields ...interface{})
	Error(msg string, err error, fields ...interface{})
}

type MetricsClient

type MetricsClient interface {
	Count(name string, value int64, tags ...string)
	Timing(name string, value time.Duration, tags ...string)
}

type PostgresRepository

type PostgresRepository struct {
	// contains filtered or unexported fields
}

func (*PostgresRepository) CreateDirectLineage

func (r *PostgresRepository) CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string, lineageType string) (string, error)

func (*PostgresRepository) DeleteDirectLineage

func (r *PostgresRepository) DeleteDirectLineage(ctx context.Context, edgeID string) error

func (*PostgresRepository) EdgeExists

func (r *PostgresRepository) EdgeExists(ctx context.Context, source, target string) (bool, error)

func (*PostgresRepository) GetAssetLineage

func (r *PostgresRepository) GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)

func (*PostgresRepository) GetDirectLineage

func (r *PostgresRepository) GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)

func (*PostgresRepository) GetImmediateNeighbors added in v0.6.0

func (r *PostgresRepository) GetImmediateNeighbors(ctx context.Context, assetMRN string, direction string) ([]string, error)

func (*PostgresRepository) StoreRunHistory added in v0.3.0

func (r *PostgresRepository) StoreRunHistory(ctx context.Context, entry *RunHistoryEntry) error

type Repository

type Repository interface {
	GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
	CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string, lineageType string) (string, error)
	EdgeExists(ctx context.Context, source, target string) (bool, error)
	DeleteDirectLineage(ctx context.Context, edgeID string) error
	GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
	GetImmediateNeighbors(ctx context.Context, assetMRN string, direction string) ([]string, error)
	StoreRunHistory(ctx context.Context, entry *RunHistoryEntry) error
}

func NewPostgresRepository

func NewPostgresRepository(db *pgxpool.Pool) Repository

type Run

type Run struct {
	RunID  string                 `json:"runId"`
	Facets map[string]interface{} `json:"facets,omitempty"`
}

type RunEvent

type RunEvent struct {
	EventType string    `json:"eventType"`
	EventTime time.Time `json:"eventTime"`
	Run       Run       `json:"run"`
	Job       Job       `json:"job"`
	Inputs    []Dataset `json:"inputs,omitempty"`
	Outputs   []Dataset `json:"outputs,omitempty"`
	Producer  string    `json:"producer,omitempty"`
	SchemaURL string    `json:"schemaURL,omitempty"`
}

type RunHistoryEntry added in v0.3.0

type RunHistoryEntry struct {
	ID           string                 `json:"id"`
	AssetID      string                 `json:"asset_id"`
	RunID        string                 `json:"run_id"`
	JobNamespace string                 `json:"job_namespace"`
	JobName      string                 `json:"job_name"`
	EventType    string                 `json:"event_type"`
	EventTime    time.Time              `json:"event_time"`
	Producer     string                 `json:"producer,omitempty"`
	RunFacets    map[string]interface{} `json:"run_facets,omitempty"`
	JobFacets    map[string]interface{} `json:"job_facets,omitempty"`
	Inputs       []Dataset              `json:"inputs,omitempty"`
	Outputs      []Dataset              `json:"outputs,omitempty"`
	CreatedAt    time.Time              `json:"created_at"`
}

type Service

type Service interface {
	GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
	CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string, lineageType string) (string, error)
	EdgeExists(ctx context.Context, source, target string) (bool, error)
	DeleteDirectLineage(ctx context.Context, edgeID string) error
	GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
	GetImmediateNeighbors(ctx context.Context, assetMRN string, direction string) ([]string, error)
	SetLineageChangeObserver(observer LineageChangeObserver)
	ProcessOpenLineageEvent(ctx context.Context, event *RunEvent, createdBy string) error
	StoreRunHistory(ctx context.Context, entry *RunHistoryEntry) error
}

func NewService

func NewService(repo Repository, assetSvc asset.Service, opts ...ServiceOption) Service

type ServiceOption

type ServiceOption func(*service)

func WithMetrics

func WithMetrics(metrics MetricsClient) ServiceOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL