Skip to content

Commit

Permalink
[sqllab] improve Hive support (#3187)
Browse files Browse the repository at this point in the history
* [sqllab] improve Hive support

* Fix "Transport not open" bug
* Getting progress bar to show
* Bump pyhive to 0.4.0
* Getting [Track Job] button to show

* Fix testzz
  • Loading branch information
mistercrunch authored Jul 27, 2017
1 parent 25c599d commit b888802
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 51 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def get_git_sha():
'pandas==0.20.2',
'parsedatetime==2.0.0',
'pydruid==0.3.1',
'PyHive>=0.3.0',
'PyHive>=0.4.0',
'python-dateutil==2.6.0',
'requests==2.17.3',
'simplejson==3.10.0',
Expand Down
14 changes: 14 additions & 0 deletions superset/assets/javascripts/SqlLab/components/ResultSet.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ export default class ResultSet extends React.PureComponent {
}
if (['running', 'pending', 'fetching'].indexOf(query.state) > -1) {
let progressBar;
let trackingUrl;
if (query.progress > 0 && query.state === 'running') {
progressBar = (
<ProgressBar
Expand All @@ -163,11 +164,24 @@ export default class ResultSet extends React.PureComponent {
label={`${query.progress}%`}
/>);
}
if (query.trackingUrl) {
trackingUrl = (
<Button
bsSize="small"
onClick={() => { window.open(query.trackingUrl); }}
>
Track Job
</Button>
);
}
return (
<div>
<img className="loading" alt="Loading..." src="/static/assets/images/loading.gif" />
<QueryStateLabel query={query} />
{progressBar}
<div>
{trackingUrl}
</div>
</div>
);
} else if (query.state === 'failed') {
Expand Down
1 change: 1 addition & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class CeleryConfig(object):
CELERY_IMPORTS = ('superset.sql_lab', )
CELERY_RESULT_BACKEND = 'db+sqlite:///celery_results.sqlite'
CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
CELERYD_LOG_LEVEL = 'DEBUG'
CELERY_CONFIG = CeleryConfig
"""
CELERY_CONFIG = None
Expand Down
78 changes: 53 additions & 25 deletions superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,21 @@ class HiveEngineSpec(PrestoEngineSpec):
engine = 'hive'
cursor_execute_kwargs = {'async': True}

# Scoping regex at class level to avoid recompiling
# 17/02/07 19:36:38 INFO ql.Driver: Total jobs = 5
jobs_stats_r = re.compile(
r'.*INFO.*Total jobs = (?P<max_jobs>[0-9]+)')
# 17/02/07 19:37:08 INFO ql.Driver: Launching Job 2 out of 5
launching_job_r = re.compile(
'.*INFO.*Launching Job (?P<job_number>[0-9]+) out of '
'(?P<max_jobs>[0-9]+)')
# 17/02/07 19:36:58 INFO exec.Task: 2017-02-07 19:36:58,152 Stage-18
# map = 0%, reduce = 0%
stage_progress_r = re.compile(
r'.*INFO.*Stage-(?P<stage_number>[0-9]+).*'
r'map = (?P<map_progress>[0-9]+)%.*'
r'reduce = (?P<reduce_progress>[0-9]+)%.*')

@classmethod
def patch(cls):
from pyhive import hive
Expand Down Expand Up @@ -666,38 +681,27 @@ def adjust_database_uri(cls, uri, selected_schema=None):
return uri

@classmethod
def progress(cls, logs):
# 17/02/07 19:36:38 INFO ql.Driver: Total jobs = 5
jobs_stats_r = re.compile(
r'.*INFO.*Total jobs = (?P<max_jobs>[0-9]+)')
# 17/02/07 19:37:08 INFO ql.Driver: Launching Job 2 out of 5
launching_job_r = re.compile(
'.*INFO.*Launching Job (?P<job_number>[0-9]+) out of '
'(?P<max_jobs>[0-9]+)')
# 17/02/07 19:36:58 INFO exec.Task: 2017-02-07 19:36:58,152 Stage-18
# map = 0%, reduce = 0%
stage_progress = re.compile(
r'.*INFO.*Stage-(?P<stage_number>[0-9]+).*'
r'map = (?P<map_progress>[0-9]+)%.*'
r'reduce = (?P<reduce_progress>[0-9]+)%.*')
total_jobs = None
def progress(cls, log_lines):
total_jobs = 1 # assuming there's at least 1 job
current_job = None
stages = {}
lines = logs.splitlines()
for line in lines:
match = jobs_stats_r.match(line)
for line in log_lines:
match = cls.jobs_stats_r.match(line)
if match:
total_jobs = int(match.groupdict()['max_jobs'])
match = launching_job_r.match(line)
total_jobs = int(match.groupdict()['max_jobs']) or 1
match = cls.launching_job_r.match(line)
if match:
current_job = int(match.groupdict()['job_number'])
stages = {}
match = stage_progress.match(line)
match = cls.stage_progress_r.match(line)
if match:
stage_number = int(match.groupdict()['stage_number'])
map_progress = int(match.groupdict()['map_progress'])
reduce_progress = int(match.groupdict()['reduce_progress'])
stages[stage_number] = (map_progress + reduce_progress) / 2
logging.info(
"Progress detail: {}, "
"total jobs: {}".format(stages, total_jobs))

if not total_jobs or not current_job:
return 0
Expand All @@ -709,6 +713,13 @@ def progress(cls, logs):
)
return int(progress)

@classmethod
def get_tracking_url(cls, log_lines):
lkp = "Tracking URL = "
for line in log_lines:
if lkp in line:
return line.split(lkp)[1]

@classmethod
def handle_cursor(cls, cursor, query, session):
"""Updates progress information"""
Expand All @@ -718,18 +729,35 @@ def handle_cursor(cls, cursor, query, session):
hive.ttypes.TOperationState.RUNNING_STATE,
)
polled = cursor.poll()
last_log_line = 0
tracking_url = None
while polled.operationState in unfinished_states:
query = session.query(type(query)).filter_by(id=query.id).one()
if query.status == QueryStatus.STOPPED:
cursor.cancel()
break

logs = cursor.fetch_logs()
if logs:
progress = cls.progress(logs)
resp = cursor.fetch_logs()
if resp and resp.log:
log = resp.log or ''
log_lines = resp.log.splitlines()
logging.info("\n".join(log_lines[last_log_line:]))
last_log_line = len(log_lines) - 1
progress = cls.progress(log_lines)
logging.info("Progress total: {}".format(progress))
needs_commit = False
if progress > query.progress:
query.progress = progress
session.commit()
needs_commit = True
if not tracking_url:
tracking_url = cls.get_tracking_url(log_lines)
if tracking_url:
logging.info(
"Found the tracking url: {}".format(tracking_url))
query.tracking_url = tracking_url
needs_commit = True
if needs_commit:
session.commit()
time.sleep(5)
polled = cursor.poll()

Expand Down
23 changes: 23 additions & 0 deletions superset/migrations/versions/ca69c70ec99b_tracking_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""tracking_url
Revision ID: ca69c70ec99b
Revises: a65458420354
Create Date: 2017-07-26 20:09:52.606416
"""

# revision identifiers, used by Alembic.
revision = 'ca69c70ec99b'
down_revision = 'a65458420354'

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql


def upgrade():
op.add_column('query', sa.Column('tracking_url', sa.Text(), nullable=True))


def downgrade():
op.drop_column('query', 'tracking_url')
2 changes: 2 additions & 0 deletions superset/models/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Query(Model):
start_running_time = Column(Numeric(precision=20, scale=6))
end_time = Column(Numeric(precision=20, scale=6))
end_result_backend_time = Column(Numeric(precision=20, scale=6))
tracking_url = Column(Text)

changed_on = Column(
DateTime,
Expand Down Expand Up @@ -119,6 +120,7 @@ def to_dict(self):
'user': self.user.username,
'limit_reached': self.limit_reached,
'resultsKey': self.results_key,
'trackingUrl': self.tracking_url,
}

@property
Expand Down
5 changes: 4 additions & 1 deletion superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ def handle_error(msg):
conn.close()
return handle_error(db_engine_spec.extract_error_message(e))

logging.info("Fetching cursor description")
cursor_description = cursor.description

conn.commit()
conn.close()

Expand All @@ -203,7 +206,7 @@ def handle_error(msg):
}, default=utils.json_iso_dttm_ser)

column_names = (
[col[0] for col in cursor.description] if cursor.description else [])
[col[0] for col in cursor_description] if cursor_description else [])
column_names = dedup(column_names)
cdf = dataframe.SupersetDataFrame(pd.DataFrame(
list(data), columns=column_names))
Expand Down
3 changes: 2 additions & 1 deletion superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class SliceAddView(SliceModelView): # noqa

