-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added console backfill pause
, console backfill stop
archives working state
#1140
Conversation
Signed-off-by: Chris Helma <[email protected]>
Signed-off-by: Chris Helma <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1140 +/- ##
============================================
+ Coverage 80.63% 80.72% +0.08%
- Complexity 2910 2947 +37
============================================
Files 399 399
Lines 14829 14965 +136
Branches 1007 1017 +10
============================================
+ Hits 11958 12080 +122
- Misses 2260 2274 +14
Partials 611 611
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
@@ -111,6 +128,23 @@ def stop(self, *args, **kwargs) -> CommandResult: | |||
def scale(self, units: int, *args, **kwargs) -> CommandResult: | |||
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances") | |||
return self.ecs_client.set_desired_count(units) | |||
|
|||
def archive(self, *args, archive_dir_path: str = None, **kwargs) -> CommandResult: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this function do if the index doesn't exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 145 seems like it would be ok, but line 140 would be a problem.
The User version of this question is what happens if a user runs it twice? If it were idempotent, it would just print out the path again, so maybe the outer message can be tweaked slightly and there can be another check in here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the code to gracefully skip the archive process if the index doesn't exist.
@@ -17,6 +19,8 @@ | |||
|
|||
logger = logging.getLogger(__name__) | |||
|
|||
WORKING_STATE_INDEX = ".migrations_working_state" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventually, this should be configurable, but I don't think that needs to be today. Let's do that at the same time we make it configurable across the board.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for picking this up. I have a couple questions/suggestions for the handling of the archives and some other minor stuff.
@@ -291,6 +293,14 @@ def start_backfill_cmd(ctx, pipeline_name): | |||
raise click.ClickException(message) | |||
click.echo(message) | |||
|
|||
@backfill_group.command(name="pause") | |||
@click.option('--pipeline-name', default=None, help='Optionally specify a pipeline name') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipeline names could be confusing. If I knew nothing about OSI, I might think that I have named RFS pipelines that I could do something with. Other than removing OSI, I'm not sure it's worth making any other changes. For that, I'd recommend opening a jira and we'll discuss in the coming weeks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -301,6 +311,17 @@ def stop_backfill_cmd(ctx, pipeline_name): | |||
raise click.ClickException(message) | |||
click.echo(message) | |||
|
|||
click.echo("Archiving the working state of the backfill operation...") | |||
exitcode, message = backfill_.archive(ctx.env.backfill) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming exitcode. It seemed like it was supposed to be the integer exit code for the process, but it looks like it's a boolean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes enough sense that I think I'm personally fine w/ it.
@abstractmethod | ||
def stop(self, *args, **kwargs) -> CommandResult[str]: | ||
"""Stop or pause the backfill. This does not make guarantees about resumeability.""" | ||
"""Stop the backfill. This does not make guarantees about resumeability.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a guarantee if it completes successfully, right? That would be a fair thing to say and give the user a better understanding of what to expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intuition is that you use pause
when you want the ability to resume, and stop
if you want to clean things up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can you not guarantee if start can pickup or not if these two commands ended successfully?
@abstractmethod | ||
def archive(self, *args, **kwargs) -> CommandResult[str]: | ||
"""Archive the backfill operation. Should return the information required to resume the backfill operations. | ||
Should fail if there are currently running operations.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently running backfill operations.
The user could think that some independent activity (or who knows ECS tasks?!) could cause failure here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This text is not surfaced to the users at all, AFAIK
@@ -111,6 +128,23 @@ def stop(self, *args, **kwargs) -> CommandResult: | |||
def scale(self, units: int, *args, **kwargs) -> CommandResult: | |||
logger.info(f"Scaling RFS backfill by setting desired count to {units} instances") | |||
return self.ecs_client.set_desired_count(units) | |||
|
|||
def archive(self, *args, archive_dir_path: str = None, **kwargs) -> CommandResult: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line 145 seems like it would be ok, but line 140 would be a problem.
The User version of this question is what happens if a user runs it twice? If it were idempotent, it would just print out the path again, so maybe the outer message can be tweaked slightly and there can be another check in here.
if archive_dir_path: | ||
backup_dir = archive_dir_path | ||
elif shared_logs_dir is None: | ||
backup_dir = "./working_state" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this more specific, like backfill_working_state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
os.makedirs(backup_dir, exist_ok=True) | ||
|
||
# Write the backup | ||
with open(backup_path, "w") as f: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This overwrites an existing file. It looks like we always write to the same file. I'd recommend opening w/ "a" and putting a header in that can clearly show the beginning so that the segments can be demarcated.
The other option is to open the file exclusively for create ("x") w/ a timestamp (if there is a collision, the user could see it and just try again a second or ms later - though there really shouldn't be in 99.9999% of real world use cases). If we think that files could be very big, this is probably a much better option than the previous one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made individual, timestamped files
for hit in hits: | ||
documents[hit['_id']] = hit['_source'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could put this into the beginning of the while loop - that makes the code easier to maintain if you're either putting this stuff to memory or stream
@@ -198,3 +199,63 @@ def execute_benchmark_workload(self, workload: str, | |||
display_command = command.replace(f"basic_auth_password:{password_to_censor}", "basic_auth_password:********") | |||
logger.info(f"Executing command: {display_command}") | |||
subprocess.run(command, shell=True) | |||
|
|||
def fetch_all_documents(self, index_name: str, batch_size: int = 100) -> Dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the results here are big enough to warrant a scroll (& good catch, I think they are) - does it also make sense to write the contents to a file rather than accumulate them in memory? If a customer has 20K shards and we divide those into 20 work units each and each document is ~200 bytes of data plus pointers for strings, you're looking at 80MB. A visitor pattern would look nice here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per discussion, turned this into a generator.
Signed-off-by: Chris Helma <[email protected]>
Signed-off-by: Chris Helma <[email protected]>
Signed-off-by: Chris Helma <[email protected]>
Signed-off-by: Chris Helma <[email protected]>
Description
console backfill stop
to scale the RFS workers down to 0, backup the working state to disk, and delete the working state index. To support this, we created a new operation,backfill archive
, at the middleware layer. This new operation is not surfaced in the CLI.console backfill pause
as an alias forconsole backfill scale 0
so that users can "stop" a migration withoutstop
'ing itIssues Resolved
Testing
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.