Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename self.blocks and self.block_mapping for human clarity #3308

Merged
merged 2 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ def create_monitoring_info(self, status):
d['status'] = s.status_name
d['timestamp'] = datetime.datetime.now()
d['executor_label'] = self.label
d['job_id'] = self.blocks.get(bid, None)
d['job_id'] = self.blocks_to_job_id.get(bid, None)
d['block_id'] = bid
msg.append(d)
return msg
Expand Down Expand Up @@ -767,14 +767,14 @@ class BlockInfo:

# Now kill via provider
# Potential issue with multiple threads trying to remove the same blocks
to_kill = [self.blocks[bid] for bid in block_ids_to_kill if bid in self.blocks]
to_kill = [self.blocks_to_job_id[bid] for bid in block_ids_to_kill if bid in self.blocks_to_job_id]

r = self.provider.cancel(to_kill)
job_ids = self._filter_scale_in_ids(to_kill, r)

# to_kill block_ids are fetched from self.blocks
# If a block_id is in self.block, it must exist in self.block_mapping
block_ids_killed = [self.block_mapping[jid] for jid in job_ids]
# to_kill block_ids are fetched from self.blocks_to_job_id
# If a block_id is in self.blocks_to_job_id, it must exist in self.job_ids_to_block
block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids]

return block_ids_killed

Expand Down
12 changes: 6 additions & 6 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def __init__(self, *,
self._block_id_counter = AtomicIDCounter()

self._tasks = {} # type: Dict[object, Future]
self.blocks = {} # type: Dict[str, str]
self.block_mapping = {} # type: Dict[str, str]
self.blocks_to_job_id = {} # type: Dict[str, str]
self.job_ids_to_block = {} # type: Dict[str, str]

def _make_status_dict(self, block_ids: List[str], status_list: List[JobStatus]) -> Dict[str, JobStatus]:
"""Given a list of block ids and a list of corresponding status strings,
Expand Down Expand Up @@ -194,8 +194,8 @@ def scale_out(self, blocks: int = 1) -> List[str]:
logger.info(f"Allocated block ID {block_id}")
try:
job_id = self._launch_block(block_id)
self.blocks[block_id] = job_id
self.block_mapping[job_id] = block_id
self.blocks_to_job_id[block_id] = job_id
self.job_ids_to_block[job_id] = block_id
block_ids.append(block_id)
except Exception as ex:
self._fail_job_async(block_id,
Expand Down Expand Up @@ -232,10 +232,10 @@ def _get_block_and_job_ids(self) -> Tuple[List[str], List[Any]]:
# Not using self.blocks.keys() and self.blocks.values() simultaneously
# The dictionary may be changed during invoking this function
# As scale_in and scale_out are invoked in multiple threads
block_ids = list(self.blocks.keys())
block_ids = list(self.blocks_to_job_id.keys())
job_ids = [] # types: List[Any]
for bid in block_ids:
job_ids.append(self.blocks[bid])
job_ids.append(self.blocks_to_job_id[bid])
return block_ids, job_ids

@abstractproperty
Expand Down
6 changes: 3 additions & 3 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,8 @@ def scale_in(self, count):
"""Scale in method. Cancel a given number of blocks
"""
# Obtain list of blocks to kill
to_kill = list(self.blocks.keys())[:count]
kill_ids = [self.blocks[block] for block in to_kill]
to_kill = list(self.blocks_to_job_id.keys())[:count]
kill_ids = [self.blocks_to_job_id[block] for block in to_kill]

# Cancel the blocks provisioned
if self.provider:
Expand All @@ -625,7 +625,7 @@ def shutdown(self, *args, **kwargs):
self._should_stop.set()

# Remove the workers that are still going
kill_ids = [self.blocks[block] for block in self.blocks.keys()]
kill_ids = [self.blocks_to_job_id[block] for block in self.blocks_to_job_id.keys()]
if self.provider:
logger.debug("Cancelling blocks")
self.provider.cancel(kill_ids)
Expand Down
6 changes: 3 additions & 3 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ def scale_in(self, count):
"""Scale in method.
"""
# Obtain list of blocks to kill
to_kill = list(self.blocks.keys())[:count]
kill_ids = [self.blocks[block] for block in to_kill]
to_kill = list(self.blocks_to_job_id.keys())[:count]
kill_ids = [self.blocks_to_job_id[block] for block in to_kill]

# Cancel the blocks provisioned
if self.provider:
Expand All @@ -720,7 +720,7 @@ def shutdown(self, *args, **kwargs):
self.should_stop.value = True

# Remove the workers that are still going
kill_ids = [self.blocks[block] for block in self.blocks.keys()]
kill_ids = [self.blocks_to_job_id[block] for block in self.blocks_to_job_id.keys()]
if self.provider:
logger.debug("Cancelling blocks")
self.provider.cancel(kill_ids)
Expand Down
2 changes: 1 addition & 1 deletion parsl/tests/site_tests/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_provider():
logger.info("Job in terminal state")

_, current_jobs = executor._get_block_and_job_ids()
# PR 1952 stoped removing scale_in blocks from self.blocks
# PR 1952 stoped removing scale_in blocks from self.blocks_to_job_id
# A new PR will handle removing blocks from self.block
# this includes failed/completed/canceled blocks
assert len(current_jobs) == 1, "Expected current_jobs == 1"
Expand Down
Loading