Documentation
¶
Index ¶
- Constants
- type Dataset
- type Job
- type LineageEdge
- type LineageNode
- type LineageResponse
- type Logger
- type MetricsClient
- type PostgresRepository
- func (r *PostgresRepository) CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string) (string, error)
- func (r *PostgresRepository) DeleteDirectLineage(ctx context.Context, edgeID string) error
- func (r *PostgresRepository) EdgeExists(ctx context.Context, source, target string) (bool, error)
- func (r *PostgresRepository) GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
- func (r *PostgresRepository) GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
- func (r *PostgresRepository) StoreRunHistory(ctx context.Context, entry *RunHistoryEntry) error
- type Repository
- type Run
- type RunEvent
- type RunHistoryEntry
- type Service
- type ServiceOption
Constants ¶
View Source
const ( EventTypeStart = "START" EventTypeRunning = "RUNNING" EventTypeComplete = "COMPLETE" EventTypeFail = "FAIL" EventTypeAbort = "ABORT" EventTypeOther = "OTHER" )
View Source
const ( AssetTypeQuery = "Query" AssetTypeCommand = "Command" AssetTypeDAG = "Dag" AssetTypeTask = "Task" AssetTypeJob = "Job" AssetTypeModel = "Model" AssetTypeTable = "Table" AssetTypeTopic = "Topic" AssetTypeFile = "File" AssetTypeBucket = "Bucket" AssetTypeDataset = "Dataset" AssetTypeProject = "Project" )
View Source
const ( ProviderDBT = "DBT" ProviderAirflow = "Airflow" ProviderSpark = "Spark" ProviderBigQuery = "BigQuery" ProviderPostgreSQL = "PostgreSQL" ProviderMySQL = "MySQL" ProviderSQLServer = "SQLServer" ProviderKafka = "Kafka" ProviderS3 = "S3" ProviderGCS = "GCS" ProviderAzure = "Azure" ProviderOpenLineage = "OpenLineage" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type LineageEdge ¶
type LineageNode ¶
type LineageResponse ¶
type LineageResponse struct {
Nodes []LineageNode `json:"nodes"`
Edges []LineageEdge `json:"edges"`
}
type MetricsClient ¶
type PostgresRepository ¶
type PostgresRepository struct {
// contains filtered or unexported fields
}
func (*PostgresRepository) CreateDirectLineage ¶
func (*PostgresRepository) DeleteDirectLineage ¶
func (r *PostgresRepository) DeleteDirectLineage(ctx context.Context, edgeID string) error
func (*PostgresRepository) EdgeExists ¶
func (*PostgresRepository) GetAssetLineage ¶
func (r *PostgresRepository) GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
func (*PostgresRepository) GetDirectLineage ¶
func (r *PostgresRepository) GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
func (*PostgresRepository) StoreRunHistory ¶ added in v0.3.0
func (r *PostgresRepository) StoreRunHistory(ctx context.Context, entry *RunHistoryEntry) error
type Repository ¶
type Repository interface {
GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string) (string, error)
EdgeExists(ctx context.Context, source, target string) (bool, error)
DeleteDirectLineage(ctx context.Context, edgeID string) error
GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
StoreRunHistory(ctx context.Context, entry *RunHistoryEntry) error
}
func NewPostgresRepository ¶
func NewPostgresRepository(db *pgxpool.Pool) Repository
type RunEvent ¶
type RunEvent struct {
EventType string `json:"eventType"`
EventTime time.Time `json:"eventTime"`
Run Run `json:"run"`
Job Job `json:"job"`
Inputs []Dataset `json:"inputs,omitempty"`
Outputs []Dataset `json:"outputs,omitempty"`
Producer string `json:"producer,omitempty"`
SchemaURL string `json:"schemaURL,omitempty"`
}
type RunHistoryEntry ¶ added in v0.3.0
type RunHistoryEntry struct {
ID string `json:"id"`
AssetID string `json:"asset_id"`
RunID string `json:"run_id"`
JobNamespace string `json:"job_namespace"`
JobName string `json:"job_name"`
EventType string `json:"event_type"`
EventTime time.Time `json:"event_time"`
Producer string `json:"producer,omitempty"`
RunFacets map[string]interface{} `json:"run_facets,omitempty"`
JobFacets map[string]interface{} `json:"job_facets,omitempty"`
Inputs []Dataset `json:"inputs,omitempty"`
Outputs []Dataset `json:"outputs,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
type Service ¶
type Service interface {
GetAssetLineage(ctx context.Context, assetID string, limit int, direction string) (*LineageResponse, error)
CreateDirectLineage(ctx context.Context, sourceMRN string, targetMRN string) (string, error)
EdgeExists(ctx context.Context, source, target string) (bool, error)
DeleteDirectLineage(ctx context.Context, edgeID string) error
GetDirectLineage(ctx context.Context, edgeID string) (*LineageEdge, error)
ProcessOpenLineageEvent(ctx context.Context, event *RunEvent, createdBy string) error
}
func NewService ¶
func NewService(repo Repository, assetSvc asset.Service, opts ...ServiceOption) Service
type ServiceOption ¶
type ServiceOption func(*service)
func WithMetrics ¶
func WithMetrics(metrics MetricsClient) ServiceOption
Click to show internal directories.
Click to hide internal directories.