Skip to content

Commit

Permalink
[Data Sources] Implement Apache Drill (#3188)
Browse files Browse the repository at this point in the history
* Added support for Apache Drill datasource

* Improvements in `Drill` query runner and minor refactoring

1. Drill query runner now inherits from `BaseHTTPQueryRunner`, because they both have a lot of common code.
2. `BaseHTTPQueryRunner.get_response` method now accepts `http_method` argument (original implementation was only capable of sending `GET` HTTP requests).
3. Added `order` to `BaseHTTPRequestRunner` configuration schema to fix order of UI elements based on the schema.
4. Eliminated duplicate method `_guess_type` in `GoogleSpreadsheet`, `Results` and `Drill` query runners, moved `guess_type` to `redash.query_runner`.
5. Removed tests for `_guess_type` in `GoogleSpreadsheet`, `Results` and `Drill` query runners, merged them into single test case and moved to `tests.query_runner.test_utils`.
6. Various minor changes (code style, imports, etc).
  • Loading branch information
break-pointer authored and arikfr committed Jan 10, 2019
1 parent 445f8e5 commit 0b6f1fc
Show file tree
Hide file tree
Showing 11 changed files with 321 additions and 116 deletions.
Binary file added client/app/assets/images/db-logos/drill.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
40 changes: 36 additions & 4 deletions redash/query_runner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging

from dateutil import parser
import requests

from redash import settings
Expand All @@ -20,7 +22,8 @@
'SUPPORTED_COLUMN_TYPES',
'register',
'get_query_runner',
'import_query_runners'
'import_query_runners',
'guess_type'
]

# Valid types of columns returned in results:
Expand Down Expand Up @@ -169,7 +172,8 @@ def configuration_schema(cls):
'title': cls.password_title,
},
},
'secret': ['password']
'secret': ['password'],
'order': ['url', 'username', 'password']
}

if cls.requires_url or cls.requires_authentication:
Expand All @@ -192,7 +196,7 @@ def get_auth(self):
else:
return None

def get_response(self, url, auth=None, **kwargs):
def get_response(self, url, auth=None, http_method='get', **kwargs):
# Get authentication values if not given
if auth is None:
auth = self.get_auth()
Expand All @@ -202,7 +206,7 @@ def get_response(self, url, auth=None, **kwargs):
error = None
response = None
try:
response = requests.get(url, auth=auth, **kwargs)
response = requests.request(http_method, url, auth=auth, **kwargs)
# Raise a requests HTTP exception with the appropriate reason
# for 4xx and 5xx response status codes which is later caught
# and passed back.
Expand Down Expand Up @@ -265,3 +269,31 @@ def get_configuration_schema_for_query_runner_type(query_runner_type):
def import_query_runners(query_runner_imports):
for runner_import in query_runner_imports:
__import__(runner_import)


def guess_type(string_value):
if string_value == '' or string_value is None:
return TYPE_STRING

try:
int(string_value)
return TYPE_INTEGER
except (ValueError, OverflowError):
pass

try:
float(string_value)
return TYPE_FLOAT
except (ValueError, OverflowError):
pass

if unicode(string_value).lower() in ('true', 'false'):
return TYPE_BOOLEAN

try:
parser.parse(string_value)
return TYPE_DATETIME
except (ValueError, OverflowError):
pass

return TYPE_STRING
143 changes: 143 additions & 0 deletions redash/query_runner/drill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import os
import logging
import re

from dateutil import parser

from redash.query_runner import (
BaseHTTPQueryRunner, register,
TYPE_DATETIME, TYPE_INTEGER, TYPE_FLOAT, TYPE_BOOLEAN,
guess_type
)
from redash.utils import json_dumps, json_loads

logger = logging.getLogger(__name__)


# Convert Drill string value to actual type
def convert_type(string_value, actual_type):
if string_value is None or string_value == '':
return ''

if actual_type == TYPE_INTEGER:
return int(string_value)

if actual_type == TYPE_FLOAT:
return float(string_value)

if actual_type == TYPE_BOOLEAN:
return unicode(string_value).lower() == 'true'

if actual_type == TYPE_DATETIME:
return parser.parse(string_value)

return unicode(string_value)


# Parse Drill API response and translate it to accepted format
def parse_response(data):
cols = data['columns']
rows = data['rows']

if len(cols) == 0:
return {'columns': [], 'rows': []}

first_row = rows[0]
columns = []
types = {}

for c in cols:
columns.append({'name': c, 'type': guess_type(first_row[c]), 'friendly_name': c})

for col in columns:
types[col['name']] = col['type']

for row in rows:
for key, value in row.iteritems():
row[key] = convert_type(value, types[key])

return {'columns': columns, 'rows': rows}


class Drill(BaseHTTPQueryRunner):
noop_query = 'select version from sys.version'
response_error = "Drill API returned unexpected status code"
requires_authentication = False
requires_url = True
url_title = 'Drill URL'
username_title = 'Username'
password_title = 'Password'

@classmethod
def name(cls):
return 'Apache Drill'

