Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debugging memory issues in IMERG #227

Open
TomAugspurger opened this issue Nov 8, 2021 · 15 comments
Open

Debugging memory issues in IMERG #227

TomAugspurger opened this issue Nov 8, 2021 · 15 comments

Comments

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Nov 8, 2021

Dropping a few notes here for the day:

@sharkinsspatial and I are seeing some slowness and high memory usage on the client when running the imerg recipe. This has lots of steps in the time dimension.

I've traced something that looks like a memory leak to azure.storage.blob. This script just puts a bunch of 24-bytes objects:

# file: async_mem.py
import os
import time
import asyncio

import azure.storage.blob.aio

# N = 10_000

data = b"\x02\x013\x08\x08\x00\x00\x00\x08\x00\x00\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"  # 0

# Obervation: we don't see a memory leak when just creating the bc; Actually have to put data.

async def put(acc, i):
    async with acc.get_blob_client(f"async/{i}") as bc:
        await bc.upload_blob(data=data, overwrite=True, metadata={"is_directory": "false"})



async def main():
    conn_str = os.environ["AZURE_CONNECTION_STRING"]
    async with azure.storage.blob.aio.ContainerClient.from_connection_string(conn_str, "gpm-imerg-debug") as acc:
        tracemalloc.start()

        coros = (put(acc, i) for i in range(N))

        t0 = time.time()
        await asyncio.gather(*coros)
        t1 = time.time()

        print(f"processed {N} blobs in {t1 - t0:.2f}s")


if __name__ == "__main__":
    asyncio.run(main())

mem

I don't see an easy workaround (I also don't really understand where the memory leak might be, and hopefully won't need to find out).


So making a bunch of small writes isn't great for performance (and memory for Azure, it turns out). Can we avoid the small writes in the first place?

One of the batches of small writes happens at

# now explicity write the sequence coordinate to avoid missing data
# when reopening
if concat_dim in zgroup:
zgroup[concat_dim][:] = 0

That writes 0 for each chunk in the concat dim (time typically). In the case of IMERG, that's ~360,000 writes. @rabernat mentioned this was to avoid an issue with decode_times. For better or worse, the test suite passes with those lines removed...

The second place we interact with a bunch of small objects is when consolidating the dimension coordinates at

for dim in dims:
arr = group[dim]
attrs = dict(arr.attrs)
new = group.array(
dim,
arr[:],
chunks=arr.shape,
dtype=arr.dtype,
compressor=arr.compressor,
fill_value=arr.fill_value,
order=arr.order,
filters=arr.filters,
overwrite=True,
)
new.attrs.update(attrs)
. That has one read per input. @sharkinsspatial can you confirm if you saw memory pressure here too? Or just slowness? We might be able to know that "statically", based just on the recipe definition, at least for time. For imerg I think it's just some (encoded form of) recipe.file_pattern.combine_dims[0].keys, which is the DatetimeIndex we provided to the FilePattern. Then we just have one "big" write at the end, and no small reads.

@TomAugspurger
Copy link
Contributor Author

For imerg I think it's just some (encoded form of) recipe.file_pattern.combine_dims[0].keys, which is the DatetimeIndex we provided to the FilePattern. Then we just have one "big" write at the end, and no small reads.

This optimization only works when nitems_per_file is 1. Otherwise, I'm not sure that the .keys will have the information necessary. I'm also guessing that it's something of a coincidence that the keys in this case are the coordinates for that coordinate. So in short, I'm not sure what to do in general. I'm moving back to the idea that these many small writes / reads may be unavoidable and we should sort out why we're seeing memory usage grow.

@sharkinsspatial to work around the issues you were seeing in finalize_target, can you try setting recipe.consolidate_dimension_coordinates = False, and then before finalize_target add a function call like

import numpy as np
import zarr
from pangeo_forge_recipes.recipes.xarray_zarr import _gather_coordinate_dimensions


