Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port DH-11692: Add native support for java.time types #3385

Merged
merged 14 commits into from
Mar 3, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use WritableColumnSource as return type of ArrayBackedColumnSource#ge…
…tMemoryColumnSource
nbauernfeind committed Mar 3, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 41e5effb0e08f557ca19d95488524801d805ac21
Original file line number Diff line number Diff line change
@@ -90,10 +90,10 @@ public static <TYPE> void forEachGroup(@NotNull final Map<TYPE, RowSet> groupToR
* @return A pair of a flat key column source and a flat RowSet column source
*/
@SuppressWarnings("unused")
public static <TYPE> Pair<ArrayBackedColumnSource<TYPE>, ObjectArraySource<TrackingWritableRowSet>> groupingToFlatSources(
public static <TYPE> Pair<WritableColumnSource<TYPE>, ObjectArraySource<TrackingWritableRowSet>> groupingToFlatSources(
@NotNull final ColumnSource<TYPE> originalKeyColumnSource, @NotNull final Map<TYPE, RowSet> groupToRowSet) {
final int numGroups = groupToRowSet.size();
final ArrayBackedColumnSource<TYPE> resultKeyColumnSource = getMemoryColumnSource(
final WritableColumnSource<TYPE> resultKeyColumnSource = getMemoryColumnSource(
numGroups, originalKeyColumnSource.getType(), originalKeyColumnSource.getComponentType());
final ObjectArraySource<TrackingWritableRowSet> resultIndexColumnSource =
new ObjectArraySource<>(TrackingWritableRowSet.class);
@@ -138,13 +138,13 @@ public static <TYPE> void forEachResponsiveGroup(@NotNull final Map<TYPE, RowSet
* @param responsiveGroups Set to the number of responsive groups on exit
* @return A pair of a flat key column source and a flat rowSet column source
*/
public static <TYPE> Pair<ArrayBackedColumnSource<TYPE>, ObjectArraySource<TrackingWritableRowSet>> groupingToFlatSources(
public static <TYPE> Pair<WritableColumnSource<TYPE>, ObjectArraySource<TrackingWritableRowSet>> groupingToFlatSources(
@NotNull final ColumnSource<TYPE> originalKeyColumnSource,
@NotNull final Map<TYPE, RowSet> groupToRowSet,
@NotNull final RowSet intersect,
@NotNull final MutableInt responsiveGroups) {
final int numGroups = groupToRowSet.size();
final ArrayBackedColumnSource<TYPE> resultKeyColumnSource = getMemoryColumnSource(
final WritableColumnSource<TYPE> resultKeyColumnSource = getMemoryColumnSource(
numGroups, originalKeyColumnSource.getType(), originalKeyColumnSource.getComponentType());
final ObjectArraySource<TrackingWritableRowSet> resultIndexColumnSource =
new ObjectArraySource<>(TrackingWritableRowSet.class);
Original file line number Diff line number Diff line change
@@ -137,7 +137,7 @@ interface StateTrackingCallbackWithRightIndex {
// endmixin rehash

// the keys for our hash entries
private final ArrayBackedColumnSource<?>[] keySources;
private final WritableColumnSource<?>[] keySources;
// the location of any overflow entry in this bucket
private final IntegerArraySource overflowLocationSource = new IntegerArraySource();

@@ -150,7 +150,7 @@ interface StateTrackingCallbackWithRightIndex {

// the keys for overflow
private int nextOverflowLocation = 0;
private final ArrayBackedColumnSource<?> [] overflowKeySources;
private final WritableColumnSource<?> [] overflowKeySources;
// the location of the next key in an overflow bucket
private final IntegerArraySource overflowOverflowLocationSource = new IntegerArraySource();
// the overflow buckets for the state source
@@ -1300,10 +1300,10 @@ private int allocateOverflowLocation() {
return nextOverflowLocation++;
}

private static long updateWriteThroughChunks(ResettableWritableChunk<Values>[] writeThroughChunks, long currentHashLocation, ArrayBackedColumnSource<?>[] sources) {
final long firstBackingChunkPosition = sources[0].resetWritableChunkToBackingStore(writeThroughChunks[0], currentHashLocation);
private static long updateWriteThroughChunks(ResettableWritableChunk<Values>[] writeThroughChunks, long currentHashLocation, WritableColumnSource<?>[] sources) {
final long firstBackingChunkPosition = ((ChunkedBackingStoreExposedWritableSource)sources[0]).resetWritableChunkToBackingStore(writeThroughChunks[0], currentHashLocation);
for (int jj = 1; jj < sources.length; ++jj) {
if (sources[jj].resetWritableChunkToBackingStore(writeThroughChunks[jj], currentHashLocation) != firstBackingChunkPosition) {
if (((ChunkedBackingStoreExposedWritableSource)sources[jj]).resetWritableChunkToBackingStore(writeThroughChunks[jj], currentHashLocation) != firstBackingChunkPosition) {
throw new IllegalStateException("ArrayBackedColumnSources have different block sizes!");
}
if (writeThroughChunks[jj].size() != writeThroughChunks[0].size()) {
@@ -1367,9 +1367,10 @@ private void fillOverflowKeys(ColumnSource.FillContext[] fillContexts, WritableC
fillKeys(overflowKeySources, fillContexts, keyChunks, overflowLocationsChunk);
}

private static void fillKeys(ArrayBackedColumnSource<?>[] keySources, ColumnSource.FillContext[] fillContexts, WritableChunk<Values>[] keyChunks, WritableLongChunk<RowKeys> keyIndices) {
private static void fillKeys(WritableColumnSource<?>[] keySources, ColumnSource.FillContext[] fillContexts, WritableChunk<Values>[] keyChunks, WritableLongChunk<RowKeys> keyIndices) {
for (int ii = 0; ii < keySources.length; ++ii) {
keySources[ii].fillChunkUnordered(fillContexts[ii], keyChunks[ii], keyIndices);
//noinspection unchecked
((FillUnordered<Values>) keySources[ii]).fillChunkUnordered(fillContexts[ii], keyChunks[ii], keyIndices);
}
}

Original file line number Diff line number Diff line change
@@ -1873,9 +1873,9 @@ private Table snapshotHistoryInternal(final Table baseTable) {
checkInitiateOperation();

// resultColumns initially contains the trigger columns, then we insert the base columns into it
final Map<String, ArrayBackedColumnSource<?>> resultColumns = SnapshotUtils
final Map<String, WritableColumnSource<?>> resultColumns = SnapshotUtils
.createColumnSourceMap(this.getColumnSourceMap(), ArrayBackedColumnSource::getMemoryColumnSource);
final Map<String, ArrayBackedColumnSource<?>> baseColumns = SnapshotUtils.createColumnSourceMap(
final Map<String, WritableColumnSource<?>> baseColumns = SnapshotUtils.createColumnSourceMap(
baseTable.getColumnSourceMap(), ArrayBackedColumnSource::getMemoryColumnSource);
resultColumns.putAll(baseColumns);

@@ -1962,7 +1962,7 @@ private Table snapshotInternal(Table baseTable, boolean doInitialSnapshot, Strin
}

// Establish the "base" columns using the same names and types as the table being snapshotted
final Map<String, ArrayBackedColumnSource<?>> baseColumns =
final Map<String, WritableColumnSource<?>> baseColumns =
SnapshotUtils.createColumnSourceMap(baseTable.getColumnSourceMap(),
ArrayBackedColumnSource::getMemoryColumnSource);

Original file line number Diff line number Diff line change
@@ -127,7 +127,7 @@ interface StateTrackingCallback {
// endmixin rehash

// the keys for our hash entries
private final ArrayBackedColumnSource<?>[] keySources;
private final WritableColumnSource<?>[] keySources;
// the location of any overflow entry in this bucket
private final IntegerArraySource overflowLocationSource = new IntegerArraySource();

@@ -140,7 +140,7 @@ interface StateTrackingCallback {

// the keys for overflow
private int nextOverflowLocation = 0;
private final ArrayBackedColumnSource<?> [] overflowKeySources;
private final WritableColumnSource<?> [] overflowKeySources;
// the location of the next key in an overflow bucket
private final IntegerArraySource overflowOverflowLocationSource = new IntegerArraySource();
// the overflow buckets for the state source
@@ -212,8 +212,8 @@ interface StateTrackingCallback {
this.tableHashPivot = tableSize;
// endmixin rehash

overflowKeySources = new ArrayBackedColumnSource[keyColumnCount];
keySources = new ArrayBackedColumnSource[keyColumnCount];
overflowKeySources = new WritableColumnSource[keyColumnCount];
keySources = new WritableColumnSource[keyColumnCount];

keyChunkTypes = new ChunkType[keyColumnCount];
chunkHashers = new ChunkHasher[keyColumnCount];
@@ -1615,10 +1615,10 @@ private int allocateOverflowLocation() {
return nextOverflowLocation++;
}

private static long updateWriteThroughChunks(ResettableWritableChunk<Values>[] writeThroughChunks, long currentHashLocation, ArrayBackedColumnSource<?>[] sources) {
final long firstBackingChunkPosition = sources[0].resetWritableChunkToBackingStore(writeThroughChunks[0], currentHashLocation);
private static long updateWriteThroughChunks(ResettableWritableChunk<Values>[] writeThroughChunks, long currentHashLocation, WritableColumnSource<?>[] sources) {
final long firstBackingChunkPosition = ((ChunkedBackingStoreExposedWritableSource)sources[0]).resetWritableChunkToBackingStore(writeThroughChunks[0], currentHashLocation);
for (int jj = 1; jj < sources.length; ++jj) {
if (sources[jj].resetWritableChunkToBackingStore(writeThroughChunks[jj], currentHashLocation) != firstBackingChunkPosition) {
if (((ChunkedBackingStoreExposedWritableSource)sources[jj]).resetWritableChunkToBackingStore(writeThroughChunks[jj], currentHashLocation) != firstBackingChunkPosition) {
throw new IllegalStateException("ArrayBackedColumnSources have different block sizes!");
}
if (writeThroughChunks[jj].size() != writeThroughChunks[0].size()) {
@@ -1682,9 +1682,10 @@ private void fillOverflowKeys(ColumnSource.FillContext[] fillContexts, WritableC
fillKeys(overflowKeySources, fillContexts, keyChunks, overflowLocationsChunk);
}

private static void fillKeys(ArrayBackedColumnSource<?>[] keySources, ColumnSource.FillContext[] fillContexts, WritableChunk<Values>[] keyChunks, WritableLongChunk<RowKeys> keyIndices) {
private static void fillKeys(WritableColumnSource<?>[] keySources, ColumnSource.FillContext[] fillContexts, WritableChunk<Values>[] keyChunks, WritableLongChunk<RowKeys> keyIndices) {
for (int ii = 0; ii < keySources.length; ++ii) {
keySources[ii].fillChunkUnordered(fillContexts[ii], keyChunks[ii], keyIndices);
//noinspection unchecked
((FillUnordered<Values>) keySources[ii]).fillChunkUnordered(fillContexts[ii], keyChunks[ii], keyIndices);
}
}

Original file line number Diff line number Diff line change
@@ -457,7 +457,7 @@ private static SortMapping doMegaSortOne(SortingOrder order, ColumnSource<Compar
RowSet rowSet, boolean usePrev, long sortSize) {
final LongArraySource resultIndices = new LongArraySource();
resultIndices.ensureCapacity(sortSize, false);
final ArrayBackedColumnSource<?> valuesToMerge =
final WritableColumnSource<?> valuesToMerge =
ArrayBackedColumnSource.getMemoryColumnSource(0, columnSource.getType());
valuesToMerge.ensureCapacity(sortSize, false);

Original file line number Diff line number Diff line change
@@ -83,8 +83,8 @@ private static final class UnderlyingTableMaintainer {
private final Predicate<ImmutableTableLocationKey> locationKeyMatcher;

private final TrackingWritableRowSet resultRows;
private final ArrayBackedColumnSource<TableLocationKey> resultTableLocationKeys;
private final ArrayBackedColumnSource<Table> resultLocationTables;
private final WritableColumnSource<TableLocationKey> resultTableLocationKeys;
private final WritableColumnSource<Table> resultLocationTables;
private final QueryTable result;

private final UpdateSourceCombiner refreshCombiner;
Original file line number Diff line number Diff line change
@@ -121,7 +121,7 @@ interface StateTrackingCallback {
// endmixin rehash

// the keys for our hash entries
private final ArrayBackedColumnSource<?>[] keySources;
private final WritableColumnSource<?>[] keySources;
// the location of any overflow entry in this bucket
private final IntegerArraySource overflowLocationSource = new IntegerArraySource();

@@ -134,7 +134,7 @@ interface StateTrackingCallback {

// the keys for overflow
private int nextOverflowLocation = 0;
private final ArrayBackedColumnSource<?> [] overflowKeySources;
private final WritableColumnSource<?> [] overflowKeySources;
// the location of the next key in an overflow bucket
private final IntegerArraySource overflowOverflowLocationSource = new IntegerArraySource();
// the overflow buckets for the state source
@@ -197,8 +197,8 @@ interface StateTrackingCallback {
this.tableHashPivot = tableSize;
// endmixin rehash

overflowKeySources = new ArrayBackedColumnSource[keyColumnCount];
keySources = new ArrayBackedColumnSource[keyColumnCount];
overflowKeySources = new WritableColumnSource[keyColumnCount];
keySources = new WritableColumnSource[keyColumnCount];

keyChunkTypes = new ChunkType[keyColumnCount];
chunkHashers = new ChunkHasher[keyColumnCount];
@@ -1257,10 +1257,10 @@ private int allocateOverflowLocation() {
return nextOverflowLocation++;
}

private static long updateWriteThroughChunks(ResettableWritableChunk<Values>[] writeThroughChunks, long currentHashLocation, ArrayBackedColumnSource<?>[] sources) {
final long firstBackingChunkPosition = sources[0].resetWritableChunkToBackingStore(writeThroughChunks[0], currentHashLocation);
private static long updateWriteThroughChunks(ResettableWritableChunk<Values>[] writeThroughChunks, long currentHashLocation, WritableColumnSource<?>[] sources) {
final long firstBackingChunkPosition = ((ChunkedBackingStoreExposedWritableSource)sources[0]).resetWritableChunkToBackingStore(writeThroughChunks[0], currentHashLocation);
for (int jj = 1; jj < sources.length; ++jj) {
if (sources[jj].resetWritableChunkToBackingStore(writeThroughChunks[jj], currentHashLocation) != firstBackingChunkPosition) {
if (((ChunkedBackingStoreExposedWritableSource)sources[jj]).resetWritableChunkToBackingStore(writeThroughChunks[jj], currentHashLocation) != firstBackingChunkPosition) {
throw new IllegalStateException("ArrayBackedColumnSources have different block sizes!");
}
if (writeThroughChunks[jj].size() != writeThroughChunks[0].size()) {
@@ -1324,9 +1324,10 @@ private void fillOverflowKeys(ColumnSource.FillContext[] fillContexts, WritableC
fillKeys(overflowKeySources, fillContexts, keyChunks, overflowLocationsChunk);
}

private static void fillKeys(ArrayBackedColumnSource<?>[] keySources, ColumnSource.FillContext[] fillContexts, WritableChunk<Values>[] keyChunks, WritableLongChunk<RowKeys> keyIndices) {
private static void fillKeys(WritableColumnSource<?>[] keySources, ColumnSource.FillContext[] fillContexts, WritableChunk<Values>[] keyChunks, WritableLongChunk<RowKeys> keyIndices) {
for (int ii = 0; ii < keySources.length; ++ii) {
keySources[ii].fillChunkUnordered(fillContexts[ii], keyChunks[ii], keyIndices);
//noinspection unchecked
((FillUnordered<Values>)keySources[ii]).fillChunkUnordered(fillContexts[ii], keyChunks[ii], keyIndices);
}
}

Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ public static Table streamToAppendOnlyTable(final Table streamTable) {

ConstructSnapshot.callDataSnapshotFunction("streamToAppendOnlyTable", swapListener.makeSnapshotControl(),
(boolean usePrev, long beforeClockValue) -> {
final Map<String, ArrayBackedColumnSource<?>> columns = new LinkedHashMap<>();
final Map<String, WritableColumnSource<?>> columns = new LinkedHashMap<>();
final Map<String, ? extends ColumnSource<?>> columnSourceMap =
baseStreamTable.getColumnSourceMap();
final int columnCount = columnSourceMap.size();
@@ -62,7 +62,7 @@ public static Table streamToAppendOnlyTable(final Table streamTable) {
for (Map.Entry<String, ? extends ColumnSource<?>> nameColumnSourceEntry : columnSourceMap
.entrySet()) {
final ColumnSource<?> existingColumn = nameColumnSourceEntry.getValue();
final ArrayBackedColumnSource<?> newColumn = ArrayBackedColumnSource.getMemoryColumnSource(
final WritableColumnSource<?> newColumn = ArrayBackedColumnSource.getMemoryColumnSource(
0, existingColumn.getType(), existingColumn.getComponentType());
columns.put(nameColumnSourceEntry.getKey(), newColumn);
// for the source columns, we would like to read primitives instead of objects in cases
Loading