Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use less ancient avro, maintain same wire format by @radai-rosenblatt #359

Merged
merged 1 commit into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,24 @@ allprojects {

repositories {
mavenCentral()
maven {
url "https://linkedin.jfrog.io/artifactory/avro-util/"
}
}

dependencies {
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.1'
compile 'org.apache.avro:avro:1.4.1'
compile 'org.apache.avro:avro:1.9.2'
compile 'org.json:json:20140107'
compile 'org.jolokia:jolokia-jvm:1.6.2'
compile 'net.savantly:graphite-client:1.1.0-RELEASE'
compile 'com.timgroup:java-statsd-client:3.0.1'
compile 'com.signalfx.public:signalfx-codahale:0.0.47'
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.0'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1'
compile 'org.apache.commons:commons-lang3:3.12.0'
compile 'com.linkedin.avroutil1:helper-all:0.2.81'
testCompile 'org.mockito:mockito-core:2.24.0'
testCompile 'org.testng:testng:6.8.8'
}
Expand Down
36 changes: 14 additions & 22 deletions src/main/java/com/linkedin/xinfra/monitor/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.ByteArrayOutputStream;
import com.linkedin.avroutil1.compatibility.AvroCodecUtil;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.avroutil1.compatibility.AvroVersion;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.time.Duration;
Expand All @@ -34,18 +36,16 @@
import javax.management.ObjectName;
import kafka.admin.BrokerMetadata;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.io.Decoder;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -224,29 +224,21 @@ public static String jsonFromFields(String topic, long idx, long timestamp, Stri
* @return GenericRecord that is de-serialized from kafka message w.r.t. expected schema.
*/
public static GenericRecord genericRecordFromJson(String message) {
GenericRecord record = new GenericData.Record(DefaultTopicSchema.MESSAGE_V0);
JSONObject jsonObject = new JSONObject(message);
record.put(DefaultTopicSchema.TOPIC_FIELD.name(), jsonObject.getString(DefaultTopicSchema.TOPIC_FIELD.name()));
record.put(DefaultTopicSchema.INDEX_FIELD.name(), jsonObject.getLong(DefaultTopicSchema.INDEX_FIELD.name()));
record.put(DefaultTopicSchema.TIME_FIELD.name(), jsonObject.getLong(DefaultTopicSchema.TIME_FIELD.name()));
record.put(DefaultTopicSchema.PRODUCER_ID_FIELD.name(),
jsonObject.getString(DefaultTopicSchema.PRODUCER_ID_FIELD.name()));
record.put(DefaultTopicSchema.CONTENT_FIELD.name(), jsonObject.getString(DefaultTopicSchema.CONTENT_FIELD.name()));
return record;
try {
Decoder jsonDecoder = AvroCompatibilityHelper.newCompatibleJsonDecoder(DefaultTopicSchema.MESSAGE_V0, message);
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(DefaultTopicSchema.MESSAGE_V0, DefaultTopicSchema.MESSAGE_V0);
return reader.read(null, jsonDecoder);
} catch (Exception e) {
throw new IllegalStateException("unable to deserialize " + message, e);
}
}

public static String jsonFromGenericRecord(GenericRecord record) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(DefaultTopicSchema.MESSAGE_V0);

try {
Encoder encoder = new JsonEncoder(DefaultTopicSchema.MESSAGE_V0, out);
writer.write(record, encoder);
encoder.flush();
return AvroCodecUtil.serializeJson(record, AvroVersion.AVRO_1_4);
} catch (IOException e) {
LOG.error("Unable to serialize avro record due to error " + e);
throw new IllegalStateException("Unable to serialize avro record due to error: " + record, e);
}
return out.toString();
}

public static List<MbeanAttributeValue> getMBeanAttributeValues(String mbeanExpr, String attributeExpr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import net.savantly.graphite.GraphiteClient;
import net.savantly.graphite.GraphiteClientFactory;
import net.savantly.graphite.impl.SimpleCarbonMetric;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down