def consolidate_coordinate_dimensions(config):
    target_mapper = config.target.get_mapper()
    group = zarr.open(target_mapper, mode="a")
    # https://github.com/pangeo-forge/pangeo-forge-recipes/issues/214
    # intersect the dims from the array metadata with the Zarr group
    # to handle coordinateless dimensions.
    dims = set(_gather_coordinate_dimensions(group)) & set(group)
    combine_dims = {x.name: x for x in config.file_pattern.combine_dims}
    for dim in dims:
        arr = group[dim]
        chunks = arr.shape
        dtype = arr.dtype
        compressor = arr.compressor
        fill_value = arr.fill_value
        order = arr.order
        filters = arr.filters

        # Performance optimization: get the values from combine_dims if possible.
        # This avoids many small reads from the non-consolidated coordinate dimension.
        # https://github.com/pangeo-forge/pangeo-forge-recipes/issues/227
        if dim in combine_dims and combine_dims[dim].nitems_per_file == 1:
            data = np.asarray(combine_dims[dim].keys, dtype=dtype)
        else:
            data = arr[:]  # this triggers reads from the target

        attrs = dict(arr.attrs)
        new = group.array(
            dim,
            data,
            chunks=chunks,
            dtype=dtype,
            compressor=compressor,
            fill_value=fill_value,
            order=order,
            filters=filters,
            overwrite=True,
        )
        new.attrs.update(attrs)

