From 7c564d3c369204ca603243ecf8c91fdc97d88a90 Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Sat, 2 Jan 2021 01:22:01 -0800 Subject: [PATCH] Add VarHandle and ThreadLocal fallback to StripedBuffer (#273) --- .../caffeine/SlotLookupBenchmark.java | 50 ++++++- .../caffeine/cache/StripedBuffer.java | 123 ++++++++++++++---- .../caffeine/cache/StripedBufferTest.java | 27 +++- 3 files changed, 169 insertions(+), 31 deletions(-) diff --git a/caffeine/src/jmh/java/com/github/benmanes/caffeine/SlotLookupBenchmark.java b/caffeine/src/jmh/java/com/github/benmanes/caffeine/SlotLookupBenchmark.java index 5cb046492e..b8519da23b 100644 --- a/caffeine/src/jmh/java/com/github/benmanes/caffeine/SlotLookupBenchmark.java +++ b/caffeine/src/jmh/java/com/github/benmanes/caffeine/SlotLookupBenchmark.java @@ -15,6 +15,8 @@ */ package com.github.benmanes.caffeine; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.Arrays; import java.util.concurrent.ThreadLocalRandom; @@ -50,8 +52,9 @@ */ @State(Scope.Benchmark) public class SlotLookupBenchmark { - static final int ARENA_SIZE = 2 << 6; static final int SPARSE_SIZE = 2 << 14; + static final int ARENA_SIZE = 2 << 6; + static final VarHandle PROBE; ThreadLocal threadLocal; long element; @@ -148,30 +151,63 @@ public int threadHashCode() { } @Benchmark - public long striped64(Blackhole blackhole) { + public long striped64_unsafe(Blackhole blackhole) { // Emulates finding the arena slot by reusing the thread-local random seed (j.u.c.a.Striped64) - int hash = getProbe(); + int hash = getProbe_unsafe(); if (hash == 0) { blackhole.consume(ThreadLocalRandom.current()); // force initialization - hash = getProbe(); + hash = getProbe_unsafe(); } - advanceProbe(hash); + advanceProbe_unsafe(hash); int index = selectSlot(hash); return array[index]; } - private int getProbe() { + private int getProbe_unsafe() { return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), probeOffset); } - private void advanceProbe(int probe) { + private void advanceProbe_unsafe(int probe) { probe ^= probe << 13; // xorshift probe ^= probe >>> 17; probe ^= probe << 5; UnsafeAccess.UNSAFE.putInt(Thread.currentThread(), probeOffset, probe); } + @Benchmark + public long striped64_varHandle(Blackhole blackhole) { + // Emulates finding the arena slot by reusing the thread-local random seed (j.u.c.a.Striped64) + int hash = getProbe_varHandle(); + if (hash == 0) { + blackhole.consume(ThreadLocalRandom.current()); // force initialization + hash = getProbe_varHandle(); + } + advanceProbe_varHandle(hash); + int index = selectSlot(hash); + return array[index]; + } + + private int getProbe_varHandle() { + return (int) PROBE.get(Thread.currentThread()); + } + + private void advanceProbe_varHandle(int probe) { + probe ^= probe << 13; // xorshift + probe ^= probe >>> 17; + probe ^= probe << 5; + PROBE.set(Thread.currentThread(), probe); + } + private static int selectSlot(int i) { return i & (ARENA_SIZE - 1); } + + static { + try { + PROBE = MethodHandles.privateLookupIn(Thread.class, MethodHandles.lookup()) + .findVarHandle(Thread.class, "threadLocalRandomProbe", int.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java index 6b8e213359..78df760734 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java @@ -21,10 +21,15 @@ package com.github.benmanes.caffeine.cache; import static com.github.benmanes.caffeine.cache.Caffeine.ceilingPowerOfTwo; +import static java.util.Objects.requireNonNull; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.Arrays; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Consumer; +import java.util.function.Supplier; import org.checkerframework.checker.nullness.qual.Nullable; @@ -84,8 +89,10 @@ abstract class StripedBuffer implements Buffer { * again; and for short-lived ones, it does not matter. */ - static final long TABLE_BUSY = UnsafeAccess.objectFieldOffset(StripedBuffer.class, "tableBusy"); - static final long PROBE = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomProbe"); + static final VarHandle TABLE_BUSY; + + /** A probe value for the current thread. */ + static final Probe PROBE; /** Number of CPUS. */ static final int NCPU = Runtime.getRuntime().availableProcessors(); @@ -97,33 +104,22 @@ abstract class StripedBuffer implements Buffer { static final int ATTEMPTS = 3; /** Table of buffers. When non-null, size is a power of 2. */ - transient volatile Buffer @Nullable[] table; + volatile Buffer @Nullable[] table; /** Spinlock (locked via CAS) used when resizing and/or creating Buffers. */ - transient volatile int tableBusy; + volatile int tableBusy; /** CASes the tableBusy field from 0 to 1 to acquire lock. */ final boolean casTableBusy() { - return UnsafeAccess.UNSAFE.compareAndSwapInt(this, TABLE_BUSY, 0, 1); - } - - /** - * Returns the probe value for the current thread. Duplicated from ThreadLocalRandom because of - * packaging restrictions. - */ - static final int getProbe() { - return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE); + return TABLE_BUSY.compareAndSet(this, 0, 1); } - /** - * Pseudo-randomly advances and records the given probe value for the given thread. Duplicated - * from ThreadLocalRandom because of packaging restrictions. - */ + /** Pseudo-randomly advances and records the given probe value for the given thread. */ static final int advanceProbe(int probe) { probe ^= probe << 13; // xorshift probe ^= probe >>> 17; probe ^= probe << 5; - UnsafeAccess.UNSAFE.putInt(Thread.currentThread(), PROBE, probe); + PROBE.set(probe); return probe; } @@ -144,7 +140,7 @@ public int offer(E e) { Buffer[] buffers = table; if ((buffers == null) || (mask = buffers.length - 1) < 0 - || (buffer = buffers[getProbe() & mask]) == null + || (buffer = buffers[PROBE.get() & mask]) == null || !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) { expandOrRetry(e, uncontended); } @@ -205,9 +201,9 @@ public int writes() { @SuppressWarnings("PMD.ConfusingTernary") final void expandOrRetry(E e, boolean wasUncontended) { int h; - if ((h = getProbe()) == 0) { - ThreadLocalRandom.current(); // force initialization - h = getProbe(); + if ((h = PROBE.get()) == 0) { + PROBE.initialize(); + h = PROBE.get(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty @@ -275,4 +271,87 @@ final void expandOrRetry(E e, boolean wasUncontended) { } } } + + static { + try { + TABLE_BUSY = MethodHandles.lookup() + .findVarHandle(StripedBuffer.class, "tableBusy", int.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + + Probe probe = null; + List> suppliers = List.of( + UnsafeProbe::new, VarHandleProbe::new, ThreadLocalProbe::new); + for (var supplier : suppliers) { + try { + probe = supplier.get(); + break; + } catch (Throwable ignored) { /* Try next strategy */ } + } + PROBE = requireNonNull(probe, "Unable to determine a probe strategy"); + } + + interface Probe { + int get(); + void set(int value); + void initialize(); + } + + /** Uses the Thread's random probe value, if accessible. */ + static final class UnsafeProbe implements Probe { + static final long PROBE = UnsafeAccess.objectFieldOffset( + Thread.class, "threadLocalRandomProbe"); + + @Override public int get() { + return UnsafeAccess.UNSAFE.getInt(Thread.currentThread(), PROBE); + } + @Override public void set(int probe) { + UnsafeAccess.UNSAFE.putInt(Thread.currentThread(), PROBE, probe); + } + @Override public void initialize() { + ThreadLocalRandom.current(); // force initialization + } + } + + /** Uses the Thread's random probe value, if accessible. */ + static final class VarHandleProbe implements Probe { + static final VarHandle PROBE; + + static { + try { + PROBE = MethodHandles.privateLookupIn(Thread.class, MethodHandles.lookup()) + .findVarHandle(Thread.class, "threadLocalRandomProbe", int.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + + @Override public int get() { + return (int) PROBE.get(Thread.currentThread()); + } + @Override public void set(int probe) { + PROBE.set(Thread.currentThread(), probe); + } + @Override public void initialize() { + ThreadLocalRandom.current(); // force initialization + } + } + + /** Uses a thread local to maintain a random probe value. */ + static final class ThreadLocalProbe implements Probe { + static final ThreadLocal threadHashCode = new ThreadLocal<>(); + + @Override public int get() { + return threadHashCode.get()[0]; + } + @Override public void set(int probe) { + threadHashCode.get()[0] = probe; + } + @Override public void initialize() { + // Avoid zero to allow xorShift rehash + int hash = 1 | ThreadLocalRandom.current().nextInt(); + threadHashCode.set(new int[] { hash }); + } + } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/StripedBufferTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/StripedBufferTest.java index e7a55f016c..e74b379088 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/StripedBufferTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/StripedBufferTest.java @@ -18,6 +18,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import java.util.function.Consumer; @@ -25,6 +26,10 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import com.github.benmanes.caffeine.cache.StripedBuffer.Probe; +import com.github.benmanes.caffeine.cache.StripedBuffer.ThreadLocalProbe; +import com.github.benmanes.caffeine.cache.StripedBuffer.UnsafeProbe; +import com.github.benmanes.caffeine.cache.StripedBuffer.VarHandleProbe; import com.github.benmanes.caffeine.testing.ConcurrentTestHarness; import com.google.common.base.MoreObjects; @@ -42,6 +47,24 @@ public void init(FakeBuffer buffer) { assertThat(buffer.table.length, is(1)); } + @Test(dataProvider = "probes") + public void probe(Probe probe) { + probe.initialize(); + assertThat(probe.get(), is(not(0))); + + probe.set(1); + assertThat(probe.get(), is(1)); + } + + @DataProvider(name = "probes") + public Object[][] providesProbes() { + return new Object[][] { + { new UnsafeProbe() }, + { new VarHandleProbe() }, + { new ThreadLocalProbe() }, + }; + } + @Test(dataProvider = "buffers") @SuppressWarnings("ThreadPriorityCheck") public void produce(FakeBuffer buffer) { @@ -65,8 +88,8 @@ public void drain(FakeBuffer buffer) { assertThat(buffer.drains, is(1)); } - @DataProvider - public Object[][] buffers() { + @DataProvider(name = "buffers") + public Object[][] providesBuffers() { return new Object[][] { { new FakeBuffer(Buffer.FULL) }, { new FakeBuffer(Buffer.FAILED) },