From 54e89ce74b70ade742edc77d4e76b9fc8241d8ab Mon Sep 17 00:00:00 2001 From: Dorian Johnson <2020@dorianj.net> Date: Wed, 10 Mar 2021 11:49:51 -0800 Subject: [PATCH] docs: minor fixes to README (#457) * README: tabs -> spaces Just to match rest of codebase Signed-off-by: Dorian Johnson <2020@dorianj.net> * README: minor fixes to some examples Signed-off-by: Dorian Johnson <2020@dorianj.net> --- README.md | 480 +++++++++++++++++++++++++++--------------------------- 1 file changed, 239 insertions(+), 241 deletions(-) diff --git a/README.md b/README.md index 46ee68076..a3ae80315 100644 --- a/README.md +++ b/README.md @@ -56,16 +56,16 @@ An extractor that uses [Python Database API](https://www.python.org/dev/peps/pep ```python job_config = ConfigFactory.from_dict({ - 'extractor.dbapi{}'.format(DBAPIExtractor.CONNECTION_CONFIG_KEY): db_api_conn, - 'extractor.dbapi.{}'.format(DBAPIExtractor.SQL_CONFIG_KEY ): select_sql_stmt, - 'extractor.dbapi.model_class': 'package.module_name.class_name' - }) + 'extractor.dbapi{}'.format(DBAPIExtractor.CONNECTION_CONFIG_KEY): db_api_conn, + 'extractor.dbapi.{}'.format(DBAPIExtractor.SQL_CONFIG_KEY ): select_sql_stmt, + 'extractor.dbapi.model_class': 'package.module_name.class_name' + }) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=DBAPIExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=DBAPIExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -82,19 +82,19 @@ As getting metadata from files could be time consuming there're several features ```python job_config = ConfigFactory.from_dict({ - 'extractor.hive_table_last_updated.partitioned_table_where_clause_suffix': partitioned_table_where_clause, - 'extractor.hive_table_last_updated.non_partitioned_table_where_clause_suffix'): non_partitioned_table_where_clause, - 'extractor.hive_table_last_updated.extractor.sqlalchemy.{}'.format( + 'extractor.hive_table_last_updated.partitioned_table_where_clause_suffix': partitioned_table_where_clause, + 'extractor.hive_table_last_updated.non_partitioned_table_where_clause_suffix'): non_partitioned_table_where_clause, + 'extractor.hive_table_last_updated.extractor.sqlalchemy.{}'.format( SQLAlchemyExtractor.CONN_STRING): connection_string, - 'extractor.hive_table_last_updated.extractor.fs_worker_pool_size': pool_size, - 'extractor.hive_table_last_updated.filesystem.{}'.format(FileSystem.DASK_FILE_SYSTEM): s3fs.S3FileSystem( - anon=False, - config_kwargs={'max_pool_connections': pool_size})}) + 'extractor.hive_table_last_updated.extractor.fs_worker_pool_size': pool_size, + 'extractor.hive_table_last_updated.filesystem.{}'.format(FileSystem.DASK_FILE_SYSTEM): s3fs.S3FileSystem( + anon=False, + config_kwargs={'max_pool_connections': pool_size})}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=HiveTableLastUpdatedExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=HiveTableLastUpdatedExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -102,13 +102,13 @@ job.launch() An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from Hive metastore database. ```python job_config = ConfigFactory.from_dict({ - 'extractor.hive_table_metadata.{}'.format(HiveTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, - 'extractor.hive_table_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) + 'extractor.hive_table_metadata.{}'.format(HiveTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, + 'extractor.hive_table_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=HiveTableMetadataExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=HiveTableMetadataExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -117,17 +117,17 @@ An extractor that extracts table and column metadata including keyspace, table n ```python job_config = ConfigFactory.from_dict({ - 'extractor.cassandra.{}'.format(CassandraExtractor.CLUSTER_KEY): cluster_identifier_string, - 'extractor.cassandra.{}'.format(CassandraExtractor.IPS_KEY): [127.0.0.1], - 'extractor.cassandra.{}'.format(CassandraExtractor.KWARGS_KEY): {}, - 'extractor.cassandra.{}'.format(CassandraExtractor.FILTER_FUNCTION_KEY): my_filter_function, + 'extractor.cassandra.{}'.format(CassandraExtractor.CLUSTER_KEY): cluster_identifier_string, + 'extractor.cassandra.{}'.format(CassandraExtractor.IPS_KEY): [127.0.0.1], + 'extractor.cassandra.{}'.format(CassandraExtractor.KWARGS_KEY): {}, + 'extractor.cassandra.{}'.format(CassandraExtractor.FILTER_FUNCTION_KEY): my_filter_function, }) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=CassandraExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=CassandraExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -141,8 +141,8 @@ def filter(keytab, table): If needed to define more args on the cassandra cluster you can pass through kwargs args ```python config = ConfigFactory.from_dict({ - 'extractor.cassandra.{}'.format(CassandraExtractor.IPS_KEY): [127.0.0.1], - 'extractor.cassandra.{}'.format(CassandraExtractor.KWARGS_KEY): {'port': 9042} + 'extractor.cassandra.{}'.format(CassandraExtractor.IPS_KEY): [127.0.0.1], + 'extractor.cassandra.{}'.format(CassandraExtractor.KWARGS_KEY): {'port': 9042} }) # it will call the cluster constructor like this Cluster([127.0.0.1], **kwargs) @@ -154,13 +154,13 @@ An extractor that extracts table and column metadata including database, schema, Before running make sure you have a working AWS profile configured and have access to search tables on Glue ```python job_config = ConfigFactory.from_dict({ - 'extractor.glue.{}'.format(GlueExtractor.CLUSTER_KEY): cluster_identifier_string, - 'extractor.glue.{}'.format(GlueExtractor.FILTER_KEY): []}) + 'extractor.glue.{}'.format(GlueExtractor.CLUSTER_KEY): cluster_identifier_string, + 'extractor.glue.{}'.format(GlueExtractor.FILTER_KEY): []}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=GlueExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=GlueExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -177,13 +177,13 @@ If using the filters option here is the input format ``` #### [Delta-Lake-MetadataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/delta_lake_metadata_extractor.py) -An extractor that runs on a spark cluster and obtains delta-lake metadata using spark sql commands. +An extractor that runs on a spark cluster and obtains delta-lake metadata using spark sql commands. This custom solution is currently necessary because the hive metastore does not contain all metadata information for delta-lake tables. -For simplicity, this extractor can also be used for all hive tables as well. +For simplicity, this extractor can also be used for all hive tables as well. -Because it must run on a spark cluster, +Because it must run on a spark cluster, it is required that you have an operator (for example a [databricks submit run operator](https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/databricks_operator.html)) -that calls the configuration code on a spark cluster. +that calls the configuration code on a spark cluster. ```python spark = SparkSession.builder.appName("Amundsen Delta Lake Metadata Extraction").getOrCreate() job_config = create_delta_lake_job_config() @@ -200,17 +200,17 @@ You can check out the sample deltalake metadata script for a full example. #### [DremioMetadataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/dremio_metadata_extractor.py) An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from [Dremio](https://www.dremio.com). -Before running make sure that you have the Dremio ODBC driver installed. Default config values assume the default driver name for the [MacBook install](https://docs.dremio.com/drivers/mac-odbc.html). +Before running make sure that you have the Dremio ODBC driver installed. Default config values assume the default driver name for the [MacBook install](https://docs.dremio.com/drivers/mac-odbc.html). ```python job_config = ConfigFactory.from_dict({ 'extractor.dremio.{}'.format(DremioMetadataExtractor.DREMIO_USER_KEY): DREMIO_USER, 'extractor.dremio.{}'.format(DremioMetadataExtractor.DREMIO_PASSWORD_KEY): DREMIO_PASSWORD, 'extractor.dremio.{}'.format(DremioMetadataExtractor.DREMIO_HOST_KEY): DREMIO_HOST}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=DremioMetadataExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=DremioMetadataExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -222,17 +222,17 @@ The `where_clause_suffix` could be defined, normally you would like to filter ou You could specify the following job config ```python conn_string = "druid+https://{host}:{port}/druid/v2/sql/".format( - host=druid_broker_host, - port=443 + host=druid_broker_host, + port=443 ) job_config = ConfigFactory.from_dict({ - 'extractor.druid_metadata.{}'.format(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, + 'extractor.druid_metadata.{}'.format(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, 'extractor.druid_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): conn_string()}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=DruidMetadataExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=DruidMetadataExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -248,14 +248,14 @@ The SQL query driving the extraction is defined [here](https://github.com/amunds ```python job_config = ConfigFactory.from_dict({ - 'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, + 'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, 'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True, - 'extractor.postgres_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) + 'extractor.postgres_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=PostgresMetadataExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=PostgresMetadataExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -272,14 +272,14 @@ The SQL query driving the extraction is defined [here](https://github.com/amunds This extractor is highly derived from [PostgresMetadataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/postgres_metadata_extractor.py "PostgresMetadataExtractor"). ```python job_config = ConfigFactory.from_dict({ - 'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, + 'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, 'extractor.mssql_metadata.{}'.format(MSSQLMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True, - 'extractor.mssql_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) + 'extractor.mssql_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=MSSQLMetadataExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=MSSQLMetadataExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -295,12 +295,12 @@ The SQL query driving the extraction is defined [here](https://github.com/amunds ```python job_config = ConfigFactory.from_dict({ - 'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, - 'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True, - 'extractor.mysql_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) + 'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, + 'extractor.mysql_metadata.{}'.format(MysqlMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True, + 'extractor.mysql_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) job = DefaultJob(conf=job_config, - task=DefaultTask(extractor=MysqlMetadataExtractor(), loader=FsNeo4jCSVLoader()), - publisher=Neo4jCsvPublisher()) + task=DefaultTask(extractor=MysqlMetadataExtractor(), loader=FsNeo4jCSVLoader()), + publisher=Neo4jCsvPublisher()) job.launch() ``` @@ -313,13 +313,13 @@ The SQL query driving the extraction is defined [here](https://github.com/amunds ```python job_config = ConfigFactory.from_dict({ - 'extractor.db2_metadata.{}'.format(Db2MetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, - 'extractor.db2_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) + 'extractor.db2_metadata.{}'.format(Db2MetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, + 'extractor.db2_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=Db2MetadataExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=Db2MetadataExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -333,7 +333,7 @@ By default, the Snowflake database is set to `PROD`. To override this, set `DATA to `WhateverNameOfYourDb`. By default, the Snowflake schema is set to `INFORMATION_SCHEMA`. To override this, set `SCHEMA_KEY` -to `WhateverNameOfYourSchema`. +to `WhateverNameOfYourSchema`. Note that `ACCOUNT_USAGE` is a separate schema which allows users to query a wider set of data at the cost of latency. Differences are defined [here](https://docs.snowflake.com/en/sql-reference/account-usage.html#differences-between-account-usage-and-information-schema) @@ -349,10 +349,10 @@ job_config = ConfigFactory.from_dict({ 'extractor.snowflake.{}'.format(SnowflakeMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True, 'extractor.snowflake.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=SnowflakeMetadataExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=SnowflakeMetadataExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -365,15 +365,15 @@ The SQL query driving the extraction is defined [here](https://github.com/amunds ```python job_config = ConfigFactory.from_dict({ - 'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.SNOWFLAKE_DATABASE_KEY): 'YourDbName', - 'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, - 'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME): True, - 'extractor.snowflake_table_last_updated.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) + 'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.SNOWFLAKE_DATABASE_KEY): 'YourDbName', + 'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix, + 'extractor.snowflake_table_last_updated.{}'.format(SnowflakeTableLastUpdatedExtractor.USE_CATALOG_AS_CLUSTER_NAME): True, + 'extractor.snowflake_table_last_updated.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=SnowflakeTableLastUpdatedExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=SnowflakeTableLastUpdatedExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -414,33 +414,33 @@ An extractor that basically get current timestamp and passes it GenericExtractor An extractor that extracts records from Neo4j based on provided [Cypher query](https://neo4j.com/developer/cypher/ "Cypher query"). One example is to extract data from Neo4j so that it can transform and publish to Elasticsearch. ```python job_config = ConfigFactory.from_dict({ - 'extractor.neo4j.{}'.format(Neo4jExtractor.CYPHER_QUERY_CONFIG_KEY): cypher_query, - 'extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint, - 'extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): 'package.module.class_name', - 'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user, - 'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password}, - 'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_ENCRYPTED): True}) + 'extractor.neo4j.{}'.format(Neo4jExtractor.CYPHER_QUERY_CONFIG_KEY): cypher_query, + 'extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint, + 'extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): 'package.module.class_name', + 'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user, + 'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password}, + 'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_ENCRYPTED): True}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=Neo4jExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=Neo4jExtractor(), + loader=AnyLoader())) job.launch() ``` #### [Neo4jSearchDataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/neo4j_search_data_extractor.py "Neo4jSearchDataExtractor") An extractor that is extracting Neo4j utilizing Neo4jExtractor where CYPHER query is already embedded in it. ```python job_config = ConfigFactory.from_dict({ - 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint, - 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): 'databuilder.models.neo4j_data.Neo4jDataResult', - 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user, - 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password}, - 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_ENCRYPTED): False}) + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint, + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): 'databuilder.models.neo4j_data.Neo4jDataResult', + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user, + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password}, + 'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_ENCRYPTED): False}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=Neo4jSearchDataExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=Neo4jSearchDataExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -457,14 +457,14 @@ By default, the Vertica database name is used as the cluster name. The `where_cl An extractor utilizes [SQLAlchemy](https://www.sqlalchemy.org/ "SQLAlchemy") to extract record from any database that support SQL Alchemy. ```python job_config = ConfigFactory.from_dict({ - 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string(), - 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.EXTRACT_SQL): sql, - 'extractor.sqlalchemy.model_class': 'package.module.class_name'}) + 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string(), + 'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.EXTRACT_SQL): sql, + 'extractor.sqlalchemy.model_class': 'package.module.class_name'}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=SQLAlchemyExtractor(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=SQLAlchemyExtractor(), + loader=AnyLoader())) job.launch() ``` @@ -477,8 +477,8 @@ Here are extractors that extracts metadata information from Mode via Mode's REST Prerequisite: 1. You will need to [create API access token](https://mode.com/developer/api-reference/authentication/) that has admin privilege. - 2. You will need organization code. This is something you can easily get by looking at one of Mode report's URL. - `https://app.mode.com//reports/report_token` + 2. You will need organization code. This is something you can easily get by looking at one of Mode report's URL. + `https://app.mode.com//reports/report_token` #### [ModeDashboardExtractor](./databuilder/extractor/dashboard/mode_analytics/mode_dashboard_extractor.py) A Extractor that extracts core metadata on Mode dashboard. https://app.mode.com/ @@ -498,29 +498,28 @@ It calls two APIs ([spaces API](https://mode.com/developer/api-reference/managem You can create Databuilder job config like this. ```python task = DefaultTask(extractor=ModeDashboardExtractor(), - loader=FsNeo4jCSVLoader(), ) + loader=FsNeo4jCSVLoader(), ) tmp_folder = '/var/tmp/amundsen/mode_dashboard_metadata' - node_files_folder = '{tmp_folder}/nodes'.format(tmp_folder=tmp_folder) relationship_files_folder = '{tmp_folder}/relationships'.format(tmp_folder=tmp_folder) job_config = ConfigFactory.from_dict({ - 'extractor.mode_dashboard.{}'.format(ORGANIZATION): organization, - 'extractor.mode_dashboard.{}'.format(MODE_ACCESS_TOKEN): mode_token, - 'extractor.mode_dashboard.{}'.format(MODE_PASSWORD_TOKEN): mode_password, - 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder, - 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder, - 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR): True, - 'task.progress_report_frequency': 100, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): node_files_folder, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): relationship_files_folder, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): neo4j_endpoint, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): neo4j_user, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): neo4j_password, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_ENCRYPTED): True, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_CREATE_ONLY_NODES): [DESCRIPTION_NODE_LABEL], - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG): job_publish_tag + 'extractor.mode_dashboard.{}'.format(ORGANIZATION): organization, + 'extractor.mode_dashboard.{}'.format(MODE_ACCESS_TOKEN): mode_token, + 'extractor.mode_dashboard.{}'.format(MODE_PASSWORD_TOKEN): mode_password, + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder, + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder, + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR): True, + 'task.progress_report_frequency': 100, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): node_files_folder, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): relationship_files_folder, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): neo4j_endpoint, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): neo4j_user, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): neo4j_password, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_ENCRYPTED): True, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_CREATE_ONLY_NODES): [DESCRIPTION_NODE_LABEL], + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG): job_publish_tag }) job = DefaultJob(conf=job_config, @@ -538,12 +537,12 @@ You can create Databuilder job config like this. (configuration related to loade ```python extractor = ModeDashboardOwnerExtractor() task = DefaultTask(extractor=extractor, - loader=FsNeo4jCSVLoader(), ) + loader=FsNeo4jCSVLoader(), ) job_config = ConfigFactory.from_dict({ - '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, - '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, - '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, + '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, + '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, + '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, }) job = DefaultJob(conf=job_config, @@ -559,13 +558,12 @@ You can create Databuilder job config like this. (configuration related to loade ```python extractor = ModeDashboardLastSuccessfulExecutionExtractor() -task = DefaultTask(extractor=extractor, - loader=FsNeo4jCSVLoader(), ) +task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader()) job_config = ConfigFactory.from_dict({ - '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, - '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, - '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, + '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, + '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, + '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, }) job = DefaultJob(conf=job_config, @@ -581,13 +579,12 @@ You can create Databuilder job config like this. (configuration related to loade ```python extractor = ModeDashboardExecutionsExtractor() -task = DefaultTask(extractor=extractor, - loader=FsNeo4jCSVLoader(), ) +task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader()) job_config = ConfigFactory.from_dict({ - '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, - '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, - '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, + '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, + '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, + '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, }) job = DefaultJob(conf=job_config, @@ -606,9 +603,9 @@ extractor = ModeDashboardLastModifiedTimestampExtractor() task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader()) job_config = ConfigFactory.from_dict({ - '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, - '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, - '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, + '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, + '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, + '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, }) job = DefaultJob(conf=job_config, @@ -627,9 +624,9 @@ extractor = ModeDashboardQueriesExtractor() task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader()) job_config = ConfigFactory.from_dict({ - '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, - '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, - '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, + '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, + '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, + '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, }) job = DefaultJob(conf=job_config, @@ -648,9 +645,9 @@ extractor = ModeDashboardChartsExtractor() task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader()) job_config = ConfigFactory.from_dict({ - '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, - '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, - '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, + '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, + '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, + '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, }) job = DefaultJob(conf=job_config, @@ -666,10 +663,10 @@ extractor = ModeDashboardChartsBatchExtractor() task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader()) job_config = ConfigFactory.from_dict({ - '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, - '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, - '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, - '{}.{}'.format(extractor.get_scope(), MODE_BEARER_TOKEN): mode_bearer_token, + '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, + '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, + '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, + '{}.{}'.format(extractor.get_scope(), MODE_BEARER_TOKEN): mode_bearer_token, }) job = DefaultJob(conf=job_config, @@ -688,9 +685,9 @@ extractor = ModeDashboardUserExtractor() task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader()) job_config = ConfigFactory.from_dict({ - '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, - '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, - '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, + '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, + '{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token, + '{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password, }) job = DefaultJob(conf=job_config, @@ -721,11 +718,11 @@ extractor = RedashDashboardExtractor() task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader()) job_config = ConfigFactory.from_dict({ - 'extractor.redash_dashboard.redash_base_url': redash_base_url, # ex: https://redash.example.org - 'extractor.redash_dashboard.api_base_url': api_base_url, # ex: https://redash.example.org/api - 'extractor.redash_dashboard.api_key': api_key, # ex: abc1234 - 'extractor.redash_dashboard.table_parser': table_parser, # ex: my_library.module.parse_tables - 'extractor.redash_dashboard.redash_version': redash_version # ex: 8. optional, default=9 + 'extractor.redash_dashboard.redash_base_url': redash_base_url, # ex: https://redash.example.org + 'extractor.redash_dashboard.api_base_url': api_base_url, # ex: https://redash.example.org/api + 'extractor.redash_dashboard.api_key': api_key, # ex: abc1234 + 'extractor.redash_dashboard.table_parser': table_parser, # ex: my_library.module.parse_tables + 'extractor.redash_dashboard.redash_version': redash_version # ex: 8. optional, default=9 }) job = DefaultJob(conf=job_config, @@ -740,13 +737,13 @@ The `RedashDashboardExtractor` extracts raw queries from each dashboard. You may ```python def parse_tables(viz_widget: RedashVisualizationWidget) -> Iterator[TableRelationData]: - # Each viz_widget corresponds to one query. - # viz_widget.data_source_id is the ID of the target DB in Redash. - # viz_widget.raw_query is the raw query (e.g., SQL). - if viz_widget.data_source_id == 123: - table_names = some_sql_parser(viz_widget.raw_query) - return [TableRelationData('some_db', 'prod', 'some_schema', tbl) for tbl in table_names] - return [] + # Each viz_widget corresponds to one query. + # viz_widget.data_source_id is the ID of the target DB in Redash. + # viz_widget.raw_query is the raw query (e.g., SQL). + if viz_widget.data_source_id == 123: + table_names = some_sql_parser(viz_widget.raw_query) + return [TableRelationData('some_db', 'prod', 'some_schema', tbl) for tbl in table_names] + return [] ``` ### [TableauDashboardExtractor](./databuilder/extractor/dashboard/tableau/tableau_dashboard_extractor.py) @@ -962,15 +959,15 @@ A chanined transformer that can take multiple transformers, passing each record Generic string replacement transformer using REGEX. User can pass list of tuples where tuple contains regex and replacement pair. ```python job_config = ConfigFactory.from_dict({ - 'transformer.regex_str_replace.{}'.format(REGEX_REPLACE_TUPLE_LIST): [(',', ' '), ('"', '')], - 'transformer.regex_str_replace.{}'.format(ATTRIBUTE_NAME): 'instance_field_name',}) + 'transformer.regex_str_replace.{}'.format(REGEX_REPLACE_TUPLE_LIST): [(',', ' '), ('"', '')], + 'transformer.regex_str_replace.{}'.format(ATTRIBUTE_NAME): 'instance_field_name',}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=AnyExtractor(), - transformer=RegexStrReplaceTransformer(), - loader=AnyLoader())) + conf=job_config, + task=DefaultTask( + extractor=AnyExtractor(), + transformer=RegexStrReplaceTransformer(), + loader=AnyLoader())) job.launch() ``` @@ -998,15 +995,15 @@ Write node and relationship CSV file(s) that can be consumed by Neo4jCsvPublishe ```python job_config = ConfigFactory.from_dict({ - 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder, - 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder},) + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder, + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder},) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=AnyExtractor(), - loader=FsNeo4jCSVLoader()), - publisher=Neo4jCsvPublisher()) + conf=job_config, + task=DefaultTask( + extractor=AnyExtractor(), + loader=FsNeo4jCSVLoader()), + publisher=Neo4jCsvPublisher()) job.launch() ``` @@ -1021,10 +1018,10 @@ task = DefaultTask(extractor=extractor, loader=GenericLoader(), ) job_config = ConfigFactory.from_dict({ - '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, - '{}.{}'.format(MODE_ACCESS_TOKEN): mode_token, - '{}.{}'.format(MODE_PASSWORD_TOKEN): mode_password, - 'loader.generic.callback_function': callback_function + '{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization, + '{}.{}'.format(MODE_ACCESS_TOKEN): mode_token, + '{}.{}'.format(MODE_PASSWORD_TOKEN): mode_password, + 'loader.generic.callback_function': callback_function }) job = DefaultJob(conf=job_config, task=task) @@ -1037,20 +1034,18 @@ job.launch() Write Elasticsearch document in JSON format which can be consumed by ElasticsearchPublisher. It assumes that the record it consumes is instance of ElasticsearchDocument. ```python -tmp_folder = '/var/tmp/amundsen/dummy_metadata' -node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder) -relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder) +data_file_path = '/var/tmp/amundsen/search_data.json' job_config = ConfigFactory.from_dict({ - 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): data_file_path, - 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',}) + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): data_file_path, + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=AnyExtractor(), - loader=FSElasticsearchJSONLoader()), - publisher=ElasticsearchPublisher()) + conf=job_config, + task=DefaultTask( + extractor=AnyExtractor(), + loader=FSElasticsearchJSONLoader()), + publisher=ElasticsearchPublisher()) job.launch() ``` @@ -1060,22 +1055,26 @@ A Publisher takes two folders for input and publishes to Neo4j. One folder will contain CSV file(s) for Node where the other folder will contain CSV file(s) for Relationship. Neo4j follows Label Node properties Graph and refer to [here](https://neo4j.com/docs/developer-manual/current/introduction/graphdb-concepts/ "here") for more information ```python +node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder) +relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder) + job_config = ConfigFactory.from_dict({ - 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder, - 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): node_files_folder, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): relationship_files_folder, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): neo4j_endpoint, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): neo4j_user, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): neo4j_password, - 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_ENCRYPTED): True}) + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder, + 'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): node_files_folder, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): relationship_files_folder, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): neo4j_endpoint, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): neo4j_user, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): neo4j_password, + 'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_ENCRYPTED): True, +}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=AnyExtractor(), - loader=FsNeo4jCSVLoader()), - publisher=Neo4jCsvPublisher()) + conf=job_config, + task=DefaultTask( + extractor=AnyExtractor(), + loader=FsNeo4jCSVLoader()), + publisher=Neo4jCsvPublisher()) job.launch() ``` @@ -1083,26 +1082,25 @@ job.launch() Elasticsearch Publisher uses Bulk API to load data from JSON file. Elasticsearch publisher supports atomic operation by utilizing alias in Elasticsearch. A new index is created and data is uploaded into it. After the upload is complete, index alias is swapped to point to new index from old index and traffic is routed to new index. ```python -tmp_folder = '/var/tmp/amundsen/dummy_metadata' -node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder) -relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder) +data_file_path = '/var/tmp/amundsen/search_data.json' job_config = ConfigFactory.from_dict({ - 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): data_file_path, - 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w', - 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY): data_file_path, - 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r', - 'publisher.elasticsearch{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY): elasticsearch_client, - 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): elasticsearch_new_index, - 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY): elasticsearch_doc_type, - 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): elasticsearch_index_alias,) + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): data_file_path, + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w', + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY): data_file_path, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r', + 'publisher.elasticsearch{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY): elasticsearch_client, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): elasticsearch_new_index, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY): elasticsearch_doc_type, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): elasticsearch_index_alias, +}) job = DefaultJob( - conf=job_config, - task=DefaultTask( - extractor=AnyExtractor(), - loader=FSElasticsearchJSONLoader()), - publisher=ElasticsearchPublisher()) + conf=job_config, + task=DefaultTask( + extractor=AnyExtractor(), + loader=FSElasticsearchJSONLoader()), + publisher=ElasticsearchPublisher()) job.launch() ``` @@ -1124,16 +1122,16 @@ The challenges come with REST API is that: To solve this challenges, we introduce [RestApiQuery](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/rest_api/rest_api_query.py) -RestAPIQuery is: +RestAPIQuery is: 1. Assuming that REST API is using HTTP(S) call with GET method -- RestAPIQuery intention's is **read**, not write -- where basic HTTP auth is supported out of the box. There's extension point on other authentication scheme such as Oauth, and pagination, etc. (See [ModePaginatedRestApiQuery](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/rest_api/mode_analytics/mode_paginated_rest_api_query.py) for pagination) 2. Usually, you want the subset of the response you get from the REST API call -- value extraction. To extract the value you want, RestApiQuery uses [JSONPath](https://goessner.net/articles/JsonPath/) which is similar product as XPATH of XML. 3. You can JOIN multiple RestApiQuery together. -More detail on JOIN operation in RestApiQuery: +More detail on JOIN operation in RestApiQuery: 1. It joins multiple RestApiQuery together by accepting prior RestApiQuery as a constructor -- a [Decorator pattern](https://en.wikipedia.org/wiki/Decorator_pattern) 2. In REST API, URL is the one that locates the resource we want. Here, JOIN simply means we need to find resource **based on the identifier that other query's result has**. In other words, when RestApiQuery forms URL, it uses previous query's result to compute the URL `e.g: Previous record: {"dashboard_id": "foo"}, URL before: http://foo.bar/dashboard/{dashboard_id} URL after compute: http://foo.bar/dashboard/foo` -With this pattern RestApiQuery supports 1:1 and 1:N JOIN relationship. -(GROUP BY or any other aggregation, sub-query join is not supported) +With this pattern RestApiQuery supports 1:1 and 1:N JOIN relationship. +(GROUP BY or any other aggregation, sub-query join is not supported) To see in action, take a peek at [ModeDashboardExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/dashboard/mode_analytics/mode_dashboard_extractor.py) Also, take a look at how it extends to support pagination at [ModePaginatedRestApiQuery](./databuilder/rest_api/mode_analytics/mode_paginated_rest_api_query.py).