Documentation
¶
Overview ¶
+marmot:name=Airflow +marmot:description=Ingests metadata from Apache Airflow including DAGs, tasks, and dataset lineage. +marmot:status=experimental +marmot:features=Assets, Lineage, Run History
Index ¶
- type APIError
- type AirflowDAGFields
- type AirflowDAGRunFields
- type AirflowDatasetFields
- type AirflowTaskFields
- type Client
- func (c *Client) GetDAG(ctx context.Context, dagID string) (*DAG, error)
- func (c *Client) GetDatasetEvents(ctx context.Context, datasetURI string) ([]DatasetEvent, error)
- func (c *Client) ListDAGRuns(ctx context.Context, dagID string, days int) ([]DAGRun, error)
- func (c *Client) ListDAGs(ctx context.Context, onlyActive bool) ([]DAG, error)
- func (c *Client) ListDatasets(ctx context.Context) ([]Dataset, error)
- func (c *Client) ListTasks(ctx context.Context, dagID string) ([]Task, error)
- type ClientConfig
- type Config
- type DAG
- type DAGCollection
- type DAGRun
- type DAGRunCollection
- type DagRef
- type Dataset
- type DatasetCollection
- type DatasetEvent
- type DatasetEventCollection
- type ExtraLink
- type RetryDelay
- type ScheduleInterval
- type Source
- type SubDAG
- type Tag
- type Task
- type TaskCollection
- type TaskRef
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type APIError ¶
type APIError struct {
Detail string `json:"detail"`
Status int `json:"status"`
Title string `json:"title"`
Type string `json:"type"`
}
APIError represents an error response from the Airflow API
type AirflowDAGFields ¶
type AirflowDAGFields struct {
DagID string `json:"dag_id" metadata:"dag_id" description:"Unique DAG identifier"`
Description string `json:"description" metadata:"description" description:"DAG description"`
FilePath string `json:"file_path" metadata:"file_path" description:"Path to DAG definition file"`
ScheduleInterval string `json:"schedule_interval" metadata:"schedule_interval" description:"DAG schedule (cron expression or preset)"`
IsPaused bool `json:"is_paused" metadata:"is_paused" description:"Whether DAG is paused"`
IsActive bool `json:"is_active" metadata:"is_active" description:"Whether DAG is active"`
Owners string `json:"owners" metadata:"owners" description:"DAG owners (comma-separated)"`
LastRunState string `json:"last_run_state" metadata:"last_run_state" description:"State of the last DAG run (success, failed, running)"`
LastRunID string `json:"last_run_id" metadata:"last_run_id" description:"ID of the last DAG run"`
LastRunDate string `json:"last_run_date" metadata:"last_run_date" description:"Execution date of the last DAG run"`
NextRunDate string `json:"next_run_date" metadata:"next_run_date" description:"Next scheduled run date"`
LastParsedTime string `json:"last_parsed_time" metadata:"last_parsed_time" description:"Last time the DAG file was parsed"`
SuccessRate float64 `json:"success_rate" metadata:"success_rate" description:"Success rate percentage over the lookback period"`
RunCount int `json:"run_count" metadata:"run_count" description:"Number of runs in the lookback period"`
}
AirflowDAGFields represents Airflow DAG-specific metadata fields +marmot:metadata
type AirflowDAGRunFields ¶
type AirflowDAGRunFields struct {
DagRunID string `json:"dag_run_id" metadata:"dag_run_id" description:"Unique identifier for the DAG run"`
State string `json:"state" metadata:"state" description:"Run state (queued, running, success, failed)"`
ExecutionDate string `json:"execution_date" metadata:"execution_date" description:"Logical execution date"`
StartDate string `json:"start_date" metadata:"start_date" description:"Actual start time of the run"`
EndDate string `json:"end_date" metadata:"end_date" description:"End time of the run"`
RunType string `json:"run_type" metadata:"run_type" description:"Type of run (scheduled, manual, backfill)"`
}
AirflowDAGRunFields represents Airflow DAG run-specific metadata fields +marmot:metadata
type AirflowDatasetFields ¶
type AirflowDatasetFields struct {
URI string `json:"uri" metadata:"uri" description:"Dataset URI identifier"`
CreatedAt string `json:"created_at" metadata:"created_at" description:"Dataset creation timestamp"`
UpdatedAt string `json:"updated_at" metadata:"updated_at" description:"Dataset last update timestamp"`
ProducerCount int `json:"producer_count" metadata:"producer_count" description:"Number of tasks that produce this dataset"`
ConsumerCount int `json:"consumer_count" metadata:"consumer_count" description:"Number of DAGs that consume this dataset"`
}
AirflowDatasetFields represents Airflow Dataset-specific metadata fields +marmot:metadata
type AirflowTaskFields ¶
type AirflowTaskFields struct {
TaskID string `json:"task_id" metadata:"task_id" description:"Task identifier within the DAG"`
DagID string `json:"dag_id" metadata:"dag_id" description:"Parent DAG ID"`
OperatorName string `json:"operator_name" metadata:"operator_name" description:"Airflow operator class name (e.g., BashOperator, PythonOperator)"`
TriggerRule string `json:"trigger_rule" metadata:"trigger_rule" description:"Task trigger rule (e.g., all_success, one_success)"`
Retries int `json:"retries" metadata:"retries" description:"Number of retries configured for the task"`
Pool string `json:"pool" metadata:"pool" description:"Execution pool for the task"`
DownstreamTasks []string `json:"downstream_tasks" metadata:"downstream_tasks" description:"List of downstream task IDs"`
}
AirflowTaskFields represents Airflow task-specific metadata fields +marmot:metadata
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is an Airflow REST API client
func NewClient ¶
func NewClient(config ClientConfig) *Client
NewClient creates a new Airflow API client
func (*Client) GetDatasetEvents ¶
GetDatasetEvents returns events for a specific dataset
func (*Client) ListDAGRuns ¶
ListDAGRuns returns DAG runs for a specific DAG within the specified number of days
func (*Client) ListDatasets ¶
ListDatasets returns all datasets from Airflow (requires Airflow 2.4+)
type ClientConfig ¶
type ClientConfig struct {
BaseURL string
Username string
Password string
APIToken string
Timeout time.Duration
}
ClientConfig holds configuration for the Airflow API client
type Config ¶
type Config struct {
plugin.BaseConfig `json:",inline"`
Host string `json:"host" description:"Airflow webserver URL (e.g., http://localhost:8080)" validate:"required,url"`
Username string `json:"username,omitempty" description:"Username for basic authentication"`
Password string `json:"password,omitempty" description:"Password for basic authentication" sensitive:"true"`
APIToken string `json:"api_token,omitempty" description:"API token for authentication (alternative to basic auth)" sensitive:"true"`
DiscoverDAGs bool `json:"discover_dags" description:"Discover Airflow DAGs as Pipeline assets" default:"true"`
DiscoverTasks bool `json:"discover_tasks" description:"Discover tasks within DAGs" default:"true"`
DiscoverDatasets bool `json:"discover_datasets" description:"Discover Airflow Datasets for lineage (requires Airflow 2.4+)" default:"true"`
IncludeRunHistory bool `json:"include_run_history" description:"Include DAG run history in metadata" default:"true"`
RunHistoryDays int `json:"run_history_days" description:"Number of days of run history to fetch" default:"7"`
DAGFilter *plugin.Filter `json:"dag_filter,omitempty" description:"Filter DAGs by ID pattern (include/exclude regex)"`
OnlyActive bool `json:"only_active" description:"Only discover active (unpaused) DAGs" default:"true"`
}
Config for Airflow plugin +marmot:config
type DAG ¶
type DAG struct {
DagID string `json:"dag_id"`
DagDisplayName string `json:"dag_display_name,omitempty"`
Description *string `json:"description"`
FileToken string `json:"file_token"`
Fileloc string `json:"fileloc"`
IsPaused bool `json:"is_paused"`
IsActive bool `json:"is_active"`
IsSubdag bool `json:"is_subdag"`
LastParsedTime *string `json:"last_parsed_time"`
LastPickled *string `json:"last_pickled"`
LastExpiredTime *string `json:"last_expired"`
SchedulerLock *string `json:"scheduler_lock"`
PickleID *string `json:"pickle_id"`
DefaultView string `json:"default_view"`
Owners []string `json:"owners"`
Tags []Tag `json:"tags"`
ScheduleInterval *ScheduleInterval `json:"schedule_interval"`
TimetableDesc *string `json:"timetable_description"`
NextDagRun *string `json:"next_dagrun"`
NextDagRunTime *string `json:"next_dagrun_data_interval_start"`
MaxActiveRuns int `json:"max_active_runs"`
MaxActiveTasks int `json:"max_active_tasks"`
HasTaskConcur bool `json:"has_task_concurrency_limits"`
HasImportErrors bool `json:"has_import_errors"`
}
DAG represents an Airflow DAG from the REST API
type DAGCollection ¶
DAGCollection represents the API response for listing DAGs
type DAGRun ¶
type DAGRun struct {
DagRunID string `json:"dag_run_id"`
DagID string `json:"dag_id"`
LogicalDate string `json:"logical_date"`
ExecutionDate string `json:"execution_date"`
StartDate *string `json:"start_date"`
EndDate *string `json:"end_date"`
DataIntervalStart *string `json:"data_interval_start"`
DataIntervalEnd *string `json:"data_interval_end"`
LastSchedulingDec *string `json:"last_scheduling_decision"`
RunType string `json:"run_type"`
State string `json:"state"`
ExternalTrigger bool `json:"external_trigger"`
Conf map[string]interface{} `json:"conf"`
Note *string `json:"note"`
}
DAGRun represents an Airflow DAG run from the REST API
type DAGRunCollection ¶
type DAGRunCollection struct {
DagRuns []DAGRun `json:"dag_runs"`
TotalCount int `json:"total_entries"`
}
DAGRunCollection represents the API response for listing DAG runs
type DagRef ¶
type DagRef struct {
DagID string `json:"dag_id"`
}
DagRef represents a reference to a DAG
type Dataset ¶
type Dataset struct {
ID int `json:"id"`
URI string `json:"uri"`
Extra map[string]interface{} `json:"extra"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
ConsumingDags []DagRef `json:"consuming_dags"`
ProducingTasks []TaskRef `json:"producing_tasks"`
}
Dataset represents an Airflow Dataset from the REST API (Airflow 2.4+)
type DatasetCollection ¶
type DatasetCollection struct {
Datasets []Dataset `json:"datasets"`
TotalCount int `json:"total_entries"`
}
DatasetCollection represents the API response for listing datasets
type DatasetEvent ¶
type DatasetEvent struct {
ID int `json:"id"`
DatasetID int `json:"dataset_id"`
DatasetURI string `json:"dataset_uri"`
SourceDagID *string `json:"source_dag_id"`
SourceTaskID *string `json:"source_task_id"`
SourceRunID *string `json:"source_run_id"`
SourceMapIndex int `json:"source_map_index"`
CreatedDagruns []DagRef `json:"created_dagruns"`
Timestamp string `json:"timestamp"`
Extra map[string]interface{} `json:"extra"`
}
DatasetEvent represents a dataset event from the REST API
type DatasetEventCollection ¶
type DatasetEventCollection struct {
DatasetEvents []DatasetEvent `json:"dataset_events"`
TotalCount int `json:"total_entries"`
}
DatasetEventCollection represents the API response for listing dataset events
type ExtraLink ¶
type ExtraLink struct {
ClassRef string `json:"class_ref"`
}
ExtraLink represents an extra link on a task
type RetryDelay ¶
type RetryDelay struct {
Type string `json:"__type"`
Days int `json:"days"`
Seconds int `json:"seconds"`
Micros int `json:"microseconds"`
}
RetryDelay represents a task's retry delay configuration
type ScheduleInterval ¶
ScheduleInterval represents a DAG's schedule interval
type Source ¶
type Source struct {
// contains filtered or unexported fields
}
Source implements the Airflow plugin.
func (*Source) Discover ¶
func (s *Source) Discover(ctx context.Context, rawConfig plugin.RawPluginConfig) (*plugin.DiscoveryResult, error)
Discover discovers Airflow DAGs, tasks, and datasets.
func (*Source) Validate ¶
func (s *Source) Validate(rawConfig plugin.RawPluginConfig) (plugin.RawPluginConfig, error)
Validate validates and normalizes the plugin configuration.
type SubDAG ¶
type SubDAG struct {
DagID string `json:"dag_id"`
}
SubDAG represents a sub-DAG reference
type Task ¶
type Task struct {
TaskID string `json:"task_id"`
TaskDisplayName string `json:"task_display_name,omitempty"`
OperatorName string `json:"operator_name"`
ClassName string `json:"class_ref,omitempty"`
Pool string `json:"pool"`
PoolSlots int `json:"pool_slots"`
ExecutionTimeout *string `json:"execution_timeout"`
TriggerRule string `json:"trigger_rule"`
Retries int `json:"retries"`
RetryDelay *RetryDelay `json:"retry_delay"`
RetryExponentialBac bool `json:"retry_exponential_backoff"`
PriorityWeight int `json:"priority_weight"`
Weight string `json:"weight_rule"`
Queue string `json:"queue"`
DownstreamTaskIDs []string `json:"downstream_task_ids"`
UpstreamTaskIDs []string `json:"upstream_task_ids"`
DependsOnPast bool `json:"depends_on_past"`
WaitForDownstream bool `json:"wait_for_downstream"`
StartDate *string `json:"start_date"`
EndDate *string `json:"end_date"`
UIColor string `json:"ui_color"`
UIFgcolor string `json:"ui_fgcolor"`
TemplateFields []string `json:"template_fields"`
ExtraLinks []ExtraLink `json:"extra_links"`
SubDag *SubDAG `json:"sub_dag,omitempty"`
Params map[string]interface{} `json:"params,omitempty"`
}
Task represents an Airflow task from the REST API
type TaskCollection ¶
TaskCollection represents the API response for listing tasks