Skip to content

Commit

Permalink
serialize DateTime As Long to improve json serde performance (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijianding authored and jon-wei committed Jun 6, 2017
1 parent d22db30 commit 551a89b
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 9 deletions.
2 changes: 2 additions & 0 deletions docs/content/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ The query context is used for various query configuration parameters. The follow
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `P0D` (off) | At the broker node level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure "druid.processing.numThreads" is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. |
|serializeDateTimeAsLong| `false` | If true, DateTime is serialized as long in the result returned by broker and the data transportation between broker and compute node|
|serializeDateTimeAsLongInner| `false` | If true, DateTime is serialized as long in the data transportation between broker and compute node|

In addition, some query types offer context parameters specific to that query type.

Expand Down
10 changes: 10 additions & 0 deletions processing/src/main/java/io/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ public static <T> boolean isFinalize(Query<T> query, boolean defaultValue)
return parseBoolean(query, "finalize", defaultValue);
}

public static <T> boolean isSerializeDateTimeAsLong(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "serializeDateTimeAsLong", defaultValue);
}

public static <T> boolean isSerializeDateTimeAsLongInner(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "serializeDateTimeAsLongInner", defaultValue);
}

public static <T> int getUncoveredIntervalsLimit(Query<T> query)
{
return getUncoveredIntervalsLimit(query, DEFAULT_UNCOVERED_INTERVALS_LIMIT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public EventHolder(
public DateTime getTimestamp()
{
Object retVal = event.get(timestampKey);
if (retVal instanceof String) {
if (retVal instanceof String || retVal instanceof Long) {
return new DateTime(retVal);
} else if (retVal instanceof DateTime) {
return (DateTime) retVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private DateTime getDateTimeValue(Object val)

if (val instanceof DateTime) {
return (DateTime) val;
} else if (val instanceof String) {
} else if (val instanceof String || val instanceof Long) {
return new DateTime(val);
} else {
throw new IAE("Cannot get time from type[%s]", val.getClass());
Expand Down
43 changes: 36 additions & 7 deletions server/src/main/java/io/druid/server/QueryResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.joda.ser.DateTimeSerializer;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
Expand All @@ -40,6 +42,7 @@
import io.druid.query.DruidMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryPlus;
Expand Down Expand Up @@ -97,6 +100,8 @@ public class QueryResource implements QueryCountStatsProvider
protected final ServerConfig config;
protected final ObjectMapper jsonMapper;
protected final ObjectMapper smileMapper;
protected final ObjectMapper serializeDateTimeAsLongJsonMapper;
protected final ObjectMapper serializeDateTimeAsLongSmileMapper;
protected final QuerySegmentWalker texasRanger;
protected final ServiceEmitter emitter;
protected final RequestLogger requestLogger;
Expand Down Expand Up @@ -125,6 +130,8 @@ public QueryResource(
this.config = config;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.serializeDateTimeAsLongJsonMapper = serializeDataTimeAsLong(jsonMapper);
this.serializeDateTimeAsLongSmileMapper = serializeDataTimeAsLong(smileMapper);
this.texasRanger = texasRanger;
this.emitter = emitter;
this.requestLogger = requestLogger;
Expand Down Expand Up @@ -250,7 +257,11 @@ public Response doPost(
try {
final Query theQuery = query;
final QueryToolChest theToolChest = toolChest;
final ObjectWriter jsonWriter = context.newOutputWriter();
boolean shouldFinalize = QueryContexts.isFinalize(query, true);
boolean serializeDateTimeAsLong =
QueryContexts.isSerializeDateTimeAsLong(query, false)
|| (!shouldFinalize && QueryContexts.isSerializeDateTimeAsLongInner(query, false));
final ObjectWriter jsonWriter = context.newOutputWriter(serializeDateTimeAsLong);
Response.ResponseBuilder builder = Response
.ok(
new StreamingOutput()
Expand Down Expand Up @@ -451,24 +462,41 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE
}
}

protected ObjectMapper serializeDataTimeAsLong(ObjectMapper mapper)
{
return mapper.copy().registerModule(new SimpleModule().addSerializer(DateTime.class, new DateTimeSerializer()));
}

protected ResponseContext createContext(String requestType, boolean pretty)
{
boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(requestType) ||
APPLICATION_SMILE.equals(requestType);
String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON;
return new ResponseContext(contentType, isSmile ? smileMapper : jsonMapper, pretty);
return new ResponseContext(
contentType,
isSmile ? smileMapper : jsonMapper,
isSmile ? serializeDateTimeAsLongSmileMapper : serializeDateTimeAsLongJsonMapper,
pretty
);
}

protected static class ResponseContext
{
private final String contentType;
private final ObjectMapper inputMapper;
private final ObjectMapper serializeDateTimeAsLongInputMapper;
private final boolean isPretty;

ResponseContext(String contentType, ObjectMapper inputMapper, boolean isPretty)
ResponseContext(
String contentType,
ObjectMapper inputMapper,
ObjectMapper serializeDateTimeAsLongInputMapper,
boolean isPretty
)
{
this.contentType = contentType;
this.inputMapper = inputMapper;
this.serializeDateTimeAsLongInputMapper = serializeDateTimeAsLongInputMapper;
this.isPretty = isPretty;
}

Expand All @@ -482,21 +510,22 @@ public ObjectMapper getObjectMapper()
return inputMapper;
}

ObjectWriter newOutputWriter()
ObjectWriter newOutputWriter(boolean serializeDateTimeAsLong)
{
return isPretty ? inputMapper.writerWithDefaultPrettyPrinter() : inputMapper.writer();
ObjectMapper mapper = serializeDateTimeAsLong ? serializeDateTimeAsLongInputMapper : inputMapper;
return isPretty ? mapper.writerWithDefaultPrettyPrinter() : mapper.writer();
}

Response ok(Object object) throws IOException
{
return Response.ok(newOutputWriter().writeValueAsString(object), contentType).build();
return Response.ok(newOutputWriter(false).writeValueAsString(object), contentType).build();
}

Response gotError(Exception e) throws IOException
{
return Response.serverError()
.type(contentType)
.entity(newOutputWriter().writeValueAsBytes(QueryInterruptedException.wrapIfNeeded(e)))
.entity(newOutputWriter(false).writeValueAsBytes(QueryInterruptedException.wrapIfNeeded(e)))
.build();
}
}
Expand Down

0 comments on commit 551a89b

Please sign in to comment.