Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk refresh #7

Closed
ben-manes opened this issue Feb 27, 2015 · 23 comments
Closed

Bulk refresh #7

ben-manes opened this issue Feb 27, 2015 · 23 comments

Comments

@ben-manes
Copy link
Owner

This Guava issue identified an expected optimization not being implemented. A getAll where some of the entries should be refreshed due to refreshAfterWrite schedules each key as an independent asynchronous operation. Due to the provided CacheLoader supporting bulk loads, it is reasonable to expect that the refresh is performed as a single batch operation.

This optimization may be invasive and deals with complex interactions for both the synchronous and asynchronous cache implementations. Or, it could be as simple as using getAllPresent and enhancing it to support batch read post-processing.

@ben-manes
Copy link
Owner Author

The API for this improvement is not very clear. Even if a batch refresh may predominantly delegate to loadAll(keys), the method should try to be symmetric with the reload(key, @Nonnull value). What would the batch version of that be?

A batch refresh using refreshAfterWrite and triggered by getAll would maps directly to reloadAll(Map<K, V> entries). However, LoadingCache.refresh(key) will reload if the entry is present and load if it is not. How should refreshAll(keys) behave when some of the entries are present and some are not? Should it split the operation into two calls, reloadAll(present) and loadAll(missing), or should it delegate the decision to the user, e.g. reloadAll(Map<K, V> entries, Iterable<K> remainingKeys)? The latter seems wrong from an API consistency perspective, but correct from a performance one.

I'm favoring the reloadAll(Map) approach because it is more consistent and we can optimize the majority of the use-cases. Here's my rational.

  • A partially present batch is not the common case (explicit refresh). Most usages will be triggered by using refreshAfterWrite and a batch getAll.
  • Most users do not use the old values when computing the new one, relying on the default implementation (delegate to load).
  • Multiple operation are only required if the user overrides the default implementation (delegate to loadAll). This can be detected using reflection (at cache construction), allowing us to perform a single call instead of two since that will have the same user visible behavior.
  • A refresh is performed asynchronously, so 2 is a better worse case than N.
  • Like getAll, we cannot ensure that parallel batch operations do not overlap and may load the same entries. In the common case of refreshAfterWrite this can be optimized to not occur, though.

On reflection, I like CacheLoader.reloadAll(Map) as providing the best power-to-weight ratio.

@abatkin @cjohnson2

@abatkin
Copy link

abatkin commented Jan 21, 2016

To clarify: Would the solution reloadAll(Map) imply that refreshAll(keys) (where some are present and some are not) would end up with calls to both reloadAll(Map) and loadAll(keys) except where both are using the default implementation?

Or are you saying that the default implementation (in the interface) of reloadAll() will actually call loadAll() (even though the user may have provided their own implementation of loadAll()) and in that case you optimize to call loadAll() once? This sounds pretty slick and I'm imagining that it would be better than the extra Iterable - not only is it cleaner, but most people would probably end up pulling out all of the keys from the Map in reloadAll(Map,Iterable) and then adding all of the remainingKeys anyway, which is a ton of extra/unnecessary work.

@ben-manes
Copy link
Owner Author

The default interface implementation of reloadAll(Map) would be to throw an UnsupportedOperationException. That would make it symmetric with loadAll where the "exception" is to indicate to fallback to the iterative loop. But similarly we don't actually have to throw and catch an exception, since by reflection we know ahead of time.

I think its best to layout each scenario separately based on which methods are implemented.

load loadAll reload reloadAll description
X ? N calls (L + R)
X X 1 call directly to loadAll
X X X 1 call to loadAll, R calls to reload
X X ? X R = 1: 1 call to loadAll, 1 call to reload
X X ? X R > 1: 1 call to loadAll, 1 call to reloadAll
X ? X R = 1: L calls to load, 1 call to reload
X ? X R > 1: L calls to load, 1 call to reloadAll

The assumption is that most users only ever implement load and maybe loadAll. Very few will ever want to do anything smart with the old value. Those who want to batch reloadAll should trust us to do it for them and not need to delegate it themselves.

Does that sound right?

@abatkin
Copy link

abatkin commented Jan 21, 2016

The table looks entirely reasonable.

