Skip to content

Commit

Permalink
update dictionarybuilder and recordbatchwriter to support delta dicti…
Browse files Browse the repository at this point in the history
…onaries
  • Loading branch information
trxcllnt committed Jun 9, 2019
1 parent 8b0752f commit b12d842
Show file tree
Hide file tree
Showing 26 changed files with 300 additions and 273 deletions.
4 changes: 2 additions & 2 deletions js/bin/print-buffer-alignment.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ const { VectorLoader } = require(`../targets/apache-arrow/visitor/vectorloader`)
})().catch((e) => { console.error(e); process.exit(1); });

function loadRecordBatch(schema, header, body) {
return new RecordBatch(schema, header.length, new VectorLoader(body, header.nodes, header.buffers).visitMany(schema.fields));
return new RecordBatch(schema, header.length, new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany(schema.fields));
}

function loadDictionaryBatch(header, body, dictionaryType) {
return RecordBatch.new(new VectorLoader(body, header.nodes, header.buffers).visitMany([dictionaryType]));
return RecordBatch.new(new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany([dictionaryType]));
}
16 changes: 2 additions & 14 deletions js/src/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,7 @@ export abstract class Builder<T extends DataType = any, TNull = any> {
* @nocollapse
*/
public static throughIterable<T extends DataType = any, TNull = any>(options: IterableBuilderOptions<T, TNull>) {
const build = throughIterable(options);
if (!DataType.isDictionary(options.type)) {
return build;
}
return function*(source: Iterable<T['TValue'] | TNull>) {
const chunks = []; for (const chunk of build(source)) { chunks.push(chunk); } yield* chunks;
};
return throughIterable(options);
}

/**
Expand Down Expand Up @@ -192,13 +186,7 @@ export abstract class Builder<T extends DataType = any, TNull = any> {
* @nocollapse
*/
public static throughAsyncIterable<T extends DataType = any, TNull = any>(options: IterableBuilderOptions<T, TNull>) {
const build = throughAsyncIterable(options);
if (!DataType.isDictionary(options.type)) {
return build;
}
return async function* (source: Iterable<T['TValue'] | TNull> | AsyncIterable<T['TValue'] | TNull>) {
const chunks = []; for await (const chunk of build(source)) { chunks.push(chunk); } yield* chunks;
};
return throughAsyncIterable(options);
}

/**
Expand Down
46 changes: 25 additions & 21 deletions js/src/builder/dictionary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ export interface DictionaryBuilderOptions<T extends DataType = any, TNull = any>
/** @ignore */
export class DictionaryBuilder<T extends Dictionary, TNull = any> extends Builder<T, TNull> {

protected _codes = Object.create(null);
protected _dictionariesOffset: number;
protected _dictionary: Vector<T['dictionary']> | null;
protected _keysToIndices: { [key: string]: number };
public readonly indices: IntBuilder<T['indices']>;
public readonly dictionary: Builder<T['dictionary']>;

constructor({ 'type': type, 'nullValues': nulls, 'dictionaryHashFunction': hashFn }: DictionaryBuilderOptions<T, TNull>) {
super({ type });
super({ type: new Dictionary(type.dictionary, type.indices, type.id, type.isOrdered) as T });
this._dictionary = null;
this._nulls = <any> null;
this._dictionariesOffset = 0;
this._keysToIndices = Object.create(null);
this.indices = Builder.new({ 'type': this.type.indices, 'nullValues': nulls }) as IntBuilder<T['indices']>;
this.dictionary = Builder.new({ 'type': this.type.dictionary, 'nullValues': null }) as Builder<T['dictionary']>;
if (typeof hashFn === 'function') {
Expand All @@ -46,9 +51,9 @@ export class DictionaryBuilder<T extends Dictionary, TNull = any> extends Builde
public get values() { return this.indices.values; }
public get nullCount() { return this.indices.nullCount; }
public get nullBitmap() { return this.indices.nullBitmap; }
public get byteLength() { return this.indices.byteLength; }
public get reservedLength() { return this.indices.reservedLength; }
public get reservedByteLength() { return this.indices.reservedByteLength; }
public get byteLength() { return this.indices.byteLength + this.dictionary.byteLength; }
public get reservedLength() { return this.indices.reservedLength + this.dictionary.reservedLength; }
public get reservedByteLength() { return this.indices.reservedByteLength + this.dictionary.reservedByteLength; }
public isValid(value: T['TValue'] | TNull) { return this.indices.isValid(value); }
public setValid(index: number, valid: boolean) {
const indices = this.indices;
Expand All @@ -57,37 +62,36 @@ export class DictionaryBuilder<T extends Dictionary, TNull = any> extends Builde
return valid;
}
public setValue(index: number, value: T['TValue']) {
let keysToCodesMap = this._codes;
let keysToIndices = this._keysToIndices;
let key = this.valueToKey(value);
let idx = keysToCodesMap[key];
let idx = keysToIndices[key];
if (idx === undefined) {
keysToCodesMap[key] = idx = this.dictionary.append(value).length - 1;
keysToIndices[key] = idx = this._dictionariesOffset + this.dictionary.append(value).length - 1;
}
return this.indices.setValue(index, idx);
}
public flush() {
const chunk = this.indices.flush().clone(this.type);
const type = this.type;
const prev = this._dictionary;
const curr = this.dictionary.toVector();
const data = this.indices.flush().clone(type);
data.dictionary = prev ? prev.concat(curr) : curr;
this.finished || (this._dictionariesOffset += curr.length);
this._dictionary = data.dictionary as Vector<T['dictionary']>;
this.clear();
return chunk;
return data;
}
public finish() {
this.type.dictionaryVector = Vector.new(this.dictionary.finish().flush());
this.indices.finish();
this.dictionary.finish();
return super.finish();
}
public clear() {
this.indices.clear();
this.dictionary.clear();
return super.clear();
}
public valueToKey(val: any) {
let str = typeof val === 'string' ? val : `${val}`;
let h1 = 0xdeadbeef ^ 0, h2 = 0x41c6ce57 ^ 0;
for (let i = -1, n = str.length, ch; ++i < n;) {
ch = str.charCodeAt(i);
h1 = Math.imul(h1 ^ ch, 2654435761);
h2 = Math.imul(h2 ^ ch, 1597334677);
}
h1 = Math.imul(h1 ^ h1 >>> 16, 2246822507) ^ Math.imul(h2 ^ h2 >>> 13, 3266489909);
h2 = Math.imul(h2 ^ h2 >>> 16, 2246822507) ^ Math.imul(h1 ^ h1 >>> 13, 3266489909);
return 4294967296 * (2097151 & h2) + (h1 >>> 0) as any;
return (typeof val === 'string' ? val : `${val}`) as any;
}
}
2 changes: 1 addition & 1 deletion js/src/column.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export class Column<T extends DataType = any>

if (typeof field === 'string') {
const type = chunks[0].data.type;
field = new Field(field, type, chunks.some(({ nullCount }) => nullCount > 0));
field = new Field(field, type, true);
} else if (!field.nullable && chunks.some(({ nullCount }) => nullCount > 0)) {
field = field.clone({ nullable: true });
}
Expand Down
19 changes: 13 additions & 6 deletions js/src/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ export class Data<T extends DataType = DataType> {
public readonly offset: number;
public readonly stride: number;
public readonly childData: Data[];

/**
* The dictionary for this Vector, if any. Only used for Dictionary type.
*/
public dictionary?: Vector;

public readonly values: Buffers<T>[BufferType.DATA];
// @ts-ignore
public readonly typeIds: Buffers<T>[BufferType.TYPE];
Expand Down Expand Up @@ -98,8 +104,9 @@ export class Data<T extends DataType = DataType> {
return nullCount;
}

constructor(type: T, offset: number, length: number, nullCount?: number, buffers?: Partial<Buffers<T>> | Data<T>, childData?: (Data | Vector)[]) {
constructor(type: T, offset: number, length: number, nullCount?: number, buffers?: Partial<Buffers<T>> | Data<T>, childData?: (Data | Vector)[], dictionary?: Vector) {
this.type = type;
this.dictionary = dictionary;
this.offset = Math.floor(Math.max(offset || 0, 0));
this.length = Math.floor(Math.max(length || 0, 0));
this._nullCount = Math.floor(Math.max(nullCount || 0, -1));
Expand All @@ -123,7 +130,7 @@ export class Data<T extends DataType = DataType> {
}

public clone<R extends DataType>(type: R, offset = this.offset, length = this.length, nullCount = this._nullCount, buffers: Buffers<R> = <any> this, childData: (Data | Vector)[] = this.childData) {
return new Data(type, offset, length, nullCount, buffers, childData);
return new Data(type, offset, length, nullCount, buffers, childData, this.dictionary);
}

public slice(offset: number, length: number): Data<T> {
Expand Down Expand Up @@ -173,12 +180,12 @@ export class Data<T extends DataType = DataType> {
// Convenience methods for creating Data instances for each of the Arrow Vector types
//
/** @nocollapse */
public static new<T extends DataType>(type: T, offset: number, length: number, nullCount?: number, buffers?: Partial<Buffers<T>> | Data<T>, childData?: (Data | Vector)[]): Data<T> {
public static new<T extends DataType>(type: T, offset: number, length: number, nullCount?: number, buffers?: Partial<Buffers<T>> | Data<T>, childData?: (Data | Vector)[], dictionary?: Vector): Data<T> {
if (buffers instanceof Data) { buffers = buffers.buffers; } else if (!buffers) { buffers = [] as Partial<Buffers<T>>; }
switch (type.typeId) {
case Type.Null: return <unknown> Data.Null( <unknown> type as Null, offset, length, nullCount || 0, buffers[BufferType.VALIDITY]) as Data<T>;
case Type.Int: return <unknown> Data.Int( <unknown> type as Int, offset, length, nullCount || 0, buffers[BufferType.VALIDITY], buffers[BufferType.DATA] || []) as Data<T>;
case Type.Dictionary: return <unknown> Data.Dictionary( <unknown> type as Dictionary, offset, length, nullCount || 0, buffers[BufferType.VALIDITY], buffers[BufferType.DATA] || []) as Data<T>;
case Type.Dictionary: return <unknown> Data.Dictionary( <unknown> type as Dictionary, offset, length, nullCount || 0, buffers[BufferType.VALIDITY], buffers[BufferType.DATA] || [], dictionary!) as Data<T>;
case Type.Float: return <unknown> Data.Float( <unknown> type as Float, offset, length, nullCount || 0, buffers[BufferType.VALIDITY], buffers[BufferType.DATA] || []) as Data<T>;
case Type.Bool: return <unknown> Data.Bool( <unknown> type as Bool, offset, length, nullCount || 0, buffers[BufferType.VALIDITY], buffers[BufferType.DATA] || []) as Data<T>;
case Type.Decimal: return <unknown> Data.Decimal( <unknown> type as Decimal, offset, length, nullCount || 0, buffers[BufferType.VALIDITY], buffers[BufferType.DATA] || []) as Data<T>;
Expand Down Expand Up @@ -207,8 +214,8 @@ export class Data<T extends DataType = DataType> {
return new Data(type, offset, length, nullCount, [undefined, toArrayBufferView(type.ArrayType, data), toUint8Array(nullBitmap)]);
}
/** @nocollapse */
public static Dictionary<T extends Dictionary>(type: T, offset: number, length: number, nullCount: number, nullBitmap: NullBuffer, data: DataBuffer<T>) {
return new Data(type, offset, length, nullCount, [undefined, toArrayBufferView<T['TArray']>(type.indices.ArrayType, data), toUint8Array(nullBitmap)]);
public static Dictionary<T extends Dictionary>(type: T, offset: number, length: number, nullCount: number, nullBitmap: NullBuffer, data: DataBuffer<T>, dictionary: Vector<T['dictionary']>) {
return new Data(type, offset, length, nullCount, [undefined, toArrayBufferView<T['TArray']>(type.indices.ArrayType, data), toUint8Array(nullBitmap)], [], dictionary);
}
/** @nocollapse */
public static Float<T extends Float>(type: T, offset: number, length: number, nullCount: number, nullBitmap: NullBuffer, data: DataBuffer<T>) {
Expand Down
4 changes: 2 additions & 2 deletions js/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ export type BuilderType<T extends Type | DataType = any, TNull = any> =

/** @ignore */
export type VectorCtor<T extends Type | DataType | VectorType> =
T extends VectorType ? VectorCtorType<T> :
T extends VectorType ? VectorCtorType<T> :
T extends Type ? VectorCtorType<VectorType<T>> :
T extends DataType ? VectorCtorType<VectorType<T['TType']>> :
VectorCtorType<vecs.BaseVector>
Expand All @@ -157,7 +157,7 @@ export type BuilderCtor<T extends Type | DataType = any> =
/** @ignore */
export type DataTypeCtor<T extends Type | DataType | VectorType = any> =
T extends DataType ? ConstructorType<T> :
T extends VectorType ? ConstructorType<T['type']> :
T extends VectorType ? ConstructorType<T['type']> :
T extends Type ? ConstructorType<TypeToDataType<T>> :
never
;
Expand Down
15 changes: 0 additions & 15 deletions js/src/io/node/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class BuilderDuplex<T extends DataType = any, TNull = any> extends Duplex {

constructor(builder: Builder<T, TNull>, options: BuilderDuplexOptions<T, TNull>) {

const isDictionary = DataType.isDictionary(builder.type);
const { queueingStrategy = 'count', autoDestroy = true } = options;
const { highWaterMark = queueingStrategy !== 'bytes' ? 1000 : 2 ** 14 } = options;

Expand All @@ -58,20 +57,6 @@ class BuilderDuplex<T extends DataType = any, TNull = any> extends Duplex {
this._builder = builder;
this._desiredSize = highWaterMark;
this._getSize = queueingStrategy !== 'bytes' ? builderLength : builderByteLength;

if (isDictionary) {
let chunks: any[] = [];
this.push = (chunk: any, _?: string) => {
if (chunk !== null) {
chunks.push(chunk);
return true;
}
const chunks_ = chunks;
chunks = [];
chunks_.forEach((x) => super.push(x));
return super.push(null) && false;
};
}
}
_read(size: number) {
this._maybeFlush(this._builder, this._desiredSize = size);
Expand Down
16 changes: 0 additions & 16 deletions js/src/io/whatwg/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,6 @@ export class BuilderTransform<T extends DataType = any, TNull = any> {
'highWaterMark': writableHighWaterMark,
'size': (value: T['TValue'] | TNull) => this._writeValueAndReturnChunkSize(value),
});

if (DataType.isDictionary(builderOptions.type)) {
let chunks: any[] = [];
this._enqueue = (controller: ReadableStreamDefaultController<V<T>>, chunk: V<T> | null) => {
this._bufferedSize = 0;
if (chunk !== null) {
chunks.push(chunk);
} else {
const chunks_ = chunks;
chunks = [];
chunks_.forEach((x) => controller.enqueue(x));
controller.close();
this._controller = null;
}
};
}
}

private _writeValueAndReturnChunkSize(value: T['TValue'] | TNull) {
Expand Down
29 changes: 13 additions & 16 deletions js/src/ipc/metadata/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import { DictionaryBatch, RecordBatch, FieldNode, BufferRegion } from './message
import { TimeUnit, Precision, IntervalUnit, UnionMode, DateUnit } from '../../enum';

/** @ignore */
export function schemaFromJSON(_schema: any, dictionaries: Map<number, DataType> = new Map(), dictionaryFields: Map<number, Field<Dictionary>[]> = new Map()) {
export function schemaFromJSON(_schema: any, dictionaries: Map<number, DataType> = new Map()) {
return new Schema(
schemaFieldsFromJSON(_schema, dictionaries, dictionaryFields),
schemaFieldsFromJSON(_schema, dictionaries),
customMetadataFromJSON(_schema['customMetadata']),
dictionaries, dictionaryFields
dictionaries
);
}

Expand All @@ -53,13 +53,13 @@ export function dictionaryBatchFromJSON(b: any) {
}

/** @ignore */
function schemaFieldsFromJSON(_schema: any, dictionaries?: Map<number, DataType>, dictionaryFields?: Map<number, Field<Dictionary>[]>) {
return (_schema['fields'] || []).filter(Boolean).map((f: any) => Field.fromJSON(f, dictionaries, dictionaryFields));
function schemaFieldsFromJSON(_schema: any, dictionaries?: Map<number, DataType>) {
return (_schema['fields'] || []).filter(Boolean).map((f: any) => Field.fromJSON(f, dictionaries));
}

/** @ignore */
function fieldChildrenFromJSON(_field: any, dictionaries?: Map<number, DataType>, dictionaryFields?: Map<number, Field<Dictionary>[]>): Field[] {
return (_field['children'] || []).filter(Boolean).map((f: any) => Field.fromJSON(f, dictionaries, dictionaryFields));
function fieldChildrenFromJSON(_field: any, dictionaries?: Map<number, DataType>): Field[] {
return (_field['children'] || []).filter(Boolean).map((f: any) => Field.fromJSON(f, dictionaries));
}

/** @ignore */
Expand Down Expand Up @@ -93,19 +93,18 @@ function nullCountFromJSON(validity: number[]) {
}

/** @ignore */
export function fieldFromJSON(_field: any, dictionaries?: Map<number, DataType>, dictionaryFields?: Map<number, Field<Dictionary>[]>) {
export function fieldFromJSON(_field: any, dictionaries?: Map<number, DataType>) {

let id: number;
let keys: TKeys | null;
let field: Field | void;
let dictMeta: any;
let type: DataType<any>;
let dictType: Dictionary;
let dictField: Field<Dictionary>;

// If no dictionary encoding
if (!dictionaries || !dictionaryFields || !(dictMeta = _field['dictionary'])) {
type = typeFromJSON(_field, fieldChildrenFromJSON(_field, dictionaries, dictionaryFields));
if (!dictionaries || !(dictMeta = _field['dictionary'])) {
type = typeFromJSON(_field, fieldChildrenFromJSON(_field, dictionaries));
field = new Field(_field['name'], type, _field['nullable'], customMetadataFromJSON(_field['customMetadata']));
}
// tslint:disable
Expand All @@ -115,19 +114,17 @@ export function fieldFromJSON(_field: any, dictionaries?: Map<number, DataType>,
else if (!dictionaries.has(id = dictMeta['id'])) {
// a dictionary index defaults to signed 32 bit int if unspecified
keys = (keys = dictMeta['indexType']) ? indexTypeFromJSON(keys) as TKeys : new Int32();
dictionaries.set(id, type = typeFromJSON(_field, fieldChildrenFromJSON(_field, dictionaries, dictionaryFields)));
dictionaries.set(id, type = typeFromJSON(_field, fieldChildrenFromJSON(_field, dictionaries)));
dictType = new Dictionary(type, keys, id, dictMeta['isOrdered']);
dictField = new Field(_field['name'], dictType, _field['nullable'], customMetadataFromJSON(_field['customMetadata']));
dictionaryFields.set(id, [field = dictField]);
field = new Field(_field['name'], dictType, _field['nullable'], customMetadataFromJSON(_field['customMetadata']));
}
// If dictionary encoded, and have already seen this dictionary Id in the schema, then reuse the
// data type and wrap in a new Dictionary type and field.
else {
// a dictionary index defaults to signed 32 bit int if unspecified
keys = (keys = dictMeta['indexType']) ? indexTypeFromJSON(keys) as TKeys : new Int32();
dictType = new Dictionary(dictionaries.get(id)!, keys, id, dictMeta['isOrdered']);
dictField = new Field(_field['name'], dictType, _field['nullable'], customMetadataFromJSON(_field['customMetadata']));
dictionaryFields.get(id)!.push(field = dictField);
field = new Field(_field['name'], dictType, _field['nullable'], customMetadataFromJSON(_field['customMetadata']));
}
return field || null;
}
Expand Down
Loading

0 comments on commit b12d842

Please sign in to comment.