Skip to content

Commit

Permalink
Use less ancient avro, maintain same wire format by @radai-rosenblatt (
Browse files Browse the repository at this point in the history
  • Loading branch information
fluffywei authored Mar 29, 2022
1 parent 77f9655 commit e4467c8
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 26 deletions.
7 changes: 6 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,24 @@ allprojects {

repositories {
mavenCentral()
maven {
url "https://linkedin.jfrog.io/artifactory/avro-util/"
}
}

dependencies {
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.1'
compile 'org.apache.avro:avro:1.4.1'
compile 'org.apache.avro:avro:1.9.2'
compile 'org.json:json:20140107'
compile 'org.jolokia:jolokia-jvm:1.6.2'
compile 'net.savantly:graphite-client:1.1.0-RELEASE'
compile 'com.timgroup:java-statsd-client:3.0.1'
compile 'com.signalfx.public:signalfx-codahale:0.0.47'
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.0'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1'
compile 'org.apache.commons:commons-lang3:3.12.0'
compile 'com.linkedin.avroutil1:helper-all:0.2.81'
testCompile 'org.mockito:mockito-core:2.24.0'
testCompile 'org.testng:testng:6.8.8'
}
Expand Down
36 changes: 14 additions & 22 deletions src/main/java/com/linkedin/xinfra/monitor/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.ByteArrayOutputStream;
import com.linkedin.avroutil1.compatibility.AvroCodecUtil;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.avroutil1.compatibility.AvroVersion;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.time.Duration;
Expand All @@ -34,18 +36,16 @@
import javax.management.ObjectName;
import kafka.admin.BrokerMetadata;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.io.Decoder;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -224,29 +224,21 @@ public static String jsonFromFields(String topic, long idx, long timestamp, Stri
* @return GenericRecord that is de-serialized from kafka message w.r.t. expected schema.
*/
public static GenericRecord genericRecordFromJson(String message) {
GenericRecord record = new GenericData.Record(DefaultTopicSchema.MESSAGE_V0);
JSONObject jsonObject = new JSONObject(message);
record.put(DefaultTopicSchema.TOPIC_FIELD.name(), jsonObject.getString(DefaultTopicSchema.TOPIC_FIELD.name()));
record.put(DefaultTopicSchema.INDEX_FIELD.name(), jsonObject.getLong(DefaultTopicSchema.INDEX_FIELD.name()));
record.put(DefaultTopicSchema.TIME_FIELD.name(), jsonObject.getLong(DefaultTopicSchema.TIME_FIELD.name()));
record.put(DefaultTopicSchema.PRODUCER_ID_FIELD.name(),
jsonObject.getString(DefaultTopicSchema.PRODUCER_ID_FIELD.name()));
record.put(DefaultTopicSchema.CONTENT_FIELD.name(), jsonObject.getString(DefaultTopicSchema.CONTENT_FIELD.name()));
return record;
try {
Decoder jsonDecoder = AvroCompatibilityHelper.newCompatibleJsonDecoder(DefaultTopicSchema.MESSAGE_V0, message);
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(DefaultTopicSchema.MESSAGE_V0, DefaultTopicSchema.MESSAGE_V0);
return reader.read(null, jsonDecoder);
} catch (Exception e) {
throw new IllegalStateException("unable to deserialize " + message, e);
}
}

public static String jsonFromGenericRecord(GenericRecord record) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(DefaultTopicSchema.MESSAGE_V0);

try {
Encoder encoder = new JsonEncoder(DefaultTopicSchema.MESSAGE_V0, out);
writer.write(record, encoder);
encoder.flush();
return AvroCodecUtil.serializeJson(record, AvroVersion.AVRO_1_4);
} catch (IOException e) {
LOG.error("Unable to serialize avro record due to error " + e);
throw new IllegalStateException("Unable to serialize avro record due to error: " + record, e);
}
return out.toString();
}

public static List<MbeanAttributeValue> getMBeanAttributeValues(String mbeanExpr, String attributeExpr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import net.savantly.graphite.GraphiteClient;
import net.savantly.graphite.GraphiteClientFactory;
import net.savantly.graphite.impl.SimpleCarbonMetric;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down

0 comments on commit e4467c8

Please sign in to comment.