Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FISH-791 Update Hazelcast to 4.1, no tenant control #5014

Merged
merged 1 commit into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.

Copyright (c) 2016 Payara Foundation and/or its affiliates. All rights reserved.
Copyright (c) 2016-2020 Payara Foundation and/or its affiliates. All rights reserved.

The contents of this file are subject to the terms of the Common Development
and Distribution License("CDDL") (collectively, the "License"). You
Expand All @@ -17,8 +17,8 @@ and Distribution License("CDDL") (collectively, the "License"). You
package fish.payara.jbatch.persistence.hazelcast;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.IdGenerator;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.map.IMap;
import com.ibm.jbatch.container.context.impl.StepContextImpl;
import com.ibm.jbatch.container.jobinstance.JobInstanceImpl;
import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
Expand Down Expand Up @@ -56,8 +56,8 @@ public class HazelcastPersistenceService implements IPersistenceManagerService{
private IMap jobInstanceMap;
private IMap checkpointMAP;
private HazelcastInstance theInstance;
private IdGenerator jobInstanceIdGenerator;
private IdGenerator checkpointIdGenerator;
private FlakeIdGenerator jobInstanceIdGenerator;
private FlakeIdGenerator checkpointIdGenerator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, could you kill all these comments?
//To change body of generated methods, choose Tools | Templates.


@Override
public int jobOperatorGetJobInstanceCount(String jobName) {
Expand Down Expand Up @@ -274,8 +274,8 @@ public void init(IBatchConfig batchConfig) {
theInstance = HazelcastInstance.class.cast(ctx.lookup(hazelcastJNDIName));
jobInstanceMap = theInstance.getMap(JOB_INSTANCE_MAP);
checkpointMAP = theInstance.getMap(CHECKPOINTMAP);
jobInstanceIdGenerator = theInstance.getIdGenerator(JOB_INSTANCE_MAP+"ID");
checkpointIdGenerator = theInstance.getIdGenerator(CHECKPOINTMAP + "ID");
jobInstanceIdGenerator = theInstance.getFlakeIdGenerator(JOB_INSTANCE_MAP+"ID");
dmatej marked this conversation as resolved.
Show resolved Hide resolved
checkpointIdGenerator = theInstance.getFlakeIdGenerator(CHECKPOINTMAP + "ID");
} catch (NamingException ex) {
Logger.getLogger(HazelcastPersistenceService.class.getName()).log(Level.SEVERE, "Unable to find the Hazelcast Instance for JBatch Persistence", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
package com.sun.enterprise.container.common.impl.util;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.cp.IAtomicLong;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.map.IMap;
import com.sun.enterprise.container.common.spi.ClusteredSingletonLookup;
import fish.payara.nucleus.hazelcast.HazelcastCore;

Expand All @@ -63,7 +63,7 @@ public abstract class ClusteredSingletonLookupImplBase implements ClusteredSingl
private final String keyPrefix;
private final String mapKey;
private final AtomicReference<String> sessionHzKey = new AtomicReference<>();
private final AtomicReference<ILock> lock = new AtomicReference<>();
private final AtomicReference<FencedLock> lock = new AtomicReference<>();
lprimak marked this conversation as resolved.
Show resolved Hide resolved
private final AtomicReference<IAtomicLong> count = new AtomicReference<>();


Expand All @@ -87,8 +87,9 @@ public final String getSessionHzKey() {
}

@Override
public ILock getDistributedLock() {
return lock.updateAndGet(v -> v != null ? v : getHazelcastInstance().getLock(makeLockKey()));
public FencedLock getDistributedLock() {
return lock.updateAndGet(v -> v != null ? v : getHazelcastInstance().getCPSubsystem()
.getLock(makeLockKey()));
}

@Override
Expand All @@ -97,8 +98,9 @@ public IMap<String, Object> getClusteredSingletonMap() {
}

@Override
public IAtomicLong getClusteredUsageCount() {
return count.updateAndGet(v -> v != null ? v : getHazelcastInstance().getAtomicLong(makeCountKey()));
public IAtomicLong getClusteredUsageCount() {
return count.updateAndGet(v -> v != null ? v : getHazelcastInstance().getCPSubsystem()
.getAtomicLong(makeCountKey()));
}

private HazelcastInstance getHazelcastInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
*/
package com.sun.enterprise.container.common.spi;

import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.cp.IAtomicLong;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.map.IMap;
import fish.payara.nucleus.hazelcast.HazelcastCore;

/**
Expand All @@ -51,7 +51,7 @@
* @author lprimak
*/
public interface ClusteredSingletonLookup {
ILock getDistributedLock();
FencedLock getDistributedLock();
boolean isDistributedLockEnabled();
IMap<String, Object> getClusteredSingletonMap();
String getClusteredSessionKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@

package com.sun.ejb.containers;

import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.IMap;
import com.hazelcast.cp.IAtomicLong;
import com.hazelcast.map.IMap;
import com.sun.ejb.ComponentContext;
import com.sun.ejb.Container;
import com.sun.ejb.EjbInvocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,17 @@

package com.sun.ejb.containers;

import com.hazelcast.cp.lock.FencedLock;
import com.sun.ejb.ComponentContext;
import com.sun.ejb.EjbInvocation;
import com.sun.ejb.InvocationInfo;
import com.sun.ejb.MethodLockInfo;
import static com.sun.ejb.containers.BaseContainer._logger;
import com.sun.enterprise.security.SecurityManager;
import java.lang.reflect.Proxy;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import javax.ejb.ConcurrentAccessException;
import javax.ejb.ConcurrentAccessTimeoutException;
import javax.ejb.IllegalLoopbackException;
Expand Down Expand Up @@ -121,7 +125,21 @@ protected ComponentContext _getContext(EjbInvocation inv) {
? defaultMethodLockInfo : invInfo.methodLockInfo;
Lock theLock;
if(lockInfo.isDistributed()) {
theLock = clusteredLookup.getDistributedLock();
if (_logger.isLoggable(Level.FINE)) {
// log all lock operations
theLock = (Lock) Proxy.newProxyInstance(loader, new Class<?>[]{Lock.class},
(proxy, method, args) -> {
FencedLock fencedLock = clusteredLookup.getDistributedLock();
_logger.log(Level.FINE, "DistributedLock, about to call {0}, Locked: {1}, Locked by Us: {2}, thread ID {3}",
new Object[]{method.getName(), fencedLock.isLocked(), fencedLock.isLockedByCurrentThread(), Thread.currentThread().getId()});
Object rv = method.invoke(fencedLock, args);
_logger.log(Level.FINE, "DistributedLock, after to call {0}, Locked: {1}, Locked by Us: {2}, thread ID {3}",
new Object[]{method.getName(), fencedLock.isLocked(), fencedLock.isLockedByCurrentThread(), Thread.currentThread().getId()});
return rv;
});
} else {
theLock = clusteredLookup.getDistributedLock();
}
}
else {
theLock = lockInfo.isReadLockedMethod() ? readLock : writeLock;
Expand Down Expand Up @@ -171,7 +189,7 @@ protected ComponentContext _getContext(EjbInvocation inv) {

//Now that we have acquired the lock, remember it
inv.setCMCLock(theLock);

//Now that we have the lock return the singletonCtx
return singletonCtx;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2017 Payara Foundation and/or its affiliates. All rights reserved.
* Copyright (c) 2017-2020 Payara Foundation and/or its affiliates. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
Expand Down Expand Up @@ -47,6 +47,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.jvnet.hk2.annotations.Contract;
Expand All @@ -65,15 +66,15 @@ public interface PayaraInstance {

void addCDIListener(CDIEventListener listener);

Map<String, Future<ClusterCommandResult>> executeClusteredASAdmin(String command, String... parameters);
Map<UUID, Future<ClusterCommandResult>> executeClusteredASAdmin(String command, String... parameters);

Map<String, Future<ClusterCommandResult>> executeClusteredASAdmin(Collection<String> memberGUIDs, String command, String... parameters);
Map<UUID, Future<ClusterCommandResult>> executeClusteredASAdmin(Collection<UUID> memberGUIDs, String command, String... parameters);

ClusterCommandResult executeLocalAsAdmin(String command, String... parameters);

Set<InstanceDescriptor> getClusteredPayaras();

InstanceDescriptor getDescriptor(String member);
InstanceDescriptor getDescriptor(UUID member);

String getInstanceName();

Expand All @@ -91,9 +92,9 @@ public interface PayaraInstance {

void removeCDIListener(CDIEventListener listener);

<T extends Serializable> Map<String, Future<T>> runCallable(Collection<String> memberUUIDS, Callable<T> callable);
<T extends Serializable> Map<UUID, Future<T>> runCallable(Collection<UUID> memberUUIDS, Callable<T> callable);

<T extends Serializable> Map<String, Future<T>> runCallable(Callable<T> callable);
<T extends Serializable> Map<UUID, Future<T>> runCallable(Callable<T> callable);

void setInstanceName(String instanceName);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2016-2018 Payara Foundation and/or its affiliates. All rights reserved.
* Copyright (c) 2016-2020 Payara Foundation and/or its affiliates. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
Expand Down Expand Up @@ -44,6 +44,7 @@
import java.net.URL;
import java.util.Collection;
import java.util.List;
import java.util.UUID;

/**
* Class describing an instance of Payara
Expand Down Expand Up @@ -107,7 +108,7 @@ public interface InstanceDescriptor extends Serializable {
/**
* @return the memberUUID
*/
String getMemberUUID();
UUID getMemberUUID();

/**
* Checks whether or not this instance is described as a Lite Hazelcast
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.

Copyright (c) 2016 Payara Foundation. All rights reserved.
Copyright (c) 2016-2020 Payara Foundation. All rights reserved.

The contents of this file are subject to the terms of the Common Development
and Distribution License("CDDL") (collectively, the "License"). You
Expand Down Expand Up @@ -33,6 +33,7 @@ and Distribution License("CDDL") (collectively, the "License"). You
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.logging.Level;
Expand Down Expand Up @@ -140,10 +141,10 @@ public Map<InstanceDescriptor, Future<? extends ClusterCommandResult>> run (Stri

// NEEDS TO HANDLE THE CASE FOR LOCAL RUNNING IF NO CLUSTER ENABLED

Map<String,Future<ClusterCommandResult>> commandResult = instanceService.executeClusteredASAdmin(command, args);
Map<UUID,Future<ClusterCommandResult>> commandResult = instanceService.executeClusteredASAdmin(command, args);
Map<InstanceDescriptor, Future<? extends ClusterCommandResult>> result = new HashMap<>(commandResult.size());
for (Entry<String,Future<ClusterCommandResult>> entry : commandResult.entrySet()) {
String uuid = entry.getKey();
for (Entry<UUID,Future<ClusterCommandResult>> entry : commandResult.entrySet()) {
UUID uuid = entry.getKey();
InstanceDescriptor id = instanceService.getDescriptor(uuid);
if (id != null) {
result.put(id, entry.getValue());
Expand All @@ -164,15 +165,15 @@ public Map<InstanceDescriptor, Future<? extends ClusterCommandResult>> run (Stri
@Override
public Map<InstanceDescriptor, Future<? extends ClusterCommandResult>> run (Collection<InstanceDescriptor> members, String command, String... args ) {

HashSet<String> memberUUIDs = new HashSet<>(members.size());
HashSet<UUID> memberUUIDs = new HashSet<>(members.size());
for (InstanceDescriptor member : members) {
memberUUIDs.add(member.getMemberUUID());
}

Map<String,Future<ClusterCommandResult>> commandResult = instanceService.executeClusteredASAdmin(memberUUIDs,command, args);
Map<UUID,Future<ClusterCommandResult>> commandResult = instanceService.executeClusteredASAdmin(memberUUIDs,command, args);
Map<InstanceDescriptor, Future<? extends ClusterCommandResult>> result = new HashMap<>(commandResult.size());
for (Entry<String,Future<ClusterCommandResult>> entry : commandResult.entrySet()) {
String uuid = entry.getKey();
for (Entry<UUID,Future<ClusterCommandResult>> entry : commandResult.entrySet()) {
UUID uuid = entry.getKey();
InstanceDescriptor id = instanceService.getDescriptor(uuid);
if (id != null) {
result.put(id, entry.getValue());
Expand All @@ -198,10 +199,10 @@ public <T extends Serializable> Map<InstanceDescriptor, Future<T>> run (Callable

// NEEDS TO HANDLE THE CASE FOR LOCAL RUNNING IF NO CLUSTER ENABLED

Map<String, Future<T>> runCallable = instanceService.runCallable(callable);
Map<UUID, Future<T>> runCallable = instanceService.runCallable(callable);
Map<InstanceDescriptor, Future<T>> result = new HashMap<>(runCallable.size());
for (Entry<String, Future<T>> entry : runCallable.entrySet()) {
String uuid = entry.getKey();
for (Entry<UUID, Future<T>> entry : runCallable.entrySet()) {
UUID uuid = entry.getKey();
InstanceDescriptor id = instanceService.getDescriptor(uuid);
if (id != null) {
result.put(id, entry.getValue());
Expand All @@ -228,15 +229,15 @@ public <T extends Serializable> Map<InstanceDescriptor, Future<T>> run (Callable
@Deprecated
public <T extends Serializable> Map<InstanceDescriptor, Future<T>> run (Collection<InstanceDescriptor> members, Callable<T> callable) {

HashSet<String> memberUUIDs = new HashSet<>(members.size());
HashSet<UUID> memberUUIDs = new HashSet<>(members.size());
for (InstanceDescriptor member : members) {
memberUUIDs.add(member.getMemberUUID());
}

Map<String, Future<T>> runCallable = instanceService.runCallable(memberUUIDs,callable);
Map<UUID, Future<T>> runCallable = instanceService.runCallable(memberUUIDs,callable);
Map<InstanceDescriptor, Future<T>> result = new HashMap<>(runCallable.size());
for (Entry<String, Future<T>> entry : runCallable.entrySet()) {
String uuid = entry.getKey();
for (Entry<UUID, Future<T>> entry : runCallable.entrySet()) {
UUID uuid = entry.getKey();
InstanceDescriptor id = instanceService.getDescriptor(uuid);
if (id != null) {
result.put(id, entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ private void init() throws BackingStoreException {
if (!clusteredStore.isEnabled()) {
throw new BackingStoreException("Hazelcast is not enabled, please enable Hazelcast");
}
instanceName = clusteredStore.getInstanceId();
instanceName = clusteredStore.getInstanceId().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
import org.jvnet.hk2.config.UnprocessedChangeEvents;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.topic.ITopic;
import com.sun.enterprise.config.serverbeans.Config;
import com.sun.enterprise.config.serverbeans.Domain;
import com.sun.enterprise.config.serverbeans.MonitoringService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2016-2018 Payara Foundation and/or its affiliates. All rights reserved.
* Copyright (c) 2016-2020 Payara Foundation and/or its affiliates. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
Expand Down Expand Up @@ -39,7 +39,6 @@
*/
package fish.payara.persistence.eclipselink.cache.coordination;

import com.hazelcast.core.MessageListener;
import fish.payara.nucleus.eventbus.ClusterMessage;
import fish.payara.nucleus.eventbus.MessageReceiver;
import org.eclipse.persistence.internal.sessions.coordination.broadcast.BroadcastRemoteConnection;
Expand Down
Loading