Skip to main content

Documentation Index

Fetch the complete documentation index at: https://openmetadata-feat-feat-2mbfixtestexui.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Develop the Ingestion Code

We recommend you to take some time to understand how the Ingestion Framework works by reading this small article. The main takes for developing a new connector are:
  • To understand that each of our Source Types (Databases, Dashboards, etc) have a Topology attached.
  • To understand that the process flow is implemented as a generator chain, going through each step.

Service Spec

When developing a new database ingestion connector in OpenMetadata, ensure all necessary components are correctly configured. This guide outlines the steps required to define the connector’s ingestion capabilities using a service_spec.py file.

Why Use service_spec.py?

The service_spec.py file centralizes the definitions of sources, profilers, lineage, and other ingestion-related components for a connector. This approach helps standardize implementations across connectors, making it easier to manage ingestion workflows.

Steps to Develop a New Connector

1. Create the service_spec.py File

Add a service_spec.py file within the connector’s directory. This file will define the components needed for ingestion, such as metadata sources, lineage sources, profilers, and samplers.

2. Use the DefaultDatabaseSpec Class

The DefaultDatabaseSpec class simplifies the definition of connectors by bundling the required components. Import the DefaultDatabaseSpec and reference the appropriate classes for your connector.

3. Define the ServiceSpec

Customize the ServiceSpec object based on the features of your connector. Below is an example configuration:
from metadata.ingestion.source.database.bigquery.lineage import BigqueryLineageSource
from metadata.ingestion.source.database.bigquery.metadata import BigquerySource
from metadata.ingestion.source.database.bigquery.profiler.profiler import (
    BigQueryProfiler,
)
from metadata.ingestion.source.database.bigquery.usage import BigqueryUsageSource
from metadata.sampler.sqlalchemy.bigquery.sampler import BigQuerySampler
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(
    metadata_source_class=BigquerySource,
    lineage_source_class=BigqueryLineageSource,
    usage_source_class=BigqueryUsageSource,
    profiler_class=BigQueryProfiler,
    sampler_class=BigQuerySampler,
)

4. Adjust Classes for Your Connector

Replace the example classes (e.g., BigquerySource, BigqueryLineageSource, etc.) with those specific to your connector. Depending on the connector’s features, you may include or exclude certain components like usage or profiling.

Components of service_spec.py

  • metadata_source_class: Defines the class for metadata ingestion.
  • lineage_source_class: Defines the class for lineage extraction.
  • usage_source_class: Tracks data usage patterns.
  • profiler_class: Profiles data for quality and insights.
  • sampler_class: Samples data for efficient ingestion.

Example Workflow

Step 1: Add service_spec.py

Place the file in the connector’s directory.

Step 2: Configure Components

Define the ServiceSpec using the required classes, adjusting for your connector’s capabilities.

Step 3: Verify Integration

Run the ingestion workflow to test the connector and ensure all components are functioning correctly.

Service Topology

The Topology defines a series of Nodes and Stages that get executed in a hierarchical way and describe how we extract the needed data from the sources. Starting from the Root node we process the entities in a depth first approach, following the topology tree through the node’s children. From the Service Topology you can understand what methods you need to implement:
  • producer: Methods that will fetch the entities we need to process
  • processor: Methods that will yield a given Entity
  • post_process: Methods that will yield a given Entity but are ran after all entities from that node were processed.

Example - DatabaseServiceTopology

Can be found in ingestion/src/metadata/ingestion/source/database/database_service.py

OpenMetadata 1.6.0 or later

Starting from 1.6.0 the OpenMetadata Ingestion Framewotk is using a ServiceSpec specificaiton in order to define the entrypoints for the ingestion process.
class DatabaseServiceTopology(ServiceTopology):
    """
    Defines the hierarchy in Database Services.
    service -> db -> schema -> table.

    We could have a topology validator. We can only consume
    data that has been produced by any parent node.
    """

    root = TopologyNode(
        producer="get_services",
        stages=[
            NodeStage(
                type_=DatabaseService,
                context="database_service",
                processor="yield_create_request_database_service",
                overwrite=False,
                must_return=True,
                cache_entities=True,
            ),
        ],
        children=["database"],
    )
    database = TopologyNode(
        producer="get_database_names",
        stages=[
            NodeStage(
                type_=OMetaTagAndClassification,
                context="tags",
                processor="yield_database_tag_details",
                nullable=True,
                store_all_in_context=True,
            ),
            NodeStage(
                type_=Database,
                context="database",
                processor="yield_database",
                consumer=["database_service"],
                cache_entities=True,
                use_cache=True,
            ),
        ],
        children=["databaseSchema"],
    )
    databaseSchema = TopologyNode(
        producer="get_database_schema_names",
        stages=[
            NodeStage(
                type_=OMetaTagAndClassification,
                context="tags",
                processor="yield_database_schema_tag_details",
                nullable=True,
                store_all_in_context=True,
            ),
            NodeStage(
                type_=DatabaseSchema,
                context="database_schema",
                processor="yield_database_schema",
                consumer=["database_service", "database"],
                cache_entities=True,
                use_cache=True,
            ),
        ],
        children=["table", "stored_procedure"],
        post_process=["mark_tables_as_deleted", "mark_stored_procedures_as_deleted"],
    )
    table = TopologyNode(
        producer="get_tables_name_and_type",
        stages=[
            NodeStage(
                type_=OMetaTagAndClassification,
                context="tags",
                processor="yield_table_tag_details",
                nullable=True,
                store_all_in_context=True,
            ),
            NodeStage(
                type_=Table,
                context="table",
                processor="yield_table",
                consumer=["database_service", "database", "database_schema"],
                use_cache=True,
            ),
            NodeStage(
                type_=OMetaLifeCycleData,
                processor="yield_life_cycle_data",
                nullable=True,
            ),
        ],
    )
    stored_procedure = TopologyNode(
        producer="get_stored_procedures",
        stages=[
            NodeStage(
                type_=StoredProcedure,
                context="stored_procedures",
                processor="yield_stored_procedure",
                consumer=["database_service", "database", "database_schema"],
                store_all_in_context=True,
                store_fqn=True,
                use_cache=True,
            ),
        ],
    )

Next Step

Service Source

Understand the Service Source abstract class and implement the required methods for your connector.