The only thing of note is that existing code that has implemented loadAll() could suddenly experience a change in behavior where loadAll() is now called to reload items (instead of n calls to reload(). I'm guessing that in most cases this is fine (and may even be a pleasant surprise, since they wouldn't have implemented loadAll() if they didn't care).

@ben-manes
Copy link
Owner Author

Only if reload was not implemented, which would have delegated to n calls to load. So they should be pleased because we make a single call to loadAll, which is what @cjohnson2 had expected in his Guava ticket.

@Dirk-c-Walter
Copy link
Contributor

I have run into this issue too. Since the overhead for loading a single item is huge I just use an Async Loader and queue the reload requests until I have enough to run a batch. Not the cleanest solution but a workable one.

@dalegaspi
Copy link

@ben-manes i'm not sure if i'm understanding this feature or if there's a real issue. I'm using the loadAll() feature, and i'm using the scaffeine wrapper (which i'm sure you're familar with) which doesn't muck around with the loadAll feature (as shown here)

here's my simple code to demostrate this:

val cache = Scaffeine()
     .refreshAfterWrite(100 millis)
     .build[String, String](loader = (key: String) => {
       println(s"key: ${key}")
       key
     }, allLoader = Some((keys: Iterable[String]) => {
       println(s"keys: ${keys.mkString(",")}")
       keys.map(k => k -> k).toMap
     }))

 val values = cache.getAll(Seq("1", "2", "3")).toList

 Thread.sleep(200)

 cache.getAll(Seq("1", "2", "3")).toList    

normally since i have expireAfterWrite() and not explicitly calling refresh, i would expect the loadAll() to be called twice, but that's not the case...instead, the load() 3 times resulting in:

keys: 1,2,3
key: 1
key: 2
key: 3

replacing it with expireAfterWrite() behaves as expected, though...but with the obvious penalty of blocking.

thoughts?

@ben-manes
Copy link
Owner Author

Currently refreshing is performed on individual keys and calls CacheLoader.reload(key, oldValue). Ideally it would take advantage of your bulk loader, but neither Caffeine nor Guava do this. We would need to add reloadAll to delegate to and the intelligent handling to support this scenario.

@dalegaspi
Copy link

Ah...thanks for the quick answer @ben-manes, i shouldn't have assumed that this is implemented since the ticket is still open 😊. I guess for now i'm gonna see what i can do with what we have. @Dirk-c-Walter's solution is interesting but it wouldn't work for me.

ben-manes added a commit that referenced this issue Feb 21, 2021
This interface method provides a placeholder for future support of bulk
reloading (see #7). That is not implemented. Therefore this method
merely calls refresh(key) for each key and composes the result. If and
when bulk reloading is supported then this method may be optimized.
ben-manes added a commit that referenced this issue Feb 21, 2021
This interface method provides a placeholder for future support of bulk
reloading (see #7). That is not implemented. Therefore this method
merely calls refresh(key) for each key and composes the result. If and
when bulk reloading is supported then this method may be optimized.
ben-manes added a commit that referenced this issue Feb 21, 2021
This interface method provides a placeholder for future support of bulk
reloading (see #7). That is not implemented. Therefore this method
merely calls refresh(key) for each key and composes the result. If and
when bulk reloading is supported then this method may be optimized.
ben-manes added a commit that referenced this issue Feb 21, 2021
This interface method provides a placeholder for future support of bulk
reloading (see #7). That is not implemented. Therefore this method
merely calls refresh(key) for each key and composes the result. If and
when bulk reloading is supported then this method may be optimized.
@ben-manes ben-manes changed the title refreshAfterWrite does not use loadAll Bulk refresh support Feb 21, 2021
@ben-manes ben-manes changed the title Bulk refresh support Bulk refresh Feb 21, 2021
@slovdahl
Copy link

Just posting this here for reference. The issue I described in #323 (comment) and #323 (comment) would most likely not have occurred at all with bulk refresh, because we implement loadAll for most of the caches and support bulk load all the way down to Redis. So in that sense this would be a very nice addition 👍

@cruftex
Copy link
Contributor

cruftex commented Nov 1, 2021

There is a CoalescingBulkLoader in the example section that joins multiple load requests into a bulk request by issuing a delay. When reviewing, I think I spotted at least one concurrency issue, so I would not recommend using it without prior vetting.

In cache2k I recently added bulk support and also provided a CoalescingBulkLoader. See the issue comment on how to configure it. I did some heavy concurrent testing on it, so I am quite confident it is production ready. The coalescing can happen for every load, which would always introduce a delay and therefor additional latency on user requests, or it can work within the refresh path only, so initial user requests are not delayed. The solution in Caffeine could be similar.

I was considering a bulk support in the cache core, which would mean that timer events would be managed for a bunch of keys, however, timing is a cross cutting concern through the whole cache implementation, OTOH "bulk" operations are very useful and widespread but not a common mode of operation. Adding bulk support in the timing code of the core would add a lot complexity. Also the majority of the cache API is still on individual keys, so entries initially loaded in a bulk request, might not be refreshed together. The coalescing approach allows to add it as an extension without altering the core cache.

At the moment the implementation uses a separate thread scheduler, which is not optimal as a general solution. It should be enhanced and use a timer from the the cache infrastructure without needing an additional thread.

Since coalescing introduces a tiny delay to bundle the requests, a more efficient solution could adjust the timer events by "rounding up" and the timer code could be extended to process bulks of events that happen at the same time.

@dalegaspi
Copy link

@cruftex wow i did not even think this exists...but would you mind pointing out the concurrency issues (at least a one or two of them)?

@cruftex
Copy link
Contributor

cruftex commented Nov 1, 2021

...but would you mind pointing out the concurrency issues (at least a one or two of them)?

@dalegaspi here you are:

asyncLoad is doing:

....
        if (size.incrementAndGet() >= maxLoadSize) {
            doLoad();
        } else if (schedule == null || schedule.isDone()) {
            startWaiting();
        }
....

startWaitng() is doing:

    synchronized private void startWaiting() {
        schedule = timer.schedule(this::doLoad, maxDelay, MILLISECONDS);
    }

This has the chance that multiple timers are scheduled. Probably that causes no serious harm, but smaller bulk batches when there is contention. However, the overall design implies that there is only one timer scheduled, which is not guaranteed.

OTOH it will also start multiple doLoad once the threshold is reached.

Looking at the fields, for example:

 private volatile Queue<WaitingKey> waitingKeys = new ConcurrentLinkedQueue<>();

volatile seems odd, the field can be final.

Looking at doLoad it polls all queued elements and issues a bulk load, not respecting the max parameter.

That said I was happy for the idea, and I think the code is basically working. However, it needs a proper review. I am very suspicious when concurrent code is not using final and volatile in a sensible way.

@dalegaspi
Copy link

dalegaspi commented Nov 1, 2021

@cruftex thanks for pointing those out...now, I'm not going to pretend i fully understand this code...i'm sure @sheepdreamofandroids and/or @ben-manes can explain better since he @sheepdreamofandroids wrote it...but i think this code:

synchronized private void startWaiting() {
  schedule = timer.schedule(this::doLoad, maxDelay, MILLISECONDS);
}

will just replace the previous schedule in the case of a multiple access to the method (whoever is the last thread to access the method), which I believe should not cause any harm at all...just essentially resets the timer which i think should be ok.

as for the use of volatile...it's probably for safety to ensure that the queue is never stored in CPU cache...but to your point it's probably unnecessary and could be final.

as for the last point, i thought it did honor the max parameter in line 167 but maybe you are referring to an entirely different thing that I don't quite see.

kindly take these comments for what it's worth. i'm not starting a debate. i'm just a village idiot. 😊

Yes, I agree, this is a great idea and should be sufficient for most use cases. I will actually try this when i get the chance.

ninja edit: @Stephan202 pointed out that @sheepdreamofandroids wrote the class.

@Stephan202
Copy link
Collaborator

Stephan202 commented Nov 1, 2021

I'm sure @ben-manes can explain better since he wrote it

I'm sure he can explain it (:smile:), but he's not the author; @sheepdreamofandroids is. See #336.

@dalegaspi
Copy link

@Stephan202 thanks wow that was an oversight. i updated my previous comment.

@sheepdreamofandroids
Copy link
Contributor

Wow, this is a while ago :-)

Indeed waitingKeys should be final, not volatile, that's just better.

I'm pretty sure that the schedule.cancel() on line 164 should instead be before line 160 to ensure that only one doLoad() is scheduled at a time, even though extra doLoad() invocations will simply not find anything to load and do no harm. I'm not sure what I was smoking when that happened.

doLoad() does respect the maxLoadSize though. If there are more keys to load, the bulkloader will be called more than once from doLoad() .

@cruftex
Copy link
Contributor

cruftex commented Nov 1, 2021

@sheepdreamofandroids:

doLoad() does respect the maxLoadSize though. If there are more keys to load, the bulkloader will be called more than once from doLoad() .

Yes it does! Sorry. I missed the counting down logic.

This was referenced Nov 1, 2021
ben-manes pushed a commit that referenced this issue Nov 2, 2021
made more fields final
applied google formatter
gave the tests more slack to also succeed consistently on Windows 10
@VegetaPn
Copy link

Hello. Is bulk refresh supported now?

@ben-manes
Copy link
Owner Author

Not yet, it's been in the backlog with a contributed alternatives in the examples section.

@Hailprob
Copy link

Hailprob commented Mar 14, 2022

I'm sorry to ask question here. I override the loadAll(keys) method, which leads to getAll(keys) to query ES. My question is, given bulkLoad(keysToLoad, result, mappingFunction) in LocalManualCache.getAll() in high concurrency situation, if there is a way to ensure one key only query one time like the way in refresh.

@ben-manes
Copy link
Owner Author

@Hailprob yes, if you use an AsyncCache (see faq for details).

@ben-manes
Copy link
Owner Author

ben-manes commented Jul 17, 2023

This issue will be closed as won't do after the documentation is updated to guide users.

The original ask was an observation that since getAll will perform the initial load using loadAll, it should likewise try to batch the reloads into a single request. The initial loads make sense to batch explicitly since the caller is waiting for the results. The reloads are optimistic, asynchronous, and hidden from the caller. If implemented as originally asked for then it would only reduce some of the parallel reloads and offer modest batching.

For bulk refreshes the coalescing examples show a better approach because they can capture reloads triggered by independent cache operations, buffer by a max delay or key count, and throttle the total number reloads in-flight. This can all be done by a more intelligent CacheLoader and no changes to the library are needed.

@sheepdreamofandroids' example is nice and straightforward. It can be further simplified by leveraging a reactive library, such as by using RxJava's buffer(timespan, timeUnit, count) or Reactor's bufferTimeout(maxSize, maxTime). It is then only ~30 LOC to implement and customize to fit the problem at hand.

Here is a short example using Reactor demonstrating this,

CoalescingBulkLoader
public final class CoalescingBulkLoader<K, V> implements AsyncCacheLoader<K, V> {
  private final Function<Set<K>, Map<K, V>> mappingFunction;
  private final Sinks.Many<Request<K, V>> sink;

  public CoalescingBulkLoader(int maxSize, Duration maxTime,
      int parallelism, Function<Set<K>, Map<K, V>> mappingFunction) {
    this.mappingFunction = requireNonNull(mappingFunction);
    sink = Sinks.many().unicast().onBackpressureBuffer();
    sink.asFlux()
        .bufferTimeout(maxSize, maxTime)
        .parallel(parallelism)
        .runOn(Schedulers.boundedElastic())
        .subscribe(this::handle);
  }

  @Override public CompletableFuture<V> asyncLoad(K key, Executor executor) {
    var result = new CompletableFuture<V>();
    sink.tryEmitNext(new Request<>(key, result)).orThrow();
    return result;
  }

  private void handle(List<Request<K, V>> requests) {
    try {
      var results = mappingFunction.apply(requests.stream().map(Request::key).collect(toSet()));
      requests.forEach(request -> request.result.complete(results.get(request.key())));
    } catch (Throwable t) {
      requests.forEach(request -> request.result.completeExceptionally(t));
    }
  }

  private record Request<K, V>(K key, CompletableFuture<V> result) {}
}
Sample test
@Test
public void coalesce() {
  AsyncLoadingCache<Integer, Integer> cache = Caffeine.newBuilder()
      .buildAsync(new CoalescingBulkLoader<>(
          /* maxSize */ 5, /* maxTime */ Duration.ofMillis(50), /* parallelism */ 5,
          keys -> keys.stream().collect(toMap(key -> key, key -> -key))));

  var results = new HashMap<Integer, CompletableFuture<Integer>>();
  for (int i = 0; i < 82; i++) {
    results.put(i, cache.get(i));
  }
  for (var entry : results.entrySet()) {
    assertThat(entry.getValue().join()).isEqualTo(-entry.getKey());
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants