Skip to content

Commit

Permalink
Merge pull request #6 from amundsen-io/master
Browse files Browse the repository at this point in the history
Sync with upstream
  • Loading branch information
Spacerat authored Aug 20, 2020
2 parents 271ffe1 + 030ef49 commit ee1b6bc
Show file tree
Hide file tree
Showing 186 changed files with 2,067 additions and 2,653 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions

on: pull_request
jobs:
pre-commit:
runs-on: ubuntu-18.04
steps:
- name: Checkout
uses: actions/checkout@v1
- name: Setup python 3.6
uses: actions/setup-python@v1
with:
python-version: 3.6
test-unit:
runs-on: ubuntu-18.04
strategy:
matrix:
python-version: ['3.6.x', '3.7.x']
steps:
- name: Checkout
uses: actions/checkout@v1
- name: Setup python ${{ matrix.python-version }}
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: pip3 install -r requirements.txt && pip3 install .[all] && pip3 install codecov
- name: Run python unit tests
run: make test
29 changes: 29 additions & 0 deletions .github/workflows/pypipublish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@

name: Build and Deploy
on:
push:
branches:
- master
tags:
- '*'
jobs:
build-and-publish-python-module:
name: Build and publish python module to pypi
runs-on: ubuntu-18.04
steps:
- name: Checkout
uses: actions/checkout@v1
- name: Setup python 3.6
uses: actions/setup-python@v1
with:
python-version: 3.6
- name: Add wheel dependency
run: pip install wheel
- name: Generate dist
run: python setup.py sdist bdist_wheel
- name: Publish to PyPI
if: startsWith(github.event.ref, 'refs/tags')
uses: pypa/gh-action-pypi-publish@master
with:
user: __token__
password: ${{ secrets.pypi_password }}
21 changes: 0 additions & 21 deletions .travis.yml

This file was deleted.

