Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Dec 2, 2024
1 parent 42a7734 commit 1551ef9
Show file tree
Hide file tree
Showing 11 changed files with 422 additions and 2 deletions.
12 changes: 11 additions & 1 deletion be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ namespace doris::vectorized {
M(TypeIndex::Date, ColumnVector<Int64>, Int64) \
M(TypeIndex::DateV2, ColumnVector<UInt32>, UInt32) \
M(TypeIndex::DateTime, ColumnVector<Int64>, Int64) \
M(TypeIndex::DateTimeV2, ColumnVector<UInt64>, UInt64)
M(TypeIndex::DateTimeV2, ColumnVector<UInt64>, UInt64) \
M(TypeIndex::IPv4, ColumnVector<UInt32>, UInt32) \
M(TypeIndex::IPv6, ColumnVector<UInt128>, UInt128)

Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
_state = state;
Expand Down Expand Up @@ -450,6 +452,10 @@ std::string JniConnector::get_jni_type(const DataTypePtr& data_type) {
return "float";
case TYPE_DOUBLE:
return "double";
case TYPE_IPV4:
return "ipv4";
case TYPE_IPV6:
return "ipv6";
case TYPE_VARCHAR:
[[fallthrough]];
case TYPE_CHAR:
Expand Down Expand Up @@ -534,6 +540,10 @@ std::string JniConnector::get_jni_type(const TypeDescriptor& desc) {
return "float";
case TYPE_DOUBLE:
return "double";
case TYPE_IPV4:
return "ipv4";
case TYPE_IPV6:
return "ipv6";
case TYPE_VARCHAR: {
buffer << "varchar(" << desc.len << ")";
return buffer.str();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -55,6 +56,9 @@ public class JavaUdfDataType {
public static final JavaUdfDataType DECIMAL64 = new JavaUdfDataType("DECIMAL64", TPrimitiveType.DECIMAL64, 8);
public static final JavaUdfDataType DECIMAL128 = new JavaUdfDataType("DECIMAL128", TPrimitiveType.DECIMAL128I,
16);

public static final JavaUdfDataType IPV4 = new JavaUdfDataType("IPV4", TPrimitiveType.IPV4, 4);
public static final JavaUdfDataType IPV6 = new JavaUdfDataType("IPV6", TPrimitiveType.IPV6, 16);
public static final JavaUdfDataType ARRAY_TYPE = new JavaUdfDataType("ARRAY_TYPE", TPrimitiveType.ARRAY, 0);
public static final JavaUdfDataType MAP_TYPE = new JavaUdfDataType("MAP_TYPE", TPrimitiveType.MAP, 0);
public static final JavaUdfDataType STRUCT_TYPE = new JavaUdfDataType("STRUCT_TYPE", TPrimitiveType.STRUCT, 0);
Expand Down Expand Up @@ -83,6 +87,8 @@ public class JavaUdfDataType {
JavaUdfDataTypeSet.add(ARRAY_TYPE);
JavaUdfDataTypeSet.add(MAP_TYPE);
JavaUdfDataTypeSet.add(STRUCT_TYPE);
JavaUdfDataTypeSet.add(IPV4);
JavaUdfDataTypeSet.add(IPV6);
}

private final String description;
Expand Down Expand Up @@ -156,6 +162,8 @@ public static Set<JavaUdfDataType> getCandidateTypes(Class<?> c) {
return Sets.newHashSet(JavaUdfDataType.ARRAY_TYPE, JavaUdfDataType.STRUCT_TYPE);
} else if (c == java.util.HashMap.class) {
return Sets.newHashSet(JavaUdfDataType.MAP_TYPE);
} else if (c == InetAddress.class) {
return Sets.newHashSet(JavaUdfDataType.IPV4, JavaUdfDataType.IPV6);
}
return Sets.newHashSet(JavaUdfDataType.INVALID_TYPE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.DateTimeException;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -58,6 +60,20 @@ public static BigInteger getBigInteger(byte[] bytes) {
return new BigInteger(originalBytes);
}

public static InetAddress getInetAddress(byte[] bytes) {
// Convert the byte order back if necessary
byte[] originalBytes = convertByteOrder(bytes);
try {
return InetAddress.getByAddress(originalBytes);
} catch (UnknownHostException e) {
return null;
}
}

public static byte[] getInetAddressBytes(InetAddress v) {
return convertByteOrder(v.getAddress()).clone();
}

public static byte[] getDecimalBytes(BigDecimal v, int scale, int size) {
BigDecimal retValue = v.setScale(scale, RoundingMode.HALF_EVEN);
BigInteger data = retValue.unscaledValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ public static Pair<Boolean, JavaUdfDataType[]> setArgTypes(Type[] parameterTypes
StructType structType = (StructType) parameterTypes[finalI];
ArrayList<StructField> fields = structType.getFields();
inputArgTypes[i].setFields(fields);
} else if (parameterTypes[finalI].isIP()) {
if (parameterTypes[finalI].isIPv4()) {
inputArgTypes[i] = new JavaUdfDataType(JavaUdfDataType.IPV4);
} else {
inputArgTypes[i] = new JavaUdfDataType(JavaUdfDataType.IPV6);
}
}
if (res.length == 0) {
return Pair.of(false, inputArgTypes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public enum Type {
DECIMAL32(4),
DECIMAL64(8),
DECIMAL128(16),
IPV4(4),
IPV6(16),
STRING(-1),
ARRAY(-1),
MAP(-1),
Expand Down Expand Up @@ -155,6 +157,18 @@ public boolean isArray() {
return type == Type.ARRAY;
}

public boolean isIpv4() {
return type == Type.IPV4;
}

public boolean isIpv6() {
return type == Type.IPV6;
}

public boolean isIp() {
return isIpv4() || isIpv6();
}

public boolean isMap() {
return type == Type.MAP;
}
Expand Down Expand Up @@ -287,6 +301,12 @@ public static ColumnType parseType(String columnName, String hiveType) {
case "double":
type = Type.DOUBLE;
break;
case "ipv4":
type = Type.IPV4;
break;
case "ipv6":
type = Type.IPV6;
break;
case "datev1":
type = Type.DATE;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -73,6 +75,19 @@ public class VectorColumn {
// todo: support pruned struct fields
private List<Integer> structFieldIndex;


public static final InetAddress DEFAULT_IPV4;
public static final InetAddress DEFAULT_IPV6;

static {
try {
DEFAULT_IPV4 = InetAddress.getByName("127.0.0.1");
DEFAULT_IPV6 = InetAddress.getByName("::1");
} catch (UnknownHostException e) {
throw new RuntimeException("Failed to initialize default InetAddress values", e);
}
}

// Create writable column
private VectorColumn(ColumnType columnType, int capacity) {
this.columnType = columnType;
Expand Down Expand Up @@ -351,6 +366,10 @@ public int appendNull(ColumnType.Type typeValue) {
return appendLong(0);
case LARGEINT:
return appendBigInteger(BigInteger.ZERO);
case IPV4:
return appendInetAddress(DEFAULT_IPV4);
case IPV6:
return appendInetAddress(DEFAULT_IPV6);
case FLOAT:
return appendFloat(0);
case DOUBLE:
Expand Down Expand Up @@ -832,6 +851,56 @@ public BigInteger[] getBigIntegerColumn(int start, int end) {
return result;
}

public byte[] getInetAddressBytes(int rowId) {
int typeSize = columnType.getTypeSize();
byte[] bytes = new byte[typeSize];
OffHeap.copyMemory(null, data + (long) rowId * typeSize, bytes, OffHeap.BYTE_ARRAY_OFFSET, typeSize);
return bytes;
}

public InetAddress getInetAddress(int rowId) {
return TypeNativeBytes.getInetAddress(getInetAddressBytes(rowId));
}

public InetAddress[] getInetAddressColumn(int start, int end) {
InetAddress[] result = new InetAddress[end - start];
for (int i = start; i < end; ++i) {
if (!isNullAt(i)) {
result[i - start] = getInetAddress(i);
}
}
return result;
}

public int appendInetAddress(InetAddress v) {
reserve(appendIndex + 1);
putInetAddress(appendIndex, v);
return appendIndex++;
}

public void appendInetAddress(InetAddress[] batch, boolean isNullable) {
reserve(appendIndex + batch.length);
for (InetAddress v : batch) {
if (v == null) {
putNull(appendIndex);
if (columnType.isIpv4()) {
putInetAddress(appendIndex, DEFAULT_IPV4);
} else {
putInetAddress(appendIndex, DEFAULT_IPV6);
}
} else {
putInetAddress(appendIndex, v);
}
appendIndex++;
}
}

private void putInetAddress(int rowId, InetAddress v) {
int typeSize = columnType.getTypeSize();
byte[] bytes = TypeNativeBytes.getInetAddressBytes(v);
OffHeap.copyMemory(bytes, OffHeap.BYTE_ARRAY_OFFSET, null, data + (long) rowId * typeSize, typeSize);
}

public int appendDecimal(BigDecimal v) {
reserve(appendIndex + 1);
putDecimal(appendIndex, v);
Expand Down Expand Up @@ -1357,6 +1426,9 @@ public Object[] newObjectContainerArray(ColumnType.Type type, int size) {
return new Long[size];
case LARGEINT:
return new BigInteger[size];
case IPV4:
case IPV6:
return new InetAddress[size];
case FLOAT:
return new Float[size];
case DOUBLE:
Expand Down Expand Up @@ -1406,6 +1478,10 @@ public void appendObjectColumn(Object[] batch, boolean isNullable) {
case LARGEINT:
appendBigInteger((BigInteger[]) batch, isNullable);
break;
case IPV4:
case IPV6:
appendInetAddress((InetAddress[]) batch, isNullable);
break;
case FLOAT:
appendFloat((Float[]) batch, isNullable);
break;
Expand Down Expand Up @@ -1463,6 +1539,9 @@ public Object[] getObjectColumn(int start, int end) {
return getLongColumn(start, end);
case LARGEINT:
return getBigIntegerColumn(start, end);
case IPV4:
case IPV6:
return getInetAddressColumn(start, end);
case FLOAT:
return getFloatColumn(start, end);
case DOUBLE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
Expand Down Expand Up @@ -321,7 +322,8 @@ public abstract class Type {
.put(PrimitiveType.FLOAT, Sets.newHashSet(Float.class, float.class))
.put(PrimitiveType.DOUBLE, Sets.newHashSet(Double.class, double.class))
.put(PrimitiveType.BIGINT, Sets.newHashSet(Long.class, long.class))
.put(PrimitiveType.IPV4, Sets.newHashSet(Integer.class, int.class))
.put(PrimitiveType.IPV4, Sets.newHashSet(InetAddress.class))
.put(PrimitiveType.IPV6, Sets.newHashSet(InetAddress.class))
.put(PrimitiveType.STRING, Sets.newHashSet(String.class))
.put(PrimitiveType.DATE, DATE_SUPPORTED_JAVA_TYPE)
.put(PrimitiveType.DATEV2, DATE_SUPPORTED_JAVA_TYPE)
Expand Down
40 changes: 40 additions & 0 deletions regression-test/data/nereids_p0/javaudf/test_javaudf_ip.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_ipv4_1 --
1 0.0.0.123 /0.0.0.123
2 0.0.12.42 /0.0.12.42
3 0.119.130.67 /0.119.130.67
4 \N null

-- !select_ipv4_2 --
1 0.0.0.123 0.0.0.123
2 0.0.0.123 0.0.0.123
3 2001:0DB8:AC10:FE01:FEED:BABE:CAFE:F00D 202.254.240.13
4 2001:0DB8:AC10:FE01:FEED:BABE:CAFE:F00D 202.254.240.13

-- !select_ipv4_3 --
nulludf/0.0.0.123udf/0.0.12.42udf/0.119.130.67udf

-- !select_ipv4_4 --
1 ["127.0.0.1", "127.0.0.1", "127.0.0.1", null, null, "127.0.0.1"]
2 ["127.0.0.1", "127.0.0.1", "127.0.0.1", null, null, "127.0.0.1"]
3 ["127.0.0.1", "127.0.0.1", "127.0.0.1", null, null, "127.0.0.1"]
4 ["127.0.0.1", "127.0.0.1", "127.0.0.1", null, null, "127.0.0.1"]

-- !select_ipv6_1 --
1 ::855d /0:0:0:0:0:0:0:855d
2 ::0.4.221.183 /0:0:0:0:0:0:4:ddb7
3 ::a:7429:d0d6:6e08:9f5f /0:0:0:a:7429:d0d6:6e08:9f5f
4 \N null

-- !select_ipv6_2 --
1 0.0.0.123 ::7b
2 0.0.0.123 ::7b
3 2001:0DB8:AC10:FE01:FEED:BABE:CAFE:F00D 2001:db8:ac10:fe01:feed:babe:cafe:f00d
4 2001:0DB8:AC10:FE01:FEED:BABE:CAFE:F00D 2001:db8:ac10:fe01:feed:babe:cafe:f00d

-- !select_ipv6_4 --
1 ["::1", "::1", "::1", null, null, "::1"]
2 ["::1", "::1", "::1", null, null, "::1"]
3 ["::1", "::1", "::1", null, null, "::1"]
4 ["::1", "::1", "::1", null, null, "::1"]

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.apache.doris.udf;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;

public class IPV4TypeTest {
// input ipv4
public String evaluate(InetAddress x) {
if (x == null) {
return "null";
}
return x.toString();
}

// output ipv4
public InetAddress evaluate(String s) {
try {
InetAddress ipv4Address = InetAddress.getByName(s);
return ipv4Address;
} catch (UnknownHostException e) {
return null;
}
}

// input array<ipv4>
public String evaluate(ArrayList<InetAddress> s) {
String ret = "";
for (InetAddress ip : s) {
ret += evaluate(ip) + "udf";
}
return ret;
}

// output array<ipv4>
public ArrayList<InetAddress> evaluate() {
ArrayList<InetAddress> ret = new ArrayList<InetAddress>();
InetAddress DEFAULT_IPV = null;
try {
DEFAULT_IPV = InetAddress.getByName("127.0.0.1");
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

ret.add(DEFAULT_IPV);
ret.add(DEFAULT_IPV);
ret.add(DEFAULT_IPV);
ret.add(null);
ret.add(null);
ret.add(DEFAULT_IPV);
return ret;
}
}
Loading

0 comments on commit 1551ef9

Please sign in to comment.