-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
update: Rewrite update script #372
base: master
Are you sure you want to change the base?
Conversation
|
63d1535
to
178226a
Compare
Cannot reproduce good performance. I compared the original update.py versus my own PoC (called
Here is what it looks like:
|
@tleb On how many tags did you test? I decreased chunksize in my script to 100 (the "1000" argument in @Daniil159x I'm not sure if async would helps much here. It maybe would if threads were blocked on I/O most of the time, but both mine and @tleb scripts do database I/O on a single thread. There is also I/O related to reading from other processes (a lot of processing happens in script.sh/ctags/git), but again, it's not clear to me if that's the bottleneck. async also has some overhead AFAIK. On the other hand, I do see that neither of the scripts achieve 100% CPU utilization, it always hovers ~99%, so maybe? |
With chunksize calculation from @tleb's script (already on my
|
self.refs_lock = Lock() | ||
self.docs_lock = Lock() | ||
self.comps_lock = Lock() | ||
self.comps_docs_lock = Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are those locks required? I thought a single thread was writing into the database? If it is because you are indexing multiple versions at the same time, why do that? My PoC did one version after the other, which simplifies code without losing performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the locks are just defensive programming/leftover from an older design. I will remove them. I wish there was a simple way to ensure that UpdatePartialState does not end up in another thread somehow.
if hash in self.hash_to_idx: | ||
return self.hash_to_idx[hash] | ||
else: | ||
return self.db.blob.get(hash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you have to look locally or in the DB? Why not only do one or the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some data, like hash -> idx, idx -> hash/filename mappings, whatever was in vers.db, is not saved into the database until the update process finishes. I was hoping to make interrupting the update process a bit safer thanks to that (although I'm actually not 100% anymore if default berkeley db can handle interruptions without breaking).
The idea is pretty basic, refs/defs added in an interrupted update won't have entries in hash/filename/blob databases.
numBlobs is updated first, reserving id space for currently processed blobs forever. An interrupted update might leave entries with unknown blob ids, but AFAIK this is (definitely could be if it's not) handled gracefully by the backend. The unknown entries could be garbage collected later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do processing in a version per version basis? That requires storing all key-value pairs to be updated somewhere: either in memory or in an append-only file. Then once we are done indexing the file, we do an "update database" step that does all the writes.
That would avoid any database issue that would be caused by indexing raising an error. Also, it removes all DB ops from indexing functions that are purely focused on calling ctags or whatever.
So pseudocode would be like:
for version_name in versions_todo:
all_blobs = ...
new_blobs = ...
new_defs = compute_new_defs_multithreaded(version_name, new_blobs)
list_of_defs_in_version = find_all_defs_in_all_blobs(all_blobs)
new_refs = find_new_refs_multithreaded(version_name, list_of_defs_in_version)
# same thing for all types of values we have
# OK, we are done, we can update the database with new_defs, new_refs, etc.
save_defs(new_defs)
save_refs(new_refs)
# ...
for idx, path in buf: | ||
obj.append(idx, path) | ||
|
||
state.db.vers.put(state.tag, obj, sync=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is part of the "add to databases" done in UpdatePartialState and another part is done here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I separate parts that can have garbage entries left by an unfinished update, from parts that cannot have garbage entries. vers is used to tell if a tag was already indexed. That's also (partially) why using a database while it's being updated was such a problem.
state.db.blob.put(hash, idx) | ||
|
||
# Update versions | ||
blobs = scriptLines('list-blobs', '-p', state.tag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done twice, can we cache the value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I think it could be cached.
# NOTE: it is assumed that update_refs and update_defs are not running | ||
# concurrently. hence, defs are not locked | ||
# defs database MUSNT be updated while get_refs is running | ||
# Get references for a file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are update_refs
and update_defs
? No functions are named that way in the file. Also why mustn't they run at the same time? Why is a comment required to explicit this, it looks hard to see from the code.
That comment wouldn't be required if the code was more readable, something like:
defs = get_defs(...)
refs = get_refs(..., defs)
Why is the code calling get_refs()
and get_defs()
more complex than the above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are update_refs and update_defs? No functions are named that way in the file
I think the names are out of date. Maybe I meant get_defs
and get_refs
.
Also why mustn't they run at the same time? Why is a comment required to explicit this, it looks hard to see from the code
References update requires computed definitions.
Why is the code calling get_refs() and get_defs() more complex than the above?
It's submitted as a job to futures pool. I can't, for example, pass a closure to another thread. This is why the awkward batch_* functions exist.
def batch(job): | ||
def f(chunk, **kwargs): | ||
return [job(*args, **kwargs) for args in chunk] | ||
return f |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is that required when pool's *map*()
methods all have a chunksize argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opted for manually creating jobs this weird way, because I wanted to be able to see how many are finished from update_version
. Map just gives you an iterable with results.
return f | ||
|
||
# NOTE: some of the following functions are kind of redundant, and could sometimes be | ||
# higher-order functions, but that's not supported by multiprocessing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question is: why are those required? Not how we could abstract them in a single function, but why we can't fit into the niceties provided by the pools from Python's stdlib.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you would replace it with, although there probably are better options. The main issue was that from what I remember I couldn't pass callables returned by higher-order functions to the executor. Only named functions could be passed. That made some this of the code awkward.
comps = BsdDB(getDataDir() + '/compatibledts.db', True, DefList) | ||
result = [get_comps_docs(*args, comps=comps, **kwargs) for args in chunk] | ||
comps.close() | ||
return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprised to see a wild database opened here. Aren't them all opened already by the caller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When batch_comps_docs is called, comps database is already closed in the main thread. You shouldn't open the same database from two different threads with the current berkeleydb config (at least not if one is opened as read-write). I think you also cannot pass them between processes. Here it's only opened in read only mode.
|
||
# Split list into sublist of chunk_size size | ||
def split_into_chunks(list, chunk_size): | ||
return [list[i:i+chunk_size] for i in range(0, len(list), chunk_size)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, we should use multiprocessing.pool.Pool.map(chunksize=)
and not reimplement it ourselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map only gives you an iterable that you can block on. I wanted to see how many tasks are finished from the main thread, to put progress logs in a single place, and I needed a list of futures for that.
|
||
# Start refs job | ||
futures = [pool.submit(batch_refs, ch) for ch in chunks] | ||
return ("refs", (futures, handle_refs_results(state), None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is code not linear? Why do we need callbacks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I maybe tired too hard to "engineer" this. Maybe it could be simplified a bit.
First, I wanted to avoid tracking progress all over the place. I wanted to hide progress bars in a single loop, without thinking about it in functions that do computation.
Second, I was hoping to model relationships between parts of the update job. References update needs definitions update to finish first. comps_docs needs comps. But I don't want to wait for defs to finish to start comps_docs, if comps finishes first.
I basically wanted the scheduling logic to be separate from the part that actually computes data, and all that to be separate from the part that flushes stuff into the database.
I was also hoping to handle database safety better, (do not share between threads, do not share between processes, do not read if it's open as read-write) that's why databases are closed sometimes. It's still not great, I feel limited by multiprocessing lib a bit (you can't/shouldn't share state).
So I decided to represent the jobs in a declarative way (submit part that does computation to pool, call part that flushes to the database on each result, call part that schedules the next step when all parts finish) , and have a loop handle these declarations, without caring about what is actually computed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I don't want to wait for defs to finish to start comps_docs, if comps finishes first.
Why? I think the whole issue with the current solution is that it tries doing everything all at the same time. It is fine to do defs then refs then docs, etc. Doing everything at the same time doesn't make things faster. Doing all X at once then all Y is as fast (or faster).
It's still not great, I feel limited by multiprocessing lib a bit (you can't/shouldn't share state).
We should make our processing fit into the abstractions it gives us. What we do is map from blobs to defs/refs inside of it. Not sharing state is a benefit to readability, it shouldn't be seen as something that hinders your solution.
# and bsddb cannot be shared between processes until/unless bsddb concurrent data store is implemented. | ||
# Operations on closed databases raise exceptions that would in this case be indicative of a bug. | ||
state.db.defs.sync() | ||
state.db.defs.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we only close this database? I would expect all databases to be closed once at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defs it's reopened read-only in a different thread, in batch_refs. I want to close the read-write version to make sure reading it is safe later.
to_track = { | ||
"defs": ([], handle_defs_results(state), after_all_defs_done), | ||
"docs": ([], handle_batch_results(state.add_docs), None), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That part (and below code) is really hard to make sense of. We should aim at making more linear code. Probably build arrays of tasks to do and pass them to a pool. Or something else (?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explained in previous comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, my complaint is that code is not linear. It makes it really hard to understand, and I've been reading and writing Elixir indexing code more or less recently. I can't imagine myself or others in two years time.
As said in some comments, I'd expect code to be more like
x = get_required_values()
y = compute_y(x)
z = compute_z(x, y)
cleanup_foobar()
If things need to be done in parallel, it should be a single call to pool.*map*()
function(s). That makes execution flow (what happens when) and ressource management (what is used/freed when) easy to understand. Futures & co are nice features but they make code obtuse.
Something else not touched in the comments (but discussed in real-life :-) ) is that the logs are not useful. I haven't run this version (please rebase on master that has the utils/index
script to make testing easier), but code says it prints not-so-useful things.
A good logs starting point would be a summary at the end of each tag indexed. Something like (ideally aligned):
tag v2.6: N blobs, M defs, O refs, P compatibles, S seconds
One last thing about logging: make sure to not lose errors! There are try except Exception
blocks that ignore the exception and print a generic error message. That is not useful for debugging purposes. It must do what it can (maybe stop all processing of the current version and try indexing other versions).
Thanks for the review!
Rebased.
I explained why the code is not linear in of the review comments. I thought I would maybe state design goals. We should've discussed that earlier. I think we agree about most of this, but some is up for discussion. From the current script:
If not for clunkiness of berkeleydb, last two points would be unnecessary. But I'm assuming we are staying it berkeleydb for now. |
Avoid calling this parse-docs script that is expensive. This heuristic avoids running it on most files, and is almost free. Signed-off-by: Théo Lebrun <[email protected]>
By default ctags sorts entries. This is not useful to the update script, but takes time. user time for `update.py 16` on musl v1.2.5 went from 1m21.613s to 1m11.849s.
New update script uses futures to dynamically schedule many smaller tasks between a constant number of threads, instead of statically assigning a single long running task to each thread. This results in better CPU saturation. Database handles are not shared between threads anymore, instead the main thread is used to commit results of other threads into the database. This trades locking on database access for serialization costs - since multiprocessing is used, values returned from futures are pickled. (although in practice that depends on ProcessPool configuration)
Code by Théo Lebrun
ec97c4c
to
e32aeef
Compare
No description provided.