Skip to content

Commit

Permalink
Upgrade Payara to Hazelcast 4.1 (#5014)
Browse files Browse the repository at this point in the history
  • Loading branch information
lprimak authored Dec 8, 2020
1 parent a62d616 commit efb512e
Show file tree
Hide file tree
Showing 37 changed files with 306 additions and 288 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
Copyright (c) 2016 Payara Foundation and/or its affiliates. All rights reserved.
Copyright (c) 2016-2020 Payara Foundation and/or its affiliates. All rights reserved.
The contents of this file are subject to the terms of the Common Development
and Distribution License("CDDL") (collectively, the "License"). You
Expand All @@ -17,8 +17,8 @@ and Distribution License("CDDL") (collectively, the "License"). You
package fish.payara.jbatch.persistence.hazelcast;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.IdGenerator;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.map.IMap;
import com.ibm.jbatch.container.context.impl.StepContextImpl;
import com.ibm.jbatch.container.jobinstance.JobInstanceImpl;
import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
Expand Down Expand Up @@ -56,8 +56,8 @@ public class HazelcastPersistenceService implements IPersistenceManagerService{
private IMap jobInstanceMap;
private IMap checkpointMAP;
private HazelcastInstance theInstance;
private IdGenerator jobInstanceIdGenerator;
private IdGenerator checkpointIdGenerator;
private FlakeIdGenerator jobInstanceIdGenerator;
private FlakeIdGenerator checkpointIdGenerator;

@Override
public int jobOperatorGetJobInstanceCount(String jobName) {
Expand Down Expand Up @@ -274,8 +274,8 @@ public void init(IBatchConfig batchConfig) {
theInstance = HazelcastInstance.class.cast(ctx.lookup(hazelcastJNDIName));
jobInstanceMap = theInstance.getMap(JOB_INSTANCE_MAP);
checkpointMAP = theInstance.getMap(CHECKPOINTMAP);
jobInstanceIdGenerator = theInstance.getIdGenerator(JOB_INSTANCE_MAP+"ID");
checkpointIdGenerator = theInstance.getIdGenerator(CHECKPOINTMAP + "ID");
jobInstanceIdGenerator = theInstance.getFlakeIdGenerator(JOB_INSTANCE_MAP+"ID");
checkpointIdGenerator = theInstance.getFlakeIdGenerator(CHECKPOINTMAP + "ID");
} catch (NamingException ex) {
Logger.getLogger(HazelcastPersistenceService.class.getName()).log(Level.SEVERE, "Unable to find the Hazelcast Instance for JBatch Persistence", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
package com.sun.enterprise.container.common.impl.util;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.cp.IAtomicLong;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.map.IMap;
import com.sun.enterprise.container.common.spi.ClusteredSingletonLookup;
import fish.payara.nucleus.hazelcast.HazelcastCore;

Expand All @@ -63,7 +63,7 @@ public abstract class ClusteredSingletonLookupImplBase implements ClusteredSingl
private final String keyPrefix;
private final String mapKey;
private final AtomicReference<String> sessionHzKey = new AtomicReference<>();
private final AtomicReference<ILock> lock = new AtomicReference<>();
private final AtomicReference<FencedLock> lock = new AtomicReference<>();
private final AtomicReference<IAtomicLong> count = new AtomicReference<>();


Expand All @@ -87,8 +87,9 @@ public final String getSessionHzKey() {
}

@Override
public ILock getDistributedLock() {
return lock.updateAndGet(v -> v != null ? v : getHazelcastInstance().getLock(makeLockKey()));
public FencedLock getDistributedLock() {
return lock.updateAndGet(v -> v != null ? v : getHazelcastInstance().getCPSubsystem()
.getLock(makeLockKey()));
}

@Override
Expand All @@ -97,8 +98,9 @@ public IMap<String, Object> getClusteredSingletonMap() {
}

@Override
public IAtomicLong getClusteredUsageCount() {
return count.updateAndGet(v -> v != null ? v : getHazelcastInstance().getAtomicLong(makeCountKey()));
public IAtomicLong getClusteredUsageCount() {
return count.updateAndGet(v -> v != null ? v : getHazelcastInstance().getCPSubsystem()
.getAtomicLong(makeCountKey()));
}

private HazelcastInstance getHazelcastInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
*/
package com.sun.enterprise.container.common.spi;

import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.cp.IAtomicLong;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.map.IMap;
import fish.payara.nucleus.hazelcast.HazelcastCore;

/**
Expand All @@ -51,7 +51,7 @@
* @author lprimak
*/
public interface ClusteredSingletonLookup {
ILock getDistributedLock();
FencedLock getDistributedLock();
boolean isDistributedLockEnabled();
IMap<String, Object> getClusteredSingletonMap();
String getClusteredSessionKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@

package com.sun.ejb.containers;

import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.IMap;
import com.hazelcast.cp.IAtomicLong;
import com.hazelcast.map.IMap;
import com.sun.ejb.ComponentContext;
import com.sun.ejb.Container;
import com.sun.ejb.EjbInvocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,17 @@

package com.sun.ejb.containers;

import com.hazelcast.cp.lock.FencedLock;
import com.sun.ejb.ComponentContext;
import com.sun.ejb.EjbInvocation;
import com.sun.ejb.InvocationInfo;
import com.sun.ejb.MethodLockInfo;
import static com.sun.ejb.containers.BaseContainer._logger;
import com.sun.enterprise.security.SecurityManager;
import java.lang.reflect.Proxy;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import javax.ejb.ConcurrentAccessException;
import javax.ejb.ConcurrentAccessTimeoutException;
import javax.ejb.IllegalLoopbackException;
Expand Down Expand Up @@ -121,7 +125,21 @@ protected ComponentContext _getContext(EjbInvocation inv) {
? defaultMethodLockInfo : invInfo.methodLockInfo;
Lock theLock;
if(lockInfo.isDistributed()) {
theLock = clusteredLookup.getDistributedLock();
if (_logger.isLoggable(Level.FINE)) {
// log all lock operations
theLock = (Lock) Proxy.newProxyInstance(loader, new Class<?>[]{Lock.class},
(proxy, method, args) -> {
FencedLock fencedLock = clusteredLookup.getDistributedLock();
_logger.log(Level.FINE, "DistributedLock, about to call {0}, Locked: {1}, Locked by Us: {2}, thread ID {3}",
new Object[]{method.getName(), fencedLock.isLocked(), fencedLock.isLockedByCurrentThread(), Thread.currentThread().getId()});
Object rv = method.invoke(fencedLock, args);
_logger.log(Level.FINE, "DistributedLock, after to call {0}, Locked: {1}, Locked by Us: {2}, thread ID {3}",
new Object[]{method.getName(), fencedLock.isLocked(), fencedLock.isLockedByCurrentThread(), Thread.currentThread().getId()});
return rv;
});
} else {
theLock = clusteredLookup.getDistributedLock();
}
}
else {
theLock = lockInfo.isReadLockedMethod() ? readLock : writeLock;
Expand Down Expand Up @@ -171,7 +189,7 @@ protected ComponentContext _getContext(EjbInvocation inv) {

//Now that we have acquired the lock, remember it
inv.setCMCLock(theLock);

//Now that we have the lock return the singletonCtx
return singletonCtx;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2017 Payara Foundation and/or its affiliates. All rights reserved.
* Copyright (c) 2017-2020 Payara Foundation and/or its affiliates. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
Expand Down Expand Up @@ -47,6 +47,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.jvnet.hk2.annotations.Contract;
Expand All @@ -65,15 +66,15 @@ public interface PayaraInstance {

void addCDIListener(CDIEventListener listener);

Map<String, Future<ClusterCommandResult>> executeClusteredASAdmin(String command, String... parameters);
Map<UUID, Future<ClusterCommandResult>> executeClusteredASAdmin(String command, String... parameters);

Map<String, Future<ClusterCommandResult>> executeClusteredASAdmin(Collection<String> memberGUIDs, String command, String... parameters);
Map<UUID, Future<ClusterCommandResult>> executeClusteredASAdmin(Collection<UUID> memberGUIDs, String command, String... parameters);

ClusterCommandResult executeLocalAsAdmin(String command, String... parameters);

Set<InstanceDescriptor> getClusteredPayaras();

InstanceDescriptor getDescriptor(String member);
InstanceDescriptor getDescriptor(UUID member);

String getInstanceName();

Expand All @@ -91,9 +92,9 @@ public interface PayaraInstance {

void removeCDIListener(CDIEventListener listener);

<T extends Serializable> Map<String, Future<T>> runCallable(Collection<String> memberUUIDS, Callable<T> callable);
<T extends Serializable> Map<UUID, Future<T>> runCallable(Collection<UUID> memberUUIDS, Callable<T> callable);

<T extends Serializable> Map<String, Future<T>> runCallable(Callable<T> callable);
<T extends Serializable> Map<UUID, Future<T>> runCallable(Callable<T> callable);

void setInstanceName(String instanceName);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2016-2018 Payara Foundation and/or its affiliates. All rights reserved.
* Copyright (c) 2016-2020 Payara Foundation and/or its affiliates. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
Expand Down Expand Up @@ -44,6 +44,7 @@
import java.net.URL;
import java.util.Collection;
import java.util.List;
import java.util.UUID;

/**
* Class describing an instance of Payara
Expand Down Expand Up @@ -107,7 +108,7 @@ public interface InstanceDescriptor extends Serializable {
/**
* @return the memberUUID
*/
String getMemberUUID();
UUID getMemberUUID();

/**
* Checks whether or not this instance is described as a Lite Hazelcast
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
Copyright (c) 2016 Payara Foundation. All rights reserved.
Copyright (c) 2016-2020 Payara Foundation. All rights reserved.
The contents of this file are subject to the terms of the Common Development
and Distribution License("CDDL") (collectively, the "License"). You
Expand Down Expand Up @@ -33,6 +33,7 @@ and Distribution License("CDDL") (collectively, the "License"). You
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.logging.Level;
Expand Down Expand Up @@ -140,10 +141,10 @@ public Map<InstanceDescriptor, Future<? extends ClusterCommandResult>> run (Stri

// NEEDS TO HANDLE THE CASE FOR LOCAL RUNNING IF NO CLUSTER ENABLED

Map<String,Future<ClusterCommandResult>> commandResult = instanceService.executeClusteredASAdmin(command, args);
Map<UUID,Future<ClusterCommandResult>> commandResult = instanceService.executeClusteredASAdmin(command, args);
Map<InstanceDescriptor, Future<? extends ClusterCommandResult>> result = new HashMap<>(commandResult.size());
for (Entry<String,Future<ClusterCommandResult>> entry : commandResult.entrySet()) {
String uuid = entry.getKey();
for (Entry<UUID,Future<ClusterCommandResult>> entry : commandResult.entrySet()) {
UUID uuid = entry.getKey();
InstanceDescriptor id = instanceService.getDescriptor(uuid);
if (id != null) {
result.put(id, entry.getValue());
Expand All @@ -164,15 +165,15 @@ public Map<InstanceDescriptor, Future<? extends ClusterCommandResult>> run (Stri
@Override
public Map<InstanceDescriptor, Future<? extends ClusterCommandResult>> run (Collection<InstanceDescriptor> members, String command, String... args ) {

HashSet<String> memberUUIDs = new HashSet<>(members.size());
HashSet<UUID> memberUUIDs = new HashSet<>(members.size());
for (InstanceDescriptor member : members) {
memberUUIDs.add(member.getMemberUUID());
}

Map<String,Future<ClusterCommandResult>> commandResult = instanceService.executeClusteredASAdmin(memberUUIDs,command, args);
Map<UUID,Future<ClusterCommandResult>> commandResult = instanceService.executeClusteredASAdmin(memberUUIDs,command, args);
Map<InstanceDescriptor, Future<? extends ClusterCommandResult>> result = new HashMap<>(commandResult.size());
for (Entry<String,Future<ClusterCommandResult>> entry : commandResult.entrySet()) {
String uuid = entry.getKey();
for (Entry<UUID,Future<ClusterCommandResult>> entry : commandResult.entrySet()) {
UUID uuid = entry.getKey();
InstanceDescriptor id = instanceService.getDescriptor(uuid);
if (id != null) {
result.put(id, entry.getValue());
Expand All @@ -198,10 +199,10 @@ public <T extends Serializable> Map<InstanceDescriptor, Future<T>> run (Callable

// NEEDS TO HANDLE THE CASE FOR LOCAL RUNNING IF NO CLUSTER ENABLED

Map<String, Future<T>> runCallable = instanceService.runCallable(callable);
Map<UUID, Future<T>> runCallable = instanceService.runCallable(callable);
Map<InstanceDescriptor, Future<T>> result = new HashMap<>(runCallable.size());
for (Entry<String, Future<T>> entry : runCallable.entrySet()) {
String uuid = entry.getKey();
for (Entry<UUID, Future<T>> entry : runCallable.entrySet()) {
UUID uuid = entry.getKey();
InstanceDescriptor id = instanceService.getDescriptor(uuid);
if (id != null) {
result.put(id, entry.getValue());
Expand All @@ -228,15 +229,15 @@ public <T extends Serializable> Map<InstanceDescriptor, Future<T>> run (Callable
@Deprecated
public <T extends Serializable> Map<InstanceDescriptor, Future<T>> run (Collection<InstanceDescriptor> members, Callable<T> callable) {

HashSet<String> memberUUIDs = new HashSet<>(members.size());
HashSet<UUID> memberUUIDs = new HashSet<>(members.size());
for (InstanceDescriptor member : members) {
memberUUIDs.add(member.getMemberUUID());
}

Map<String, Future<T>> runCallable = instanceService.runCallable(memberUUIDs,callable);
Map<UUID, Future<T>> runCallable = instanceService.runCallable(memberUUIDs,callable);
Map<InstanceDescriptor, Future<T>> result = new HashMap<>(runCallable.size());
for (Entry<String, Future<T>> entry : runCallable.entrySet()) {
String uuid = entry.getKey();
for (Entry<UUID, Future<T>> entry : runCallable.entrySet()) {
UUID uuid = entry.getKey();
InstanceDescriptor id = instanceService.getDescriptor(uuid);
if (id != null) {
result.put(id, entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ private void init() throws BackingStoreException {
if (!clusteredStore.isEnabled()) {
throw new BackingStoreException("Hazelcast is not enabled, please enable Hazelcast");
}
instanceName = clusteredStore.getInstanceId();
instanceName = clusteredStore.getInstanceId().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
import org.jvnet.hk2.config.UnprocessedChangeEvents;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.topic.ITopic;
import com.sun.enterprise.config.serverbeans.Config;
import com.sun.enterprise.config.serverbeans.Domain;
import com.sun.enterprise.config.serverbeans.MonitoringService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2016-2018 Payara Foundation and/or its affiliates. All rights reserved.
* Copyright (c) 2016-2020 Payara Foundation and/or its affiliates. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
Expand Down Expand Up @@ -39,7 +39,6 @@
*/
package fish.payara.persistence.eclipselink.cache.coordination;

import com.hazelcast.core.MessageListener;
import fish.payara.nucleus.eventbus.ClusterMessage;
import fish.payara.nucleus.eventbus.MessageReceiver;
import org.eclipse.persistence.internal.sessions.coordination.broadcast.BroadcastRemoteConnection;
Expand Down
Loading

0 comments on commit efb512e

Please sign in to comment.