2 changes: 1 addition & 1 deletion CODE_OF_CONDUCT.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
This project is governed by [Lyft's code of conduct](https://github.com/lyft/code-of-conduct).
This project is governed by [Amundsen's code of conduct](https://github.com/amundsen-io/amundsen/blob/master/CODE_OF_CONDUCT.md).
All contributors and participants agree to abide by its terms.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ test_unit:
lint:
flake8 .

.PHONY: mypy
mypy:
mypy .

.PHONY: test
test: test_unit lint
test: test_unit lint mypy

113 changes: 56 additions & 57 deletions README.md

Large diffs are not rendered by default.

14 changes: 5 additions & 9 deletions databuilder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import abc

from pyhocon import ConfigTree, ConfigFactory # noqa: F401
from pyhocon import ConfigTree, ConfigFactory


class Scoped(object, metaclass=abc.ABCMeta):
Expand All @@ -29,8 +29,7 @@ class Scoped(object, metaclass=abc.ABCMeta):
"""

@abc.abstractmethod
def init(self, conf):
# type: (ConfigTree) -> None
def init(self, conf: ConfigTree) -> None:
"""
All scoped instance is expected to be lazily initialized. Means that
__init__ should not have any heavy operation such as service call.
Expand All @@ -46,26 +45,23 @@ def init(self, conf):
pass

@abc.abstractmethod
def get_scope(self):
# type: () -> str
def get_scope(self) -> str:
"""
A scope for the config. Typesafe config supports nested config.
Scope, string, is used to basically peel off nested config
:return:
"""
return ''

def close(self):
# type: () -> None
def close(self) -> None:
"""
Anything that needs to be cleaned up after the use of the instance.
:return: None
"""
pass

@classmethod
def get_scoped_conf(cls, conf, scope):
# type: (ConfigTree, str) -> ConfigTree
def get_scoped_conf(cls, conf: ConfigTree, scope: str) -> ConfigTree:
"""
Convenient method to provide scoped method.
Expand Down
13 changes: 5 additions & 8 deletions databuilder/callback/call_back.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import abc
import logging

from typing import List, Optional # noqa: F401
from typing import List, Optional

LOGGER = logging.getLogger(__name__)

Expand All @@ -16,25 +16,23 @@ class Callback(object, metaclass=abc.ABCMeta):
"""

@abc.abstractmethod
def on_success(self):
# type: () -> None
def on_success(self) -> None:
"""
A call back method that will be called when operation is successful
:return: None
"""
pass

@abc.abstractmethod
def on_failure(self):
# type: () -> None
def on_failure(self) -> None:
"""
A call back method that will be called when operation failed
:return: None
"""
pass


def notify_callbacks(callbacks, is_success):
def notify_callbacks(callbacks: List[Callback], is_success: bool) -> None:
"""
A Utility method that notifies callback. If any callback fails it will still go through all the callbacks,
and raise the last exception it experienced.
Expand All @@ -43,15 +41,14 @@ def notify_callbacks(callbacks, is_success):
:param is_success:
:return:
"""
# type: (List[Callback], bool) -> None

if not callbacks:
LOGGER.info('No callbacks to notify')
return

LOGGER.info('Notifying callbacks')

last_exception = None # type: Optional[Exception]
last_exception: Optional[Exception] = None
for callback in callbacks:
try:
if is_success:
Expand Down
24 changes: 9 additions & 15 deletions databuilder/extractor/athena_metadata_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import logging
from collections import namedtuple

from pyhocon import ConfigFactory, ConfigTree # noqa: F401
from typing import Iterator, Union, Dict, Any # noqa: F401
from pyhocon import ConfigFactory, ConfigTree
from typing import Iterator, Union, Dict, Any

from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
Expand Down Expand Up @@ -43,8 +43,7 @@ class AthenaMetadataExtractor(Extractor):
{WHERE_CLAUSE_SUFFIX_KEY: ' ', CATALOG_KEY: DEFAULT_CLUSTER_NAME}
)

def init(self, conf):
# type: (ConfigTree) -> None
def init(self, conf: ConfigTree) -> None:
conf = conf.with_fallback(AthenaMetadataExtractor.DEFAULT_CONFIG)
self._cluster = '{}'.format(conf.get_string(AthenaMetadataExtractor.CATALOG_KEY))

Expand All @@ -60,23 +59,20 @@ def init(self, conf):
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt}))

self._alchemy_extractor.init(sql_alch_conf)
self._extract_iter = None # type: Union[None, Iterator]
self._extract_iter: Union[None, Iterator] = None

def extract(self):
# type: () -> Union[TableMetadata, None]
def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None

def get_scope(self):
# type: () -> str
def get_scope(self) -> str:
return 'extractor.athena_metadata'

def _get_extract_iter(self):
# type: () -> Iterator[TableMetadata]
def _get_extract_iter(self) -> Iterator[TableMetadata]:
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
Expand All @@ -97,8 +93,7 @@ def _get_extract_iter(self):
'',
columns)

def _get_raw_extract_iter(self):
# type: () -> Iterator[Dict[str, Any]]
def _get_raw_extract_iter(self) -> Iterator[Dict[str, Any]]:
"""
Provides iterator of result row from SQLAlchemy extractor
:return:
Expand All @@ -108,8 +103,7 @@ def _get_raw_extract_iter(self):
yield row
row = self._alchemy_extractor.extract()

def _get_table_key(self, row):
# type: (Dict[str, Any]) -> Union[TableKey, None]
def _get_table_key(self, row: Dict[str, Any]) -> Union[TableKey, None]:
"""
Table key consists of schema and table name
:param row:
Expand Down
42 changes: 21 additions & 21 deletions databuilder/extractor/base_bigquery_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import google_auth_httplib2
from googleapiclient.discovery import build
import httplib2
from pyhocon import ConfigTree # noqa: F401
from typing import List, Any # noqa: F401
from pyhocon import ConfigTree
from typing import Any, Dict, Iterator, List

