Skip to content

Commit

Permalink
update kafka to 3.6.1
Browse files Browse the repository at this point in the history
update application runtime synchronization to ReentrantLock

Signed-off-by: neo <[email protected]>
  • Loading branch information
neowu committed Dec 13, 2023
1 parent a6cc47b commit c0ead6b
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 9 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## Change log

### 9.0.3 (12/12/2023 - )

* kafka: updated client to 3.6.1

### 9.0.2 (12/7/2023 - 12/12/2023)

* stats: dump virtual threads on high cpu
Expand All @@ -9,6 +13,7 @@
> use "core.framework.mysql:mysql-connector-j:8.2.0"
> !!! for db-migration, pls continue to use "com.mysql:mysql-connector-j:8.2.0", as our patched version may remove unused features
> refer to https://github.com/neowu/mysql-connector-j
> refer to https://bugs.mysql.com/bug.php?id=110512
### 9.0.1 (12/01/2023 - 12/7/2023)

Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ subprojects {
}

val elasticVersion = "8.11.1"
val kafkaVersion = "3.6.0"
val kafkaVersion = "3.6.1"
val jacksonVersion = "2.15.3"
val junitVersion = "5.10.0"
val mockitoVersion = "5.6.0"
Expand Down
1 change: 0 additions & 1 deletion buildSrc/src/main/check/spotbugs.xml
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@
MDM_RUNTIME_EXIT_OR_HALT,
MDM_RUNFINALIZATION,
MDM_THREAD_PRIORITIES,
MDM_WAIT_WITHOUT_TIMEOUT,
MDM_SIGNAL_NOT_SIGNALALL,
MDM_THREAD_FAIRNESS,
MDM_LOCK_ISLOCKED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import static core.framework.log.Markers.errorCode;

Expand All @@ -23,6 +24,7 @@
*/
public final class ExecutorImpl implements Executor {
private final Logger logger = LoggerFactory.getLogger(ExecutorImpl.class);
private final ReentrantLock lock = new ReentrantLock();
private final ExecutorService executor;
private final LogManager logManager;
private final long maxProcessTimeInNano;
Expand All @@ -36,14 +38,17 @@ public ExecutorImpl(ExecutorService executor, LogManager logManager, long maxPro

public void shutdown() {
logger.info("shutting down executor");
synchronized (this) {
lock.lock();
try {
if (scheduler != null) {
List<Runnable> canceledTasks = scheduler.shutdownNow(); // drop all delayed tasks
if (!canceledTasks.isEmpty()) {
logger.warn(errorCode("TASK_REJECTED"), "delayed tasks are canceled due to server is shutting down, tasks={}", canceledTasks);
}
}
executor.shutdown();
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -74,14 +79,17 @@ public Future<Void> submit(String action, Task task) {

@Override
public void submit(String action, Task task, Duration delay) {
synchronized (this) {
lock.lock();
try {
if (executor.isShutdown()) {
logger.warn(errorCode("TASK_REJECTED"), "reject task due to server is shutting down, action={}", action); // with current executor impl, rejection only happens when shutdown
return;
}
if (scheduler == null) {
scheduler = ThreadPools.singleThreadScheduler("executor-scheduler-");
}
} finally {
lock.unlock();
}
scheduleDelayedTask(action, task, delay);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
*/
public final class ThreadPools {
static {
// currently jdbc query is not fully support virtual thread yet, db operation will block current virtual thread
// increase parallelism to allow more virtual thread unfriendly tasks to run
// refer to https://bugs.mysql.com/bug.php?id=110512
// refer to java.lang.VirtualThread.createDefaultScheduler
if (System.getProperty("jdk.virtualThreadScheduler.parallelism") == null) {
int parallelism = Math.max(Runtime.getRuntime().availableProcessors(), 16);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author neo
*/
public class APIController {
public final IPv4AccessControl accessControl = new IPv4AccessControl();
private final ReentrantLock lock = new ReentrantLock();

public Set<Class<?>> serviceInterfaces = new LinkedHashSet<>();
public Set<Class<?>> beanClasses = new LinkedHashSet<>(); // custom bean classes not referred by service interfaces
Expand All @@ -46,25 +48,35 @@ public Response message(Request request) {
}

APIDefinitionResponse serviceDefinition() {
synchronized (this) {
if (serviceDefinition != null) return serviceDefinition;

lock.lock();
try {
if (serviceDefinition == null) {
var builder = new APIDefinitionBuilder(serviceInterfaces, beanClasses);
serviceDefinition = builder.build();
serviceInterfaces = null; // release memory
beanClasses = null;
}
return serviceDefinition;
} finally {
lock.unlock();
}
}

MessageAPIDefinitionResponse messageDefinition() {
synchronized (this) {
if (messageDefinition != null) return messageDefinition;

lock.lock();
try {
if (messageDefinition == null) {
var builder = new MessageAPIDefinitionBuilder(topics);
messageDefinition = builder.build();
topics = null; // release memory
}
return messageDefinition;
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/**
* @author ericchung, neo
*/
public class AlertService {
private final LRUMap<String, AlertStat> stats = new LRUMap<>(1000);
private final ReentrantLock lock = new ReentrantLock();
private final String kibanaURL;
private final Matchers ignoredErrors;
private final Matchers criticalErrors;
Expand Down Expand Up @@ -57,7 +59,8 @@ Result check(Alert alert) {
return new Result(false, -1);

String key = alertKey(alert);
synchronized (stats) {
lock.lock();
try {
AlertStat stat = stats.get(key);
if (stat == null) {
stats.put(key, new AlertStat(alert.date));
Expand All @@ -71,6 +74,8 @@ Result check(Alert alert) {
stat.alertCountSinceLastSent++;
return new Result(false, -1);
}
} finally {
lock.unlock();
}
}

Expand Down

0 comments on commit c0ead6b

Please sign in to comment.