Skip to content

Commit

Permalink
Demonstrate usage of foreign arrow function (#9150)
Browse files Browse the repository at this point in the history
Including arrow language in the distribution by default. Added a basic example for creating an Arrow array.
Making sure that memory layout agrees with Arrow specification (padding, continuous allocation of memory chunks).
Related to #9118.

This should unblock work on allowing serialization/deserialization to/from Parquet but I'd like to delay it to a follow up ticket as it is going to be a significant amount of specialized work.
  • Loading branch information
hubertp authored Mar 8, 2024
1 parent a3bf5a0 commit f80dd9f
Show file tree
Hide file tree
Showing 14 changed files with 772 additions and 343 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2216,8 +2216,7 @@ lazy val `engine-runner` = project
"com.sun.imageio",
"com.sun.jna.internal.Cleaner",
"com.sun.jna.Structure$FFIType",
"akka.http",
"org.enso.interpreter.arrow.util.MemoryUtil"
"akka.http"
)
)
.dependsOn(assembly)
Expand Down Expand Up @@ -3030,7 +3029,8 @@ buildEngineDistribution := {
val _ = (`engine-runner` / assembly).value
updateLibraryManifests.value
val modulesToCopy = componentModulesPaths.value.map(_.data)
val engineModules = Seq(file("runtime.jar"))
val arrow = Seq((`runtime-language-arrow` / Compile / packageBin).value)
val engineModules = Seq(file("runtime.jar")) ++ arrow
val root = engineDistributionRoot.value
val log = streams.value.log
val cacheFactory = streams.value.cacheStoreFactory
Expand Down
1 change: 1 addition & 0 deletions distribution/lib/Standard/Base/0.0.0-dev/src/Polyglot.enso
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Polyglot
Reads the element in a given polyglot array object.

Arguments:
- array: The array on which to perform the operation.
- index: The index to get the element from.
read_array_element : Any -> Integer -> Any
read_array_element array index = @Builtin_Method "Polyglot.read_array_element"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public record Result(PhysicalLayout physicalLayout, LogicalLayout logicalLayout,

public static Result parse(Source source) {
String src = source.getCharacters().toString();
Matcher m = ARRAY_PATTERN.matcher(src);
Matcher m = NEW_ARRAY_CONSTR.matcher(src);
if (m.find()) {
try {
var layout = LogicalLayout.valueOf(m.group(1));
Expand All @@ -36,8 +36,8 @@ public static Result parse(Source source) {
return null;
}

private static final Pattern ARRAY_PATTERN = Pattern.compile("new\\[(.+)\\]");
private static final Pattern CAST_PATTERN = Pattern.compile("cast\\[(.+)\\]");
private static final Pattern NEW_ARRAY_CONSTR = Pattern.compile("^new\\[(.+)\\]$");
private static final Pattern CAST_PATTERN = Pattern.compile("^cast\\[(.+)\\]$");

public enum Mode {
Allocate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
import com.oracle.truffle.api.library.CachedLibrary;
import com.oracle.truffle.api.library.ExportLibrary;
import com.oracle.truffle.api.library.ExportMessage;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.enso.interpreter.arrow.LogicalLayout;
import org.enso.interpreter.arrow.util.MemoryUtil;

@ExportLibrary(InteropLibrary.class)
public class ArrowCastToFixedSizeArrayFactory implements TruffleObject {
Expand Down Expand Up @@ -45,7 +42,7 @@ static Object doDate32(
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException, ArityException, UnsupportedTypeException {
var unit = ArrowFixedArrayDate.DateUnit.Day;
var unit = LogicalLayout.Date32;
return new ArrowFixedArrayDate(pointer(args, iop, unit), unit);
}

Expand All @@ -55,7 +52,7 @@ static Object doDate64(
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException, ArityException, UnsupportedTypeException {
var unit = ArrowFixedArrayDate.DateUnit.Millisecond;
var unit = LogicalLayout.Date64;
return new ArrowFixedArrayDate(pointer(args, iop, unit), unit);
}

Expand All @@ -65,7 +62,7 @@ static Object doInt8(
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException, ArityException, UnsupportedTypeException {
var unit = ArrowFixedArrayInt.IntUnit.Byte1;
var unit = LogicalLayout.Int8;
return new ArrowFixedArrayInt(pointer(args, iop, unit), unit);
}

Expand All @@ -75,7 +72,7 @@ static Object doInt16(
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException, ArityException, UnsupportedTypeException {
var unit = ArrowFixedArrayInt.IntUnit.Byte2;
var unit = LogicalLayout.Int16;
return new ArrowFixedArrayInt(pointer(args, iop, unit), unit);
}

Expand All @@ -85,7 +82,7 @@ static Object doInt32(
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException, ArityException, UnsupportedTypeException {
var unit = ArrowFixedArrayInt.IntUnit.Byte4;
var unit = LogicalLayout.Int32;
return new ArrowFixedArrayInt(pointer(args, iop, unit), unit);
}

Expand All @@ -95,7 +92,7 @@ static Object doInt64(
Object[] args,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException, ArityException, UnsupportedTypeException {
var unit = ArrowFixedArrayInt.IntUnit.Byte8;
var unit = LogicalLayout.Int64;
return new ArrowFixedArrayInt(pointer(args, iop, unit), unit);
}

Expand All @@ -114,20 +111,16 @@ private static ByteBufferDirect pointer(Object[] args, InteropLibrary interop, S
new Object[] {args[0]}, "Size of allocated memory is invalid");
}

var size = interop.asInt(args[1]);
var targetSize = size * unit.sizeInBytes();
ByteBuffer buffer = MemoryUtil.directBuffer(interop.asLong(args[0]), targetSize);
buffer.order(ByteOrder.LITTLE_ENDIAN);
var capacity = interop.asInt(args[1]);
if (args.length == 3) {
if (!interop.isNumber(args[2]) || !interop.fitsInLong(args[2])) {
throw UnsupportedTypeException.create(
new Object[] {args[2]}, "Address of non-null bitmap is invalid");
}
ByteBuffer validityMap =
MemoryUtil.directBuffer(interop.asLong(args[2]), (int) Math.ceil(size / 8) + 1);
return new ByteBufferDirect(buffer, validityMap);
return ByteBufferDirect.fromAddress(
interop.asLong(args[0]), interop.asLong(args[2]), capacity, unit);
} else {
return new ByteBufferDirect(buffer, size);
return ByteBufferDirect.fromAddress(interop.asLong(args[0]), capacity, unit);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,40 @@
package org.enso.interpreter.arrow.runtime;

import com.oracle.truffle.api.CompilerDirectives;
import com.oracle.truffle.api.dsl.Cached;
import com.oracle.truffle.api.dsl.ImportStatic;
import com.oracle.truffle.api.dsl.Specialization;
import com.oracle.truffle.api.interop.InteropLibrary;
import com.oracle.truffle.api.interop.TruffleObject;
import com.oracle.truffle.api.interop.UnsupportedMessageException;
import com.oracle.truffle.api.library.CachedLibrary;
import com.oracle.truffle.api.library.ExportLibrary;
import com.oracle.truffle.api.library.ExportMessage;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import org.enso.interpreter.arrow.LogicalLayout;

@ExportLibrary(InteropLibrary.class)
public final class ArrowFixedArrayDate implements TruffleObject {
private final int size;
private final ByteBufferDirect buffer;
private final LogicalLayout unit;

private final DateUnit unit;

public ArrowFixedArrayDate(int size, DateUnit unit) {
public ArrowFixedArrayDate(int size, LogicalLayout unit) {
this.size = size;
this.unit = unit;
this.buffer = allocateBuffer(size * unit.sizeInBytes(), size);
this.buffer = ByteBufferDirect.forSize(size, unit);
}

public ArrowFixedArrayDate(ByteBufferDirect buffer, DateUnit unit)
public ArrowFixedArrayDate(ByteBufferDirect buffer, LogicalLayout unit)
throws UnsupportedMessageException {
this.size = buffer.capacity() / unit.sizeInBytes();
this.unit = unit;
this.buffer = buffer;
}

public DateUnit getUnit() {
public LogicalLayout getUnit() {
return unit;
}

Expand All @@ -47,86 +44,42 @@ public boolean hasArrayElements() {
}

@ExportMessage
@ImportStatic(ArrowFixedArrayDate.DateUnit.class)
@ImportStatic(LogicalLayout.class)
static class ReadArrayElement {
@Specialization(guards = "receiver.getUnit() == Day")
@Specialization(guards = "receiver.getUnit() == Date32")
static Object doDay(ArrowFixedArrayDate receiver, long index)
throws UnsupportedMessageException {
if (receiver.buffer.isNull((int) index)) {
return NullValue.get();
}
var at = typeAdjustedIndex(index, receiver.unit);
var daysSinceEpoch = receiver.buffer.getInt(at);
var localDate = localDateFromDays(daysSinceEpoch);
return new ArrowDate(localDate);
return readDay(receiver.buffer, index);
}

@Specialization(guards = "receiver.getUnit() == Millisecond")
@Specialization(guards = "receiver.getUnit() == Date64")
static Object doMilliseconds(ArrowFixedArrayDate receiver, long index)
throws UnsupportedMessageException {
if (receiver.buffer.isNull((int) index)) {
return NullValue.get();
}
var at = typeAdjustedIndex(index, receiver.unit);
var secondsPlusNanoSinceEpoch = receiver.buffer.getLong(at);
var seconds = Math.floorDiv(secondsPlusNanoSinceEpoch, nanoDiv);
var nano = Math.floorMod(secondsPlusNanoSinceEpoch, nanoDiv);
var zonedDateTime = zonedDateTimeFromSeconds(seconds, nano, utc);
return new ArrowZonedDateTime(zonedDateTime);
return readMilliseconds(receiver.buffer, index);
}
}

@ExportMessage
@ImportStatic(ArrowFixedArrayDate.DateUnit.class)
static class WriteArrayElement {
@Specialization(guards = "receiver.getUnit() == Day")
static void doDay(
ArrowFixedArrayDate receiver,
long index,
Object value,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
if (!iop.isDate(value)) {
throw UnsupportedMessageException.create();
}
var at = typeAdjustedIndex(index, receiver.unit);
var time = iop.asDate(value).toEpochDay();
receiver.buffer.putInt(at, Math.toIntExact(time));
}

@Specialization(guards = {"receiver.getUnit() == Millisecond", "!iop.isNull(value)"})
static void doMilliseconds(
ArrowFixedArrayDate receiver,
long index,
Object value,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop)
throws UnsupportedMessageException {
if (!iop.isDate(value) || !iop.isTime(value)) {
throw UnsupportedMessageException.create();
}

var at = typeAdjustedIndex(index, receiver.unit);
if (iop.isTimeZone(value)) {
var zoneDateTimeInstant =
instantForZone(iop.asDate(value), iop.asTime(value), iop.asTimeZone(value), utc);
var secondsPlusNano =
zoneDateTimeInstant.getEpochSecond() * nanoDiv + zoneDateTimeInstant.getNano();
receiver.buffer.putLong(at, secondsPlusNano);
} else {
var dateTime = instantForOffset(iop.asDate(value), iop.asTime(value), ZoneOffset.UTC);
var secondsPlusNano = dateTime.getEpochSecond() * nanoDiv + dateTime.getNano();
receiver.buffer.putLong(at, secondsPlusNano);
}
static Object readDay(ByteBufferDirect buffer, long index) throws UnsupportedMessageException {
if (buffer.isNull((int) index)) {
return NullValue.get();
}
var at = typeAdjustedIndex(index, 4);
var daysSinceEpoch = buffer.getInt(at);
var localDate = localDateFromDays(daysSinceEpoch);
return new ArrowDate(localDate);
}

@Specialization(guards = "iop.isNull(value)")
static void doNull(
ArrowFixedArrayDate receiver,
long index,
Object value,
@Cached.Shared("interop") @CachedLibrary(limit = "1") InteropLibrary iop) {
receiver.buffer.setNull((int) index);
static Object readMilliseconds(ByteBufferDirect buffer, long index)
throws UnsupportedMessageException {
if (buffer.isNull((int) index)) {
return NullValue.get();
}
var at = typeAdjustedIndex(index, 8);
var secondsPlusNanoSinceEpoch = buffer.getLong(at);
var seconds = Math.floorDiv(secondsPlusNanoSinceEpoch, NANO_DIV);
var nano = Math.floorMod(secondsPlusNanoSinceEpoch, NANO_DIV);
var zonedDateTime = zonedDateTimeFromSeconds(seconds, nano, UTC);
return new ArrowZonedDateTime(zonedDateTime);
}

@ExportMessage
Expand All @@ -139,16 +92,6 @@ boolean isArrayElementReadable(long index) {
return index >= 0 && index < size && !buffer.isNull((int) index);
}

@ExportMessage
boolean isArrayElementModifiable(long index) {
return index >= 0 && index < size;
}

@ExportMessage
boolean isArrayElementInsertable(long index) {
return index >= 0 && index < size;
}

@ExportLibrary(InteropLibrary.class)
static class ArrowDate implements TruffleObject {
private LocalDate date;
Expand Down Expand Up @@ -207,11 +150,6 @@ public ZoneId asTimeZone() {
}
}

@CompilerDirectives.TruffleBoundary
private static ByteBufferDirect allocateBuffer(int sizeInBytes, int size) {
return new ByteBufferDirect(sizeInBytes, size);
}

@CompilerDirectives.TruffleBoundary
private static LocalDate localDateFromDays(int daysSinceEpoch) {
return LocalDate.ofEpochDay(daysSinceEpoch);
Expand All @@ -222,37 +160,11 @@ private static ZonedDateTime zonedDateTimeFromSeconds(long seconds, long nano, Z
return Instant.ofEpochSecond(seconds, nano).atZone(zone);
}

@CompilerDirectives.TruffleBoundary
private static Instant instantForZone(
LocalDate date, LocalTime time, ZoneId zone, ZoneId target) {
return date.atTime(time).atZone(zone).withZoneSameLocal(target).toInstant();
}

@CompilerDirectives.TruffleBoundary
private static Instant instantForOffset(LocalDate date, LocalTime time, ZoneOffset offset) {
return date.atTime(time).toInstant(offset);
}

public enum DateUnit implements SizeInBytes {
Day(4),
Millisecond(8);

private final int bytes;

DateUnit(int bytes) {
this.bytes = bytes;
}

public int sizeInBytes() {
return bytes;
}
}

private static final long nanoDiv = 1000000000L;
static final long NANO_DIV = 1000000000L;

private static final ZoneId utc = ZoneId.of("UTC");
static final ZoneId UTC = ZoneId.of("UTC");

private static int typeAdjustedIndex(long index, SizeInBytes unit) {
return Math.toIntExact(index * unit.sizeInBytes());
static int typeAdjustedIndex(long index, int daySizeInBytes) {
return Math.toIntExact(index * daySizeInBytes);
}
}
Loading

0 comments on commit f80dd9f

Please sign in to comment.