Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compress support #156

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2024 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.api;

public enum CompressMethod {
GZIP("gzip"),
SNAPPY("snappy"),
ZSTD("zstd");

private final String value;

CompressMethod(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public class Configuration {

BatchConfig batchConfig;

ContentType contentType;

CompressMethod compressMethod;

// deprecated, will use compressMethod and contentType
boolean gzipEnabled;

HttpClientConfig httpConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.api;

public enum ContentType {
JSON("application/json"),
MSGPACK("application/msgpack");

private final String value;

ContentType(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}
10 changes: 10 additions & 0 deletions opengemini-client-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.8.4</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.5.2-4</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public BaseClient(Configuration conf) {
contentEncodingHeader.add("gzip");
headers.put("Content-Encoding", contentEncodingHeader);
}

applyCodec(conf, headers);

String httpPrefix;
if (conf.getHttpConfig().tlsConfig() != null) {
httpPrefix = "https://";
Expand All @@ -77,6 +80,36 @@ public BaseClient(Configuration conf) {
scheduler.ifPresent(this::startHealthCheck);
}

private void applyCodec(Configuration config, Map<String, List<String>> headers) {
if (config.getContentType() != null) {
List<String> acceptHeader = new ArrayList<>();
switch (config.getContentType()) {
case MSGPACK:
acceptHeader.add("application/msgpack");
break;
case JSON:
acceptHeader.add("application/json");
break;
}
headers.put("Accept", acceptHeader);
}
if (config.getCompressMethod() != null) {
List<String> acceptEncodingHeader = new ArrayList<>();
switch (config.getCompressMethod()) {
case GZIP:
acceptEncodingHeader.add("gzip");
break;
case ZSTD:
acceptEncodingHeader.add("zstd");
break;
case SNAPPY:
acceptEncodingHeader.add("snappy");
break;
}
headers.put("Accept-Encoding", acceptEncodingHeader);
}
}

/**
* Health Check
* Start schedule task(period 10s) to ping all server url
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2024 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.common.compress;

public interface Compressor {

byte[] compress(byte[] data);

byte[] decompress(byte[] data);

String getName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2024 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.common.compress;

import io.opengemini.client.api.CompressMethod;
import lombok.AllArgsConstructor;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

@AllArgsConstructor
public class GzipCompressor implements Compressor {

@Override
public byte[] compress(byte[] data) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
gzipOutputStream.write(data);
gzipOutputStream.finish();
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Failed to compress data", e);
}
}

@Override
public byte[] decompress(byte[] data) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
byte[] buffer = new byte[1024];
int len;
while ((len = gzipInputStream.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, len);
}
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Failed to decompress data", e);
}
}

@Override
public String getName() {
return CompressMethod.GZIP.getValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2024 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.common.compress;

import io.opengemini.client.api.CompressMethod;
import lombok.AllArgsConstructor;
import org.xerial.snappy.Snappy;

import java.io.IOException;

@AllArgsConstructor
public class SnappyCompressor implements Compressor {

@Override
public byte[] compress(byte[] data) {
try {
return Snappy.compress(data);
} catch (IOException e) {
throw new RuntimeException("Failed to compress data", e);
}
}

@Override
public byte[] decompress(byte[] data) {
try {
return Snappy.uncompress(data);
} catch (IOException e) {
throw new RuntimeException("Failed to decompress data", e);
}
}

@Override
public String getName() {
return CompressMethod.SNAPPY.getValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.common.compress;

import com.github.luben.zstd.Zstd;
import io.opengemini.client.api.CompressMethod;

public class ZstdCompressor implements Compressor {

@Override
public byte[] compress(byte[] data) {
return Zstd.compress(data);
}

@Override
public byte[] decompress(byte[] data) {
try {
long decompressedSize = Zstd.decompressedSize(data);
byte[] decompressedData = new byte[(int) decompressedSize];
Zstd.decompress(decompressedData, data);
return decompressedData;
} catch (Exception e) {
throw new RuntimeException("Failed to decompress data", e);
}
}

@Override
public String getName() {
return CompressMethod.ZSTD.getValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2024 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.common.compress;
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024 openGemini Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.opengemini.client.common.compress;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class GzipCompressorTest {

private final GzipCompressor gzipCompressor = new GzipCompressor();

@Test
void testCompressAndDecompress() {
String originalString = "This is a test string for GZIP compression";
byte[] originalData = originalString.getBytes();

// Compress the data
byte[] compressedData = gzipCompressor.compress(originalData);
Assertions.assertNotNull(compressedData);
Assertions.assertNotEquals(0, compressedData.length);

// Decompress the data
byte[] decompressedData = gzipCompressor.decompress(compressedData);
Assertions.assertNotNull(decompressedData);
Assertions.assertArrayEquals(originalData, decompressedData);

// Verify the decompressed string is the same as the original
String decompressedString = new String(decompressedData);
Assertions.assertEquals(originalString, decompressedString);
}
}
Loading
Loading