Skip to content

Commit

Permalink
Replace synchronization by Lock in FutureImpl (#2912)
Browse files Browse the repository at this point in the history
I replaced _synchronized_ blocks with Lock for the sake of Virtual Threads support. More details:
https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html#GUID-04C03FFC-066D-4857-85B9-E5A27A875AF9

----
Similar to #2845
Koziolek authored Oct 18, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 7508892 commit 82f26e1
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions vavr/src/main/java/io/vavr/concurrent/FutureImpl.java
Original file line number Diff line number Diff line change
@@ -25,7 +25,9 @@

import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

/**
@@ -45,7 +47,7 @@ final class FutureImpl<T> implements Future<T> {
/**
* Used to synchronize state changes.
*/
private final Object lock = new Object();
private final Lock lock;

/**
* Indicates if this Future is cancelled
@@ -87,8 +89,10 @@ final class FutureImpl<T> implements Future<T> {

// single constructor
private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T>>> actions, Queue<Thread> waiters, Computation<T> computation) {
this.lock = new ReentrantLock();
this.executor = executor;
synchronized (lock) {
lock.lock();
try {
this.cancelled = false;
this.value = value;
this.actions = actions;
@@ -98,6 +102,8 @@ private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T
} catch (Throwable x) {
tryComplete(Try.failure(x));
}
} finally {
lock.unlock();
}
}

@@ -219,8 +225,11 @@ private void _await(long start, long timeout, TimeUnit unit) {
public boolean block() {
try {
if (!threadEnqueued) {
synchronized (lock) {
lock.lock();
try {
waiters = waiters.enqueue(waitingThread);
} finally {
lock.unlock();
}
threadEnqueued = true;
}
@@ -256,14 +265,17 @@ public boolean isReleasable() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!isCompleted()) {
synchronized (lock) {
lock.lock();
try {
if (!isCompleted()) {
if (mayInterruptIfRunning && this.thread != null) {
this.thread.interrupt();
}
this.cancelled = tryComplete(Try.failure(new CancellationException()));
return this.cancelled;
}
} finally {
lock.unlock();
}
}
return false;
@@ -272,7 +284,8 @@ public boolean cancel(boolean mayInterruptIfRunning) {
private void updateThread() {
// cancellation may have been initiated by a different thread before this.thread is set by the worker thread
if (!isCompleted()) {
synchronized (lock) {
lock.lock();
try {
if (!isCompleted()) {
this.thread = Thread.currentThread();
try {
@@ -281,6 +294,8 @@ private void updateThread() {
// we are not allowed to set the uncaught exception handler of the worker thread ¯\_(ツ)_/¯
}
}
} finally {
lock.unlock();
}
}
}
@@ -322,12 +337,15 @@ public Future<T> onComplete(Consumer<? super Try<T>> action) {
if (isCompleted()) {
perform(action);
} else {
synchronized (lock) {
lock.lock();
try {
if (isCompleted()) {
perform(action);
} else {
actions = actions.enqueue((Consumer<Try<T>>) action);
}
} finally {
lock.unlock();
}
}
return this;
@@ -362,7 +380,8 @@ boolean tryComplete(Try<? extends T> value) {
final Queue<Consumer<Try<T>>> actions;
final Queue<Thread> waiters;
// it is essential to make the completed state public *before* performing the actions
synchronized (lock) {
lock.lock();
try {
if (isCompleted()) {
actions = null;
waiters = null;
@@ -374,6 +393,8 @@ boolean tryComplete(Try<? extends T> value) {
this.waiters = null;
this.thread = null;
}
} finally {
lock.unlock();
}
if (waiters != null) {
waiters.forEach(this::unlock);

0 comments on commit 82f26e1

Please sign in to comment.