Skip to content

Commit

Permalink
me
Browse files Browse the repository at this point in the history
  • Loading branch information
Lin Jian committed Jun 2, 2020
1 parent a3415cd commit e0db47b
Show file tree
Hide file tree
Showing 23 changed files with 230 additions and 109 deletions.
4 changes: 2 additions & 2 deletions rush.json
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,8 @@
"versionPolicyName": "client"
},
{
"packageName": "@azure/storage-blob-change-feed",
"projectFolder": "sdk/storage/storage-blob-change-feed",
"packageName": "@azure/storage-blob-changefeed",
"projectFolder": "sdk/storage/storage-blob-changefeed",
"versionPolicyName": "client"
},
{
Expand Down
4 changes: 2 additions & 2 deletions sdk/storage/storage-blob-change-feed/api-extractor.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://developer.microsoft.com/json-schemas/api-extractor/v7/api-extractor.schema.json",
"mainEntryPointFilePath": "typings/latest/storage-blob-change-feed/src/index.d.ts",
"mainEntryPointFilePath": "typings/latest/storage-blob-changefeed/src/index.d.ts",
"docModel": {
"enabled": false
},
Expand All @@ -11,7 +11,7 @@
"dtsRollup": {
"enabled": true,
"untrimmedFilePath": "",
"publicTrimmedFilePath": "./typings/latest/storage-blob-change-feed.d.ts"
"publicTrimmedFilePath": "./typings/latest/storage-blob-changefeed.d.ts"
},
"messages": {
"tsdocMessageReporting": {
Expand Down
22 changes: 11 additions & 11 deletions sdk/storage/storage-blob-change-feed/package.json
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
{
"name": "@azure/storage-blob-change-feed",
"name": "@azure/storage-blob-changefeed",
"sdk-type": "client",
"version": "12.0.0-preview.1",
"description": "Microsoft Azure Storage SDK for JavaScript - Blob Change Feed",
"main": "./dist/index.js",
"module": "./dist-esm/storage-blob-change-feed/src/index.js",
"module": "./dist-esm/storage-blob-changefeed/src/index.js",
"browser": {
"./dist-esm/storage-blob-change-feed/src/utils/utils.node.js": "./dist-esm/storage-blob-change-feed/src/utils/utils.browser.js",
"./dist-esm/storage-blob-change-feed/test/utils/index.js": "./dist-esm/storage-blob-change-feed/test/utils/index.browser.js",
"./dist-esm/storage-blob-changefeed/src/utils/utils.node.js": "./dist-esm/storage-blob-changefeed/src/utils/utils.browser.js",
"./dist-esm/storage-blob-changefeed/test/utils/index.js": "./dist-esm/storage-blob-changefeed/test/utils/index.browser.js",
"fs": false,
"os": false,
"process": false
},
"types": "./typings/latest/storage-blob-change-feed.d.ts",
"types": "./typings/latest/storage-blob-changefeed.d.ts",
"typesVersions": {
"<3.6": {
"*": [
"./typings/3.1/storage-blob-change-feed.d.ts"
"./typings/3.1/storage-blob-changefeed.d.ts"
]
}
},
Expand All @@ -40,7 +40,7 @@
"execute:samples": "npm run build:samples && npm run execute:js-samples && npm run execute:ts-samples",
"format": "prettier --write --config ../../.prettierrc.json \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"",
"integration-test:browser": "karma start --single-run",
"integration-test:node": "nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --full-trace -t 300000 dist-esm/storage-blob-change-feed/test/*.spec.js dist-esm/storage-blob-change-feed/test/node/*.spec.js",
"integration-test:node": "nyc mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --full-trace -t 300000 dist-esm/storage-blob-changefeed/test/*.spec.js dist-esm/storage-blob-changefeed/test/node/*.spec.js",
"integration-test": "npm run integration-test:node && npm run integration-test:browser",
"pack": "npm pack 2>&1",
"prebuild": "npm run clean",
Expand All @@ -57,10 +57,10 @@
"types/",
"dist/",
"dist-browser/",
"dist-esm/storage-blob-change-feed/src/",
"dist-esm/storage-blob-changefeed/src/",
"dist-esm/storage-internal-avro/src/",
"typings/latest/storage-blob-change-feed.d.ts",
"typings/3.1/storage-blob-change-feed.d.ts",
"typings/latest/storage-blob-changefeed.d.ts",
"typings/3.1/storage-blob-changefeed.d.ts",
"README.md",
"LICENSE"
],
Expand All @@ -72,7 +72,7 @@
"Azure",
"Storage",
"Blob",
"Change Feed",
"Change feed",
"Node.js",
"TypeScript",
"JavaScript",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## API Report File for "@azure/storage-blob-change-feed"
## API Report File for "@azure/storage-blob-changefeed"

> Do not edit this file. It is a report generated by [API Extractor](https://api-extractor.com/).
Expand All @@ -16,16 +16,12 @@ export class BlobChangeFeedClient {

// @public (undocumented)
export interface BlobChangeFeedEvent {
// Warning: (ae-forgotten-export) The symbol "BlobChangeFeedEventData" needs to be exported by the entry point index.d.ts
//
// (undocumented)
data: BlobChangeFeedEventData;
// (undocumented)
dataVersion?: string;
// (undocumented)
eventTime: string;
// Warning: (ae-forgotten-export) The symbol "BlobChangeFeedEventType" needs to be exported by the entry point index.d.ts
//
// (undocumented)
eventType: BlobChangeFeedEventType;
// (undocumented)
Expand All @@ -38,6 +34,34 @@ export interface BlobChangeFeedEvent {
topic: string;
}

// @public (undocumented)
export interface BlobChangeFeedEventData {
// (undocumented)
api: string;
// (undocumented)
blobType: BlobType;
// (undocumented)
clientRequestId: string;
// (undocumented)
contentLength: number;
// (undocumented)
contentType: string;
// (undocumented)
destinationUrl?: string;
// (undocumented)
eTag: string;
// (undocumented)
recursive?: string;
// (undocumented)
requestId: string;
// (undocumented)
sequencer: string;
// (undocumented)
sourceUrl?: string;
// (undocumented)
url: string;
}

// @public (undocumented)
export class BlobChangeFeedEventPage {
constructor();
Expand All @@ -47,6 +71,12 @@ export class BlobChangeFeedEventPage {
events: BlobChangeFeedEvent[];
}

// @public (undocumented)
export type BlobChangeFeedEventType = "BlobCreate" | "BlobDeleted";

// @public (undocumented)
export type BlobType = "BlockBlob" | "AppendBlob" | "PageBlob";

// @public (undocumented)
export interface ChangeFeedGetChangesOptions {
// (undocumented)
Expand Down
15 changes: 7 additions & 8 deletions sdk/storage/storage-blob-change-feed/rollup.base.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export function nodeConfig(test = false) {
"util"
];
const baseConfig = {
input: "dist-esm/storage-blob-change-feed/src/index.js",
input: "dist-esm/storage-blob-changefeed/src/index.js",
external: depNames.concat(externalNodeBuiltins),
output: {
file: "dist/index.js",
Expand Down Expand Up @@ -65,9 +65,9 @@ export function nodeConfig(test = false) {
if (test) {
// entry point is every test file
baseConfig.input = [
"dist-esm/storage-blob-change-feed/test/*.spec.js",
"dist-esm/storage-blob-change-feed/test/node/*.spec.js",
"dist-esm/storage-blob-change-feed/src/index.js"
"dist-esm/storage-blob-changefeed/test/*.spec.js",
"dist-esm/storage-blob-changefeed/test/node/*.spec.js",
"dist-esm/storage-blob-changefeed/src/index.js"
];
baseConfig.plugins.unshift(multiEntry());

Expand All @@ -92,10 +92,9 @@ export function nodeConfig(test = false) {

export function browserConfig(test = false) {
const baseConfig = {
input: "dist-esm/storage-blob-change-feed/src/index.js",
// input: "dist-esm/storage-blob-change-feed/src/index.browser.js",
input: "dist-esm/storage-blob-changefeed/src/index.browser.js",
output: {
file: "dist-browser/azure-storage-blob-change-feed.js",
file: "dist-browser/azure-storage-blob-changefeed.js",
banner: banner,
format: "umd",
name: "azblob",
Expand Down Expand Up @@ -160,7 +159,7 @@ export function browserConfig(test = false) {
};

if (test) {
baseConfig.input = ["dist-esm/storage-blob-change-feed/test/*.spec.js", "dist-esm/storage-blob-change-feed/test/browser/*.spec.js"];
baseConfig.input = ["dist-esm/storage-blob-changefeed/test/*.spec.js", "dist-esm/storage-blob-changefeed/test/browser/*.spec.js"];
baseConfig.plugins.unshift(multiEntry({ exports: false }));
baseConfig.output.file = "dist-test/index.browser.js";
// mark fs-extra as external
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { BlobServiceClient, StorageSharedKeyCredential } from "@azure/storage-blob";
// import { BlobChangeFeedClient } from "@azure/storage-blob-change-feed";
// import { BlobChangeFeedClient } from "@azure/storage-blob-changefeed";
import { BlobChangeFeedClient } from "../../src";

// Load the .env file if it exists
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "azure-storage-blob-change-feed-samples-ts",
"name": "azure-storage-blob-changefeed-samples-ts",
"private": true,
"version": "0.1.0",
"description": "Azure Storage Blob Change Feed client library samples for TypeScript",
Expand Down Expand Up @@ -33,7 +33,7 @@
"@azure/abort-controller": "latest",
"@azure/identity": "latest",
"@azure/storage-blob": "latest",
"@azure/storage-blob-change-feed": "latest",
"@azure/storage-blob-changefeed": "latest",
"dotenv": "^8.2.0"
},
"devDependencies": {
Expand Down
27 changes: 22 additions & 5 deletions sdk/storage/storage-blob-change-feed/src/BlobChangeFeedClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,24 @@ export class BlobChangeFeedClient {

private async *getChange(options: ChangeFeedGetChangesOptions = {})
: AsyncIterableIterator<BlobChangeFeedEvent> {
for await (const eventPage of this.getPage(undefined, undefined, options)) {
for (const event of eventPage.events) {
const changeFeed: ChangeFeed = await this._changeFeedFactory.buildChangeFeed(
this._blobServiceClient,
undefined,
options.start,
options.end
);

while (changeFeed.hasNext()) {
const event = await changeFeed.getChange();
if (event) {
yield event;
} else {
return;
}
}
}

// start in ChangeFeedGetChangesOptions will be ignored when continuationToken is specified.
private async *getPage(continuationToken?: string, maxPageSize?: number, options: ChangeFeedGetChangesOptions = {})
: AsyncIterableIterator<BlobChangeFeedEventPage> {
const changeFeed: ChangeFeed = await this._changeFeedFactory.buildChangeFeed(
Expand All @@ -79,8 +90,9 @@ export class BlobChangeFeedClient {
options.end
);

maxPageSize = maxPageSize || CHANGE_FEED_DEFAULT_PAGE_SIZE;
maxPageSize = maxPageSize || 2;
if (!maxPageSize || maxPageSize > CHANGE_FEED_DEFAULT_PAGE_SIZE) {
maxPageSize = CHANGE_FEED_DEFAULT_PAGE_SIZE;
}
while (changeFeed.hasNext()) {
let eventPage = new BlobChangeFeedEventPage();
while (changeFeed.hasNext() && eventPage.events.length < maxPageSize) {
Expand All @@ -92,7 +104,12 @@ export class BlobChangeFeedClient {
if (changeFeed.hasNext()) {
eventPage.continuationToken = JSON.stringify(changeFeed.getCursor());
}
yield eventPage;
if (eventPage.events.length > 0) {
yield eventPage;
}
else {
return;
}
}
}
}
20 changes: 9 additions & 11 deletions sdk/storage/storage-blob-change-feed/src/ChangeFeed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ export class ChangeFeed {
}

public hasNext(): boolean {
// ChangeFeed not initialized with proper data. Using _currentSegment as the indicator.
// Empty ChangeFeed, using _currentSegment as the indicator.
if (!this._currentSegment) {
return false;
}

if (this._segments.length == 0 && this._years.length == 0 && !this._currentSegment.hasNext()) {
if (this._segments.length === 0 && this._years.length === 0 && !this._currentSegment.hasNext()) {
return false;
}

Expand All @@ -82,29 +82,29 @@ export class ChangeFeed {
throw new Error("Change feed doesn't have any more events");
}

let event: BlobChangeFeedEvent | undefined;
do {
let event: BlobChangeFeedEvent | undefined = undefined;
while (!event && this.hasNext()) {
event = await this._currentSegment!.getChange();
await this.advanceSegmentIfNecessary();
} while (!event && this.hasNext());
}
return event;
}

public getCursor(): ChangeFeedCursor {
if (!this._currentSegment) {
throw new Error("Change Feed not fully initialized shouldn't call this function.");
throw new Error("Empty Change Feed shouldn't call this function.");
}

return {
urlHash: hashString(getURLPath(this._containerClient!.url)!),
endTime: this._endTime,
endTime: this._endTime?.toJSON(),
currentSegmentCursor: this._currentSegment!.getCursor()
};
}

private async advanceSegmentIfNecessary(): Promise<void> {
if (!this._currentSegment) {
throw new Error("Change Feed not fully initialized shouldn't call this function.");
throw new Error("Empty Change Feed shouldn't call this function.");
}

// If the current segment has more Events, we don't need to do anything.
Expand All @@ -114,9 +114,7 @@ export class ChangeFeed {

// If the current segment is completed, remove it
if (this._segments.length > 0) {
console.log(`segment done, advancing to next one: ${this._segments[0]}`);
this._currentSegment = await this._segmentFactory!.buildSegment(this._containerClient!, this._segments.shift()!);
console.log(`segment finalized: ${this._currentSegment.finalized}`);
}
// If _segments is empty, refill it
else if (this._segments.length === 0 && this._years.length > 0) {
Expand All @@ -126,7 +124,7 @@ export class ChangeFeed {
if (this._segments.length > 0) {
this._currentSegment = await this._segmentFactory!.buildSegment(this._containerClient!, this._segments.shift()!);
} else {
throw new Error("Year in the middle should have returned valid segments.");
throw new Error(`Year ${year} in the middle should have returned valid segments.`);
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions sdk/storage/storage-blob-change-feed/src/ChangeFeedFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ export class ChangeFeedFactory {
if (continuationToken) {
cursor = JSON.parse(continuationToken);
ChangeFeedFactory.validateCursor(containerClient, cursor!);
startTime = cursor!.currentSegmentCursor?.segmentTime;
endTime = cursor!.endTime;
// startTime passed in is ignored
startTime = new Date(cursor!.currentSegmentCursor.segmentTime);
if (cursor!.endTime) {
endTime = new Date(cursor!.endTime!);
}
}
// Round start and end time if we are not using the cursor.
else {
Expand Down Expand Up @@ -87,24 +90,21 @@ export class ChangeFeedFactory {
years.shift();
}
}

if (years.length === 0) {
return new ChangeFeed();
}

let segments: string[] = [];
while (segments.length === 0 && years.length !== 0) {
const firstYear = years.shift();
segments = await getSegmentsInYear(
containerClient,
firstYear!,
years.shift()!,
startTime,
minDate(lastConsumable, endTime));
}
if (segments.length === 0) {
return new ChangeFeed();
}

const currentSegment: Segment = await this._segmentFactory.buildSegment(
containerClient,
segments.shift()!,
Expand All @@ -125,7 +125,7 @@ export class ChangeFeedFactory {
containerClient: ContainerClient,
cursor: ChangeFeedCursor
): void {
if (hashString(getURLPath(containerClient.url)!) != cursor.urlHash) {
if (hashString(getURLPath(containerClient.url)!) !== cursor.urlHash) {
throw new Error("Cursor URL does not match container URL.");
}
}
Expand Down
Loading

0 comments on commit e0db47b

Please sign in to comment.