Skip to content

Commit

Permalink
Remove Jackson from most of Storage (#41312)
Browse files Browse the repository at this point in the history
  • Loading branch information
alzimmermsft authored Jul 29, 2024
1 parent 4bda1ba commit bfb7c6c
Show file tree
Hide file tree
Showing 26 changed files with 396 additions and 254 deletions.
25 changes: 0 additions & 25 deletions sdk/storage/azure-storage-blob-batch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@
<version>12.27.0</version> <!-- {x-version-update;com.azure:azure-storage-blob;current} -->
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.13.5</version> <!-- {x-version-update;com.fasterxml.jackson.dataformat:jackson-dataformat-xml;external_dependency} -->
</dependency>

<!-- Added this dependency to include necessary annotations used by reactor core.
Without this dependency, javadoc throws a warning as it cannot find enum When.MAYBE
which is used in @Nullable annotation in reactor core classes -->
Expand Down Expand Up @@ -130,25 +124,6 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>3.4.1</version> <!-- {x-version-update;org.apache.maven.plugins:maven-enforcer-plugin;external_dependency} -->
<configuration>
<rules>
<bannedDependencies>
<includes>
<include>com.fasterxml.jackson.dataformat:jackson-dataformat-xml:[2.13.5]</include> <!-- {x-include-update;com.fasterxml.jackson.dataformat:jackson-dataformat-xml;external_dependency} -->
</includes>
</bannedDependencies>
</rules>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>java12plus</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
module com.azure.storage.blob.batch {
requires transitive com.azure.storage.blob;

requires com.fasterxml.jackson.dataformat.xml;

exports com.azure.storage.blob.batch;
exports com.azure.storage.blob.batch.options;
}
30 changes: 5 additions & 25 deletions sdk/storage/azure-storage-blob-changefeed/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
</properties>

<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-json</artifactId>
<version>1.1.0</version> <!-- {x-version-update;com.azure:azure-json;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
Expand All @@ -75,12 +80,6 @@
<version>12.27.0</version> <!-- {x-version-update;com.azure:azure-storage-blob;current} -->
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.13.5</version> <!-- {x-version-update;com.fasterxml.jackson.dataformat:jackson-dataformat-xml;external_dependency} -->
</dependency>

<!-- Added this dependency to include necessary annotations used by reactor core.
Without this dependency, javadoc throws a warning as it cannot find enum When.MAYBE
which is used in @Nullable annotation in reactor core classes -->
Expand Down Expand Up @@ -169,25 +168,6 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>3.4.1</version> <!-- {x-version-update;org.apache.maven.plugins:maven-enforcer-plugin;external_dependency} -->
<configuration>
<rules>
<bannedDependencies>
<includes>
<include>com.fasterxml.jackson.dataformat:jackson-dataformat-xml:[2.13.5]</include> <!-- {x-include-update;com.fasterxml.jackson.dataformat:jackson-dataformat-xml;external_dependency} -->
</includes>
</bannedDependencies>
</rules>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>java12plus</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@

/**
* A class that represents a Changefeed.
*
* <p>
* The changefeed is a log of changes that are organized into hourly segments.
* The listing of the $blobchangefeed/idx/segments/ virtual directory shows these segments ordered by time.
* The path of the segment describes the start of the hourly time-range that the segment represents.
* This list can be used to filter out the segments of logs that are interest.
*
* <p>
* Note: The time represented by the segment is approximate with bounds of 15 minutes. So to ensure consumption of
* all records within a specified time, consume the consecutive previous and next hour segment.
*/
Expand Down Expand Up @@ -56,7 +56,7 @@ class Changefeed {
this.endTime = TimeUtils.roundUpToNearestHour(endTime);
this.userCursor = userCursor;
this.segmentFactory = segmentFactory;
String urlHost = null;
String urlHost;
try {
urlHost = new URL(client.getBlobContainerUrl()).getHost();
} catch (MalformedURLException e) {
Expand Down Expand Up @@ -120,11 +120,11 @@ Mono<OffsetDateTime> populateLastConsumable() {
/* Parse JSON for last consumable. */
.flatMap(jsonNode -> {
/* Last consumable time. The latest time the changefeed can safely be read from.*/
OffsetDateTime lastConsumableTime = OffsetDateTime.parse(jsonNode.get("lastConsumable").asText());
OffsetDateTime lastConsumableTime = OffsetDateTime.parse(String.valueOf(jsonNode.get("lastConsumable")));
/* Soonest time between lastConsumable and endTime. */
OffsetDateTime safeEndTime = this.endTime;
if (lastConsumableTime.isBefore(endTime)) {
safeEndTime = lastConsumableTime.plusHours(1); /* Add an hour since end time is non inclusive. */
safeEndTime = lastConsumableTime.plusHours(1); /* Add an hour since end time is non-inclusive. */
}
return Mono.just(safeEndTime);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
package com.azure.storage.blob.changefeed;

import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.changefeed.implementation.models.ChangefeedCursor;
import com.azure.storage.blob.changefeed.implementation.models.BlobChangefeedEventWrapper;
import com.azure.storage.blob.changefeed.implementation.models.ChangefeedCursor;
import com.azure.storage.blob.changefeed.implementation.models.SegmentCursor;
import com.azure.storage.blob.changefeed.implementation.models.ShardCursor;
import com.azure.storage.blob.changefeed.implementation.util.DownloadUtils;
import com.fasterxml.jackson.databind.JsonNode;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Represents a Segment in Changefeed.
Expand Down Expand Up @@ -66,14 +66,15 @@ Flux<BlobChangefeedEventWrapper> getEvents() {
.concatMap(Shard::getEvents);
}

private List<Shard> getShards(JsonNode node) {
@SuppressWarnings("unchecked")
private List<Shard> getShards(Map<String, Object> node) {
List<Shard> shards = new ArrayList<>();

/* Iterate over each shard element. */
for (JsonNode shard : node.withArray(CHUNK_FILE_PATHS)) {
for (Object shard : (List<Object>) node.get(CHUNK_FILE_PATHS)) {
/* Strip out the changefeed container name and the subsequent / */
String shardPath =
shard.asText().substring(BlobChangefeedClientBuilder.CHANGEFEED_CONTAINER_NAME.length() + 1);
String shardPath = String.valueOf(shard)
.substring(BlobChangefeedClientBuilder.CHANGEFEED_CONTAINER_NAME.length() + 1);

ShardCursor shardCursor = null; /* By default, read shard from the beginning. */
if (userCursor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
package com.azure.storage.blob.changefeed.implementation.models;

import com.azure.core.annotation.Fluent;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.azure.json.JsonProviders;
import com.azure.json.JsonReader;
import com.azure.json.JsonSerializable;
import com.azure.json.JsonToken;
import com.azure.json.JsonWriter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.Objects;

Expand All @@ -22,24 +24,12 @@
* Represents a cursor for BlobChangefeed.
*/
@Fluent
public class ChangefeedCursor {

public class ChangefeedCursor implements JsonSerializable<ChangefeedCursor> {
private static final ClientLogger LOGGER = new ClientLogger(ChangefeedCursor.class);
private static final ObjectMapper MAPPER = new ObjectMapper()
.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE)
.registerModule(new JavaTimeModule());

@JsonProperty("CursorVersion")
private int cursorVersion;

@JsonProperty("UrlHost")
private String urlHost;

@JsonProperty("EndTime")
@JsonFormat(shape = JsonFormat.Shape.STRING)
private OffsetDateTime endTime;

@JsonProperty("CurrentSegmentCursor")
private SegmentCursor currentSegmentCursor;

/**
Expand Down Expand Up @@ -174,9 +164,11 @@ public ChangefeedCursor setCurrentSegmentCursor(SegmentCursor currentSegmentCurs
* @return The resulting serialized cursor.
*/
public String serialize() {
try {
return MAPPER.writer().writeValueAsString(this);
} catch (JsonProcessingException e) {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
JsonWriter jsonWriter = JsonProviders.createWriter(outputStream)) {
jsonWriter.writeJson(this).flush();
return outputStream.toString(StandardCharsets.UTF_8.name());
} catch (IOException e) {
throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
}
}
Expand All @@ -188,13 +180,55 @@ public String serialize() {
* @return The resulting {@link ChangefeedCursor cursor}.
*/
public static ChangefeedCursor deserialize(String cursor, ClientLogger logger) {
try {
return MAPPER.readerFor(ChangefeedCursor.class).readValue(cursor);
try (JsonReader jsonReader = JsonProviders.createReader(cursor)) {
return fromJson(jsonReader);
} catch (IOException e) {
throw logger.logExceptionAsError(new UncheckedIOException(e));
}
}

@Override
public JsonWriter toJson(JsonWriter jsonWriter) throws IOException {
return jsonWriter.writeStartObject()
.writeIntField("CursorVersion", cursorVersion)
.writeStringField("UrlHost", urlHost)
.writeStringField("EndTime", endTime == null ? null : endTime.toString())
.writeJsonField("CurrentSegmentCursor", currentSegmentCursor)
.writeEndObject();
}

/**
* Deserialize a SegmentCursor from JSON.
*
* @param jsonReader The JSON reader to deserialize from.
* @return The deserialized SegmentCursor.
* @throws IOException If the JSON object cannot be properly deserialized.
*/
public static ChangefeedCursor fromJson(JsonReader jsonReader) throws IOException {
return jsonReader.readObject(reader -> {
ChangefeedCursor changefeedCursor = new ChangefeedCursor();

while (reader.nextToken() != JsonToken.END_OBJECT) {
String fieldName = reader.getFieldName();
reader.nextToken();

if ("CursorVersion".equals(fieldName)) {
changefeedCursor.cursorVersion = reader.getInt();
} else if ("UrlHost".equals(fieldName)) {
changefeedCursor.urlHost = reader.getString();
} else if ("EndTime".equals(fieldName)) {
changefeedCursor.endTime = CoreUtils.parseBestOffsetDateTime(reader.getString());
} else if ("CurrentSegmentCursor".equals(fieldName)) {
changefeedCursor.currentSegmentCursor = SegmentCursor.fromJson(reader);
} else {
reader.skipChildren();
}
}

return changefeedCursor;
});
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ public class InternalBlobChangefeedEventData implements BlobChangefeedEventData
* @param requestId The request id.
* @param eTag The eTag.
* @param contentType The content type.
* @param contentLength Th4e content length.
* @param contentLength The content length.
* @param blobType {@link BlobType}
* @param contentOffset The content offset.
* @param destinationUrl The destination url.
* @param sourceUrl The source url.
* @param blobUrl The blob url.
* @param recursive Whether or not this operation was recursive.
* @param recursive Whether this operation was recursive.
* @param sequencer The sequencer.
*/
public InternalBlobChangefeedEventData(String api, String clientRequestId, String requestId, String eTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
package com.azure.storage.blob.changefeed.implementation.models;

import com.azure.core.annotation.Fluent;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.azure.json.JsonReader;
import com.azure.json.JsonSerializable;
import com.azure.json.JsonToken;
import com.azure.json.JsonWriter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -15,15 +19,9 @@
* Represents a cursor for a segment in BlobChangefeed.
*/
@Fluent
public class SegmentCursor {

@JsonProperty("ShardCursors")
public class SegmentCursor implements JsonSerializable<SegmentCursor> {
private List<ShardCursor> shardCursors;

@JsonProperty("CurrentShardPath")
private String currentShardPath; // 'log/00/2020/07/06/1600/'

@JsonProperty("SegmentPath")
private String segmentPath; // 'idx/segments/2020/07/06/1600/meta.json'

/**
Expand Down Expand Up @@ -159,6 +157,45 @@ public SegmentCursor setCurrentShardPath(String currentShardPath) {
return this;
}

@Override
public JsonWriter toJson(JsonWriter jsonWriter) throws IOException {
return jsonWriter.writeStartObject()
.writeArrayField("ShardCursors", shardCursors, JsonWriter::writeJson)
.writeStringField("CurrentShardPath", currentShardPath)
.writeStringField("SegmentPath", segmentPath)
.writeEndObject();
}

/**
* Deserialize a SegmentCursor from JSON.
*
* @param jsonReader The JSON reader to deserialize from.
* @return The deserialized SegmentCursor.
* @throws IOException If the JSON object cannot be properly deserialized.
*/
public static SegmentCursor fromJson(JsonReader jsonReader) throws IOException {
return jsonReader.readObject(reader -> {
SegmentCursor segmentCursor = new SegmentCursor();

while (reader.nextToken() != JsonToken.END_OBJECT) {
String fieldName = reader.getFieldName();
reader.nextToken();

if ("ShardCursors".equals(fieldName)) {
segmentCursor.shardCursors = reader.readArray(ShardCursor::fromJson);
} else if ("CurrentShardPath".equals(fieldName)) {
segmentCursor.currentShardPath = reader.getString();
} else if ("SegmentPath".equals(fieldName)) {
segmentCursor.segmentPath = reader.getString();
} else {
reader.skipChildren();
}
}

return segmentCursor;
});
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading

0 comments on commit bfb7c6c

Please sign in to comment.