Skip to content

Commit

Permalink
feat(storage-r2): add performance reporting capability
Browse files Browse the repository at this point in the history
  • Loading branch information
Klowner committed Jun 6, 2024
1 parent 3b80854 commit e4a9860
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 19 deletions.
2 changes: 1 addition & 1 deletion packages/storage-r2/src/storage.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import TussleStateMemory from '@tussle/state-memory';
import { R2UploadState, TussleStorageR2 } from './storage';
import { MemoryStorage } from "@miniflare/storage-memory";
import { R2Bucket } from "@miniflare/r2";
import {firstValueFrom} from 'rxjs';
import {firstValueFrom, take, tap, toArray} from 'rxjs';
import {mockIncomingRequest} from '@tussle/spec';

storageServiceTests(
Expand Down
102 changes: 84 additions & 18 deletions packages/storage-r2/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
import type {TussleStateService} from "@tussle/spec/interface/state";
import type {
TussleStorageCreateFileParams,
TussleStorageCreateFileResponse,
TussleStorageDeleteFileParams,
TussleStorageDeleteFileResponse,
TussleStorageFileInfo,
TussleStorageFileInfoParams,
TussleStoragePatchFileParams,
TussleStoragePatchFileResponse,
TussleStorageService,
UploadConcatFinal,
UploadConcatPartial
import {
TussleStoragePerfEvent,
type TussleStorageCreateFileParams,
type TussleStorageCreateFileResponse,
type TussleStorageDeleteFileParams,
type TussleStorageDeleteFileResponse,
type TussleStorageFileInfo,
type TussleStorageFileInfoParams,
type TussleStoragePatchFileParams,
type TussleStoragePatchFileResponse,
type TussleStorageService,
type UploadConcatFinal,
type UploadConcatPartial
} from "@tussle/spec/interface/storage";
import type {TusProtocolExtension} from "@tussle/spec/interface/tus";
import {ChunkOffsetError} from '@tussle/spec/lib/error';
import {
catchError, concat, concatMap, defaultIfEmpty, defer, EMPTY, filter,
EMPTY,
MonoTypeOperatorFunction, Observable,
Subject,
catchError, concat, concatMap, defaultIfEmpty, defer,
filter,
firstValueFrom,
from, map,
mergeMap, MonoTypeOperatorFunction, Observable,
mergeMap,
of,
pipe,
share, switchMap,
take, takeLast, throwError, throwIfEmpty, toArray
share,
switchMap,
take, takeLast, tap, throwError, throwIfEmpty, toArray
} from "rxjs";
import {lousyUUID} from "./lousyuuid";
import {deleteR2Records, R2File} from './r2file';
import {ChunkOffsetError} from '@tussle/spec/lib/error';
import {R2File, deleteR2Records} from './r2file';

interface Part {
key: string;
Expand Down Expand Up @@ -57,6 +63,8 @@ export interface TussleStorageR2Options {
// Skip the automatic merging of uploaded chunks into a single R2 record
// (otherwise use R2File for reads)
skipMerge?: boolean;
// Get current time for performance reporting (default: Date.now)
now?: () => number;
}

function isNonNull<T>(value: T): value is NonNullable<T> {
Expand Down Expand Up @@ -223,8 +231,13 @@ export class TussleStorageR2 implements TussleStorageService {
readonly extensionsRequired: TusProtocolExtension[] = [];
readonly extensionsSupported: TusProtocolExtension[] = EXTENSIONS_SUPPORTED;

private readonly event = new Subject<TussleStoragePerfEvent>();
readonly event$ = this.event.asObservable().pipe(share());

constructor(readonly options: TussleStorageR2Options) {}

readonly now = this.options.now ?? (() => Date.now());

private readonly state = this.options.stateService;

private createInitialState(
Expand Down Expand Up @@ -425,6 +438,12 @@ export class TussleStorageR2 implements TussleStorageService {
offset: state.currentOffset,
success: true,
})),
this.emitPerformanceEvents(({success, location}) => ({
action: 'create',
bytes: 0,
location,
success,
})),
catchError(err => {
return of<TussleStorageCreateFileResponse>({
location: params.path,
Expand Down Expand Up @@ -692,6 +711,12 @@ export class TussleStorageR2 implements TussleStorageService {
this.optionallyMergeAndDiscardChunksIfComplete,
map(state => this.asPatchResponse(state)),
defaultIfEmpty(this.invalidPatchResponse(location)),
this.emitPerformanceEvents(({location, success}) => ({
action: 'patch',
bytes: params.length,
location,
success,
})),
);
}

Expand Down Expand Up @@ -724,6 +749,12 @@ export class TussleStorageR2 implements TussleStorageService {
location,
info: null,
}),
this.emitPerformanceEvents(({location}) => ({
action: 'info',
success: true,
location,
bytes: 0,
})),
);
return response$;
}
Expand Down Expand Up @@ -752,6 +783,12 @@ export class TussleStorageR2 implements TussleStorageService {
location: path,
success: false,
}),
this.emitPerformanceEvents(({success, location}) => ({
action: 'delete',
success,
location,
bytes: 0,
})),
);
}

Expand All @@ -772,6 +809,35 @@ export class TussleStorageR2 implements TussleStorageService {
success: !error,
};
}

protected emitPerformanceEvents<T extends {location: string}>(
prepare: (params: T) => {
success: boolean;
location: string;
action: TussleStoragePerfEvent['action'];
bytes: number;
},
): MonoTypeOperatorFunction<T> {
const storage = this;
const {event} = storage;
let start: number;

return tap({
subscribe() {
start = storage.now();
},
next(state) {
const now = storage.now();
const elapsed_time_ms = now - start;
event.next({
...prepare(state),
storage,
elapsed_time_ms,
});
start = now; // in case of more emissions
},
});
}
}

function createR2FileFromState(
Expand Down

0 comments on commit e4a9860

Please sign in to comment.