class DashboardModelView(SupersetModelView, DeleteMixin): # noqa
datamodel = SQLAInterface(models.Dashboard)

list_title = _('List Dashboards')
show_title = _('Show Dashboard')
add_title = _('Add Dashboard')
Expand Down Expand Up @@ -2030,6 +2030,7 @@ def sql_json(self):

# Async request.
if async:
logging.info("Running query on a Celery worker")
# Ignore the celery future object and the request may time out.
try:
sql_lab.get_sql_results.delay(
Expand Down
40 changes: 21 additions & 19 deletions tests/db_engine_specs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,55 @@

import unittest

from superset import db_engine_specs
from superset.db_engine_specs import HiveEngineSpec


class DbEngineSpecsTestCase(unittest.TestCase):
def test_0_progress(self):
log = """
17/02/07 18:26:27 INFO log.PerfLogger: <PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>
17/02/07 18:26:27 INFO log.PerfLogger: <PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>
"""
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(
0, HiveEngineSpec.progress(log))

def test_0_progress(self):
log = """
17/02/07 18:26:27 INFO log.PerfLogger: <PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>
17/02/07 18:26:27 INFO log.PerfLogger: <PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>
"""
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(
0, HiveEngineSpec.progress(log))

def test_number_of_jobs_progress(self):
log = """
17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2
"""
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(0, HiveEngineSpec.progress(log))

def test_job_1_launched_progress(self):
log = """
17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2
17/02/07 19:15:55 INFO ql.Driver: Launching Job 1 out of 2
"""
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(0, HiveEngineSpec.progress(log))

def test_job_1_launched_stage_1_0_progress(self):
log = """
17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2
17/02/07 19:15:55 INFO ql.Driver: Launching Job 1 out of 2
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0%
"""
self.assertEquals(0, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(0, HiveEngineSpec.progress(log))

def test_job_1_launched_stage_1_map_40_progress(self):
log = """
17/02/07 19:15:55 INFO ql.Driver: Total jobs = 2
17/02/07 19:15:55 INFO ql.Driver: Launching Job 1 out of 2
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0%
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 40%, reduce = 0%
"""
self.assertEquals(10, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(10, HiveEngineSpec.progress(log))

def test_job_1_launched_stage_1_map_80_reduce_40_progress(self):
log = """
Expand All @@ -60,8 +62,8 @@ def test_job_1_launched_stage_1_map_80_reduce_40_progress(self):
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0%
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 40%, reduce = 0%
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 80%, reduce = 40%
"""
self.assertEquals(30, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(30, HiveEngineSpec.progress(log))

def test_job_1_launched_stage_2_stages_progress(self):
log = """
Expand All @@ -72,8 +74,8 @@ def test_job_1_launched_stage_2_stages_progress(self):
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 80%, reduce = 40%
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-2 map = 0%, reduce = 0%
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 100%, reduce = 0%
"""
self.assertEquals(12, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(12, HiveEngineSpec.progress(log))

def test_job_2_launched_stage_2_stages_progress(self):
log = """
Expand All @@ -83,5 +85,5 @@ def test_job_2_launched_stage_2_stages_progress(self):
17/02/07 19:15:55 INFO ql.Driver: Launching Job 2 out of 2
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 0%, reduce = 0%
17/02/07 19:16:09 INFO exec.Task: 2017-02-07 19:16:09,173 Stage-1 map = 40%, reduce = 0%
"""
self.assertEquals(60, db_engine_specs.HiveEngineSpec.progress(log))
""".split('\n')
self.assertEquals(60, HiveEngineSpec.progress(log))
5 changes: 1 addition & 4 deletions tests/sqllab_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,9 @@ def test_search_query_on_time(self):
from_time = 'from={}'.format(int(first_query_time))
to_time = 'to={}'.format(int(second_query_time))
params = [from_time, to_time]
resp = self.get_resp('/superset/search_queries?'+'&'.join(params))
resp = self.get_resp('/superset/search_queries?' + '&'.join(params))
data = json.loads(resp)
self.assertEquals(2, len(data))
for k in data:
self.assertLess(int(first_query_time), k['startDttm'])
self.assertLess(k['startDttm'], int(second_query_time))

def test_alias_duplicate(self):
self.run_sql(
Expand Down

0 comments on commit b888802

Please sign in to comment.