-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data Sources] Implement Apache Drill #3188
Changes from 2 commits
a62d54f
8606ff7
040fc79
a3e2b38
945d700
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,239 @@ | ||
import os | ||
import logging | ||
import requests | ||
import re | ||
|
||
from dateutil import parser | ||
|
||
from redash.query_runner import BaseQueryRunner, register | ||
from redash.query_runner import TYPE_STRING, TYPE_DATETIME, TYPE_INTEGER, TYPE_FLOAT, TYPE_BOOLEAN | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mind putting the imports in one import call? from redash.query_runner import (
BaseQueryRunner, register,
TYPE_STRING, TYPE_DATETIME, TYPE_INTEGER, TYPE_FLOAT, TYPE_BOOLEAN
) |
||
from redash.utils import json_dumps, json_loads | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
# Drill returns request result as strings, so we have to guess the actual column type | ||
def guess_type(string_value): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have the same or almost the same code in the google_spreadsheets and query_results query runners. Could you please move the code into |
||
if string_value == '' or string_value is None: | ||
return TYPE_STRING | ||
|
||
try: | ||
int(string_value) | ||
return TYPE_INTEGER | ||
except (ValueError, OverflowError): | ||
pass | ||
|
||
try: | ||
float(string_value) | ||
return TYPE_FLOAT | ||
except (ValueError, OverflowError): | ||
pass | ||
|
||
if unicode(string_value).lower() in ('true', 'false'): | ||
return TYPE_BOOLEAN | ||
|
||
try: | ||
parser.parse(string_value) | ||
return TYPE_DATETIME | ||
except (ValueError, OverflowError): | ||
pass | ||
|
||
return TYPE_STRING | ||
|
||
|
||
# Convert Drill string value to actual type | ||
def convert_type(string_value, actual_type): | ||
if string_value is None or string_value == '': | ||
return '' | ||
|
||
if actual_type == TYPE_INTEGER: | ||
return int(string_value) | ||
|
||
if actual_type == TYPE_FLOAT: | ||
return float(string_value) | ||
|
||
if actual_type == TYPE_BOOLEAN: | ||
return unicode(string_value).lower() == 'true' | ||
|
||
if actual_type == TYPE_DATETIME: | ||
return parser.parse(string_value) | ||
|
||
return unicode(string_value) | ||
|
||
|
||
# Parse Drill API response and translate it to accepted format | ||
def parse_response(data): | ||
cols = data['columns'] | ||
rows = data['rows'] | ||
|
||
if len(cols) == 0: | ||
return {'columns': [], 'rows': []} | ||
|
||
first_row = rows[0] | ||
columns = [] | ||
types = {} | ||
|
||
for c in cols: | ||
columns.append({'name': c, 'type': guess_type(first_row[c]), 'friendly_name': c}) | ||
|
||
for col in columns: | ||
types[col['name']] = col['type'] | ||
|
||
for row in rows: | ||
for key, value in row.iteritems(): | ||
row[key] = convert_type(value, types[key]) | ||
|
||
return {'columns': columns, 'rows': rows} | ||
|
||
|
||
class Drill(BaseQueryRunner): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should use the |
||
noop_query = 'select version from sys.version' | ||
|
||
@classmethod | ||
def name(cls): | ||
return 'Apache Drill' | ||
|
||
@classmethod | ||
def type(cls): | ||
return 'drill' | ||
|
||
@classmethod | ||
def enabled(cls): | ||
return True | ||
|
||
@classmethod | ||
def configuration_schema(cls): | ||
arikfr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
schema = { | ||
'type': 'object', | ||
'properties': { | ||
'username': { | ||
'type': 'string', | ||
'title': 'Username', | ||
}, | ||
'password': { | ||
'type': 'string', | ||
'title': 'Password', | ||
}, | ||
'url': { | ||
'type': 'string', | ||
'title': 'Drill URL', | ||
}, | ||
# Since Drill itself can act as aggregator of various datasources, | ||
# it can contain quite a lot of schemas in `INFORMATION_SCHEMA` | ||
# We added this to improve user experience and let users focus only on desired schemas. | ||
'allowed_schemas': { | ||
'type': 'string', | ||
'title': 'List of schemas to use in schema browser (comma separated)' | ||
} | ||
}, | ||
'order': ['url', 'username', 'password', 'allowed_schemas'], | ||
'required': ['url'], | ||
'secret': ['password'] | ||
} | ||
return schema | ||
|
||
def get_auth(self): | ||
username = self.configuration.get('username') | ||
password = self.configuration.get('password') | ||
if username and password: | ||
return (username, password) | ||
else: | ||
return None | ||
|
||
def get_response(self, url, auth=None, **kwargs): | ||
# Get authentication values if not given | ||
if auth is None: | ||
auth = self.get_auth() | ||
|
||
# Then call requests to get the response from the given endpoint | ||
# URL optionally, with the additional requests parameters. | ||
error = None | ||
response = None | ||
try: | ||
response = requests.post(url, auth=auth, **kwargs) | ||
# Raise a requests HTTP exception with the appropriate reason | ||
# for 4xx and 5xx response status codes which is later caught | ||
# and passed back. | ||
response.raise_for_status() | ||
|
||
# Any other responses (e.g. 2xx and 3xx): | ||
if response.status_code != 200: | ||
error = '{} ({}).'.format( | ||
'Drill returned unexpected status code', | ||
response.status_code, | ||
) | ||
|
||
except requests.HTTPError as exc: | ||
logger.exception(exc) | ||
error = ( | ||
'Failed to execute query. ' | ||
'Return Code: {} Reason: {}'.format( | ||
response.status_code, | ||
response.text | ||
) | ||
) | ||
except requests.RequestException as exc: | ||
# Catch all other requests exceptions and return the error. | ||
logger.exception(exc) | ||
error = str(exc) | ||
|
||
# Return response and error. | ||
return response, error | ||
|
||
def run_query(self, query, user): | ||
drill_url = os.path.join(self.configuration['url'], 'query.json') | ||
|
||
try: | ||
payload = {'queryType': 'SQL', 'query': query} | ||
|
||
response, error = self.get_response(drill_url, json=payload) | ||
if error is not None: | ||
return None, error | ||
|
||
results = parse_response(response.json()) | ||
|
||
return json_dumps(results), None | ||
except KeyboardInterrupt: | ||
return None, 'Query cancelled by user.' | ||
|
||
def get_schema(self, get_stats=False): | ||
|
||
query = """ | ||
SELECT DISTINCT | ||
TABLE_SCHEMA, | ||
TABLE_NAME, | ||
COLUMN_NAME | ||
FROM | ||
INFORMATION_SCHEMA.`COLUMNS` | ||
WHERE | ||
TABLE_SCHEMA not in ('INFORMATION_SCHEMA', 'information_schema', 'sys') | ||
and TABLE_SCHEMA not like '%.information_schema' | ||
and TABLE_SCHEMA not like '%.INFORMATION_SCHEMA' | ||
|
||
""" | ||
allowed_schemas = self.configuration.get('allowed_schemas') | ||
if allowed_schemas: | ||
query += "and TABLE_SCHEMA in ({})".format(', '.join(map(lambda x: "'{}'".format(re.sub('[^a-zA-Z0-9_.`]', '', x)), allowed_schemas.split(',')))) | ||
|
||
results, error = self.run_query(query, None) | ||
|
||
if error is not None: | ||
raise Exception("Failed getting schema.") | ||
|
||
results = json_loads(results) | ||
|
||
schema = {} | ||
|
||
for row in results['rows']: | ||
table_name = u'{}.{}'.format(row['TABLE_SCHEMA'], row['TABLE_NAME']) | ||
|
||
if table_name not in schema: | ||
schema[table_name] = {'name': table_name, 'columns': []} | ||
|
||
schema[table_name]['columns'].append(row['COLUMN_NAME']) | ||
|
||
return schema.values() | ||
|
||
|
||
register(Drill) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -188,7 +188,8 @@ def all_settings(): | |
'redash.query_runner.qubole', | ||
'redash.query_runner.db2', | ||
'redash.query_runner.druid', | ||
'redash.query_runner.kylin' | ||
'redash.query_runner.kylin', | ||
'redash.query_runner.drill' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mind adding a trailing comma here? |
||
] | ||
|
||
enabled_query_runners = array_from_string(os.environ.get("REDASH_ENABLED_QUERY_RUNNERS", ",".join(default_query_runners))) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
# -*- coding: utf-8 -*- | ||
import datetime | ||
from unittest import TestCase | ||
|
||
from mock import MagicMock | ||
|
||
from redash.query_runner import TYPE_DATETIME, TYPE_FLOAT, TYPE_INTEGER, TYPE_BOOLEAN, TYPE_STRING | ||
from redash.query_runner.drill import guess_type, convert_type, parse_response, Drill | ||
|
||
|
||
class TestGuessType(TestCase): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would go into a separate |
||
def test_handles_unicode(self): | ||
self.assertEqual(guess_type(u'Текст'), TYPE_STRING) | ||
|
||
def test_detects_booleans(self): | ||
self.assertEqual(guess_type('true'), TYPE_BOOLEAN) | ||
self.assertEqual(guess_type('True'), TYPE_BOOLEAN) | ||
self.assertEqual(guess_type('TRUE'), TYPE_BOOLEAN) | ||
self.assertEqual(guess_type('false'), TYPE_BOOLEAN) | ||
self.assertEqual(guess_type('False'), TYPE_BOOLEAN) | ||
self.assertEqual(guess_type('FALSE'), TYPE_BOOLEAN) | ||
|
||
def test_detects_strings(self): | ||
self.assertEqual(guess_type(None), TYPE_STRING) | ||
self.assertEqual(guess_type(''), TYPE_STRING) | ||
self.assertEqual(guess_type('redash'), TYPE_STRING) | ||
|
||
def test_detects_integer(self): | ||
self.assertEqual(guess_type('42'), TYPE_INTEGER) | ||
|
||
def test_detects_float(self): | ||
self.assertEqual(guess_type('3.14'), TYPE_FLOAT) | ||
|
||
def test_detects_date(self): | ||
self.assertEqual(guess_type('2018-10-31'), TYPE_DATETIME) | ||
|
||
|
||
class TestConvertType(TestCase): | ||
def test_converts_booleans(self): | ||
self.assertEqual(convert_type('true', TYPE_BOOLEAN), True) | ||
self.assertEqual(convert_type('True', TYPE_BOOLEAN), True) | ||
self.assertEqual(convert_type('TRUE', TYPE_BOOLEAN), True) | ||
self.assertEqual(convert_type('false', TYPE_BOOLEAN), False) | ||
self.assertEqual(convert_type('False', TYPE_BOOLEAN), False) | ||
self.assertEqual(convert_type('FALSE', TYPE_BOOLEAN), False) | ||
|
||
def test_converts_strings(self): | ||
self.assertEqual(convert_type(u'Текст', TYPE_STRING), u'Текст') | ||
self.assertEqual(convert_type(None, TYPE_STRING), '') | ||
self.assertEqual(convert_type('', TYPE_STRING), '') | ||
self.assertEqual(convert_type('redash', TYPE_STRING), 'redash') | ||
|
||
def test_converts_integer(self): | ||
self.assertEqual(convert_type('42', TYPE_INTEGER), 42) | ||
|
||
def test_converts_float(self): | ||
self.assertAlmostEqual(convert_type('3.14', TYPE_FLOAT), 3.14, 2) | ||
|
||
def test_converts_date(self): | ||
self.assertEqual(convert_type('2018-10-31', TYPE_DATETIME), datetime.datetime(2018, 10, 31, 0, 0)) | ||
|
||
empty_response = { | ||
'columns': [], | ||
'rows': [{}] | ||
} | ||
|
||
regular_response = { | ||
'columns': ['key', 'date', 'count', 'avg'], | ||
'rows': [ | ||
{'key': 'Alpha', 'date': '2018-01-01', 'count': '10', 'avg': '3.14'}, | ||
{'key': 'Beta', 'date': '2018-02-01', 'count': '20', 'avg': '6.28'} | ||
] | ||
} | ||
|
||
class TestParseResponse(TestCase): | ||
def test_parse_empty_reponse(self): | ||
parsed = parse_response(empty_response) | ||
|
||
self.assertIsInstance(parsed, dict) | ||
self.assertIsNotNone(parsed['columns']) | ||
self.assertIsNotNone(parsed['rows']) | ||
self.assertEqual(len(parsed['columns']), 0) | ||
self.assertEqual(len(parsed['rows']), 0) | ||
|
||
def test_parse_regular_response(self): | ||
parsed = parse_response(regular_response) | ||
|
||
self.assertIsInstance(parsed, dict) | ||
self.assertIsNotNone(parsed['columns']) | ||
self.assertIsNotNone(parsed['rows']) | ||
self.assertEqual(len(parsed['columns']), 4) | ||
self.assertEqual(len(parsed['rows']), 2) | ||
|
||
key_col = parsed['columns'][0] | ||
self.assertEqual(key_col['name'], 'key') | ||
self.assertEqual(key_col['type'], TYPE_STRING) | ||
|
||
date_col = parsed['columns'][1] | ||
self.assertEqual(date_col['name'], 'date') | ||
self.assertEqual(date_col['type'], TYPE_DATETIME) | ||
|
||
count_col = parsed['columns'][2] | ||
self.assertEqual(count_col['name'], 'count') | ||
self.assertEqual(count_col['type'], TYPE_INTEGER) | ||
|
||
avg_col = parsed['columns'][3] | ||
self.assertEqual(avg_col['name'], 'avg') | ||
self.assertEqual(avg_col['type'], TYPE_FLOAT) | ||
|
||
row_0 = parsed['rows'][0] | ||
self.assertEqual(row_0['key'], 'Alpha') | ||
self.assertEqual(row_0['date'], datetime.datetime(2018, 1, 1, 0, 0)) | ||
self.assertEqual(row_0['count'], 10) | ||
self.assertAlmostEqual(row_0['avg'], 3.14, 2) | ||
|
||
row_1 = parsed['rows'][1] | ||
self.assertEqual(row_1['key'], 'Beta') | ||
self.assertEqual(row_1['date'], datetime.datetime(2018, 2, 1, 0, 0)) | ||
self.assertEqual(row_1['count'], 20) | ||
self.assertAlmostEqual(row_1['avg'], 6.28, 2) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move the requests import down to the dateutil import to separate it from the stdlib imports.