Skip to content

Commit

Permalink
Merge pull request #1 from cr-orilibhaber/master
Browse files Browse the repository at this point in the history
Added message attributes support to PublisherSampler + advanced minor…
  • Loading branch information
rollno748 authored Feb 21, 2021
2 parents 6a2768e + b2b36c4 commit ecbac2d
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 34 deletions.
6 changes: 2 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.di.jmeter.sampler</groupId>
<artifactId>jmeter-pubsub-sampler</artifactId>
<version>1.0</version>
<version>1.1-SNAPSHOT</version>
<properties>
<di-jmeter-version>5.1.1</di-jmeter-version>
<jmeter.lib.scope>provided</jmeter.lib.scope>
Expand Down
66 changes: 47 additions & 19 deletions src/main/java/com/di/jmeter/pubsub/sampler/PublisherSampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

package com.di.jmeter.pubsub.sampler;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.zip.GZIPOutputStream;
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;

import com.di.jmeter.pubsub.config.PublisherConfig;

import org.apache.jmeter.config.ConfigTestElement;
import org.apache.jmeter.engine.util.ConfigMergabilityIndicator;
Expand All @@ -37,16 +38,25 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.di.jmeter.pubsub.config.PublisherConfig;
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.zip.GZIPOutputStream;

public class PublisherSampler extends PublisherTestElement implements Sampler, TestBean, ConfigMergabilityIndicator {

private static final long serialVersionUID = -2509242423429019193L;
private static final Logger LOGGER = LoggerFactory.getLogger(PublisherSampler.class);
private static final Gson gson = new Gson();
private static final Type TYPED_MAP = new TypeToken<Map<String, String>>() {}.getType();

private Publisher publisher = null;
private static final Set<String> APPLIABLE_CONFIG_CLASSES = new HashSet<>(
Expand All @@ -62,10 +72,12 @@ public SampleResult sample(Entry e) {
result.setContentType("text/plain");
result.setDataEncoding(StandardCharsets.UTF_8.name());

Map<String, String> attributes = convertStringToAttributesMap(getAttributes());

if (isGzipCompression()) {
template = createPubsubMessage(createEventCompressed(getMessage()));
template = createPubsubMessage(createEventCompressed(getMessage()), attributes);
} else {
template = convertStringToPubSubMessage(getMessage());
template = convertStringToPubSubMessage(getMessage(), attributes);
}

result.sampleStart();
Expand All @@ -81,9 +93,25 @@ public SampleResult sample(Entry e) {
return result;
}

private Map<String,String> convertStringToAttributesMap(String attributesAsString){
return Optional.ofNullable(attributesAsString)

.map(str -> {
Map<String, String> result = null;
try {
result = gson.fromJson(str, TYPED_MAP);
} catch (JsonSyntaxException sse) {
LOGGER.error("Failed to convert string attributes: {} to Map<String,String>", attributesAsString, sse);
}
return result;
})

.orElse(Collections.emptyMap());
}

// Returns Modified templates/Message as template for publishing
private PubsubMessage convertStringToPubSubMessage(String message) {
return PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(message)).build();
private PubsubMessage convertStringToPubSubMessage(String message, Map<String, String> attributes) {
return PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(message)).putAllAttributes(attributes).build();
}

private byte[] createEventCompressed(String message) {
Expand Down Expand Up @@ -124,8 +152,8 @@ private String publish(PubsubMessage template, SampleResult result) {
return resp;
}

public static PubsubMessage createPubsubMessage(byte[] msg) {
return PubsubMessage.newBuilder().setData(ByteString.copyFrom(msg)).build();
public static PubsubMessage createPubsubMessage(byte[] msg, Map<String,String> attributes) {
return PubsubMessage.newBuilder().setData(ByteString.copyFrom(msg)).putAllAttributes(attributes).build();
}

private String request() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,21 @@
*/
package com.di.jmeter.pubsub.sampler;

import java.io.Serializable;

import org.apache.jmeter.gui.Searchable;
import org.apache.jmeter.testelement.AbstractTestElement;
import org.apache.jmeter.testelement.TestElement;
import org.apache.jmeter.testelement.TestStateListener;

public abstract class PublisherTestElement extends AbstractTestElement implements TestStateListener, TestElement, Serializable, Searchable {
import java.io.Serializable;

public abstract class PublisherTestElement extends AbstractTestElement implements TestStateListener, TestElement, Serializable, Searchable {

private static final long serialVersionUID = 7027549399338665744L;

private boolean gzipCompression;
private String message;

private boolean gzipCompression = false;
private String message = "";
private String attributes = "";

public String getMessage() {
return message;
}
Expand All @@ -49,5 +48,12 @@ public void setGzipCompression(boolean gzipCompression) {
this.gzipCompression = gzipCompression;
}

public String getAttributes() {
return attributes;
}

public void setAttributes(String attributes) {
this.attributes = attributes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,30 @@
*/
package com.di.jmeter.pubsub.sampler;

import java.beans.PropertyDescriptor;

import org.apache.jmeter.testbeans.BeanInfoSupport;
import org.apache.jmeter.testbeans.TestBean;
import org.apache.jmeter.testbeans.gui.TypeEditor;

import java.beans.PropertyDescriptor;

public class PublisherTestElementBeanInfoSupport extends BeanInfoSupport {

protected PublisherTestElementBeanInfoSupport(Class<? extends TestBean> beanClass) {
super(beanClass);

createPropertyGroup("Message to publish", new String[] { "gzipCompression","message"});
createPropertyGroup("Message to publish", new String[] { "gzipCompression", "message", "attributes"});

PropertyDescriptor propertyDescriptor = property("message", TypeEditor.TextAreaEditor);
propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE);
propertyDescriptor.setValue(DEFAULT, "{\"demoMessage\":\"Hello World!\"}");

propertyDescriptor = property("gzipCompression");
propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE);
propertyDescriptor.setValue(DEFAULT, Boolean.FALSE);

propertyDescriptor = property("attributes", TypeEditor.TextAreaEditor);
propertyDescriptor.setValue(NOT_UNDEFINED, Boolean.TRUE);
propertyDescriptor.setValue(DEFAULT, "{\"attributeKey\":\"attributeValue\"}");

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ gzipCompression.displayName=Gzip-Compression
gzipCompression.shortDescription=Set to enable compression for messages
message.displayName=Message
message.shortDescription=Message - This will supercedes template Type
attributes.displayName=Attributes
attributes.shortDescription=Attributes - Specify message attributes

0 comments on commit ecbac2d

Please sign in to comment.