@classmethod
def configuration_schema(cls):
schema = super(Drill, cls).configuration_schema()
# Since Drill itself can act as aggregator of various datasources,
# it can contain quite a lot of schemas in `INFORMATION_SCHEMA`
# We added this to improve user experience and let users focus only on desired schemas.
schema['properties']['allowed_schemas'] = {
'type': 'string',
'title': 'List of schemas to use in schema browser (comma separated)'
}
schema['order'] += ['allowed_schemas']
return schema

def run_query(self, query, user):
drill_url = os.path.join(self.configuration['url'], 'query.json')

try:
payload = {'queryType': 'SQL', 'query': query}

response, error = self.get_response(drill_url, http_method='post', json=payload)
if error is not None:
return None, error

results = parse_response(response.json())

return json_dumps(results), None
except KeyboardInterrupt:
return None, 'Query cancelled by user.'

def get_schema(self, get_stats=False):

query = """
SELECT DISTINCT
TABLE_SCHEMA,
TABLE_NAME,
COLUMN_NAME
FROM
INFORMATION_SCHEMA.`COLUMNS`
WHERE
TABLE_SCHEMA not in ('INFORMATION_SCHEMA', 'information_schema', 'sys')
and TABLE_SCHEMA not like '%.information_schema'
and TABLE_SCHEMA not like '%.INFORMATION_SCHEMA'
"""
allowed_schemas = self.configuration.get('allowed_schemas')
if allowed_schemas:
query += "and TABLE_SCHEMA in ({})".format(', '.join(map(lambda x: "'{}'".format(re.sub('[^a-zA-Z0-9_.`]', '', x)), allowed_schemas.split(','))))

results, error = self.run_query(query, None)

if error is not None:
raise Exception("Failed getting schema.")

results = json_loads(results)

schema = {}

for row in results['rows']:
table_name = u'{}.{}'.format(row['TABLE_SCHEMA'], row['TABLE_NAME'])

if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}

schema[table_name]['columns'].append(row['COLUMN_NAME'])

return schema.values()


register(Drill)
25 changes: 1 addition & 24 deletions redash/query_runner/google_spreadsheets.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,6 @@ def _get_columns_and_column_names(row):
return columns, column_names


def _guess_type(value):
if value == '':
return TYPE_STRING
try:
val = int(value)
return TYPE_INTEGER
except ValueError:
pass
try:
val = float(value)
return TYPE_FLOAT
except ValueError:
pass
if unicode(value).lower() in ('true', 'false'):
return TYPE_BOOLEAN
try:
val = parser.parse(value)
return TYPE_DATETIME
except (ValueError, OverflowError):
pass
return TYPE_STRING


def _value_eval_list(row_values, col_types):
value_list = []
raw_values = zip(col_types, row_values)
Expand Down Expand Up @@ -120,7 +97,7 @@ def parse_worksheet(worksheet):

if len(worksheet) > 1:
for j, value in enumerate(worksheet[HEADER_INDEX + 1]):
columns[j]['type'] = _guess_type(value)
columns[j]['type'] = guess_type(value)

column_types = [c['type'] for c in columns]
rows = [dict(zip(column_names, _value_eval_list(row, column_types))) for row in worksheet[HEADER_INDEX + 1:]]
Expand Down
28 changes: 2 additions & 26 deletions redash/query_runner/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

from redash import models
from redash.permissions import has_access, not_view_only
from redash.query_runner import (TYPE_BOOLEAN, TYPE_DATETIME, TYPE_FLOAT,
TYPE_INTEGER, TYPE_STRING, BaseQueryRunner,
register)
from redash.query_runner import guess_type, TYPE_STRING, BaseQueryRunner, register
from redash.utils import json_dumps, json_loads

logger = logging.getLogger(__name__)
Expand All @@ -24,28 +22,6 @@ class CreateTableError(Exception):
pass


def _guess_type(value):
if value == '' or value is None:
return TYPE_STRING

if isinstance(value, numbers.Integral):
return TYPE_INTEGER

if isinstance(value, float):
return TYPE_FLOAT

if text_type(value).lower() in ('true', 'false'):
return TYPE_BOOLEAN

try:
parser.parse(value)
return TYPE_DATETIME
except (ValueError, OverflowError):
pass

return TYPE_STRING


def extract_query_ids(query):
queries = re.findall(r'(?:join|from)\s+query_(\d+)', query, re.IGNORECASE)
return [int(q) for q in queries]
Expand Down Expand Up @@ -164,7 +140,7 @@ def run_query(self, query, user):

for i, row in enumerate(cursor):
for j, col in enumerate(row):
guess = _guess_type(col)
guess = guess_type(col)

if columns[j]['type'] is None:
columns[j]['type'] = guess
Expand Down
3 changes: 2 additions & 1 deletion redash/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ def all_settings():
'redash.query_runner.qubole',
'redash.query_runner.db2',
'redash.query_runner.druid',
'redash.query_runner.kylin'
'redash.query_runner.kylin',
'redash.query_runner.drill',
]

enabled_query_runners = array_from_string(os.environ.get("REDASH_ENABLED_QUERY_RUNNERS", ",".join(default_query_runners)))
Expand Down
Loading

0 comments on commit 0b6f1fc

Please sign in to comment.