Skip to content

Commit

Permalink
submit multiple jobs per instance
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel.asztalos committed Feb 19, 2019
1 parent 10c1a4f commit 1787958
Showing 1 changed file with 43 additions and 32 deletions.
75 changes: 43 additions & 32 deletions pyblish_deadline/plugins/deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
import uuid
import json
from collections import defaultdict

import pyblish.api

Expand All @@ -17,9 +18,9 @@ class IntegrateDeadline(pyblish.api.ContextPlugin):

def process(self, context):

instances = {}
instances_order = []
instances_no_order = []
self.orders = []
jobs_instances_by_order = defaultdict(list)
jobs_instances_no_order = []

for instance in context:

Expand All @@ -33,33 +34,38 @@ def process(self, context):
self.log.info(msg)
continue

if "order" in instance.data("deadlineData"):
order = instance.data("deadlineData")["order"]
instances_order.append(order)
if order in instances:
instances[order].append(instance)
jobs = instance.data("deadlineData")
# maintain backward compatibility, where only job could be submitted per instance
if not isinstance(jobs, list):
jobs = [jobs]

for job in jobs:
if "order" in job:
order = job["order"]
self.orders.append(order)
jobs_instances_by_order[order].append((job, instance))
else:
instances[order] = [instance]
else:
instances_no_order.append(instance)
jobs_instances_no_order.append((job, instance))

instances_order = list(set(instances_order))
instances_order.sort()
self.orders = list(set(self.orders))
self.orders.sort()

new_context = []
for order in instances_order:
for instance in instances[order]:
new_context.append(instance)
jobs_instances_sorted = []
for order in self.orders:
for job_instance in jobs_instances_by_order[order]:
jobs_instances_sorted.append(job_instance)

if not instances_order:
new_context = instances_no_order
jobs_instances_sorted.extend(jobs_instances_no_order)

for instance in new_context:
self.job_ids = [[] for i in self.orders]
for job, instance in jobs_instances_sorted:
self._process_job(job, instance)

def _process_job(self, job, instance):
submission_id = uuid.uuid4()

# getting job data
job_data = instance.data("deadlineData")["job"]
job_data = job["job"]

# setting instance data
data = {}
Expand Down Expand Up @@ -100,14 +106,15 @@ def process(self, context):
job_data["ExtraInfoKeyValue"]["PyblishContextData"] = data

# setting up dependencies
if "order" in instance.data("deadlineData"):
order = instance.data("deadlineData")["order"]
if instances_order.index(order) != 0:
index = instances_order.index(order) - 1
dependencies = instances[instances_order[index]]
for count in range(0, len(dependencies)):
name = "JobDependency%s" % count
job_data[name] = dependencies[count].data("jobId")
if "order" in job:
order = job["order"]
current_order_index = self.orders.index(order)
if current_order_index != 0:
index = current_order_index - 1
dependencies = self.job_ids[index]
for i, job_id in enumerate(dependencies):
name = "JobDependency%s" % i
job_data[name] = job_id

# writing job data
data = ""
Expand Down Expand Up @@ -149,7 +156,7 @@ def process(self, context):
self.log.info("job data:\n\n%s" % data)

# writing plugin data
plugin_data = instance.data("deadlineData")["plugin"]
plugin_data = job["plugin"]
data = ""
for entry in plugin_data:
data += "%s=%s\n" % (entry, plugin_data[entry])
Expand All @@ -167,7 +174,7 @@ def process(self, context):
args = [job_path, plugin_path]

if "auxiliaryFiles" in instance.data["deadlineData"]:
aux_files = instance.data("deadlineData")["auxiliaryFiles"]
aux_files = job["auxiliaryFiles"]
if isinstance(aux_files, list):
args.extend(aux_files)
else:
Expand All @@ -180,7 +187,11 @@ def process(self, context):
self.log.info(result)

job_id = re.search(r"JobID=(.*)", result).groups()[0]
instance.set_data("jobId", value=job_id)

if "order" in job:
order = job["order"]
index = self.orders.index(order)
self.job_ids[index].append(job_id)
except:
raise ValueError(traceback.format_exc())

Expand Down

0 comments on commit 1787958

Please sign in to comment.