-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
speed to write files to s3 storage #39
Comments
I can check with CESNET after @guillaumeeb confirms issues. |
@tinaok could you point towards the code cell that you feel is slow? I have to admit I didn't go into details about performances, but this seemed reasonable to me when I did those tests. If I remember correctly, this was between 0.5s and 2s per chunk, depending on chunk size. |
Thank you @guillaumeeb and @sebastian-luna-valero |
@guillaumeeb If you have better example than that, please place them at https://github.com/pangeo-data/clivar-2022/blob/main/tutorial/examples/notebooks/ and make pull request? |
So, I've looked at the notebook and tried to reproduce. This is something I've experienced during my tests: when writing a Zarr file to S3 on CESNET, there is an non compressible time before the writes of chunks actually start. This time is about 30s. From your example, we can see that it is not related to Dask. So this must be on Zarr or Xarray side. I'm not sure what is happening between the Python command and task graphs begin in Dask. Maybe it's the time it takes to create Metadata? But this seems really high. You can verify this by using a LocalCluster and watching the tasks stream. Maybe it comes from CESNET, I don't know. Did you try the same code using another Cloud infrastructure? What I can say, is that once the writes of chunks are started, the bandwidth seems reasonable, so bandwidth and latency of CESNET storage looks good when writing the chunks of data. We can try simple writes from command line or with s3fs to see if it comes from CESNET, or from something internal to Xarray or Zarr, or maybe from a conjunction of both... |
Hm, trying ds.to_zarr("/tmp/myfile", mode='w', consolidated=True) takes less than a second, so there is definitely something going on with Zarr and the object store. |
I've also tried to sync the dataset from local storage to CESNET object store using time aws s3 --endpoint https://object-store.cloud.muni.cz --profile pangeoswift sync /tmp/myfile s3://tmp/guillaumeeb/
upload: ../../tmp/myfile/.zattrs to s3://tmp/guillaumeeb/.zattrs
upload: ../../tmp/myfile/lon/.zarray to s3://tmp/guillaumeeb/lon/.zarray
upload: ../../tmp/myfile/Tair/.zattrs to s3://tmp/guillaumeeb/Tair/.zattrs
upload: ../../tmp/myfile/Tair/.zarray to s3://tmp/guillaumeeb/Tair/.zarray
upload: ../../tmp/myfile/lat/0 to s3://tmp/guillaumeeb/lat/0
upload: ../../tmp/myfile/lat/.zarray to s3://tmp/guillaumeeb/lat/.zarray
upload: ../../tmp/myfile/.zgroup to s3://tmp/guillaumeeb/.zgroup
upload: ../../tmp/myfile/.zmetadata to s3://tmp/guillaumeeb/.zmetadata
upload: ../../tmp/myfile/lat/.zattrs to s3://tmp/guillaumeeb/lat/.zattrs
upload: ../../tmp/myfile/Tair/0.0.0 to s3://tmp/guillaumeeb/Tair/0.0.0
upload: ../../tmp/myfile/lon/0 to s3://tmp/guillaumeeb/lon/0
upload: ../../tmp/myfile/time/0 to s3://tmp/guillaumeeb/time/0
upload: ../../tmp/myfile/time/.zattrs to s3://tmp/guillaumeeb/time/.zattrs
upload: ../../tmp/myfile/time/.zarray to s3://tmp/guillaumeeb/time/.zarray
upload: ../../tmp/myfile/lon/.zattrs to s3://tmp/guillaumeeb/lon/.zattrs
real 0m1.989s
user 0m0.733s
sys 0m0.184s Less than 2 seconds, so the problem does not come from CESNET per se. We're in the last situation, conjunction between using Zarr/Xarray and the object store. We should try the notebook on another system with S3 storage if possible. Else, maybe ask some help on Pangeo community side. |
Thank you very much @guillaumeeb !! I would like to start letting students use the object storage for writing their Zarr file. Do you have a way for avoiding one students deleting other student's data? |
I have access to https://us-central1-b.gcp.pangeo.io/ but I do not have access to it's s3 storage so I can not bench. May be @rabernat can help us? |
Everyone on that hub has access to the scratch bucket where you can write a much data as you want. I tried this example on the main cluster in the following way import xarray as xr
import os
ds = xr.tutorial.open_dataset("air_temperature.nc").load()
SCRATCH_BUCKET = os.environ['SCRATCH_BUCKET']
%time ds.to_zarr(f'{SCRATCH_BUCKET}/air_temperature.zarr') It took 5s. Definitely not very fast! But not 30s either. This is using gcsfs. Just a thought...rather than using an There are some optimizations available in FSStore that are not there in S3Map, and this could be making a difference. See zarr-developers/zarr-python#911 for some relevant discussion. |
Thank you @rabernat but I do not understand yet what I should change in this notebook to use Zarr.storage.FSStore ... Also, we have one more question that we want to ask how US Pangeo is doing #17 (comment) |
You want to construct your store as follows uri = f"s3://{s3_prefix}/{zarr_file_name}"
store = zarr.storage.FSStore(uri, **storage_kwargs) where
|
To profile this, here is what I am doing. From my notebook, I run
Then I download the zarr.prof.gz (gzipped because github doesn't allow just What I have learned from this is that an awful lot of time is being spent on calling I would be very interested to see the same profiling for your object store. Feel free to run the same command and post your results here. This seems like a good target for optimization within Zarr. I'm pinging @martindurant, and expert on all of these things, to see if he can see any low-hanging fruit. |
I've tried switching from So I profiled the call, an here is what I get: The profiling file. I'm not sure how to analyze this, it seems that every steps take longer time. I guess this is probably due to a higher latency of some commands? |
@rabernat @guillaumeeb Thank you. I'm still stuck with saving Zarr file through zarr.storage.FSStore I tried
@guillaumeeb How did you specify |
I used: uri = f"s3://{s3_prefix}/zarr-fsstore"
client_kwargs={'endpoint_url': 'https://object-store.cloud.muni.cz'}
store = zarr.storage.FSStore(uri, client_kwargs=client_kwargs,key=access_key, secret=secret_key) |
Here's a useful breakdown of your profile. A huge amount of time is being spent on
|
I think the problem is, that a directory listing is cached by the s3fs instance, and makes further lookups much faster. But exists() (used by A workaround would be to call
|
Do you mean before calling The thing here is that the store is cleared, either by doing it manually before writing the zarr dataset, or either by Zarr I guess? Anyway, clearing the store manually does give speedup. And on a store cleared, Sorry @martindurant, I'm not sure if I've correctly understood your suggestion. In any case, as we are starting from a free bucket/uri, Zarr should not have to call |
That's a good point! In this case, zarr should know that there are no keys available at all. Actually, by writing to the directory, you invalidate the cache anyway, so my suggestion would only be true for reading, not writing. I wonder whether s3fs should attempt to update update the relevant part of the directory cache when completing a write, rather than invalidating it. I have checked, and AWS doesn't quite give you the same information on finalizing a file as when querying it afterwards. |
@martindurant @rabernat |
No thanks! I am not going to log in your cluster. 🙃 But I am happy to interact here on GitHub around optimizing Zarr and fsspec to improve your use case. |
Thank you @rabernat ! |
Hi @martindurant and @guillaumeeb I tried with FSStore and find (51 s) and with FSStore and with out find (51.3 s) |
@tinaok this is also what I observed above, no speed up with these changes. So we've profiled the calls to try to understand why it takes such time on this object store. |
I think the question is, when zarr wants to write a chunk, why does it first need to check if that key already exists? We mean to overwrite anyway, so it shouldn't need to know. If we were instead updating part of a chunk, we would need to first read the values from the part of the chunk that is not being changed - but not in this case. |
Now we are asking the right questions. 👌 @tinaok & @guillaumeeb - this is how Pangeo has made progress over the past years: carefully identifying performance bottlenecks and working systematically to fix them upstream in the open source libraries. If you have the time and interest, I really encourage you to dig into the zarr source code and see if you can optimize this a bit. It's very satisfying! One behalf of the zarr developers, I can say we will gladly welcome a PR on this topic. I also think there is something weird going on with your object store. Those list operations are going very slow compared to GCS or AWS S3. So perhaps there is a chance to optimize on that side as well. If we can optimize both, I believe we can achieve a 10 - 100x speedup of this workflow. |
Thank you @rabernat @martindurant
CESNET is based on openstack, would it be other native protocols I should use to make it faster may be? (or do we see slow down due to that in the trace file?) |
I went down a bit of a rabbit hole with this and learned some interesting things. To understand what was happening better, I created a metaclass that would help me log everything the import logging
import sys
import os
from functools import wraps, partial
from types import FunctionType, MethodType, MethodWrapperType
from time import perf_counter
from zarr.storage import FSStore
def format_args(args):
if not isinstance(args, tuple):
args = (args,)
return ', '.join((
repr(a)[:32]
if not isinstance(a, bytes)
else f'{len(a)} BYTES'
for a in args
))
def wrapper(method):
@wraps(method)
def _impl(self, *method_args, **method_kwargs):
tic = perf_counter()
method_output = method(self, *method_args, **method_kwargs)
toc = perf_counter()
elapsed = 1000*(toc - tic)
arg_reprs = format_args(method_args)
output_reprs = (' -> ' + format_args(method_output)) if method_output else ''
self.logger.info(f"{method.__name__}({arg_reprs}){output_reprs} - {elapsed:2.5f} ms") # ({method_args}, {method_kwargs})")
return method_output
return _impl
class LoggingMetaclass(type):
def __new__(cls, clsname, bases, attrs, methods_to_log=()):
instance = super().__new__(cls, clsname, bases, attrs)
#instance = type(clsname, bases, attrs)
methods_to_overwrite = (
method_name
for method_name in dir(instance)
if callable(getattr(instance, method_name, None))
and method_name in methods_to_log
)
for method_name in methods_to_overwrite:
setattr(instance, method_name, wrapper(getattr(instance, method_name)))
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
instance.logger = logging.getLogger(clsname)
instance.logger.info("initialized")
return instance
class Foo:
def __init__(self, a):
self.a = a
def get_a(self):
return self.a
def __getitem__(self, key):
if key=="a":
return self.a
else:
raise KeyError
class LoggingFoo(Foo, metaclass=LoggingMetaclass, methods_to_log=['get_a', '__getitem__']):
pass
lf = LoggingFoo(a=1)
lf.get_a()
lf['a']
store_methods = [
'getitems', '__getitem__', 'setitems', '__setitem__',
'delitems', '__delitem__', '__contains__', 'keys'
]
class FinalMeta(type(FSStore), LoggingMetaclass):
pass
class LoggedFSStore(FSStore, metaclass=FinalMeta, methods_to_log=store_methods):
pass With this I could watch the action as follows import xarray as xr
import gcsfs
fs = gcsfs.GCSFileSystem()
SCRATCH_BUCKET = os.environ['SCRATCH_BUCKET']
print(SCRATCH_BUCKET)
try:
# make sure scratch dir is empty
fs.rm(SCRATCH_BUCKET, recursive=True)
except FileNotFoundError:
pass
ds = xr.tutorial.open_dataset("air_temperature.nc").load()
encoding = {var: {"chunks": ds[var].shape} for var in ds.variables}
store = LoggedFSStore(f'{SCRATCH_BUCKET}/air_temperature.zarr')
%time ds.to_zarr(store, encoding=encoding, mode='w') which gave me the following logs
It is RIDICULOUS how much unnecessary listing of the same objects over and over is being done. I am confident that we can pretty easily get a 10x improvement here by refactoring Zarr. |
Out of curiosity, I tried implementing a very naive implementation of storing the data which bypasses most of xarray. This allows us to see whether xarray itself is responsible for these inefficient calls import zarr
from xarray.backends.zarr import encode_zarr_attr_value
def naive_store(ds, store):
group = zarr.open_group(store, mode="w")
group.attrs.update(ds.attrs)
for v in ds.variables:
a = group.create(v, shape=ds[v].shape, chunks=ds[v].shape, dtype=ds[v].dtype)
a[:] = ds[v].values
attrs = {k: encode_zarr_attr_value(val) for k, val in ds[v].attrs.items()}
attrs['_ARRAY_DIMENSIONS'] = ds[v].dims
a.attrs.update(attrs)
zarr.consolidate_metadata(store)
%time naive_store(ds, LoggedFSStore(f'{SCRATCH_BUCKET}/air_temperature_naive.zarr'))
This shows that Xarray is adding a few calls of its own, but mostly the problem exists at the Zarr level. |
What does this all mean. It boils down to this
The difference btw the 656ms of CPU time and the 5.31s of Wall time is time spent blocking on I/O. If we put all of these sequential I/O calls into an async event loop, I'm pretty sure we could eliminate most of that difference. There is also absolutely no reason we need to be checking for We should try turning on the Zarr V3 spec and see if it behaves better. |
Thanks so much @rabernat for all that. Just wanted to say that I hope to do that soon, however I won't be available next week. |
Something like the consolidated store knows about the differences between metadata keys and data chunks, and can cache the small pieces of JSON data. Separately, the remote store (s3fs) could also cache small writes or all writes LRU. I note that .zmetadata is being accessed, but as a separate step (as recommended). Maybe consolidated store already does the right thing? Zarr itself has no caching of state at all, so every time xarray asks for a new array or a write at all, it needs to check what is in the output location. I wonder if this is any different with write-clobber mode as opposed to write-append. |
I am thinking something like --- a/zarr/storage.py
+++ b/zarr/storage.py
@@ -2858,7 +2858,11 @@ class ConsolidatedMetadataStore(Store):
self.store = Store._ensure_store(store)
# retrieve consolidated metadata
- meta = json_loads(self.store[metadata_key])
+ try:
+ meta = json_loads(self.store[metadata_key])
+ except KeyError:
+ # new store for writing
+ meta = {'zarr_consolidated_format': 1}
# check format of consolidated metadata
consolidated_format = meta.get('zarr_consolidated_format', None)
@@ -2885,7 +2889,12 @@ class ConsolidatedMetadataStore(Store):
raise ReadOnlyError()
def __setitem__(self, key, value):
- raise ReadOnlyError()
+ self.meta_store[key] = value
+
+ def commit(self):
+ out = {".zmetadata": self.meta_store}
+ out.update(self.meta_store)
+ del out["zarr_consolidated_format"]
+ self.store.setitems(out)
so that instead of calling consolidate_metadata, you use this store instead (for metadata only). |
That's one option. However, I see it as a bit of a workaround for the fundamental problem that the store has to be created in this very sequential, procedural way. What if Zarr had a fast path to create an entire hierarchy at once, i.e. zarr.create_hierarchy(big_dict, store=store, mode="w") and all the data were set using a single call to I feel like that would be a more elegant solution that would not require consolidated metadata. After all, the `aws s3' CLI test shows that it is possible to get all of this data in S3 (without consolidated metadata) in < 2s. Why shouldn't Zarr be able to do the same? |
Certainly. I don't have an immediate evaluation as to the merits of adding top level functions to zarr versus making more versatile storage layers. They end up doing the same thing here. What does your big_dict contain? setitems is (for now) only making any difference on FSStore compared to serial.
Some of this was also discussed in the async-zarr idea. That was about data access, possibly across multiple arrays (which zarr also can't normally do). Note that fsspec instances also allow for "transactions" to defer finalising upload batches and make them semi-atomic. That wouldn't help with speed, but would with consistency, which any lazy/deferred collection of thing to write would need to worry about. |
(ah, and my diff doesn't actually require consolidated metadata, I am merely hijacking the class, because it was already doing some of this work; any dict could be used as the metadata store for uploading in a batch) |
https://github.com/pangeo-data/pangeo-eosc/blob/main/notebooks/SwiftXarrayTest.ipynb
@sebastian-luna-valero @guillaumeeb
I've tried to write the Zarr file following the example and I confirm it is very slow.
Is there any way to make it faster?
The text was updated successfully, but these errors were encountered: