From 4d495aa16cf3526dac929b1661c61fc1b4f86ffd Mon Sep 17 00:00:00 2001 From: Ashutosh Sanzgiri Date: Wed, 3 Feb 2021 11:07:57 -0800 Subject: [PATCH] add docs for vertica extractor Signed-off-by: Ashutosh Sanzgiri --- README.md | 9 ++ example/scripts/sample_vertica_loader.py | 185 +++++++++++++++++++++++ 2 files changed, 194 insertions(+) create mode 100644 example/scripts/sample_vertica_loader.py diff --git a/README.md b/README.md index 87d297cbe..baad3eb4d 100644 --- a/README.md +++ b/README.md @@ -444,6 +444,15 @@ job = DefaultJob( job.launch() ``` +#### [VerticaMetadataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/vertica_metadata_extractor.py "MysqlMetadataExtractor") +An extractor that extracts table and column metadata including database, schema, table name, column name and column datatype from a Vertica database. + +A sample loading script for Vertica is provided [here](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/databuilder/example/scripts/sample_vertica_loader.py) + +By default, the Vertica database name is used as the cluster name. The `where_clause_suffix` in the example can be used to define which schemas you would like to query. + + + #### [SQLAlchemyExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/sql_alchemy_extractor.py "SQLAlchemyExtractor") An extractor utilizes [SQLAlchemy](https://www.sqlalchemy.org/ "SQLAlchemy") to extract record from any database that support SQL Alchemy. ```python diff --git a/example/scripts/sample_vertica_loader.py b/example/scripts/sample_vertica_loader.py new file mode 100644 index 000000000..bb491b175 --- /dev/null +++ b/example/scripts/sample_vertica_loader.py @@ -0,0 +1,185 @@ +# Copyright Contributors to the Amundsen project. +# SPDX-License-Identifier: Apache-2.0 + +""" +This is a example script which demo how to load data +into Neo4j and Elasticsearch without using an Airflow DAG. + +""" + +import logging +import os +import sys +import textwrap +import uuid +from elasticsearch import Elasticsearch +from pyhocon import ConfigFactory +from sqlalchemy.ext.declarative import declarative_base + +from databuilder.extractor.vertica_metadata_extractor import VerticaMetadataExtractor +from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor +from databuilder.extractor.neo4j_extractor import Neo4jExtractor +from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor +from databuilder.job.job import DefaultJob +from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader +from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader +from databuilder.publisher import neo4j_csv_publisher +from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher +from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher +from databuilder.task.task import DefaultTask +from databuilder.transformer.base_transformer import NoopTransformer + +es_host = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_HOST', 'localhost') +neo_host = os.getenv('CREDENTIALS_NEO4J_PROXY_HOST', 'localhost') + +es_port = os.getenv('CREDENTIALS_ELASTICSEARCH_PROXY_PORT', 9200) +neo_port = os.getenv('CREDENTIALS_NEO4J_PROXY_PORT', 7687) + +if len(sys.argv) > 1: + es_host = sys.argv[1] +if len(sys.argv) > 2: + neo_host = sys.argv[2] + +es = Elasticsearch([ + {'host': es_host, 'port': es_port}, +]) + +DB_FILE = '/tmp/test.db' +SQLITE_CONN_STRING = 'sqlite:////tmp/test.db' +Base = declarative_base() + +NEO4J_ENDPOINT = 'bolt://{}:{}'.format(neo_host, neo_port) + +neo4j_endpoint = NEO4J_ENDPOINT + +neo4j_user = 'neo4j' +neo4j_password = 'test' + + +# specify vertica access credentials, host server, port (default 5433), +# database name (default 'vertica') +def connection_string(): + user = 'username' + password = 'password' + host = 'vertica-budget.host' + port = '5433' + db = 'vertica' + return "vertica+vertica_python://%s:%s@%s:%s/%s" % (user, password, host, port, db) + +# provide schemas to run extraction on (default 'public') +def run_vertica_job(): + where_clause_suffix = textwrap.dedent(""" + where c.table_schema = 'public' + """) + + tmp_folder = '/var/tmp/amundsen/table_metadata' + node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder) + relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder) + + job_config = ConfigFactory.from_dict({ + 'extractor.vertica_metadata.{}'.format(VerticaMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): + where_clause_suffix, + 'extractor.vertica_metadata.{}'.format(VerticaMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): + False, + 'extractor.vertica_metadata.{}'.format(VerticaMetadataExtractor.CLUSTER_KEY): + 'vertica_budget', + 'extractor.vertica_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): + connection_string(), + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): + node_files_folder, + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): + relationship_files_folder, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): + node_files_folder, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): + relationship_files_folder, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): + neo4j_endpoint, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): + neo4j_user, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): + neo4j_password, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG): + 'unique_tag', # should use unique tag here like {ds} + }) + job = DefaultJob(conf=job_config, + task=DefaultTask(extractor=VerticaMetadataExtractor(), loader=FsNeo4jCSVLoader()), + publisher=Neo4jCsvPublisher()) + return job + + +def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index', + elasticsearch_doc_type_key='table', + model_name='databuilder.models.table_elasticsearch_document.TableESDocument', + cypher_query=None, + elasticsearch_mapping=None): + """ + :param elasticsearch_index_alias: alias for Elasticsearch used in + amundsensearchlibrary/search_service/config.py as an index + :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in + `table_search_index` + :param model_name: the Databuilder model class used in transporting between Extractor and Loader + :param cypher_query: Query handed to the `Neo4jSearchDataExtractor` class, if None is given (default) + it uses the `Table` query baked into the Extractor + :param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class, + if None is given (default) it uses the `Table` query baked into the Publisher + """ + # loader saves data to this location and publisher reads it from here + extracted_search_data_path = '/var/tmp/amundsen/search_data.json' + + task = DefaultTask(loader=FSElasticsearchJSONLoader(), + extractor=Neo4jSearchDataExtractor(), + transformer=NoopTransformer()) + + # elastic search client instance + elasticsearch_client = es + # unique name of new index in Elasticsearch + elasticsearch_new_index_key = 'tables' + str(uuid.uuid4()) + + job_config = ConfigFactory.from_dict({ + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint, + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): model_name, + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user, + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password, + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): + extracted_search_data_path, + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w', + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY): + extracted_search_data_path, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r', + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY): + elasticsearch_client, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): + elasticsearch_new_index_key, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY): + elasticsearch_doc_type_key, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): + elasticsearch_index_alias, + }) + + # only optionally add these keys, so need to dynamically `put` them + if cypher_query: + job_config.put('extractor.search_data.{}'.format(Neo4jSearchDataExtractor.CYPHER_QUERY_CONFIG_KEY), + cypher_query) + if elasticsearch_mapping: + job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY), + elasticsearch_mapping) + + job = DefaultJob(conf=job_config, + task=task, + publisher=ElasticsearchPublisher()) + return job + + +if __name__ == "__main__": + # Uncomment next line to get INFO level logging + # logging.basicConfig(level=logging.INFO) + + loading_job = run_vertica_job() + loading_job.launch() + + job_es_table = create_es_publisher_sample_job( + elasticsearch_index_alias='table_search_index', + elasticsearch_doc_type_key='table', + model_name='databuilder.models.table_elasticsearch_document.TableESDocument') + job_es_table.launch()