Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Field partitioner to pick value from Map structure #180

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@

<suppress
checks="(CyclomaticComplexity)"
files="(PartitionerConfig|StorageSchemaCompatibility).java"
files="(PartitionerConfig|StorageSchemaCompatibility|FieldPartitioner).java"
/>

<suppress
checks="(JavaNCSSCheck)"
files="FieldPartitioner.java"
/>

<suppress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@

package io.confluent.connect.storage.util;

import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;

import java.util.concurrent.TimeUnit;

public class DateTimeUtils {

private static final long DAY_IN_MS = TimeUnit.DAYS.toMillis(1);

/**
* Calculates next period of periodMs after currentTimeMs
* starting from midnight in given timeZone.
Expand All @@ -37,15 +39,12 @@ public static long getNextTimeAdjustedByDay(
long periodMs,
DateTimeZone timeZone
) {
DateTime currentDT = new DateTime(currentTimeMs).withZone(timeZone);
DateTime startOfDayDT = currentDT.withTimeAtStartOfDay();
DateTime startOfNextDayDT = startOfDayDT.plusDays(1);
Duration currentDayDuration = new Duration(startOfDayDT, startOfNextDayDT);
long todayInMs = currentDayDuration.getMillis();

long startOfDay = startOfDayDT.getMillis();
long startOfDay = timeZone.convertLocalToUTC(
timeZone.convertUTCToLocal(currentTimeMs) / DAY_IN_MS * DAY_IN_MS,
true
);
long nextPeriodOffset = ((currentTimeMs - startOfDay) / periodMs + 1) * periodMs;
long offset = Math.min(nextPeriodOffset, todayInMs);
long offset = Math.min(nextPeriodOffset, DAY_IN_MS);
return startOfDay + offset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

package io.confluent.connect.storage.partitioner;

import io.confluent.connect.storage.util.DataUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,16 +46,14 @@ public void configure(Map<String, Object> config) {
@Override
public String encodePartition(SinkRecord sinkRecord) {
Object value = sinkRecord.value();
StringBuilder builder = new StringBuilder();
if (value instanceof Struct) {
final Schema valueSchema = sinkRecord.valueSchema();
final Struct struct = (Struct) value;

StringBuilder builder = new StringBuilder();
for (String fieldName : fieldNames) {
if (builder.length() > 0) {
builder.append(this.delim);
}

Object partitionKey = struct.get(fieldName);
Type type = valueSchema.field(fieldName).schema().type();
switch (type) {
Expand All @@ -77,6 +77,39 @@ public String encodePartition(SinkRecord sinkRecord) {
}
}
return builder.toString();
} else if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;
for (String fieldName : fieldNames) {
if (builder.length() > 0) {
builder.append(this.delim);
}
Object fieldValue = "null";
try {
fieldValue = DataUtils.getNestedFieldValue(map, fieldName);
} catch (DataException e) {
log.warn("{} is unable to parse field - {} from record - {}",
this.getClass(), fieldName, sinkRecord.value());
}
String[] nestedFieldList = fieldName.split("\\.");
String partitionName = nestedFieldList[nestedFieldList.length - 1];
if (fieldValue instanceof Number) {
Number record = (Number) fieldValue;
builder.append(partitionName + "=" + record.toString());
} else if (fieldValue instanceof String) {
builder.append(partitionName + "=" + (String) fieldValue);
} else if (fieldValue instanceof Boolean) {
boolean booleanRecord = (boolean) fieldValue;
builder.append(partitionName + "=" + Boolean.toString(booleanRecord));
} else {
log.error(
"Unsupported type '{}' for user-defined timestamp field.", fieldValue.getClass()
);
throw new PartitionException(
"Error extracting timestamp from record field: " + fieldName
);
}
}
return builder.toString();
} else {
log.error("Value is not Struct type.");
throw new PartitionException("Error encoding partition.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import io.confluent.connect.storage.errors.PartitionException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -127,6 +125,42 @@ public void testNotStructPartition() throws PartitionException {
assertEquals("Error encoding partition.", e.getMessage());
}

@Test
public void testMapPartition() throws PartitionException {
String fieldName = "nested.string";

Map<String, Object> expectedNestedMap = createMapWithTimestampField(TIMESTAMP);
Map<String, Object> map = new HashMap<>();
map.put("nested", expectedNestedMap);
FieldPartitioner<String> partitioner;
SinkRecord sinkRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, null,
Schema.STRING_SCHEMA, map, 0L);
String path;


Map<String, Object> m = new LinkedHashMap<>();
m.put("string", "def");
partitioner = getFieldPartitioner(fieldName);
path = partitioner.encodePartition(sinkRecord);
assertThat(path, is(generateEncodedPartitionFromMap(m)));

fieldName = "nested.int";
m = new LinkedHashMap<>();
m.put("int", "12");
partitioner = getFieldPartitioner(fieldName);
path = partitioner.encodePartition(sinkRecord);
assertThat(path, is(generateEncodedPartitionFromMap(m)));

fieldName = "nested.long";
String secondFieldName = "nested.int";
m = new LinkedHashMap<>();
m.put("long", "12");
m.put("int", "12");
partitioner = getFieldPartitioner(fieldName, secondFieldName);
path = partitioner.encodePartition(sinkRecord);
assertThat(path, is(generateEncodedPartitionFromMap(m)));
}

@Test
public void testMultiPartition() {
FieldPartitioner<String> partitioner = getFieldPartitioner("string", "int");
Expand Down
7 changes: 2 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,21 @@
<module>partitioner</module>
<module>format</module>
<module>hive</module>
<module>htrace-core4-shaded</module>
<module>avatica-shaded</module>
<module>package-kafka-connect-storage-common</module>
</modules>

<properties>
<kafka.connect.storage.common.version>10.0.3-SNAPSHOT</kafka.connect.storage.common.version>
<commons-io.version>2.4</commons-io.version>
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
<hadoop.version>2.10.1</hadoop.version>
<hive.version>2.3.7</hive.version>
<hadoop.version>2.10.0</hadoop.version>
<hive.version>2.3.6</hive.version>
<joda.version>2.9.6</joda.version>
<parquet.version>1.11.1</parquet.version>
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.4</httpcore.version>
<jline.version>2.12.1</jline.version>
<confluent-log4j.version>1.2.17-cp2</confluent-log4j.version>
<commons.codec.version>1.15</commons.codec.version>
<maven.release.plugin.version>2.5.3</maven.release.plugin.version>
<jackson.databind.version>2.10.5.1</jackson.databind.version>
</properties>
Expand Down