Skip to content

Commit

Permalink
Merge pull request archiver-appliance#288 from slacmshankar/Issue286
Browse files Browse the repository at this point in the history
Close CA channels properly and do not reuse CA channels
  • Loading branch information
slacmshankar authored Nov 22, 2024
2 parents 64c782c + 5400711 commit a3338d0
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ public LinkedList<Map<String, String>> details(ConfigService configService) {
}
details.add(this.metricDetail(
"PVs pending computation of meta info", Integer.toString(MetaGet.getPendingMetaGetsSize())));
details.add(this.metricDetail(
"Total number of reference counted channels", Integer.toString(PVContext.getChannelCount())));
details.add(this.metricDetail("Total number of CAJ channels", Integer.toString(context.getCAJChannelCount())));

details.addAll(context.getCAJContextDetails());
Expand Down
96 changes: 56 additions & 40 deletions src/main/org/epics/archiverappliance/engine/pv/EPICS_V3_PV.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public int getMask() {
private final CopyOnWriteArrayList<PVListener> listeners = new CopyOnWriteArrayList<PVListener>();

/** JCA channel. LOCK <code>this</code> on change. */
private RefCountedChannel channel_ref = null;
private Channel theChannel = null;

/**
* Either <code>null</code>, or the subscription identifier. LOCK
Expand Down Expand Up @@ -255,7 +255,7 @@ public void getCompleted(final GetEvent event) { // This runs in a CA
PVContext.scheduleCommand(
EPICS_V3_PV.this.name,
EPICS_V3_PV.this.jcaCommandThreadId,
EPICS_V3_PV.this.channel_ref,
EPICS_V3_PV.this.theChannel,
"getCompleted",
new Runnable() {
@Override
Expand Down Expand Up @@ -360,21 +360,21 @@ public void removeListener(final PVListener listener) {
*/
private void connect() throws Exception {
logger.debug("pv of" + this.name + " connectting");
PVContext.scheduleCommand(this.name, this.jcaCommandThreadId, this.channel_ref, "connect", new Runnable() {
PVContext.scheduleCommand(this.name, this.jcaCommandThreadId, this.theChannel, "connect", new Runnable() {
@Override
public void run() {
//
try {
state = PVConnectionState.Connecting;
// Already attempted a connection?
synchronized (this) {
if (channel_ref == null) {
channel_ref =
if (theChannel == null) {
theChannel =
PVContext.getChannel(name, EPICS_V3_PV.this.jcaCommandThreadId, EPICS_V3_PV.this);
}
fireConnectionRequestMade();
if (channel_ref.getChannel().getConnectionState() == ConnectionState.CONNECTED) {
handleConnected(channel_ref.getChannel());
if (theChannel.getConnectionState() == ConnectionState.CONNECTED) {
handleConnected(theChannel);
}
}
} catch (Exception e) {
Expand All @@ -396,16 +396,16 @@ private void disconnect() {
// This code locked the PV, then tried to join the JCA Command thread.
// JCA Command thread tried to lock the PV, so it could not exit.
// --> Don't lock while calling into the PVContext.
RefCountedChannel channel_ref_copy;
Channel theChannel_copy;
synchronized (this) {
// Never attempted a connection?
if (channel_ref == null) return;
channel_ref_copy = channel_ref;
channel_ref = null;
if (theChannel == null) return;
theChannel_copy = theChannel;
theChannel = null;
connected = false;
}
try {
PVContext.releaseChannel(channel_ref_copy, this);
PVContext.releaseChannel(theChannel_copy, this);
} catch (final IllegalStateException ile) {
logger.warn("exception when disconnecting pv " + name, ile);
} catch (final Throwable e) {
Expand All @@ -421,21 +421,22 @@ private void subscribe() {
if (subscription != null) {
logger.error(
"When trying to establish a subscription, there is already a subscription for " + this.name);
this.transientErrorCount++;
return;
}
// Late callback, channel already closed?
final RefCountedChannel ch_ref = channel_ref;
if (ch_ref == null) {
if (theChannel == null) {
logger.error(
"When trying to establish a subscription, the refcounted channel is closed for " + this.name);
this.transientErrorCount++;
return;
}
final Channel channel = ch_ref.getChannel();
// final Logger logger = Activator.getLogger();
try {
if (channel.getConnectionState() != Channel.CONNECTED) {
if (theChannel.getConnectionState() != Channel.CONNECTED) {
logger.error("When trying to establish a subscription, the CA channel is not connected for "
+ this.name);
this.transientErrorCount++;
return;
}
//
Expand All @@ -444,17 +445,17 @@ private void subscribe() {
// So even with N PVs for the same channel, it's
// only one subscription on the network instead of
// N subscriptions.
final DBRType type = DBR_Helper.getTimeType(plain, channel.getFieldType());
final DBRType type = DBR_Helper.getTimeType(plain, theChannel.getFieldType());
state = PVConnectionState.Subscribing;
totalMetaInfo.setStartTime(System.currentTimeMillis());
// isnotTimestampDBR
if (this.name.endsWith(".RTYP")) {
subscription = channel.addMonitor(MonitorMask.ARCHIVE.getMask(), this);
subscription = theChannel.addMonitor(MonitorMask.ARCHIVE.getMask(), this);
} else {
if (this.isDBEProperties && dbePropertiesSubscription == null) {
logger.debug("Adding a DBE_PROPERTIES monitor for " + this.name);
dbePropertiesSubscription = channel.addMonitor(
DBR_Helper.getControlType(channel.getFieldType()),
dbePropertiesSubscription = theChannel.addMonitor(
DBR_Helper.getControlType(theChannel.getFieldType()),
1,
MonitorMask.PROPERTY.getMask(),
new MonitorListener() {
Expand All @@ -467,7 +468,7 @@ public void monitorChanged(MonitorEvent monitorEvent) {
});
}
subscription =
channel.addMonitor(type, channel.getElementCount(), MonitorMask.ARCHIVE.getMask(), this);
theChannel.addMonitor(type, theChannel.getElementCount(), MonitorMask.ARCHIVE.getMask(), this);
}
} catch (final Exception ex) {
logger.error("exception when subscribing pv " + name, ex);
Expand Down Expand Up @@ -529,7 +530,7 @@ public PVConnectionState connectionState() {
@Override
public void stop() {
running = false;
PVContext.scheduleCommand(this.name, this.jcaCommandThreadId, this.channel_ref, "stop", () -> {
PVContext.scheduleCommand(this.name, this.jcaCommandThreadId, this.theChannel, "stop", () -> {
logger.debug("Stopping channel " + EPICS_V3_PV.this.name);
unsubscribe();
disconnect();
Expand All @@ -547,15 +548,15 @@ public void connectionChanged(final ConnectionEvent ev) {
// away'
// when the channel is created, before we even
// get to assign
// the channel_ref. So use the channel from the
// the theChannel. So use the channel from the
// event, not
// the channel_ref which might still be null.
// the theChannel which might still be null.
//
// EngineContext.getInstance().getScheduler().execute(new Runnable()
PVContext.scheduleCommand(
this.name,
this.jcaCommandThreadId,
this.channel_ref,
this.theChannel,
"Connection changed connected",
new Runnable() {
@Override
Expand All @@ -564,20 +565,33 @@ public void run() {
}
});
} else {
state = PVConnectionState.Disconnected;
connected = false;
Monitor sub_copy;
synchronized (this) {
sub_copy = subscription;
state = PVConnectionState.Disconnected;
connected = false;
subscription = null;
}
PVContext.scheduleCommand(
this.name,
this.jcaCommandThreadId,
this.channel_ref,
this.theChannel,
"Connection changed disconnected",
new Runnable() {
@Override
public void run() {
unsubscribe();
if (sub_copy != null) {
try {
sub_copy.clear();
} catch (IllegalStateException ile) {
logger.warn("Illegal state exception when unsubscribing pv " + name, ile);
} catch (final Exception ex) {
logger.error("exception when unsubscribing pv " + name, ex);
}
}
fireDisconnected();
}
});
});
}
}

Expand All @@ -589,6 +603,7 @@ private void handleConnected(final Channel channel) {
if (channel.getConnectionState() != Channel.CONNECTED) {
logger.error("While processing a handleConnected for " + this.name
+ "; the channel is not in a connected state.");
this.transientErrorCount++;
return;
}
} catch (Exception ex) {
Expand All @@ -598,6 +613,7 @@ private void handleConnected(final Channel channel) {
if (state == PVConnectionState.Connected) {
logger.error("While processing a handleConnected for " + this.name
+ "; the state is already in a connected state.");
this.transientErrorCount++;
return;
}
state = PVConnectionState.Connected;
Expand Down Expand Up @@ -686,6 +702,7 @@ public void monitorChanged(final MonitorEvent ev) {
DBR dbr = ev.getDBR();
if (dbr == null) {
logger.error("Ignoring monitor events that does not have a valid DBR for " + this.name);
this.transientErrorCount++;
return;
}
if (this.name.endsWith(".RTYP")) {
Expand Down Expand Up @@ -844,10 +861,10 @@ public void markPVHasMetafields(boolean hasMetaField) {
@Override
public void getLowLevelChannelInfo(List<Map<String, String>> statuses) {
// Commented out when using JCA. This seems to work in CAJ but not in JCA.
/* if(channel_ref != null) {
/* if(theChannel != null) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
PrintStream out = new PrintStream(os);
channel_ref.getChannel().printInfo(out);
theChannel.getChannel().printInfo(out);
out.close();
return os.toString();
}
Expand Down Expand Up @@ -881,11 +898,10 @@ void addKV(String k, String v) {

ad.addKV(
"Do we have a CA channel?",
Boolean.toString(this.channel_ref != null && this.channel_ref.getChannel() != null));
Boolean.toString(this.theChannel != null && theChannel != null));
ad.addKV("Do we have a subscription?", Boolean.toString(this.subscription != null));
if (this.channel_ref != null
&& this.channel_ref.getChannel() != null
&& (this.channel_ref.getChannel() instanceof CAJChannel cajChannel)) {
if (this.theChannel != null
&& (theChannel instanceof CAJChannel cajChannel)) {
ad.addKV("CAJ Searches", Integer.toString(cajChannel.getSearchTries()));
ad.addKV("CAJ channel ID (CID)", Integer.toString(cajChannel.getChannelID()));
ad.addKV("CAJ server channel ID (SID)", Integer.toString(cajChannel.getServerChannelID()));
Expand Down Expand Up @@ -924,15 +940,15 @@ public void updateTotalMetaInfo() throws IllegalStateException, CAException {
logger.error("The meta get listener was not successful for EPICS_V3_PV " + name);
}
};
if (channel_ref != null) {
if (channel_ref.getChannel().getConnectionState() == ConnectionState.CONNECTED) {
DBRType type = channel_ref.getChannel().getFieldType();
if (theChannel != null) {
if (theChannel.getConnectionState() == ConnectionState.CONNECTED) {
DBRType type = theChannel.getFieldType();
if (!(plain || type.isSTRING())) {
if (type.isDOUBLE() || type.isFLOAT()) type = DBRType.CTRL_DOUBLE;
else if (type.isENUM()) type = DBRType.LABELS_ENUM;
else if (type.isINT()) type = DBRType.CTRL_INT;
else type = DBRType.CTRL_SHORT;
channel_ref.getChannel().get(type, 1, getListener);
theChannel.get(type, 1, getListener);
}
}
}
Expand Down
Loading

0 comments on commit a3338d0

Please sign in to comment.