from databuilder.extractor.base_extractor import Extractor

Expand All @@ -28,13 +28,12 @@ class BaseBigQueryExtractor(Extractor):
CRED_KEY = 'project_cred'
PAGE_SIZE_KEY = 'page_size'
FILTER_KEY = 'filter'
_DEFAULT_SCOPES = ['https://www.googleapis.com/auth/bigquery.readonly', ]
_DEFAULT_SCOPES = ['https://www.googleapis.com/auth/bigquery.readonly']
DEFAULT_PAGE_SIZE = 300
NUM_RETRIES = 3
DATE_LENGTH = 8

def init(self, conf):
# type: (ConfigTree) -> None
def init(self, conf: ConfigTree) -> None:
# should use key_path, or cred_key if the former doesn't exist
self.key_path = conf.get_string(BaseBigQueryExtractor.KEY_PATH_KEY, None)
self.cred_key = conf.get_string(BaseBigQueryExtractor.CRED_KEY, None)
Expand All @@ -55,33 +54,37 @@ def init(self, conf):
google.oauth2.service_account.Credentials.from_service_account_info(
service_account_info, scopes=self._DEFAULT_SCOPES))
else:
credentials, _ = google.auth.default(scopes=self._DEFAULT_SCOPES)
# FIXME: mypy can't find this attribute
google_auth: Any = getattr(google, 'auth')
credentials, _ = google_auth.default(scopes=self._DEFAULT_SCOPES)

http = httplib2.Http()
authed_http = google_auth_httplib2.AuthorizedHttp(credentials, http=http)
self.bigquery_service = build('bigquery', 'v2', http=authed_http, cache_discovery=False)
self.logging_service = build('logging', 'v2', http=authed_http, cache_discovery=False)
self.iter = iter(self._iterate_over_tables())
self.iter: Iterator[Any] = iter([])

def extract(self):
# type: () -> Any
def extract(self) -> Any:
try:
return next(self.iter)
except StopIteration:
return None

def _is_sharded_table(self, table_id):
def _is_sharded_table(self, table_id: str) -> bool:
suffix = table_id[-BaseBigQueryExtractor.DATE_LENGTH:]
return suffix.isdigit()

def _iterate_over_tables(self):
# type: () -> Any
def _iterate_over_tables(self) -> Any:
for dataset in self._retrieve_datasets():
for entry in self._retrieve_tables(dataset):
yield(entry)
yield entry

def _retrieve_datasets(self):
# type: () -> List[DatasetRef]
# TRICKY: this function has different return types between different subclasses,
# so type as Any. Should probably refactor to remove this unclear sharing.
def _retrieve_tables(self, dataset: DatasetRef) -> Any:
pass

def _retrieve_datasets(self) -> List[DatasetRef]:
datasets = []
for page in self._page_dataset_list_results():
if 'datasets' not in page:
Expand All @@ -94,8 +97,7 @@ def _retrieve_datasets(self):

return datasets

def _page_dataset_list_results(self):
# type: () -> Any
def _page_dataset_list_results(self) -> Iterator[Any]:
response = self.bigquery_service.datasets().list(
projectId=self.project_id,
all=False, # Do not return hidden datasets
Expand All @@ -116,8 +118,7 @@ def _page_dataset_list_results(self):
else:
response = None

def _page_table_list_results(self, dataset):
# type: (DatasetRef) -> Any
def _page_table_list_results(self, dataset: DatasetRef) -> Iterator[Dict[str, Any]]:
response = self.bigquery_service.tables().list(
projectId=dataset.projectId,
datasetId=dataset.datasetId,
Expand All @@ -137,6 +138,5 @@ def _page_table_list_results(self, dataset):
else:
response = None

def get_scope(self):
# type: () -> str
def get_scope(self) -> str:
return 'extractor.bigquery_table_metadata'
Loading

0 comments on commit ee1b6bc

Please sign in to comment.