Skip to content

Commit

Permalink
[AIRFLOW-2800] Remove low-hanging linting errors
Browse files Browse the repository at this point in the history
  • Loading branch information
andscoop authored and ashb committed Oct 22, 2018
1 parent 9c1030e commit 0f67553
Show file tree
Hide file tree
Showing 75 changed files with 226 additions and 209 deletions.
13 changes: 7 additions & 6 deletions airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -80,11 +80,12 @@ class AirflowMacroPlugin(object):
def __init__(self, namespace):
self.namespace = namespace

from airflow import operators

from airflow import operators # noqa: E402
from airflow import sensors # noqa: E402
from airflow import hooks
from airflow import executors
from airflow import macros
from airflow import hooks # noqa: E402
from airflow import executors # noqa: E402
from airflow import macros # noqa: E402

operators._integrate_plugins()
sensors._integrate_plugins() # noqa: E402
Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/auth/backends/ldap_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def get_ldap_connection(dn=None, password=None):
cacert = configuration.conf.get("ldap", "cacert")
tls_configuration = Tls(validate=ssl.CERT_REQUIRED, ca_certs_file=cacert)
use_ssl = True
except:
except Exception:
pass

server = Server(configuration.conf.get("ldap", "uri"), use_ssl, tls_configuration)
Expand Down Expand Up @@ -94,7 +94,7 @@ def groups_user(conn, search_base, user_filter, user_name_att, username):
search_filter = "(&({0})({1}={2}))".format(user_filter, user_name_att, username)
try:
memberof_attr = configuration.conf.get("ldap", "group_member_attr")
except:
except Exception:
memberof_attr = "memberOf"
res = conn.search(native(search_base), native(search_filter),
attributes=[native(memberof_attr)])
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/aws_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _parse_s3_config(config_file_name, config_format='boto', profile=None):
try:
access_key = config.get(cred_section, key_id_option)
secret_key = config.get(cred_section, secret_key_option)
except:
except Exception:
logging.warning("Option Error in parsing s3 config file")
raise
return access_key, secret_key
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/awsbatch_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def _wait_for_task_ended(self):
if response['jobs'][-1]['status'] in ['SUCCEEDED', 'FAILED']:
retry = False

sleep( 1 + pow(retries * 0.1, 2))
sleep(1 + pow(retries * 0.1, 2))
retries += 1

def _check_success_task(self):
Expand Down
14 changes: 7 additions & 7 deletions airflow/contrib/operators/mlengine_prediction_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ def decode(self, x):
@beam.ptransform_fn
def MakeSummary(pcoll, metric_fn, metric_keys): # pylint: disable=invalid-name
return (
pcoll
| "ApplyMetricFnPerInstance" >> beam.Map(metric_fn)
| "PairWith1" >> beam.Map(lambda tup: tup + (1,))
| "SumTuple" >> beam.CombineGlobally(beam.combiners.TupleCombineFn(
*([sum] * (len(metric_keys) + 1))))
| "AverageAndMakeDict" >> beam.Map(
pcoll |
"ApplyMetricFnPerInstance" >> beam.Map(metric_fn) |
"PairWith1" >> beam.Map(lambda tup: tup + (1,)) |
"SumTuple" >> beam.CombineGlobally(beam.combiners.TupleCombineFn(
*([sum] * (len(metric_keys) + 1)))) |
"AverageAndMakeDict" >> beam.Map(
lambda tup: dict(
[(name, tup[i]/tup[-1]) for i, name in enumerate(metric_keys)] +
[(name, tup[i] / tup[-1]) for i, name in enumerate(metric_keys)] +
[("count", tup[-1])])))


Expand Down
50 changes: 33 additions & 17 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ def heartbeat(self):
if job.latest_heartbeat:
sleep_for = max(
0,
self.heartrate - (timezone.utcnow() - job.latest_heartbeat).total_seconds())
self.heartrate - (
timezone.utcnow() - job.latest_heartbeat).total_seconds())

sleep(sleep_for)

