lineage

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EventTypeStart    = "START"
	EventTypeRunning  = "RUNNING"
	EventTypeComplete = "COMPLETE"
	EventTypeFail     = "FAIL"
	EventTypeAbort    = "ABORT"
	EventTypeOther    = "OTHER"
)
View Source
const (
	AssetTypeQuery   = "Query"
	AssetTypeCommand = "Command"
	AssetTypeDAG     = "Dag"
	AssetTypeTask    = "Task"
	AssetTypeJob     = "Job"
	AssetTypeModel   = "Model"
	AssetTypeTable   = "Table"
	AssetTypeTopic   = "Topic"
	AssetTypeFile    = "File"
	AssetTypeBucket  = "Bucket"
	AssetTypeDataset = "Dataset"
	AssetTypeProject = "Project"
)
View Source
const (
	ProviderDBT         = "DBT"
	ProviderAirflow     = "Airflow"
	ProviderSpark       = "Spark"
	ProviderBigQuery    = "BigQuery"
	ProviderPostgreSQL  = "PostgreSQL"
	ProviderMySQL       = "MySQL"
	ProviderSQLServer   = "SQLServer"
	ProviderKafka       = "Kafka"
	ProviderS3          = "S3"
	ProviderGCS         = "GCS"
	ProviderAzure       = "Azure"
	ProviderOpenLineage = "OpenLineage"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Dataset added in v0.3.0

type Dataset struct {
	Namespace    string                 `json:"namespace"`
	Name         string                 `json:"name"`
	Facets       map[string]interface{} `json:"facets,omitempty"`
	InputFacets  map[string]interface{} `json:"inputFacets,omitempty"`
	OutputFacets map[string]interface{} `json:"outputFacets,omitempty"`
}

type Job

type Job struct {
	Namespace string                 `json:"namespace"`
	Name      string                 `json:"name"`
	Facets    map[string]interface{} `json:"facets,omitempty"`
}

type LineageEdge

type LineageEdge struct {
	ID     string `json:"id"`
	Source string `json:"source"`
	Target string `json:"target"`
	Type   string `json:"type"`
	JobMRN string `json:"job_mrn,omitempty"`
}

type LineageNode

type LineageNode struct {
	ID    string       `json:"id"`
	Type  string       `json:"type"`
	Asset *asset.Asset `json:"asset"`
	Depth int          `json:"depth"`
}

type LineageResponse

type LineageResponse struct {
	Nodes []LineageNode `json:"nodes"`
	Edges []LineageEdge `json:"edges"`
}

type Logger

type Logger interface {
	Info(msg string, fields ...interface{})
	Error(msg string, err error, fields ...interface{})
}

type MetricsClient

type MetricsClient interface {
	Count(name string, value int64, tags ...string)
	Timing(name string, value time.Duration, tags ...string)
}

type PostgresRepository

type PostgresRepository struct {
	// contains filtered or unexported fields
}

func (*PostgresRepository) CreateDirectLineage

func (r *PostgresRepository) CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string) (string, error)

func (*PostgresRepository) DeleteDirectLineage

func (r *PostgresRepository) DeleteDirectLineage(ctx context.Context, edgeID string) error

func (*PostgresRepository) EdgeExists

func (r *PostgresRepository) EdgeExists(ctx context.Context, source, target string) (bool, error)

func (*PostgresRepository) GetAssetLineage

func (r *PostgresRepository) GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)

func (*PostgresRepository) GetDirectLineage

func (r *PostgresRepository) GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)

func (*PostgresRepository) StoreRunHistory added in v0.3.0

func (r *PostgresRepository) StoreRunHistory(ctx context.Context, entry *RunHistoryEntry) error

type Repository

type Repository interface {
	GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
	CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string) (string, error)
	EdgeExists(ctx context.Context, source, target string) (bool, error)
	DeleteDirectLineage(ctx context.Context, edgeID string) error
	GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
	StoreRunHistory(ctx context.Context, entry *RunHistoryEntry) error
}

func NewPostgresRepository

func NewPostgresRepository(db *pgxpool.Pool) Repository

type Run

type Run struct {
	RunID  string                 `json:"runId"`
	Facets map[string]interface{} `json:"facets,omitempty"`
}

type RunEvent

type RunEvent struct {
	EventType string    `json:"eventType"`
	EventTime time.Time `json:"eventTime"`
	Run       Run       `json:"run"`
	Job       Job       `json:"job"`
	Inputs    []Dataset `json:"inputs,omitempty"`
	Outputs   []Dataset `json:"outputs,omitempty"`
	Producer  string    `json:"producer,omitempty"`
	SchemaURL string    `json:"schemaURL,omitempty"`
}

type RunHistoryEntry added in v0.3.0

type RunHistoryEntry struct {
	ID           string                 `json:"id"`
	AssetID      string                 `json:"asset_id"`
	RunID        string                 `json:"run_id"`
	JobNamespace string                 `json:"job_namespace"`
	JobName      string                 `json:"job_name"`
	EventType    string                 `json:"event_type"`
	EventTime    time.Time              `json:"event_time"`
	Producer     string                 `json:"producer,omitempty"`
	RunFacets    map[string]interface{} `json:"run_facets,omitempty"`
	JobFacets    map[string]interface{} `json:"job_facets,omitempty"`
	Inputs       []Dataset              `json:"inputs,omitempty"`
	Outputs      []Dataset              `json:"outputs,omitempty"`
	CreatedAt    time.Time              `json:"created_at"`
}

type Service

type Service interface {
	GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
	CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string) (string, error)
	EdgeExists(ctx context.Context, source, target string) (bool, error)
	DeleteDirectLineage(ctx context.Context, edgeID string) error
	GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
	ProcessOpenLineageEvent(ctx context.Context, event *RunEvent, createdBy string) error
}

func NewService

func NewService(repo Repository, assetSvc asset.Service, opts ...ServiceOption) Service

type ServiceOption

type ServiceOption func(*service)

func WithMetrics

func WithMetrics(metrics MetricsClient) ServiceOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL