Skip to content

Commit

Permalink
krasserm#53: change interceptor position (again), fix generics
Browse files Browse the repository at this point in the history
  • Loading branch information
unixoid committed Oct 28, 2014
1 parent 1a6aabf commit a26eeca
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.camel.component.mina2.Mina2Endpoint;
import org.openehealth.ipf.platform.camel.ihe.hl7v2.intercept.Hl7v2Interceptor;
import org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.consumer.ConsumerDispatchingInterceptor;
import org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.consumer.ConsumerRequestDefragmenterInterceptor;
import org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.consumer.ConsumerStringProcessingInterceptor;

import java.util.ArrayList;
Expand All @@ -43,11 +42,7 @@ public MllpDispatchEndpoint(
protected List<Hl7v2Interceptor> createInitialConsumerInterceptorChain() {
List<Hl7v2Interceptor> initialChain = new ArrayList<Hl7v2Interceptor>();
initialChain.add(new ConsumerStringProcessingInterceptor());
if (isSupportUnsolicitedFragmentation()) {
initialChain.add(new ConsumerRequestDefragmenterInterceptor());
}
initialChain.add(new ConsumerDispatchingInterceptor(getCamelContext(), getConfig().getRoutes()));

return initialChain;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,6 @@ public NakFactory getNakFactory() {
return mllpComponent.getNakFactory();
}

/**
* Returns <code>true</code> if this endpoint supports unsolicited message fragmentation.
*/
@ManagedAttribute(description = "Support Unsolicited Fragmentation Enabled")
public boolean isSupportUnsolicitedFragmentation() {
return config.isSupportUnsolicitedFragmentation();
}

/**
* Returns <code>true</code> if this endpoint supports segment fragmentation.
*/
Expand All @@ -230,15 +222,6 @@ public boolean isSupportSegmentFragmentation() {
return config.isSupportSegmentFragmentation();
}

/**
* Returns threshold for unsolicited message fragmentation
* (relevant on producer side only).
*/
@ManagedAttribute(description = "Unsolicited Fragmentation Threshold")
public int getUnsolicitedFragmentationThreshold() {
return config.getUnsolicitedFragmentationThreshold();
}

/**
* Returns threshold for segment fragmentation.
*/
Expand All @@ -247,13 +230,6 @@ public int getSegmentFragmentationThreshold() {
return config.getSegmentFragmentationThreshold();
}

/**
* Returns the unsolicited fragmentation storage bean.
*/
public UnsolicitedFragmentationStorage getUnsolicitedFragmentationStorage() {
return config.getUnsolicitedFragmentationStorage();
}

/**
* @return the sslContext
*/
Expand Down Expand Up @@ -308,12 +284,6 @@ public String[] getIoFilters() {
return toStringArray(filters);
}

@ManagedAttribute(description = "Unsolicited Fragmentation Storage Cache Type")
public String getUnsolicitedFragmentationStorageType() {
return isSupportUnsolicitedFragmentation() ?
getUnsolicitedFragmentationStorage().getClass().getName() : "";
}

@ManagedAttribute(description = "SSL Secure Enabled")
public boolean isSslSecure() {
return getSslContext() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,8 @@ public class MllpEndpointConfiguration implements Serializable {
@Getter private final String[] sslProtocols;
@Getter private final String[] sslCiphers;

@Getter private final boolean supportUnsolicitedFragmentation;
@Getter private final boolean supportSegmentFragmentation;
@Getter private final int unsolicitedFragmentationThreshold;
@Getter private final int segmentFragmentationThreshold;
@Getter private final UnsolicitedFragmentationStorage unsolicitedFragmentationStorage;


protected MllpEndpointConfiguration(MllpComponent component, Map<String, Object> parameters) throws Exception {
Expand All @@ -76,21 +73,11 @@ protected MllpEndpointConfiguration(MllpComponent component, Map<String, Object>
SSLContext.class,
SSLContext.getDefault()) : null;

supportUnsolicitedFragmentation = component.getAndRemoveParameter(
parameters, "supportUnsolicitedFragmentation", boolean.class, false);
unsolicitedFragmentationThreshold = component.getAndRemoveParameter(
parameters, "unsolicitedFragmentationThreshold", int.class, -1); // >= 3 segments

supportSegmentFragmentation = component.getAndRemoveParameter(
parameters, "supportSegmentFragmentation", boolean.class, false);
segmentFragmentationThreshold = component.getAndRemoveParameter(
parameters, "segmentFragmentationThreshold", int.class, -1); // >= 5 characters

unsolicitedFragmentationStorage = component.resolveAndRemoveReferenceParameter(
parameters,
"unsolicitedFragmentationStorage",
UnsolicitedFragmentationStorage.class);

customInterceptorFactories = component.resolveAndRemoveReferenceListParameter(
parameters, "interceptorFactories", Hl7v2InterceptorFactory.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,22 +113,49 @@ public boolean isAudit() {
return getConfig().isAudit();
}


/**
* Returns client-side audit strategy instance.
*/
public MllpAuditStrategy<AuditDatasetType> getClientAuditStrategy() {
return getMllpComponent().getClientAuditStrategy();
}


/**
* Returns server-side audit strategy instance.
*/
public MllpAuditStrategy<AuditDatasetType> getServerAuditStrategy() {
return getMllpComponent().getServerAuditStrategy();
}

/**
* Returns <code>true</code> if this endpoint supports unsolicited message fragmentation.
*/
@ManagedAttribute(description = "Support Unsolicited Fragmentation Enabled")
public boolean isSupportUnsolicitedFragmentation() {
return getConfig().isSupportUnsolicitedFragmentation();
}

/**
* Returns threshold for unsolicited message fragmentation
* (relevant on producer side only).
*/
@ManagedAttribute(description = "Unsolicited Fragmentation Threshold")
public int getUnsolicitedFragmentationThreshold() {
return getConfig().getUnsolicitedFragmentationThreshold();
}

/**
* Returns the unsolicited fragmentation storage bean.
*/
public UnsolicitedFragmentationStorage getUnsolicitedFragmentationStorage() {
return getConfig().getUnsolicitedFragmentationStorage();
}

@ManagedAttribute(description = "Unsolicited Fragmentation Storage Cache Type")
public String getUnsolicitedFragmentationStorageType() {
return isSupportUnsolicitedFragmentation() ?
getUnsolicitedFragmentationStorage().getClass().getName() : "";
}

/**
* Returns <code>true</code> if this endpoint supports interactive continuation.
Expand All @@ -138,7 +165,6 @@ public boolean isSupportInteractiveContinuation() {
return getConfig().isSupportInteractiveContinuation();
}


/**
* Returns default threshold for interactive continuation
* (relevant on consumer side only).
Expand All @@ -153,15 +179,13 @@ public int getInteractiveContinuationDefaultThreshold() {
return getConfig().getInteractiveContinuationDefaultThreshold();
}


/**
* Returns the interactive continuation storage bean.
*/
public InteractiveContinuationStorage getInteractiveContinuationStorage() {
return getConfig().getInteractiveContinuationStorage();
}


/**
* Returns true, when the producer should automatically send a cancel
* message after it has collected all interactive continuation pieces.
Expand All @@ -171,7 +195,6 @@ public boolean isAutoCancel() {
return getConfig().isAutoCancel();
}


@ManagedAttribute(description = "Interactive Continuation Storage Cache Type")
public String getInteractiveContinuationStorageType() {
return isSupportInteractiveContinuation() ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
public class MllpTransactionEndpointConfiguration extends MllpEndpointConfiguration {
private static final long serialVersionUID = -6154765290339153487L;

@Getter private final boolean supportUnsolicitedFragmentation;
@Getter private final int unsolicitedFragmentationThreshold;
@Getter private final UnsolicitedFragmentationStorage unsolicitedFragmentationStorage;

@Getter private final boolean supportInteractiveContinuation;
@Getter private final int interactiveContinuationDefaultThreshold;
@Getter private final InteractiveContinuationStorage interactiveContinuationStorage;
Expand All @@ -36,6 +40,16 @@ public class MllpTransactionEndpointConfiguration extends MllpEndpointConfigurat
protected MllpTransactionEndpointConfiguration(MllpComponent component, Map<String, Object> parameters) throws Exception {
super(component, parameters);

supportUnsolicitedFragmentation = component.getAndRemoveParameter(
parameters, "supportUnsolicitedFragmentation", boolean.class, false);
unsolicitedFragmentationThreshold = component.getAndRemoveParameter(
parameters, "unsolicitedFragmentationThreshold", int.class, -1); // >= 3 segments

unsolicitedFragmentationStorage = component.resolveAndRemoveReferenceParameter(
parameters,
"unsolicitedFragmentationStorage",
UnsolicitedFragmentationStorage.class);

supportInteractiveContinuation = component.getAndRemoveParameter(
parameters, "supportInteractiveContinuation", boolean.class, false);
interactiveContinuationDefaultThreshold = component.getAndRemoveParameter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* @author Dmytro Rud
*/
public class ConsumerAuditInterceptor<T extends MllpAuditDataset>
extends AbstractMllpInterceptor
extends AbstractMllpInterceptor<MllpTransactionEndpoint<T>>
implements AuditInterceptor<T>
{
@Override
Expand All @@ -43,8 +43,7 @@ public void process(Exchange exchange) throws Exception {

@Override
public MllpAuditStrategy<T> getAuditStrategy() {
MllpTransactionEndpoint mllpEndpoint = (MllpTransactionEndpoint) getMllpEndpoint();
return mllpEndpoint.getServerAuditStrategy();
return getMllpEndpoint().getServerAuditStrategy();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StartupListener;
import org.apache.camel.component.mina2.Mina2Consumer;
import org.openehealth.ipf.modules.hl7.HL7v2Exception;
import org.openehealth.ipf.platform.camel.ihe.hl7v2.Hl7v2AcceptanceException;
import org.openehealth.ipf.platform.camel.ihe.hl7v2.Hl7v2TransactionConfiguration;
import org.openehealth.ipf.platform.camel.ihe.hl7v2.intercept.Hl7v2Interceptor;
import org.openehealth.ipf.platform.camel.ihe.hl7v2.intercept.consumer.ConsumerMarshalInterceptor;
import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpDispatchEndpoint;
import org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.AbstractMllpInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,11 +39,11 @@
* Interceptor which dispatches an incoming request message to another MLLP route.
* @author Dmytro Rud
*/
public class ConsumerDispatchingInterceptor extends AbstractMllpInterceptor implements StartupListener {
public class ConsumerDispatchingInterceptor extends AbstractMllpInterceptor<MllpDispatchEndpoint> implements StartupListener {
private static final transient Logger LOG = LoggerFactory.getLogger(ConsumerDispatchingInterceptor.class);

private final String[] routeIds;
private IdentityHashMap<Hl7v2Interceptor, String> map;
private IdentityHashMap<String, Hl7v2Interceptor> map;


/**
Expand All @@ -65,15 +66,15 @@ public ConsumerDispatchingInterceptor(CamelContext camelContext, String[] routeI

@Override
public void onCamelContextStarted(CamelContext camelContext, boolean alreadyStarted) throws Exception {
map = new IdentityHashMap<Hl7v2Interceptor, String>(routeIds.length);
map = new IdentityHashMap<String, Hl7v2Interceptor>(routeIds.length);
for (String routeId : routeIds) {
Mina2Consumer consumer = (Mina2Consumer) camelContext.getRoute(routeId).getConsumer();
Hl7v2Interceptor interceptor = (Hl7v2Interceptor) consumer.getProcessor();
while (! ConsumerMarshalInterceptor.class.getName().equals(interceptor.getId())) {
while (! (interceptor instanceof ConsumerStringProcessingInterceptor)) {
interceptor = (Hl7v2Interceptor) interceptor.getWrappedProcessor();
}

map.put(interceptor, routeId);
map.put(routeId, (Hl7v2Interceptor) interceptor.getWrappedProcessor());
}
}

Expand Down Expand Up @@ -108,13 +109,14 @@ public void process(Exchange exchange) throws Exception {

// check who can accept the message
boolean found = false;
for (Hl7v2Interceptor interceptor : map.keySet()) {
for (String routeId : routeIds) {
Hl7v2Interceptor interceptor = map.get(routeId);
Hl7v2TransactionConfiguration config = interceptor.getConfigurationHolder().getHl7v2TransactionConfiguration();
try {
config.checkMessageAcceptance(messageType, triggerEvent, messageStructure, version, true);

LOG.debug("Dispatch message with MSH-9-1='{}', MSH-9-2='{}', MSH-9-3='{}', MSH-12='{}' to route '{}'",
messageType, triggerEvent, messageStructure, version, map.get(interceptor));
messageType, triggerEvent, messageStructure, version, routeId);
found = true;
interceptor.process(exchange);
break;
Expand All @@ -126,7 +128,7 @@ public void process(Exchange exchange) throws Exception {
if (! found) {
LOG.debug("Nobody can process message with MSH-9-1='{}', MSH-9-2='{}', MSH-9-3='{}', MSH-12='{}'",
messageType, triggerEvent, messageStructure, version);
Hl7v2AcceptanceException exception = new Hl7v2AcceptanceException("Unsupported message type and/or version", 207);
HL7v2Exception exception = new HL7v2Exception("Unsupported message type and/or version", 207);
resultMessage(exchange).setBody(getNakFactory().createDefaultNak(exception).encode());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,20 @@
* as described in paragraph 5.6.3 of the HL7 v2.5 specification.
* @author Dmytro Rud
*/
public class ConsumerInteractiveResponseSenderInterceptor extends AbstractMllpInterceptor {
public class ConsumerInteractiveResponseSenderInterceptor extends AbstractMllpInterceptor<MllpTransactionEndpoint> {
private static final transient Logger LOG = LoggerFactory.getLogger(ConsumerInteractiveResponseSenderInterceptor.class);
private InteractiveContinuationStorage storage;


@Override
public void setConfigurationHolder(Hl7v2ConfigurationHolder configurationHolder) {
super.setConfigurationHolder(configurationHolder);
MllpTransactionEndpoint endpoint = (MllpTransactionEndpoint) getMllpEndpoint();
this.storage = Validate.notNull(endpoint.getInteractiveContinuationStorage());
this.storage = Validate.notNull(getMllpEndpoint().getInteractiveContinuationStorage());
}


@Override
public void process(Exchange exchange) throws Exception {
MllpTransactionEndpoint endpoint = (MllpTransactionEndpoint) getMllpEndpoint();

Parser parser = getHl7v2TransactionConfiguration().getParser();
MessageAdapter<?> request = (MessageAdapter<?>) exchange.getIn().getHeader(ORIGINAL_MESSAGE_ADAPTER_HEADER_NAME);
Message requestMessage = request.getHapiMessage();
Expand Down Expand Up @@ -113,7 +110,7 @@ public void process(Exchange exchange) throws Exception {
LOG.warn("Cannot parse RCP-2-1, try to use default threshold", nfe);
}
if (threshold < 1) {
threshold = endpoint.getInteractiveContinuationDefaultThreshold();
threshold = getMllpEndpoint().getInteractiveContinuationDefaultThreshold();
}
if (threshold < 1) {
LOG.debug("Cannot perform interactive continuation: invalid or missing threshold");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@
*/
package org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.consumer;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.openehealth.ipf.platform.camel.ihe.mllp.core.FragmentationUtils.keyString;

import ca.uhn.hl7v2.HL7Exception;
import ca.uhn.hl7v2.model.Message;
import ca.uhn.hl7v2.parser.Parser;
import ca.uhn.hl7v2.util.Terser;
import org.apache.camel.Exchange;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.openehealth.ipf.modules.hl7.message.MessageUtils;
import org.openehealth.ipf.platform.camel.core.util.Exchanges;
import org.openehealth.ipf.platform.camel.ihe.hl7v2.Hl7v2ConfigurationHolder;
import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpTransactionEndpoint;
import org.openehealth.ipf.platform.camel.ihe.mllp.core.UnsolicitedFragmentationStorage;

import ca.uhn.hl7v2.model.Message;
import ca.uhn.hl7v2.parser.Parser;
import ca.uhn.hl7v2.util.Terser;
import org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.AbstractMllpInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.openehealth.ipf.platform.camel.ihe.mllp.core.FragmentationUtils.keyString;


/**
Expand All @@ -40,7 +40,7 @@
*
* @author Dmytro Rud
*/
public class ConsumerRequestDefragmenterInterceptor extends AbstractMllpInterceptor {
public class ConsumerRequestDefragmenterInterceptor extends AbstractMllpInterceptor<MllpTransactionEndpoint> {
private static final transient Logger LOG = LoggerFactory.getLogger(ConsumerRequestDefragmenterInterceptor.class);

// keys consist of: continuation pointer, MSH-3-1, MSH-3-2, and MSH-3-3
Expand Down
Loading

0 comments on commit a26eeca

Please sign in to comment.