Expand Down Expand Up @@ -391,7 +392,7 @@ def helper():
log.info(
"Processing %s took %.3f seconds", file_path, end_time - start_time
)
except:
except Exception:
# Log exceptions through the logging framework.
log.exception("Got an exception! Propagating...")
raise
Expand Down Expand Up @@ -1087,17 +1088,18 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
session
.query(TI)
.filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
.outerjoin(DR,
and_(DR.dag_id == TI.dag_id,
DR.execution_date == TI.execution_date))
.filter(or_(DR.run_id == None,
.outerjoin(
DR,
and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date)
)
.filter(or_(DR.run_id == None, # noqa E711
not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'))))
.outerjoin(DM, DM.dag_id==TI.dag_id)
.filter(or_(DM.dag_id == None,
.outerjoin(DM, DM.dag_id == TI.dag_id)
.filter(or_(DM.dag_id == None, # noqa E711
not_(DM.is_paused)))
)
if None in states:
ti_query = ti_query.filter(or_(TI.state == None, TI.state.in_(states)))
ti_query = ti_query.filter(or_(TI.state == None, TI.state.in_(states))) # noqa E711
else:
ti_query = ti_query.filter(TI.state.in_(states))

Expand All @@ -1119,7 +1121,8 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
for task_instance in task_instances_to_examine:
pool_to_task_instances[task_instance.pool].append(task_instance)

task_concurrency_map = self.__get_task_concurrency_map(states=states_to_count_as_running, session=session)
task_concurrency_map = self.__get_task_concurrency_map(
states=states_to_count_as_running, session=session)

# Go through each pool, and queue up a task for execution if there are
# any open slots in the pool.
Expand Down Expand Up @@ -1190,9 +1193,14 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
)
continue

task_concurrency = simple_dag.get_task_special_arg(task_instance.task_id, 'task_concurrency')
task_concurrency = simple_dag.get_task_special_arg(
task_instance.task_id,
'task_concurrency')
if task_concurrency is not None:
num_running = task_concurrency_map[((task_instance.dag_id, task_instance.task_id))]
num_running = task_concurrency_map[
((task_instance.dag_id, task_instance.task_id))
]

if num_running >= task_concurrency:
self.log.info("Not executing %s since the task concurrency for"
" this task has been reached.", task_instance)
Expand All @@ -1212,7 +1220,8 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):

task_instance_str = "\n\t".join(
["{}".format(x) for x in executable_tis])
self.log.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
self.log.info(
"Setting the follow tasks to queued state:\n\t%s", task_instance_str)
# so these dont expire on commit
for ti in executable_tis:
copy_dag_id = ti.dag_id
Expand Down Expand Up @@ -1254,7 +1263,9 @@ def _change_state_for_executable_task_instances(self, task_instances,
.filter(or_(*filter_for_ti_state_change)))

if None in acceptable_states:
ti_query = ti_query.filter(or_(TI.state == None, TI.state.in_(acceptable_states)))
ti_query = ti_query.filter(
or_(TI.state == None, TI.state.in_(acceptable_states)) # noqa E711
)
else:
ti_query = ti_query.filter(TI.state.in_(acceptable_states))

Expand Down Expand Up @@ -1600,7 +1611,8 @@ def processor_factory(file_path):
child.terminate()
# TODO: Remove magic number
timeout = 5
self.log.info("Waiting up to %s seconds for processes to exit...", timeout)
self.log.info(
"Waiting up to %s seconds for processes to exit...", timeout)
try:
psutil.wait_procs(
child_processes, timeout=timeout,
Expand Down Expand Up @@ -1657,7 +1669,9 @@ def _execute_helper(self, processor_manager):
self.log.info("Searching for files in %s", self.subdir)
known_file_paths = list_py_file_paths(self.subdir)
last_dag_dir_refresh_time = timezone.utcnow()
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
self.log.info(
"There are %s files in %s", len(known_file_paths), self.subdir)

processor_manager.set_file_paths(known_file_paths)

self.log.debug("Removing old import errors")
Expand All @@ -1670,7 +1684,9 @@ def _execute_helper(self, processor_manager):
if self.using_sqlite:
# For the sqlite case w/ 1 thread, wait until the processor
# is finished to avoid concurrent access to the DB.
self.log.debug("Waiting for processors to finish since we're using sqlite")
self.log.debug(
"Waiting for processors to finish since we're using sqlite")

processor_manager.wait_until_finished()

# Send tasks for execution if available
Expand Down
5 changes: 2 additions & 3 deletions airflow/migrations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

4 changes: 2 additions & 2 deletions airflow/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -39,4 +39,3 @@ def upgrade():

def downgrade():
op.drop_index('dag_id_state', table_name='dag_run')

4 changes: 2 additions & 2 deletions airflow/migrations/versions/13eb55f81627_for_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
4 changes: 2 additions & 2 deletions airflow/migrations/versions/1b38cef5b76e_add_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
4 changes: 2 additions & 2 deletions airflow/migrations/versions/2e541a1dcfed_task_duration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
5 changes: 2 additions & 3 deletions airflow/migrations/versions/2e82aab8ef20_rename_user_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down Expand Up @@ -41,4 +41,3 @@ def upgrade():

def downgrade():
op.rename_table('users', 'user')

Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
4 changes: 2 additions & 2 deletions airflow/migrations/versions/40e67319e3a9_dagrun_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
4 changes: 2 additions & 2 deletions airflow/migrations/versions/4446e08588_dagrun_start_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand Down
Loading

0 comments on commit 0f67553

Please sign in to comment.