Skip to content

Commit

Permalink
xcute: Allow to put on hold if lock is already used
Browse files Browse the repository at this point in the history
  • Loading branch information
AymericDu committed May 27, 2021
1 parent 47e12da commit bca9a50
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 74 deletions.
12 changes: 11 additions & 1 deletion oio/cli/admin/xcute/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ class XcuteRdirCommand(XcuteCommand, ShowOne):
def get_parser(self, prog_name):
parser = super(XcuteRdirCommand, self).get_parser(prog_name)

parser.add_argument(
'--put-on-hold-if-locked',
default=False,
help="""
If the lock is already used,
put the job on hold until the lock is released.
""",
action='store_true')

parser.add_argument(
'--rdir-fetch-limit', type=int,
help='Maximum number of entries returned in each rdir response. '
Expand All @@ -57,6 +66,7 @@ def take_action(self, parsed_args):

job_config = self.get_job_config(parsed_args)
job_info = self.xcute.job_create(
self.JOB_CLASS.JOB_TYPE, job_config=job_config)
self.JOB_CLASS.JOB_TYPE, job_config=job_config,
put_on_hold_if_locked=parsed_args.put_on_hold_if_locked)
return zip(*sorted(
flat_dict_from_dict(parsed_args, job_info).items()))
30 changes: 24 additions & 6 deletions oio/cli/admin/xcute/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ def _take_action(self, parsed_args):
for job_info in jobs:
job_main_info = job_info['job']
job_tasks = job_info['tasks']
progress = \
job_tasks['processed'] * 100. / (job_tasks['total'] or 0.00001)
try:
progress = job_tasks['processed'] * 100. / job_tasks['total']
except ZeroDivisionError:
if job_tasks['is_total_temp']:
progress = 0.
else:
progress = 100.
yield (job_main_info['id'], job_main_info['status'],
job_main_info['type'], job_main_info.get('lock'),
'%.2f%%' % progress)
Expand Down Expand Up @@ -130,12 +135,25 @@ def take_action(self, parsed_args):
job_main_info['duration'] = duration

job_tasks = job_info['tasks']
job_tasks['sent_percent'] = \
job_tasks['sent'] * 100. / (job_tasks['total'] or 0.00001)
try:
sent_percent = job_tasks['sent'] * 100. / job_tasks['total']
except ZeroDivisionError:
if job_tasks['is_total_temp']:
sent_percent = 0.
else:
sent_percent = 100.
job_tasks['sent_percent'] = sent_percent
job_tasks['processed_per_second'] = \
job_tasks['processed'] / (duration or 0.00001)
job_tasks['processed_percent'] = \
job_tasks['processed'] * 100. / (job_tasks['total'] or 0.00001)
try:
processed_percent = \
job_tasks['processed'] * 100. / job_tasks['total']
except ZeroDivisionError:
if job_tasks['is_total_temp']:
processed_percent = 0.
else:
processed_percent = 100.
job_tasks['processed_percent'] = processed_percent

if parsed_args.formatter == 'table':
if not job_tasks['all_sent']:
Expand Down
8 changes: 6 additions & 2 deletions oio/xcute/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,13 @@ def job_list(self, limit=None, prefix=None, marker=None,
'lock': job_lock})
return data

def job_create(self, job_type, job_config=None):
def job_create(self, job_type, job_config=None,
put_on_hold_if_locked=False):
_, data = self.xcute_request(
'POST', '/job/create', params={'type': job_type}, json=job_config)
'POST', '/job/create',
params={'type': job_type,
'put_on_hold_if_locked': put_on_hold_if_locked},
json=job_config)
return data

def job_show(self, job_id):
Expand Down
Loading

0 comments on commit bca9a50

Please sign in to comment.