Skip to content

Commit

Permalink
[storage][stg74] quick query new output format "arrow" (#11423)
Browse files Browse the repository at this point in the history
* quick query new outputSerialization "arrow"

* for datalake, and remove preprod
  • Loading branch information
ljian3377 authored Sep 24, 2020
1 parent 9b78225 commit 58a959d
Show file tree
Hide file tree
Showing 12 changed files with 552 additions and 13 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion sdk/storage/storage-blob/review/storage-blob.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,23 @@ export interface BlobProperties {
tagCount?: number;
}

// @public
export interface BlobQueryArrowConfiguration {
kind: "arrow";
schema: BlobQueryArrowField[];
}

// @public
export interface BlobQueryArrowField {
name?: string;
precision?: number;
scale?: number;
type: BlobQueryArrowFieldType;
}

// @public
export type BlobQueryArrowFieldType = "int64" | "bool" | "timestamp[ms]" | "string" | "double" | "decimal";

// @public
export interface BlobQueryCsvTextConfiguration {
columnSeparator?: string;
Expand Down Expand Up @@ -1363,7 +1380,7 @@ export interface BlockBlobQueryOptions extends CommonOptions {
inputTextConfiguration?: BlobQueryJsonTextConfiguration | BlobQueryCsvTextConfiguration;
onError?: (error: BlobQueryError) => void;
onProgress?: (progress: TransferProgressEvent) => void;
outputTextConfiguration?: BlobQueryJsonTextConfiguration | BlobQueryCsvTextConfiguration;
outputTextConfiguration?: BlobQueryJsonTextConfiguration | BlobQueryCsvTextConfiguration | BlobQueryArrowConfiguration;
}

// @public
Expand Down
34 changes: 31 additions & 3 deletions sdk/storage/storage-blob/src/Clients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ import {
TagConditions,
MatchConditions,
ModificationConditions,
ModifiedAccessConditions
ModifiedAccessConditions,
BlobQueryArrowField
} from "./models";
import {
PageBlobGetPageRangesDiffResponse,
Expand Down Expand Up @@ -3307,6 +3308,30 @@ export interface BlobQueryCsvTextConfiguration {
hasHeaders?: boolean;
}

/**
* Options to query blob with Apache Arrow format. Only valid for {@link BlockBlobQueryOptions.outputTextConfiguration}.
*
* @export
* @interface BlobQueryArrowConfiguration
*/
export interface BlobQueryArrowConfiguration {
/**
* Kind.
*
* @type {"arrow"}
* @memberof BlobQueryArrowConfiguration
*/
kind: "arrow";

/**
* List of {@link BlobQueryArrowField} describing the schema of the data.
*
* @type {BlobQueryArrowField[]}
* @memberof BlobQueryArrowConfiguration
*/
schema: BlobQueryArrowField[];
}

/**
* Options to configure {@link BlockBlobClient.query} operation.
*
Expand All @@ -3332,10 +3357,13 @@ export interface BlockBlobQueryOptions extends CommonOptions {
/**
* Configurations for the query output.
*
* @type {BlobQueryJsonTextConfiguration | BlobQueryCsvTextConfiguration}
* @type {BlobQueryJsonTextConfiguration | BlobQueryCsvTextConfiguration| BlobQueryArrowConfiguration}
* @memberof BlockBlobQueryOptions
*/
outputTextConfiguration?: BlobQueryJsonTextConfiguration | BlobQueryCsvTextConfiguration;
outputTextConfiguration?:
| BlobQueryJsonTextConfiguration
| BlobQueryCsvTextConfiguration
| BlobQueryArrowConfiguration;
/**
* Callback to receive events on the progress of query operation.
*
Expand Down
4 changes: 3 additions & 1 deletion sdk/storage/storage-blob/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ export {
BlobDownloadResponseParsed,
ObjectReplicationPolicy,
ObjectReplicationRule,
ObjectReplicationStatus
ObjectReplicationStatus,
BlobQueryArrowField,
BlobQueryArrowFieldType
} from "./models";
export * from "./Pipeline";
export * from "./policies/AnonymousCredentialPolicy";
Expand Down
56 changes: 55 additions & 1 deletion sdk/storage/storage-blob/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ export interface ObjectReplicationRule {
* This is used when retrieving the Object Replication Properties on the source blob. The policy id for the
* destination blob is set in ObjectReplicationDestinationPolicyId of the respective method responses
* (e.g. {@link BlobProperties.ObjectReplicationDestinationPolicyId}.
*
* @export
* @interface ObjectReplicationPolicy
*/
export interface ObjectReplicationPolicy {
/**
Expand All @@ -236,7 +239,7 @@ export interface ObjectReplicationPolicy {
/**
* The Rule ID(s) and respective Replication Status(s) that are under the Policy ID.
*
* @type {string}
* @type {ObjectReplicationRule[]}
* @memberof ObjectReplicationPolicy
*/
rules: ObjectReplicationRule[];
Expand Down Expand Up @@ -265,3 +268,54 @@ export interface BlobDownloadResponseParsed extends BlobDownloadResponseModel {
*/
objectReplicationDestinationPolicyId?: string;
}

/**
* The type of a {@link BlobQueryArrowField}.
*/
export type BlobQueryArrowFieldType =
| "int64"
| "bool"
| "timestamp[ms]"
| "string"
| "double"
| "decimal";

/**
* Describe a field in {@link BlobQueryArrowConfiguration}.
*
* @export
* @interface BlobQueryArrowField
*/
export interface BlobQueryArrowField {
/**
* The type of the field.
*
* @type {BlobQueryArrowFieldType}
* @memberof BlobQueryArrowField
*/
type: BlobQueryArrowFieldType;

/**
* The name of the field.
*
* @type {string}
* @memberof BlobQueryArrowField
*/
name?: string;

/**
* The precision of the field. Required if type is "decimal".
*
* @type {number}
* @memberof BlobQueryArrowField
*/
precision?: number;

/**
* The scale of the field. Required if type is is "decimal".
*
* @type {number}
* @memberof BlobQueryArrowField
*/
scale?: number;
}
23 changes: 20 additions & 3 deletions sdk/storage/storage-blob/src/utils/utils.common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import { AbortSignalLike } from "@azure/abort-controller";
import { HttpHeaders, isNode, URLBuilder } from "@azure/core-http";

import { BlobQueryCsvTextConfiguration, BlobQueryJsonTextConfiguration } from "../Clients";
import {
BlobQueryArrowConfiguration,
BlobQueryCsvTextConfiguration,
BlobQueryJsonTextConfiguration
} from "../Clients";
import { QuerySerialization, BlobTags } from "../generated/src/models";
import { DevelopmentConnectionString, HeaderConstants, URLConstants } from "./constants";
import {
Expand Down Expand Up @@ -650,11 +654,14 @@ export function toTags(tags?: BlobTags): Tags | undefined {
* Convert BlobQueryTextConfiguration to QuerySerialization type.
*
* @export
* @param {(BlobQueryJsonTextConfiguration | BlobQueryCsvTextConfiguration)} [textConfiguration]
* @param {(BlobQueryJsonTextConfiguration | BlobQueryCsvTextConfiguration | BlobQueryArrowConfiguration)} [textConfiguration]
* @returns {(QuerySerialization | undefined)}
*/
export function toQuerySerialization(
textConfiguration?: BlobQueryJsonTextConfiguration | BlobQueryCsvTextConfiguration
textConfiguration?:
| BlobQueryJsonTextConfiguration
| BlobQueryCsvTextConfiguration
| BlobQueryArrowConfiguration
): QuerySerialization | undefined {
if (textConfiguration === undefined) {
return undefined;
Expand Down Expand Up @@ -683,6 +690,16 @@ export function toQuerySerialization(
}
}
};
case "arrow":
return {
format: {
type: "arrow",
arrowConfiguration: {
schema: textConfiguration.schema
}
}
};

default:
throw Error("Invalid BlobQueryTextConfiguration.");
}
Expand Down
24 changes: 24 additions & 0 deletions sdk/storage/storage-blob/src/utils/utils.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,30 @@ export async function streamToBuffer2(
});
}

/**
* Reads a readable stream into a buffer.
*
* @export
* @param {NodeJS.ReadableStream} stream A Node.js Readable stream
* @param {string} [encoding] Encoding of the Readable stream
* @returns {Promise<Buffer>} with the count of bytes read.
*/
export async function streamToBuffer3(
readableStream: NodeJS.ReadableStream,
encoding?: string
): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
readableStream.on("data", (data: Buffer | string) => {
chunks.push(data instanceof Buffer ? data : Buffer.from(data, encoding));
});
readableStream.on("end", () => {
resolve(Buffer.concat(chunks));
});
readableStream.on("error", reject);
});
}

/**
* ONLY AVAILABLE IN NODE.JS RUNTIME.
*
Expand Down
Loading

0 comments on commit 58a959d

Please sign in to comment.