That should run pretty quickly. (make sure to still run finalize_target so that we get the consolidated metadata.

@TomAugspurger
Copy link
Contributor Author

Hmmm I've had a thought: why does the memory usage level off before we're done with all the reads / writes? Running a smaller version of the earlier profile, we see that memory use peaks at 20s, while the job finishes at 30s.

image

What if we don't really have a memory leak? What if we're trying to run all 360,000 write coroutines concurrently? Then we might see memory usage grow as each await moves on to the next coroutine, which allocates a bit more data, and so on. We eventually start completing coroutines, which frees up memory, which levels off the memory usage (since we might still be starting new coroutines, so it might not drop yet. Plus Python doesn't always release memory back to the OS when objects are deallocated).

It's just a guess, but if we limit the maximum concurrency, we might see a different chart. For this run, we have an asyncio.Semaphore to limit us to 100 concurrent writes.

import os
import time
import asyncio

import azure.storage.blob.aio

N = 10_000

data = b"\x02\x013\x08\x08\x00\x00\x00\x08\x00\x00\x00\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"  # 0

# Obervation: we don't see a memory leak when just creating the bc; Actually have to put data.

async def put(acc, i, sem):
    async with sem:
        async with acc.get_blob_client(f"async/{i}") as bc:
            await bc.upload_blob(data=data, overwrite=True, metadata={"is_directory": "false"})



async def main():
    conn_str = os.environ["AZURE_CONNECTION_STRING"]
    async with azure.storage.blob.aio.ContainerClient.from_connection_string(conn_str, "gpm-imerg-debug") as acc:
        sem = asyncio.Semaphore(100)

        coros = (put(acc, i, sem) for i in range(N))

        t0 = time.time()
        await asyncio.gather(*coros)
        t1 = time.time()

        print(f"processed {N} blobs in {t1 - t0:.2f}s")


if __name__ == "__main__":
    asyncio.run(main())

image

Interestingly, we aren't any slower with the limited-concurrency version. I think that's because we were CPU bound anyway (with event loop / Python overhead).

So next steps might be to look into fsspec backends to see if there's a way to limit concurrency, and design a way to set that from the recipe. I'll look into adlfs.

@TomAugspurger
Copy link
Contributor Author

cc @martindurant, in case you see an obvious place where concurrency could be limited (context at #227 (comment), but the tl/dr is it'd be nice to have a way to limit the concurrency of AsyncFileSystem._pipe / _pipe_file when passing a ton of values)

A possible fix is in _pipe_file. For adlfs that's at https://github.com/dask/adlfs/blob/bc6e7498af806fe851b4c0157612033b46c03176/adlfs/spec.py#L1411-L1421. We could perhaps pass in an asyncio.Semaphore there and have it acquire a spot before getting the blob client or doing the write.

But that feels very ad-hoc and too narrow. Why just limit concurrency in pipe_file? So I might briefly look into doing this across adlfs for any operation that acquires a BlobClient.

I don't immediately see a way to do this maximum concurrency thing generically in fsspec, unless it were to define some protocol. So I think for now it'll have to be implemented independently in each of adlfs / s3fs / gcsfs.

@rabernat
Copy link
Contributor

rabernat commented Nov 9, 2021

For better or worse, the test suite passes with those lines removed...

Drat. I know there are some examples that break without it, but I guess not the test suite. I think something from the recipe tutorials. This is a good motivation to perhaps run the tutorials as part of the test suite.

@cisaacstern
Copy link
Member

For better or worse, the test suite passes with those lines removed...

xref #173

TomAugspurger pushed a commit to TomAugspurger/adlfs that referenced this issue Nov 9, 2021
This adds a new keyword to AzureBlobFileSystem to limit the number of
concurrent connectiouns. See pangeo-forge/pangeo-forge-recipes#227 (comment)
for some motivation. In that situation, we had a single FileSystem
instance that was generating many concurrent write requests through
`.pipe`. So many, that we were seeing memory issues from creating all
the BlobClient connections simultaneously.

This adds an asyncio.Semaphore instance to the AzureBlobFilesSytem that
controls the number of concurrent BlobClient connections. The default of
None is backwards-compatible (no limit)
@rabernat
Copy link
Contributor

rabernat commented Nov 9, 2021

I agree that we should pursue an upstream fix. But there are also a few other possibilities

Make time coordinate contiguous from the beginning

What if we explicitly specify that the time coordinate should be contiguous (single chunk) from the beginning. Then we are only doing a single write in prepare_target, and there is nothing to consolidate at the end.

Pangeo Forge should be able to handle this situation using locks. There will be some write contention between processes, which could be managed via our locking mechanism.

This is not possible with the current code but could be implemented fairly easily.

Fix handling of encoding to avoid the big write in prepare_target

This part

# now explicity write the sequence coordinate to avoid missing data
# when reopening
if concat_dim in zgroup:
zgroup[concat_dim][:] = 0

Was added to avoid cf time decording errors related to decoding an uninitialized time dimension that can crop up when opening the target with xarray. However, we we have not been opening the target with xarray in store_chunk since b603c98. Therefore, perhaps those lines can indeed be removed. I'm going to look into this.

@TomAugspurger
Copy link
Contributor Author

Pangeo Forge should be able to handle this situation using locks. There will be some write contention between processes, which could be managed via our locking mechanism.

I've been assuming that the locking would essentially kill parallelism while storing data chunks. Is that not the case? Ah, (maybe) it's OK, because writing a variable like tmax doesn't require getting the lock to update the time coordinate?

@rabernat
Copy link
Contributor

rabernat commented Nov 9, 2021

If the locks are fine-grained enough (e.g. only on the time variable) then it should not be too bad. Async within pangeo-forge would help a lot here too--we could write other variables while waiting on the lock.

@TomAugspurger
Copy link
Contributor Author

Great. I'll look into doing the coordinate consolidation earlier (in expand_target_dim).

@rabernat
Copy link
Contributor

rabernat commented Nov 9, 2021

I'll look into doing the coordinate consolidation earlier (in expand_target_dim).

You'll need to revisit this line:

write_region, conflicts = region_and_conflicts_for_chunk(config, chunk_key)

We currently are assuming that all variables with time (more generally called concat_dim) have the same target chunks (concat_dim_chunks). The target chunk grid is set up here

assert config.concat_dim_chunks is not None
target_grid = ChunkGrid.from_uniform_grid(
{config.concat_dim: (config.concat_dim_chunks, total_len)}
)

For this to work, you may need to add an argument to region_and_conflicts_for_chunk to specify the target chunks (which can now differ by variable).

@martindurant
Copy link
Contributor

async fsspec methods get and put methods allow for a batch_size arg which controls the number of coroutines submitted in one go. The code is simple, and this could be added to all batch methods. The reason it's only in those two, is because the local open files limit was more constraining than what you are facing here. The option should be made visible and thoroughly documented.

@rabernat
Copy link
Contributor

rabernat commented Nov 9, 2021

Perhaps there could also be a global config option to set batch_size?

@martindurant
Copy link
Contributor

Note that you set batch_size for the filesystem instance too, or use conf "gather_batch_size" to set for all async implementations. The default is 128 or the open file limit / 8, if known. But, again, this is only for put/get, and you are probably hitting pipe instead, which is unlimited.

TomAugspurger pushed a commit to TomAugspurger/pangeo-forge that referenced this issue Nov 9, 2021
@sharkinsspatial
Copy link
Contributor

@martindurant Is the expanded use of _run_coros_in_chunks across all gather operations which you mention in ref fsspec/s3fs#537 (comment) still under consideration?

@martindurant
Copy link
Contributor

Exactly that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants