diff --git a/rush.json b/rush.json index 75483477fc65..eac316fdb2d5 100644 --- a/rush.json +++ b/rush.json @@ -457,6 +457,11 @@ "projectFolder": "sdk/storage/storage-blob", "versionPolicyName": "client" }, + { + "packageName": "@azure/storage-blob-change-feed", + "projectFolder": "sdk/storage/storage-blob-change-feed", + "versionPolicyName": "client" + }, { "packageName": "@azure/storage-file-share", "projectFolder": "sdk/storage/storage-file-share", @@ -493,4 +498,4 @@ "versionPolicyName": "utility" } ] -} +} \ No newline at end of file diff --git a/sdk/storage/storage-blob-change-feed/.vscode/extensions.json b/sdk/storage/storage-blob-change-feed/.vscode/extensions.json new file mode 100644 index 000000000000..c83e26348e1f --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/.vscode/extensions.json @@ -0,0 +1,3 @@ +{ + "recommendations": ["esbenp.prettier-vscode"] +} diff --git a/sdk/storage/storage-blob-change-feed/.vscode/launch.json b/sdk/storage/storage-blob-change-feed/.vscode/launch.json new file mode 100644 index 000000000000..24dbfc9d74c4 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/.vscode/launch.json @@ -0,0 +1,59 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "launch", + "name": "Debug Javascript Samples", + "program": "${workspaceFolder}/samples/javascript/basic.js", + "preLaunchTask": "npm: build:js-samples" + }, + { + "type": "node", + "request": "launch", + "name": "Debug Typescript Samples", + "program": "${workspaceFolder}/samples/typescript/basic.ts", + "preLaunchTask": "npm: build:ts-samples", + "outFiles": ["${workspaceFolder}/dist-esm/samples/typescript/*.js"] + }, + { + "type": "node", + "request": "launch", + "name": "Debug Mocha Test [Without Rollup]", + "program": "${workspaceFolder}/node_modules/mocha/bin/_mocha", + "args": [ + "-r", + "ts-node/register", + "--timeout", + "999999", + "--colors", + "${workspaceFolder}/test/*.spec.ts", + "${workspaceFolder}/test/node/*.spec.ts" + ], + "env": { "TS_NODE_COMPILER_OPTIONS": "{\"module\": \"commonjs\"}" }, + "envFile": "${workspaceFolder}/../.env", + "console": "integratedTerminal", + "internalConsoleOptions": "neverOpen", + "protocol": "inspector" + }, + { + "type": "node", + "request": "launch", + "name": "Debug Unit Tests", + "program": "${workspaceFolder}/node_modules/mocha/bin/_mocha", + "args": [ + "-u", + "tdd", + "--timeout", + "999999", + "--colors", + "${workspaceFolder}/dist-test/index.node.js" + ], + "internalConsoleOptions": "openOnSessionStart", + "preLaunchTask": "npm: build:test" + } + ] +} diff --git a/sdk/storage/storage-blob-change-feed/.vscode/settings.json b/sdk/storage/storage-blob-change-feed/.vscode/settings.json new file mode 100644 index 000000000000..7ceb5ace3e9d --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/.vscode/settings.json @@ -0,0 +1,27 @@ +{ + "files.exclude": { + "**/.git": true, + "**/.svn": true, + "**/.DS_Store": true + }, + "[typescript]": { + "editor.formatOnSave": true, + "editor.tabSize": 2, + "editor.detectIndentation": false + }, + "[json]": { + "editor.formatOnSave": true, + "editor.tabSize": 2, + "editor.detectIndentation": false + }, + "[yaml]": { + "editor.formatOnSave": true, + "editor.tabSize": 2, + "editor.detectIndentation": false + }, + "editor.rulers": [ + 100 + ], + "typescript.preferences.quoteStyle": "double", + "javascript.preferences.quoteStyle": "double" + } \ No newline at end of file diff --git a/sdk/storage/storage-blob-change-feed/LICENSE b/sdk/storage/storage-blob-change-feed/LICENSE new file mode 100644 index 000000000000..ea8fb1516028 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2020 Microsoft + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/sdk/storage/storage-blob-change-feed/api-extractor.json b/sdk/storage/storage-blob-change-feed/api-extractor.json new file mode 100644 index 000000000000..24dd27bb1c02 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/api-extractor.json @@ -0,0 +1,31 @@ +{ + "$schema": "https://developer.microsoft.com/json-schemas/api-extractor/v7/api-extractor.schema.json", + "mainEntryPointFilePath": "typings/latest/storage-blob-change-feed/src/index.d.ts", + "docModel": { + "enabled": false + }, + "apiReport": { + "enabled": true, + "reportFolder": "./review" + }, + "dtsRollup": { + "enabled": true, + "untrimmedFilePath": "", + "publicTrimmedFilePath": "./typings/latest/storage-blob-change-feed.d.ts" + }, + "messages": { + "tsdocMessageReporting": { + "default": { + "logLevel": "none" + } + }, + "extractorMessageReporting": { + "ae-missing-release-tag": { + "logLevel": "none" + }, + "ae-unresolved-link": { + "logLevel": "none" + } + } + } +} \ No newline at end of file diff --git a/sdk/storage/storage-blob-change-feed/package.json b/sdk/storage/storage-blob-change-feed/package.json new file mode 100644 index 000000000000..9ff00638608f --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/package.json @@ -0,0 +1,159 @@ +{ + "name": "@azure/storage-blob-change-feed", + "sdk-type": "client", + "version": "12.0.0-preview", + "description": "Microsoft Azure Storage SDK for JavaScript - Blob Change Feed", + "main": "./dist/index.js", + "module": "./dist-esm/storage-blob-change-feed/src/index.js", + "browser": { + "./dist-esm/storage-blob-change-feed/src/utils/utils.node.js": "./dist-esm/storage-blob-change-feed/src/utils/utils.browser.js", + "./dist-esm/storage-blob-change-feed/test/utils/index.js": "./dist-esm/storage-blob-change-feed/test/utils/index.browser.js", + "fs": false, + "os": false, + "process": false + }, + "types": "./typings/latest/storage-blob-change-feed.d.ts", + "typesVersions": { + "<3.6": { + "*": [ + "./typings/3.1/storage-blob-change-feed.d.ts" + ] + } + }, + "engine": { + "node": ">=8.0.0" + }, + "scripts": { + "build:es6": "tsc -p tsconfig.json", + "build:nodebrowser": "rollup -c 2>&1", + "build:samples": "npm run clean && npm run build:es6 && cross-env ONLY_NODE=true rollup -c 2>&1 && npm run build:prep-samples", + "build:prep-samples": "node ../../../common/scripts/prep-samples.js && cd samples && tsc", + "build:test": "npm run build:es6 && rollup -c rollup.test.config.js 2>&1", + "build:types": "downlevel-dts typings/latest typings/3.1", + "build": "npm run build:es6 && npm run build:nodebrowser && api-extractor run --local && npm run build:types", + "check-format": "prettier --list-different --config ../../.prettierrc.json \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"", + "clean": "rimraf dist dist-esm dist-test typings temp dist-browser/*.js* dist-browser/*.zip statistics.html coverage coverage-browser .nyc_output *.tgz *.log test*.xml TEST*.xml", + "clean:samples": "rimraf samples/javascript/node_modules samples/typescript/node_modules samples/typescript/dist samples/typescript/package-lock.json samples/javascript/package-lock.json", + "extract-api": "tsc -p . && api-extractor run --local", + "execute:js-samples": "node ../../../common/scripts/run-samples.js samples/javascript/", + "execute:ts-samples": "node ../../../common/scripts/run-samples.js samples/typescript/dist/samples/typescript/src/", + "execute:samples": "npm run build:samples && npm run execute:js-samples && npm run execute:ts-samples", + "format": "prettier --write --config ../../.prettierrc.json \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"", + "integration-test:browser": "karma start --single-run", + "integration-test:node": "nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --full-trace -t 300000 dist-esm/storage-blob-change-feed/test/*.spec.js dist-esm/storage-blob-change-feed/test/node/*.spec.js", + "integration-test": "npm run integration-test:node && npm run integration-test:browser", + "pack": "npm pack 2>&1", + "prebuild": "npm run clean", + "test:browser": "npm run clean && npm run build:test && npm run unit-test:browser", + "test:node": "npm run clean && npm run build:test && npm run unit-test:node", + "test": "npm run clean && npm run build:test && npm run unit-test", + "unit-test:browser": "karma start --single-run", + "unit-test:node": "mocha --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --full-trace -t 120000 dist-test/index.node.js", + "unit-test": "npm run unit-test:node && npm run unit-test:browser", + "emulator-tests": "cross-env STORAGE_CONNECTION_STRING=UseDevelopmentStorage=true && npm run test:node" + }, + "files": [ + "BreakingChanges.md", + "types/", + "dist/", + "dist-browser/", + "dist-esm/storage-blob-change-feed/src/", + "dist-esm/storage-internal-avro/src/", + "typings/latest/storage-blob-change-feed.d.ts", + "typings/3.1/storage-blob-change-feed.d.ts", + "README.md", + "LICENSE" + ], + "repository": { + "type": "git", + "url": "git+https://github.com/Azure/azure-sdk-for-js.git" + }, + "keywords": [ + "Azure", + "Storage", + "Blob", + "ChangeFeed", + "Node.js", + "TypeScript", + "JavaScript", + "Browser" + ], + "author": "Microsoft Corporation", + "license": "MIT", + "bugs": { + "url": "https://github.com/Azure/azure-sdk-for-js/issues" + }, + "homepage": "https://github.com/Azure/azure-sdk-for-js#readme", + "sideEffects": false, + "//metadata": { + "constantPaths": [ + { + "path": "src/utils/constants.ts", + "prefix": "SDK_VERSION" + } + ] + }, + "dependencies": { + "@azure/storage-blob": "^12.1.2", + "@azure/abort-controller": "^1.0.0", + "@azure/core-http": "^1.1.1", + "@azure/core-lro": "^1.0.2", + "@azure/core-paging": "^1.1.1", + "@azure/core-tracing": "1.0.0-preview.8", + "@azure/logger": "^1.0.0", + "tslib": "^1.10.0" + }, + "devDependencies": { + "@azure/identity": "^1.1.0-preview", + "@azure/test-utils-recorder": "^1.0.0", + "@microsoft/api-extractor": "7.7.11", + "@rollup/plugin-multi-entry": "^3.0.0", + "@rollup/plugin-replace": "^2.2.0", + "@types/mocha": "^7.0.2", + "@types/node": "^8.0.0", + "@typescript-eslint/eslint-plugin": "^2.0.0", + "@typescript-eslint/parser": "^2.0.0", + "assert": "^1.4.1", + "cross-env": "^6.0.3", + "dotenv": "^8.2.0", + "downlevel-dts": "~0.4.0", + "es6-promise": "^4.2.5", + "eslint": "^6.1.0", + "eslint-config-prettier": "^6.0.0", + "eslint-plugin-no-null": "^1.0.2", + "eslint-plugin-no-only-tests": "^2.3.0", + "eslint-plugin-promise": "^4.1.1", + "esm": "^3.2.18", + "inherits": "^2.0.3", + "karma": "^4.0.1", + "karma-chrome-launcher": "^3.0.0", + "karma-coverage": "^2.0.0", + "karma-edge-launcher": "^0.4.2", + "karma-env-preprocessor": "^0.1.1", + "karma-firefox-launcher": "^1.1.0", + "karma-ie-launcher": "^1.0.0", + "karma-json-preprocessor": "^0.3.3", + "karma-json-to-file-reporter": "^1.0.1", + "karma-junit-reporter": "^2.0.1", + "karma-mocha": "^1.3.0", + "karma-mocha-reporter": "^2.2.5", + "karma-remap-istanbul": "^0.6.0", + "mocha": "^7.1.1", + "mocha-junit-reporter": "^1.18.0", + "nyc": "^14.0.0", + "prettier": "^1.16.4", + "puppeteer": "^2.0.0", + "rimraf": "^3.0.0", + "rollup": "^1.16.3", + "@rollup/plugin-commonjs": "11.0.2", + "@rollup/plugin-node-resolve": "^7.0.0", + "rollup-plugin-shim": "^1.0.0", + "rollup-plugin-sourcemaps": "^0.4.2", + "rollup-plugin-terser": "^5.1.1", + "rollup-plugin-visualizer": "^3.1.1", + "source-map-support": "^0.5.9", + "ts-node": "^8.3.0", + "typescript": "~3.8.3", + "util": "^0.12.1" + } +} \ No newline at end of file diff --git a/sdk/storage/storage-blob-change-feed/review/storage-blob-change-feed.api.md b/sdk/storage/storage-blob-change-feed/review/storage-blob-change-feed.api.md new file mode 100644 index 000000000000..02586dbb008f --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/review/storage-blob-change-feed.api.md @@ -0,0 +1,61 @@ +## API Report File for "@azure/storage-blob-change-feed" + +> Do not edit this file. It is a report generated by [API Extractor](https://api-extractor.com/). + +```ts + +import { BlobServiceClient } from '@azure/storage-blob'; +import { PagedAsyncIterableIterator } from '@azure/core-paging'; + +// @public (undocumented) +export class BlobChangeFeedClient { + constructor(blobServiceClient: BlobServiceClient); + // (undocumented) + getChanges(options?: ChangeFeedGetChangesOptions): PagedAsyncIterableIterator; + } + +// @public (undocumented) +export interface BlobChangeFeedEvent { + // Warning: (ae-forgotten-export) The symbol "BlobChangeFeedEventData" needs to be exported by the entry point index.d.ts + // + // (undocumented) + data: BlobChangeFeedEventData; + // (undocumented) + dataVersion?: string; + // (undocumented) + eventTime: string; + // Warning: (ae-forgotten-export) The symbol "BlobChangeFeedEventType" needs to be exported by the entry point index.d.ts + // + // (undocumented) + eventType: BlobChangeFeedEventType; + // (undocumented) + id: string; + // (undocumented) + metadataVersion: string; + // (undocumented) + subject: string; + // (undocumented) + topic: string; +} + +// @public (undocumented) +export class BlobChangeFeedEventPage { + constructor(); + // (undocumented) + continuationToken: string; + // (undocumented) + events: BlobChangeFeedEvent[]; +} + +// @public (undocumented) +export interface ChangeFeedGetChangesOptions { + // (undocumented) + end?: Date; + // (undocumented) + start?: Date; +} + + +// (No @packageDocumentation comment for this package) + +``` diff --git a/sdk/storage/storage-blob-change-feed/rollup.base.config.js b/sdk/storage/storage-blob-change-feed/rollup.base.config.js new file mode 100644 index 000000000000..3f08ad78868c --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/rollup.base.config.js @@ -0,0 +1,178 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import nodeResolve from "@rollup/plugin-node-resolve"; +import multiEntry from "@rollup/plugin-multi-entry"; +import cjs from "@rollup/plugin-commonjs"; +import replace from "@rollup/plugin-replace"; +import { terser } from "rollup-plugin-terser"; +import sourcemaps from "rollup-plugin-sourcemaps"; +import shim from "rollup-plugin-shim"; +// import visualizer from "rollup-plugin-visualizer"; + +const version = require("./package.json").version; +const banner = [ + "/*!", + ` * Azure Storage SDK for JavaScript - Blob, ${version}`, + " * Copyright (c) Microsoft and contributors. All rights reserved.", + " */" +].join("\n"); + +const pkg = require("./package.json"); +const depNames = Object.keys(pkg.dependencies); +const production = process.env.NODE_ENV === "production"; + +export function nodeConfig(test = false) { + const externalNodeBuiltins = [ + "@azure/core-http", + "crypto", + "fs", + "events", + "os", + "stream", + "util" + ]; + const baseConfig = { + input: "dist-esm/storage-blob-change-feed/src/index.js", + external: depNames.concat(externalNodeBuiltins), + output: { + file: "dist/index.js", + format: "cjs", + sourcemap: true + }, + preserveSymlinks: false, + plugins: [ + sourcemaps(), + replace({ + delimiters: ["", ""], + values: { + // replace dynamic checks with if (true) since this is for node only. + // Allows rollup's dead code elimination to be more aggressive. + "if (isNode)": "if (true)" + } + }), + nodeResolve({ preferBuiltins: true }), + cjs() + ], + onwarn(warning, warn) { + if (warning.code === "CIRCULAR_DEPENDENCY") { + throw new Error(warning.message); + } + warn(warning); + } + }; + + if (test) { + // entry point is every test file + baseConfig.input = [ + "dist-esm/storage-blob-change-feed/test/*.spec.js", + "dist-esm/storage-blob-change-feed/test/node/*.spec.js", + "dist-esm/storage-blob-change-feed/src/index.js" + ]; + baseConfig.plugins.unshift(multiEntry()); + + // different output file + baseConfig.output.file = "dist-test/index.node.js"; + + // mark assert as external + baseConfig.external.push("assert", "fs", "path", "buffer", "zlib"); + + baseConfig.context = "null"; + + // Disable tree-shaking of test code. In rollup-plugin-node-resolve@5.0.0, rollup started respecting + // the "sideEffects" field in package.json. Since our package.json sets "sideEffects=false", this also + // applies to test code, which causes all tests to be removed by tree-shaking. + baseConfig.treeshake = false; + } else if (production) { + baseConfig.plugins.push(terser()); + } + + return baseConfig; +} + +export function browserConfig(test = false) { + const baseConfig = { + input: "dist-esm/storage-blob-change-feed/src/index.js", + // input: "dist-esm/storage-blob-change-feed/src/index.browser.js", + output: { + file: "dist-browser/azure-storage-blob-change-feed.js", + banner: banner, + format: "umd", + name: "azblob", + sourcemap: true + }, + preserveSymlinks: false, + plugins: [ + sourcemaps(), + replace({ + delimiters: ["", ""], + values: { + // replace dynamic checks with if (false) since this is for + // browser only. Rollup's dead code elimination will remove + // any code guarded by if (isNode) { ... } + "if (isNode)": "if (false)" + } + }), + // fs and os are not used by the browser bundle, so just shim it + // dotenv doesn't work in the browser, so replace it with a no-op function + shim({ + dotenv: `export function config() { }`, + fs: ` + export function stat() { } + export function createReadStream() { } + export function createWriteStream() { } + `, + os: ` + export const type = 1; + export const release = 1; + `, + util: ` + export function promisify() { } + ` + }), + nodeResolve({ + mainFields: ["module", "browser"], + preferBuiltins: false + }), + cjs({ + namedExports: { + events: ["EventEmitter"], + assert: [ + "ok", + "deepEqual", + "equal", + "fail", + "strictEqual", + "deepStrictEqual", + "notDeepEqual", + "notDeepStrictEqual" + ], + "@opentelemetry/api": ["CanonicalCode", "SpanKind", "TraceFlags"] + } + }) + ], + onwarn(warning, warn) { + if (warning.code === "CIRCULAR_DEPENDENCY") { + throw new Error(warning.message); + } + warn(warning); + } + }; + + if (test) { + baseConfig.input = ["dist-esm/storage-blob-change-feed/test/*.spec.js", "dist-esm/storage-blob-change-feed/test/browser/*.spec.js"]; + baseConfig.plugins.unshift(multiEntry({ exports: false })); + baseConfig.output.file = "dist-test/index.browser.js"; + // mark fs-extra as external + baseConfig.external = ["fs-extra"]; + + baseConfig.context = "null"; + + // Disable tree-shaking of test code. In rollup-plugin-node-resolve@5.0.0, rollup started respecting + // the "sideEffects" field in package.json. Since our package.json sets "sideEffects=false", this also + // applies to test code, which causes all tests to be removed by tree-shaking. + baseConfig.treeshake = false; + } + + return baseConfig; +} diff --git a/sdk/storage/storage-blob-change-feed/rollup.config.js b/sdk/storage/storage-blob-change-feed/rollup.config.js new file mode 100644 index 000000000000..a62dabd573b4 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/rollup.config.js @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import * as base from "./rollup.base.config"; + +const inputs = []; + +if (!process.env.ONLY_BROWSER) { + inputs.push(base.nodeConfig()); +} + +// Disable this until we are ready to run rollup for the browser. +// if (!process.env.ONLY_NODE) { +// inputs.push(base.browserConfig()); +// } + +export default inputs; diff --git a/sdk/storage/storage-blob-change-feed/rollup.test.config.js b/sdk/storage/storage-blob-change-feed/rollup.test.config.js new file mode 100644 index 000000000000..ad98718cce46 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/rollup.test.config.js @@ -0,0 +1,6 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import * as base from "./rollup.base.config"; + +export default [base.nodeConfig(true), base.browserConfig(true)]; diff --git a/sdk/storage/storage-blob-change-feed/src/AvroReaderFactory.ts b/sdk/storage/storage-blob-change-feed/src/AvroReaderFactory.ts new file mode 100644 index 000000000000..9e4d58a9966a --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/AvroReaderFactory.ts @@ -0,0 +1,23 @@ +import { AvroReadable, AvroReader } from '../../storage-internal-avro/src'; + +export class AvroReaderFactory { + public buildAvroReader(dataStream: AvroReadable): AvroReader; + public buildAvroReader( + dataStream: AvroReadable, + headerStream: AvroReadable, + blockOffset: number, + eventIndex: number): AvroReader; + + public buildAvroReader( + dataStream: AvroReadable, + headerStream?: AvroReadable, + blockOffset?: number, + eventIndex?: number + ): AvroReader { + if (headerStream) { + return new AvroReader(dataStream, headerStream, blockOffset!, eventIndex!); + } else { + return new AvroReader(dataStream); + } + } +} diff --git a/sdk/storage/storage-blob-change-feed/src/BlobChangeFeedClient.ts b/sdk/storage/storage-blob-change-feed/src/BlobChangeFeedClient.ts new file mode 100644 index 000000000000..94bbc40bfd3f --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/BlobChangeFeedClient.ts @@ -0,0 +1,98 @@ +import { BlobServiceClient } from "@azure/storage-blob"; +import { PagedAsyncIterableIterator, PageSettings } from "@azure/core-paging"; +import { BlobChangeFeedEvent } from "./models/BlobChangeFeedEvent"; +import { ChangeFeedFactory } from "./ChangFeedFactory"; +import { ChangeFeed } from "./ChangFeed"; +import { CHANGE_FEED_DEFAULT_PAGE_SIZE } from "./utils/constants"; + +export interface ChangeFeedGetChangesOptions { + start?: Date; + end?: Date; +} + +export class BlobChangeFeedEventPage { + public events: BlobChangeFeedEvent[]; + public continuationToken: string; + + constructor() { + this.events = []; + this.continuationToken = ""; + } +} + +export class BlobChangeFeedClient { + /** + * blobServiceClient provided by @azure/storage-blob package. + * + * @private + * @type {BlobServiceClient} + * @memberof DataLakeServiceClient + */ + private _blobServiceClient: BlobServiceClient; + private _changeFeedFactory: ChangeFeedFactory; + + public constructor(blobServiceClient: BlobServiceClient) { + this._blobServiceClient = blobServiceClient; + this._changeFeedFactory = new ChangeFeedFactory(); + } + + public getChanges(options: ChangeFeedGetChangesOptions = {}) + : PagedAsyncIterableIterator { + const iter = this.getChange(options); + return { + /** + * @member {Promise} [next] The next method, part of the iteration protocol + */ + async next() { + return iter.next(); + }, + /** + * @member {Symbol} [asyncIterator] The connection to the async iterator, part of the iteration protocol + */ + [Symbol.asyncIterator]() { + return this; + }, + /** + * @member {Function} [byPage] Return an AsyncIterableIterator that works a page at a time + */ + byPage: (settings: PageSettings = {}) => { + return this.getPage(settings.continuationToken, settings.maxPageSize, options); + } + }; + } + + private async *getChange(options: ChangeFeedGetChangesOptions = {}) + : AsyncIterableIterator { + for await (const eventPage of this.getPage(undefined, undefined, options)) { + for (const event of eventPage.events) { + yield event; + } + } + } + + private async *getPage(continuationToken?: string, maxPageSize?: number, options: ChangeFeedGetChangesOptions = {}) + : AsyncIterableIterator { + const changeFeed: ChangeFeed = await this._changeFeedFactory.buildChangeFeed( + this._blobServiceClient, + continuationToken, + options.start, + options.end + ); + + maxPageSize = maxPageSize || CHANGE_FEED_DEFAULT_PAGE_SIZE; + while (changeFeed.hasNext()) { + let eventPage = new BlobChangeFeedEventPage(); + while (changeFeed.hasNext() && eventPage.events.length < maxPageSize) { + const event = await changeFeed.getChange(); + if (event) { + eventPage.events.push(event); + } + } + if (changeFeed.hasNext()) { + // FIXME: won't this token be too long? + eventPage.continuationToken = JSON.stringify(changeFeed.getCursor()); + } + yield eventPage; + } + } +} diff --git a/sdk/storage/storage-blob-change-feed/src/ChangFeed.ts b/sdk/storage/storage-blob-change-feed/src/ChangFeed.ts new file mode 100644 index 000000000000..488fc31e44fe --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/ChangFeed.ts @@ -0,0 +1,133 @@ +import { ContainerClient } from '@azure/storage-blob'; +import { Segment } from './Segment'; +import { SegmentFactory } from './SegmentFactory'; +import { BlobChangeFeedEvent } from "./models/BlobChangeFeedEvent"; +import { ChangeFeedCursor } from "./models/ChangeFeedCursor"; +import { getURLPath, hashString, getSegmentsInYear, minDate } from "./utils/utils.common"; + +export class ChangeFeed { + /** + * BlobContainerClient for making List Blob requests and creating Segments. + * + * @private + * @type {ContainerClient} + * @memberof ChangeFeed + */ + private readonly _containerClient?: ContainerClient; + + private readonly _segmentFactory?: SegmentFactory; + + private readonly _years: number[]; + + private _segments: string[]; + + private _currentSegment?: Segment; + + private _lastConsumable?: Date; + + private _startTime?: Date; + + private _endTime?: Date; + + private _end?: Date; + + constructor(); + constructor(containerClient: ContainerClient, + segmentFactory: SegmentFactory, + years: number[], + segments: string[], + currentSegment: Segment, + lastConsumable: Date, + startTime?: Date, + endTime?: Date + ); + + constructor(containerClient?: ContainerClient, + segmentFactory?: SegmentFactory, + years?: number[], + segments?: string[], + currentSegment?: Segment, + lastConsumable?: Date, + startTime?: Date, + endTime?: Date + ) { + this._containerClient = containerClient; + this._segmentFactory = segmentFactory; + this._years = years || []; + this._segments = segments || []; + this._currentSegment = currentSegment; + this._lastConsumable = lastConsumable; + this._startTime = startTime; + this._endTime = endTime; + if (this._lastConsumable) { + this._end = minDate(this._lastConsumable, this._endTime); + } + } + + public hasNext(): boolean { + // ChangeFeed not initialized with proper data + if (!this._currentSegment) { + return false; + } + + if (this._segments.length == 0 && this._years.length == 0 && !this._currentSegment.hasNext()) { + return false; + } + + // FIXME: == ? + return this._currentSegment.dateTime <= this._end!; + } + + public async getChange(): Promise { + if (!this.hasNext()) { + throw new Error("Change feed doesn't have any more events"); + } + + let event: BlobChangeFeedEvent | undefined; + do { + // TODO: filter by lastConsumable? + event = await this._currentSegment!.getChange(); + await this.advanceSegmentIfNecessary(); + } while (!event && this.hasNext()); + return event; + } + + public getCursor(): ChangeFeedCursor { + if (!this._currentSegment) { + throw new Error("Change Feed not fully initialized shouldn't call this function."); + } + + return { + urlHash: hashString(getURLPath(this._containerClient!.url)!), + endTime: this._endTime, + currentSegmentCursor: this._currentSegment!.getCursor() + }; + } + + private async advanceSegmentIfNecessary(): Promise { + if (!this._currentSegment) { + throw new Error("Change Feed not fully initialized shouldn't call this function."); + } + + // If the current segment has more Events, we don't need to do anything. + if (this._currentSegment.hasNext()) { + return; + } + + // If the current segment is completed, remove it + if (this._segments.length > 0) { + this._currentSegment = await this._segmentFactory!.buildSegment(this._containerClient!, this._segments.shift()!); + } + // If _segments is empty, refill it + else if (this._segments.length === 0 && this._years.length > 0) { + const year = this._years.shift(); + this._segments = await getSegmentsInYear(this._containerClient!, year!, this._startTime, this._end); + + if (this._segments.length > 0) { + this._currentSegment = await this._segmentFactory!.buildSegment(this._containerClient!, this._segments.shift()!); + } else { + throw new Error("Year in the middle should have returned valid segments."); + } + } + } +} diff --git a/sdk/storage/storage-blob-change-feed/src/ChangFeedFactory.ts b/sdk/storage/storage-blob-change-feed/src/ChangFeedFactory.ts new file mode 100644 index 000000000000..c3be341b3c5b --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/ChangFeedFactory.ts @@ -0,0 +1,132 @@ +import { BlobServiceClient, ContainerClient } from "@azure/storage-blob"; +import { ChangeFeed } from "./ChangFeed"; +import { ChangeFeedCursor } from "./models/ChangeFeedCursor"; +import { + CHANGE_FEED_CONTAINER_NAME, + CHANGE_FEED_META_SEGMENT_PATH +} from './utils/constants'; +import { + ceilToNearestHour, + floorToNearestHour, + getURLPath, + hashString, + getYearsPaths, + getSegmentsInYear, + minDate +} from './utils/utils.common'; +import { + bodyToString +} from './utils/utils.node'; +import { SegmentFactory } from "./SegmentFactory"; +import { ShardFactory } from "./ShardFactory"; +import { ChunkFactory } from "./ChunkFactory"; +import { AvroReaderFactory } from "./AvroReaderFactory"; +import { Segment } from "./Segment"; + +interface MetaSegments { + version?: number; + lastConsumable: Date; +} + +export class ChangeFeedFactory { + private readonly _segmentFactory: SegmentFactory; + + constructor(); + constructor(segmentFactory: SegmentFactory); + constructor(segmentFactory?: SegmentFactory) { + if (segmentFactory) { + this._segmentFactory = segmentFactory; + } + else { + this._segmentFactory = new SegmentFactory( + new ShardFactory( + new ChunkFactory( + new AvroReaderFactory()))); + } + } + + public async buildChangeFeed( + blobServiceClient: BlobServiceClient, + continuationToken?: string, + startTime?: Date, + endTime?: Date + ): Promise { + const containerClient = blobServiceClient.getContainerClient(CHANGE_FEED_CONTAINER_NAME); + let cursor: ChangeFeedCursor | undefined = undefined; + // Create cursor. + if (continuationToken) { + cursor = JSON.parse(continuationToken); + ChangeFeedFactory.validateCursor(containerClient, cursor!); + startTime = cursor!.currentSegmentCursor?.segmentTime; + endTime = cursor!.endTime; + } + // Round start and end time if we are not using the cursor. + else { + startTime = floorToNearestHour(startTime); + endTime = ceilToNearestHour(endTime); + } + + // Check if Change Feed has been enabled for this account. + let changeFeedContainerExists = await containerClient.exists(); + if (!changeFeedContainerExists) { + throw new Error("Change Feed hasn't been enabled on this account, or is currently being enabled."); + } + + // Get last consumable. + const blobClient = containerClient.getBlobClient(CHANGE_FEED_META_SEGMENT_PATH); + const blobDownloadRes = await blobClient.download(); + const lastConsumable = (JSON.parse(await bodyToString(blobDownloadRes)) as MetaSegments).lastConsumable; + + // Get year paths + const years: number[] = await getYearsPaths(containerClient); + + // Dequeue any years that occur before start time. + if (startTime) { + let startYear = startTime.getUTCFullYear(); + while (years.length > 0 && years[0] < startYear) { + years.shift(); + } + } + + if (years.length === 0) { + return new ChangeFeed(); + } + + let segments: string[] = []; + while (segments.length === 0 && years.length !== 0) { + const firstYear = years.shift(); + segments = await getSegmentsInYear( + containerClient, + firstYear!, + startTime, + minDate(lastConsumable, endTime)); + } + if (segments.length === 0) { + return new ChangeFeed(); + } + + const currentSegment: Segment = await this._segmentFactory.buildSegment( + containerClient, + segments.shift()!, + cursor?.currentSegmentCursor); + + return new ChangeFeed( + containerClient, + this._segmentFactory, + years, + segments, + currentSegment, + lastConsumable, + startTime, + endTime); + } + + private static validateCursor( + containerClient: ContainerClient, + cursor: ChangeFeedCursor + ): void { + if (hashString(getURLPath(containerClient.url)!) != cursor.urlHash) { + throw new Error("Cursor URL does not match container URL."); + } + } +} diff --git a/sdk/storage/storage-blob-change-feed/src/Chunk.ts b/sdk/storage/storage-blob-change-feed/src/Chunk.ts new file mode 100644 index 000000000000..64242c7dea71 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/Chunk.ts @@ -0,0 +1,46 @@ +import { AvroReader } from '../../storage-internal-avro/src'; +import { BlobChangeFeedEvent } from "./models/BlobChangeFeedEvent"; + +export class Chunk { + private readonly _avroReader: AvroReader; + private readonly _iter: AsyncIterableIterator; + + private _blockOffset: number; + public get blockOffset(): number { + return this._blockOffset; + } + + private _eventIndex: number; + public get eventIndex(): number { + return this._eventIndex; + } + + constructor( + avroReader: AvroReader, + blockOffset: number, + eventIndex: number + ) { + this._avroReader = avroReader; + this._blockOffset = blockOffset; + this._eventIndex = eventIndex; + + this._iter = this._avroReader.parseObjects(); + } + + public hasNext(): boolean { + return this._avroReader.hasNext(); + } + + public async getChange(): Promise { + if (!this.hasNext()) { + return undefined; + } + + const next = await this._iter.next(); + if (next.done) { + return undefined; + } else { + return next.value as BlobChangeFeedEvent; + } + } +} diff --git a/sdk/storage/storage-blob-change-feed/src/ChunkFactory.ts b/sdk/storage/storage-blob-change-feed/src/ChunkFactory.ts new file mode 100644 index 000000000000..54dc7770d21a --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/ChunkFactory.ts @@ -0,0 +1,43 @@ +import { AvroReaderFactory } from "./AvroReaderFactory"; +import { ContainerClient } from '@azure/storage-blob'; +import { Chunk } from "./Chunk"; +import { AvroReader } from "../../storage-internal-avro/src" +import { bodyToAvroReadable } from "./utils/utils.node"; + + +export class ChunkFactory { + private readonly _avroReaderFactory: AvroReaderFactory; + + constructor(avroReaderFactory: AvroReaderFactory) { + this._avroReaderFactory = avroReaderFactory; + } + + public async buildChunk( + containerClient: ContainerClient, + chunkPath: string, + blockOffset?: number, + eventIndex?: number + ): Promise { + const blobClient = containerClient.getBlobClient(chunkPath); + blockOffset = blockOffset || 0; + eventIndex = eventIndex || 0; + + const downloadRes = await blobClient.download(blockOffset, undefined, { + onProgress: (ev) => { + console.log(ev.loadedBytes); // for debug purpose + } + }); + + const dataStream = bodyToAvroReadable(downloadRes); + let avroReader: AvroReader; + if (blockOffset != 0) { + const headerDownloadRes = await blobClient.download(0); + const headerStream = bodyToAvroReadable(headerDownloadRes); + avroReader = this._avroReaderFactory.buildAvroReader(dataStream, headerStream, blockOffset, eventIndex); + } else { + avroReader = this._avroReaderFactory.buildAvroReader(dataStream); + } + + return new Chunk(avroReader, blockOffset, eventIndex); + } +} diff --git a/sdk/storage/storage-blob-change-feed/src/Segment.ts b/sdk/storage/storage-blob-change-feed/src/Segment.ts new file mode 100644 index 000000000000..82ebb66c28a0 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/Segment.ts @@ -0,0 +1,90 @@ +import { BlobChangeFeedEvent } from "./models/BlobChangeFeedEvent"; +// import { ContainerClient } from "@azure/storage-blob"; +import { Shard } from './Shard'; +import { SegmentCursor, ShardCursor } from './models/ChangeFeedCursor'; + +export class Segment { + // private readonly _containerClient: ContainerClient; + + private readonly _shards: Shard[]; + + private _shardDone: boolean[]; + + private _shardIndex: number; + + private _finalized: boolean; + public get finalized(): boolean { + return this._finalized; + } + + private _dateTime: Date; + public get dateTime(): Date { + return this._dateTime; + } + + constructor( + // containerClient: ContainerClient, + shards: Shard[], + shardIndex: number, + dateTime: Date, + finalized: boolean + ) { + // this._containerClient = containerClient; + this._shards = shards; + this._shardIndex = shardIndex; + this._dateTime = dateTime; + this._finalized = finalized; + + // TODO: add polyfill for Array.prototype.fill for IE11 + this._shardDone = Array(shards.length).fill(false); + } + + public hasNext(): boolean { + return this._shards.length > 0; + } + + public async getChange(): Promise { + if (!this.hasNext()) { + return undefined; + } + + let event: BlobChangeFeedEvent | undefined = undefined; + let allShardsDone = false; + // Round robin with shards + while (!event && !allShardsDone) { + const currentShard = this._shards[this._shardIndex]; + event = await currentShard.getChange(); + + if (!currentShard.hasNext()) { + this._shardDone[this._shardIndex] = true; + } + + let start = this._shardIndex; + while (true) { + this._shardIndex = (this._shardIndex + 1) % this._shards.length; + if (!this._shardDone[this._shardIndex]) { + break; + } + + if (this._shardIndex === start) { + allShardsDone = true; + break; + } + } + } + return event; + } + + public getCursor(): SegmentCursor { + let shardCursors: ShardCursor[] = []; + for (const shard of this._shards) { + shardCursors.push(shard.getCursor()); + } + + return { + shardCursors, + shardIndex: this._shardIndex, + segmentTime: this._dateTime + } + } +} diff --git a/sdk/storage/storage-blob-change-feed/src/SegmentFactory.ts b/sdk/storage/storage-blob-change-feed/src/SegmentFactory.ts new file mode 100644 index 000000000000..19e2097b454e --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/SegmentFactory.ts @@ -0,0 +1,52 @@ +import { ShardFactory } from "./ShardFactory"; +import { ContainerClient } from "@azure/storage-blob"; +import { CHANGE_FEED_STATUS_FINALIZED } from './utils/constants'; +import { Shard } from './Shard'; +import { Segment } from './Segment'; +import { SegmentCursor } from './models/ChangeFeedCursor'; +import { bodyToString } from "./utils/utils.node"; +import { parseDateFromSegmentPath } from "./utils/utils.common"; + +export interface SegmentManifest { + version?: number; + begin?: Date; + intervalSecs?: number; + status: string; + config?: any; + chunkFilePaths: string[]; +} + +export class SegmentFactory { + private readonly _shardFactory?: ShardFactory; + + // FIXME: what for? + constructor(); + constructor(shardFactory: ShardFactory); + constructor(shardFactory?: ShardFactory) { + this._shardFactory = shardFactory; + } + + public async buildSegment(containerClient: ContainerClient, + manifestPath: string, + cursor?: SegmentCursor + ): Promise { + let shards: Shard[] = []; + const dateTime: Date = parseDateFromSegmentPath(manifestPath); + const shardIndex = cursor?.shardIndex || 0; + + const blobClient = containerClient.getBlobClient(manifestPath); + const blobDownloadRes = await blobClient.download(); + const blobContent: string = await bodyToString(blobDownloadRes); + + const segmentManifest = JSON.parse(blobContent) as SegmentManifest; + const finalized = segmentManifest.status === CHANGE_FEED_STATUS_FINALIZED; + + let i = 0; + for (const shardPath of segmentManifest.chunkFilePaths) { + const shard: Shard = await this._shardFactory!.buildShard(containerClient, shardPath, cursor?.shardCursors[i++]); + shards.push(shard); + } + + return new Segment(shards, shardIndex, dateTime, finalized); + } +} diff --git a/sdk/storage/storage-blob-change-feed/src/Shard.ts b/sdk/storage/storage-blob-change-feed/src/Shard.ts new file mode 100644 index 000000000000..daa3d5b82de3 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/Shard.ts @@ -0,0 +1,63 @@ +import { ContainerClient } from "@azure/storage-blob"; +import { ChunkFactory } from "./ChunkFactory"; +import { Chunk } from "./Chunk"; +import { BlobChangeFeedEvent } from "./models/BlobChangeFeedEvent"; +import { ShardCursor } from "./models/ChangeFeedCursor"; + +export class Shard { + private readonly _containerClient: ContainerClient; + + private readonly _chunkFactory: ChunkFactory; + + private readonly _chunks: string[]; + + private _currentChunk: Chunk; + + private _chunkIndex: number; + + constructor( + containerClient: ContainerClient, + chunkFactory: ChunkFactory, + chunks: string[], + currentChunk: Chunk, + chunkIndex: number) { + this._containerClient = containerClient; + this._chunkFactory = chunkFactory; + this._chunks = chunks; + this._currentChunk = currentChunk; + this._chunkIndex = chunkIndex; + } + + public hasNext(): boolean { + return this._chunks.length > 0 || this._currentChunk.hasNext(); + } + + public async getChange(): Promise { + if (!this.hasNext()) { + return undefined; + } + + let event: BlobChangeFeedEvent | undefined = undefined; + do { + event = await this._currentChunk.getChange(); + + // Remove currentChunk if it doesn't have more events. + if (!this._currentChunk.hasNext() && this._chunks.length > 0) { + this._currentChunk = await this._chunkFactory.buildChunk( + this._containerClient, + this._chunks.shift()! + ); + this._chunkIndex++; + } + } while (!event && !this.hasNext()) + return event; + } + + public getCursor(): ShardCursor { + return { + chunkIndex: this._chunkIndex, + blockOffset: this._currentChunk.blockOffset, + eventIndex: this._currentChunk.eventIndex + }; + } +} diff --git a/sdk/storage/storage-blob-change-feed/src/ShardFactory.ts b/sdk/storage/storage-blob-change-feed/src/ShardFactory.ts new file mode 100644 index 000000000000..04fcfb7b42f4 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/ShardFactory.ts @@ -0,0 +1,39 @@ +import { ChunkFactory } from './ChunkFactory'; +import { ShardCursor } from './models/ChangeFeedCursor'; +import { Shard } from "./Shard"; +import { ContainerClient } from "@azure/storage-blob"; + +export class ShardFactory { + private readonly _chunkFactory: ChunkFactory; + + constructor(chunkFactory: ChunkFactory) { + this._chunkFactory = chunkFactory; + } + + public async buildShard( + containerClient: ContainerClient, + shardPath: string, + shardCursor?: ShardCursor + ) { + let chunks: string[] = []; + const chunkIndex: number = shardCursor?.chunkIndex || 0; + const blockOffset: number = shardCursor?.blockOffset || 0; + const eventIndex: number = shardCursor?.eventIndex || 0; + + for await (const blobItem of containerClient.listBlobsFlat({ prefix: shardPath })) { + chunks.push(blobItem.name); + } + + // Fast forward to current Chunk. + if (chunkIndex > 0) { + chunks.splice(0, chunkIndex); + } + + const currentChunk = await this._chunkFactory.buildChunk( + containerClient, + chunks.shift()!, + blockOffset, + eventIndex); + return new Shard(containerClient, this._chunkFactory, chunks, currentChunk, chunkIndex); + } +} diff --git a/sdk/storage/storage-blob-change-feed/src/index.ts b/sdk/storage/storage-blob-change-feed/src/index.ts new file mode 100644 index 000000000000..21fb6f33570c --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/index.ts @@ -0,0 +1,2 @@ +export * from './BlobChangeFeedClient'; +export { BlobChangeFeedEvent } from './models/BlobChangeFeedEvent'; diff --git a/sdk/storage/storage-blob-change-feed/src/models/BlobChangeFeedEvent.ts b/sdk/storage/storage-blob-change-feed/src/models/BlobChangeFeedEvent.ts new file mode 100644 index 000000000000..f24419dc4d08 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/models/BlobChangeFeedEvent.ts @@ -0,0 +1,33 @@ +export type BlobChangeFeedEventType = "BlobCreate" | "BlobDeleted"; + +export interface BlobChangeFeedEvent { + topic: string; + subject: string; + eventType: BlobChangeFeedEventType; + eventTime: string; + id: string; // GUID + data: BlobChangeFeedEventData; + dataVersion?: string; + metadataVersion: string; +} + + +export type BlobType = "BlockBlob" | "AppendBlob" | "PageBlob"; + +export interface BlobChangeFeedEventData { + api: string; + clientRequestId: string; // GUID + requestId: string; // GUID + eTag: string; + contentType: string; + contentLength: number; + blobType: BlobType; + url: string; + sequencer: string; + + // For HNS only. + // FIXME: what's the reference other than C# code. + destinationUrl?: string; + sourceUrl?: string; + recursive?: string; +} diff --git a/sdk/storage/storage-blob-change-feed/src/models/ChangeFeedCursor.ts b/sdk/storage/storage-blob-change-feed/src/models/ChangeFeedCursor.ts new file mode 100644 index 000000000000..57ecdfd641eb --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/models/ChangeFeedCursor.ts @@ -0,0 +1,20 @@ + +export interface ChangeFeedCursor { + urlHash: number; + endTime?: Date; + currentSegmentCursor: SegmentCursor; +} + + +export interface SegmentCursor { + shardCursors: ShardCursor[]; + shardIndex: number; + segmentTime: Date; +} + + +export interface ShardCursor { + chunkIndex: number; + blockOffset: number; + eventIndex: number; +} diff --git a/sdk/storage/storage-blob-change-feed/src/utils/constants.ts b/sdk/storage/storage-blob-change-feed/src/utils/constants.ts new file mode 100644 index 000000000000..4f56e4b2e6e6 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/utils/constants.ts @@ -0,0 +1,6 @@ +export const CHANGE_FEED_CONTAINER_NAME: string = "$blobchangefeed"; +export const CHANGE_FEED_META_SEGMENT_PATH: string = "meta/segments.json"; +export const CHANGE_FEED_DEFAULT_PAGE_SIZE: number = 512; +export const CHANGE_FEED_STATUS_FINALIZED: string = "Finalized"; +export const CHANGE_FEED_SEGMENT_PREFIX: string = "idx/segments/"; +export const CHANGE_FEED_INITIALIZATION_SEGMENT: string = "1601"; diff --git a/sdk/storage/storage-blob-change-feed/src/utils/utils.browser.ts b/sdk/storage/storage-blob-change-feed/src/utils/utils.browser.ts new file mode 100644 index 000000000000..1f5c744532e7 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/utils/utils.browser.ts @@ -0,0 +1,29 @@ +/** + * Read body from downloading operation methods to string. + * Work on both Node.js and browser environment. + * + * @param response Convenience layer methods response with downloaded body + * @param length Length of Readable stream, needed for Node.js environment + */ +export async function bodyToString( + response: { + readableStreamBody?: NodeJS.ReadableStream; + blobBody?: Promise; + }, + // tslint:disable-next-line:variable-name + _length?: number +): Promise { + const blob = await response.blobBody!; + return blobToString(blob); +} + +export async function blobToString(blob: Blob): Promise { + const fileReader = new FileReader(); + return new Promise((resolve, reject) => { + fileReader.onloadend = (ev: any) => { + resolve(ev.target!.result); + }; + fileReader.onerror = reject; + fileReader.readAsText(blob); + }); +} diff --git a/sdk/storage/storage-blob-change-feed/src/utils/utils.common.ts b/sdk/storage/storage-blob-change-feed/src/utils/utils.common.ts new file mode 100644 index 000000000000..6bd56a4e64f2 --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/utils/utils.common.ts @@ -0,0 +1,94 @@ +import { URLBuilder } from "@azure/core-http"; +import { ContainerClient } from "@azure/storage-blob"; +import { CHANGE_FEED_SEGMENT_PREFIX, CHANGE_FEED_INITIALIZATION_SEGMENT } from "./constants"; + +const millisecondsInAnHour = 60 * 60 * 1000; +export function ceilToNearestHour(date: Date | undefined): Date | undefined { + if (date === undefined) { + return undefined; + } + return new Date(Math.ceil(date.getTime() / millisecondsInAnHour) * millisecondsInAnHour); +} + +export function floorToNearestHour(date: Date | undefined): Date | undefined { + if (date === undefined) { + return undefined; + } + return new Date(Math.floor(date.getTime() / millisecondsInAnHour) * millisecondsInAnHour); +} + +/** + * Get URL path from an URL string. + * + * @export + * @param {string} url Source URL string + * @returns {(string | undefined)} + */ +export function getURLPath(url: string): string | undefined { + const urlParsed = URLBuilder.parse(url); + return urlParsed.getPath(); +} + +// s[0]*31^(n - 1) + s[1]*31^(n - 2) + ... + s[n - 1] +export function hashString(str: string): number { + let hash = 0; + for (let i = 0; i < str.length; i++) { + hash = ((hash << 5) - hash) + str.charCodeAt(i); + hash |= 0;; // Bit operation converts operands to 32-bit integers + } + return hash; +} + +export async function getYearsPaths(containerClient: ContainerClient): Promise { + let years: number[] = []; + for await (const item of containerClient.listBlobsByHierarchy("/", { prefix: CHANGE_FEED_SEGMENT_PREFIX })) { + // TODO: add String.prototype.includes polyfill for IE11 + if (item.kind === "prefix" && !item.name.includes(CHANGE_FEED_INITIALIZATION_SEGMENT)) { + let yearStr = item.name.slice(CHANGE_FEED_SEGMENT_PREFIX.length, -1); + years.push(parseInt(yearStr)); + } + } + return years.sort((a, b) => a - b); +} + +export async function getSegmentsInYear(containerClient: ContainerClient, year: number, startTime?: Date, endTime?: Date): Promise { + let segments: string[] = []; + const prefix = `${CHANGE_FEED_SEGMENT_PREFIX}${year}/` + for await (const item of containerClient.listBlobsFlat({ prefix })) { + const segmentTime = parseDateFromSegmentPath(item.name); + if (startTime && segmentTime < startTime + || endTime && segmentTime > endTime) { + continue; + } + segments.push(item.name); + } + return segments; +} + +export function parseDateFromSegmentPath(segmentPath: string): Date { + const splitPath = segmentPath.split('/'); + if (splitPath.length < 3) { + throw new Error(`${segmentPath} is not a valid segment path.`); + } + + let segmentTime = new Date(0); + segmentTime.setUTCFullYear(parseInt(splitPath[2])); + + if (splitPath.length >= 4) { + segmentTime.setUTCMonth(parseInt(splitPath[3])); + } + if (splitPath.length >= 5) { + segmentTime.setUTCDate(parseInt(splitPath[4])); + } + if (splitPath.length >= 6) { + segmentTime.setUTCHours(parseInt(splitPath[5]) / 100); + } + return segmentTime; +} + +export function minDate(dateA: Date, dateB?: Date): Date { + if (dateB && dateB < dateA) { + return dateB; + } + return dateA; +} diff --git a/sdk/storage/storage-blob-change-feed/src/utils/utils.node.ts b/sdk/storage/storage-blob-change-feed/src/utils/utils.node.ts new file mode 100644 index 000000000000..f0d33bf0446c --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/src/utils/utils.node.ts @@ -0,0 +1,41 @@ +import { AvroReadable, AvroReadableFromStream } from '../../../storage-internal-avro/src'; + +/** + * Read body from downloading operation methods to string. + * Work on both Node.js and browser environment. + * + * @param response Convenience layer methods response with downloaded body + * @param length Length of Readable stream, needed for Node.js environment + */ +export async function bodyToString( + response: { + readableStreamBody?: NodeJS.ReadableStream; + blobBody?: Promise; + }, + length?: number +): Promise { + return new Promise((resolve, reject) => { + response.readableStreamBody!.on("readable", () => { + let chunk; + chunk = response.readableStreamBody!.read(length); + if (chunk) { + resolve(chunk.toString()); + } + }); + + response.readableStreamBody!.on("error", reject); + response.readableStreamBody!.on("end", () => { + resolve(""); + }); + }); +} + + +export function bodyToAvroReadable( + response: { + readableStreamBody?: NodeJS.ReadableStream; + blobBody?: Promise; + } +): AvroReadable { + return new AvroReadableFromStream(response.readableStreamBody!); +} diff --git a/sdk/storage/storage-blob-change-feed/tsconfig.json b/sdk/storage/storage-blob-change-feed/tsconfig.json new file mode 100644 index 000000000000..f89af540f58e --- /dev/null +++ b/sdk/storage/storage-blob-change-feed/tsconfig.json @@ -0,0 +1,26 @@ +{ + "compilerOptions": { + "alwaysStrict": true, + "noImplicitAny": true, + "preserveConstEnums": true, + "sourceMap": true, + "inlineSources": true, + "newLine": "LF", + "target": "es5", + "moduleResolution": "node", + "noUnusedLocals": true, + "noUnusedParameters": true, + "strict": true, + "module": "esNext", + "outDir": "./dist-esm", + "declaration": true, + "declarationMap": true, + "importHelpers": true, + "declarationDir": "./typings/latest", + "lib": ["dom", "es5", "es6", "es7", "esnext"], + "esModuleInterop": true + }, + "compileOnSave": true, + "exclude": ["node_modules", "../storage-internal-avro/node_modules", "./samples/**"], + "include": ["./src/**/*.ts", "./test/**/*.ts", "../storage-internal-avro/**/*.ts"] +}