Documentation Index
Fetch the complete documentation index at: https://openmetadata-feat-feat-2mbfixtestexui.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
This page is about running the Ingestion Framework externally!There are mainly 2 ways of running the ingestion:
- Internally, by managing the workflows from OpenMetadata.
- Externally, by using any other tool capable of running Python code.
If you are looking for how to manage the ingestion process from OpenMetadata, you can follow
this doc.
Run the ingestion from your Airflow
OpenMetadata integrates with Airflow to orchestrate ingestion workflows. You can use Airflow to extract metadata and [deploy workflows] (/deployment/ingestion/openmetadata) directly. This guide explains how to run ingestion workflows in Airflow using three different operators:
- Python Operator
- Docker Operator
- Python Virtualenv Operator
Using the Python Operator
Prerequisites
Install the openmetadata-ingestion package in your Airflow environment. This approach works best if you have access to the Airflow host and can manage dependencies.
Installation Command:
pip3 install openmetadata-ingestion[<plugin>]==x.y.z
-Replace <plugin> with the sources to ingest, such as mysql, snowflake, or s3.
-Replace x.y.z with the OpenMetadata version matching your server (e.g., 1.6.1).
Example
pip3 install openmetadata-ingestion[mysql,snowflake,s3]==1.6.1
Example DAG
import yaml
from datetime import timedelta
from airflow import DAG
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.config.common import load_config_file
from metadata.workflow.metadata import MetadataWorkflow
from airflow.utils.dates import days_ago
default_args = {
"owner": "user_name",
"email": ["username@org.com"],
"email_on_failure": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=60)
}
config = """
<your YAML configuration>
"""
def metadata_ingestion_workflow():
workflow_config = yaml.safe_load(config)
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
with DAG(
"sample_data",
default_args=default_args,
description="An example DAG which runs a OpenMetadata ingestion workflow",
start_date=days_ago(1),
is_paused_upon_creation=False,
schedule_interval='*/5 * * * *',
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=metadata_ingestion_workflow,
)
Key Notes
- Function Setup: The
python_callable argument in the PythonOperator executes the metadata_ingestion_workflow function, which instantiates the workflow and runs the ingestion process.
- Drawback: This method requires pre-installed dependencies, which may not always be feasible. Consider using the DockerOperator or PythonVirtualenvOperator as alternatives.
Next Steps
Docker & Virtualenv Operators
Run ingestion using the Docker Operator or Python Virtualenv Operator for isolated, dependency-free execution.