Skip to content

Commit

Permalink
feat(scheduler): reimplement scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
jkuri committed Sep 28, 2017
1 parent 0a99847 commit cfc7e3d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 52 deletions.
110 changes: 63 additions & 47 deletions src/ngx-uploader/classes/ngx-uploader.class.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { EventEmitter } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { Subscription } from 'rxjs/Subscription';
import { Subscriber } from 'rxjs/Subscriber';
import 'rxjs/add/observable/of';
Expand All @@ -10,12 +11,16 @@ import 'rxjs/add/operator/map';
export interface BlobFile extends Blob {
name: string;
}

export enum UploadStatus {
Queue,
Uploading,
Done,
Canceled
Cancelled
}

export interface UploaderOptions {
concurrency: number;
}

export interface UploadProgress {
Expand Down Expand Up @@ -62,7 +67,6 @@ export interface UploadInput {
file?: UploadFile;
data?: { [key: string]: string | Blob };
headers?: { [key: string]: string };
concurrency?: number;
withCredentials?: boolean;
}

Expand All @@ -79,17 +83,24 @@ export function humanizeBytes(bytes: number): string {
}

export class NgUploaderService {
files: UploadFile[];
uploads: { file?: UploadFile, files?: UploadFile[], sub: {instance: Subscription} }[];
queue: UploadFile[];
serviceEvents: EventEmitter<UploadOutput>;
uploadScheduler: Subject<{ file: UploadFile, event: UploadInput }>;
subs: { id: string, sub: Subscription }[];

constructor() {
this.files = [];
constructor(concurrency: number = Number.POSITIVE_INFINITY) {
this.queue = [];
this.serviceEvents = new EventEmitter<any>();
this.uploadScheduler = new Subject();
this.subs = [];

this.uploadScheduler
.mergeMap(upload => this.startUpload(upload), concurrency)
.subscribe(uploadOutput => this.serviceEvents.emit(uploadOutput));
}

handleFiles(incomingFiles: FileList): void {
this.files.push(...[].map.call(incomingFiles, (file: File, i: number) => {
this.queue.push(...[].map.call(incomingFiles, (file: File, i: number) => {
const uploadFile: UploadFile = {
fileIndex: i,
id: this.generateId(),
Expand All @@ -113,7 +124,6 @@ export class NgUploaderService {
sub: undefined,
nativeFile: file
};
i = i + 1;

this.serviceEvents.emit({ type: 'addedToQueue', file: uploadFile });
return uploadFile;
Expand All @@ -126,77 +136,83 @@ export class NgUploaderService {
input.subscribe((event: UploadInput) => {
switch (event.type) {
case 'uploadFile':
const uploadFileIndex = this.files.findIndex(file => file === event.file);
const uploadFileIndex = this.queue.findIndex(file => file === event.file);
if (uploadFileIndex !== -1 && event.file) {

this.files[uploadFileIndex].sub = this.uploadFile(event.file, event).subscribe(data => {
this.serviceEvents.emit(data);
});
this.uploadScheduler.next({ file: this.queue[uploadFileIndex], event: event });
}
break;
case 'uploadAll':
const concurrency = typeof event.concurrency !== 'undefined' && event.concurrency > 0 ? event.concurrency : Number.POSITIVE_INFINITY;
const files = this.files.filter(file => file.progress.status === UploadStatus.Queue);
if (!files.length) {
return;
}
Observable.of(...files)
.mergeMap(file => {
return this.uploadFile(file, event);
}, concurrency)
.subscribe(data => {
this.serviceEvents.emit(data);
});
const files = this.queue.filter(file => file.progress.status === UploadStatus.Queue);
files.forEach(file => this.uploadScheduler.next({ file: file, event: event }));
break;
case 'cancel':
const id = event.id || null;
if (!id) {
return;
}

const index = this.files.findIndex(file => file.id === id);
if (index !== -1) {
if (this.files[index].sub) {
this.files[index].sub.unsubscribe();
}
const index = this.subs.findIndex(sub => sub.id === id);
if (index !== -1 && this.subs[index].sub) {
this.subs[index].sub.unsubscribe();

this.serviceEvents.emit({ type: 'cancelled', file: this.files[index] });
this.files[index].progress.status = UploadStatus.Canceled;
const fileIndex = this.queue.findIndex(file => file.id === id);
if (fileIndex !== -1) {
this.queue[fileIndex].progress.status = UploadStatus.Cancelled;
this.serviceEvents.emit({ type: 'cancelled', file: this.queue[fileIndex] });
}
}
break;
case 'cancelAll':

this.files.forEach(file => {
if (file.sub) {
file.sub.unsubscribe();
this.subs.forEach(sub => {
if (sub.sub) {
sub.sub.unsubscribe();
}

file.progress.status = UploadStatus.Canceled;
this.serviceEvents.emit({ type: 'cancelled', file: file });
const file = this.queue.find(uploadFile => uploadFile.id === sub.id);
if (file) {
file.progress.status = UploadStatus.Cancelled;
this.serviceEvents.emit({ type: 'cancelled', file: file });
}
});
break;
case 'remove':
if (!event.id) {
return;
}

const i = this.files.findIndex(file => file.id === event.id);
const i = this.queue.findIndex(file => file.id === event.id);
if (i !== -1) {
const file = this.files[i];
this.files.splice(i, 1);
const file = this.queue[i];
this.queue.splice(i, 1);
this.serviceEvents.emit({ type: 'removed', file: file });
}
break;
case 'removeAll':
if (this.files.length) {
this.files = [];
if (this.queue.length) {
this.queue = [];
this.serviceEvents.emit({ type: 'removedAll' });
}
break;
}
});
}

startUpload(upload: { file: UploadFile, event: UploadInput }): Observable<UploadOutput> {
return new Observable(observer => {
const sub = this.uploadFile(upload.file, upload.event)
.subscribe(output => {
observer.next(output);
}, err => {
observer.error(err);
observer.complete();
}, () => {
observer.complete();
});

this.subs.push({ id: upload.file.id, sub: sub });
});
}

uploadFile(file: UploadFile, event: UploadInput): Observable<UploadOutput> {
return new Observable(observer => {
const url = event.url || '';
Expand Down Expand Up @@ -266,7 +282,7 @@ export class NgUploaderService {
}

observer.next({ type: 'done', file: file });

observer.complete();
}
};
Expand All @@ -276,9 +292,9 @@ export class NgUploaderService {

try {
const uploadFile = <BlobFile>file.nativeFile;
const uploadIndex = this.files.findIndex(file => file.nativeFile === uploadFile);
const uploadIndex = this.queue.findIndex(file => file.nativeFile === uploadFile);

if (this.files[uploadIndex].progress.status === UploadStatus.Canceled) {
if (this.queue[uploadIndex].progress.status === UploadStatus.Cancelled) {
observer.complete();
}

Expand Down
9 changes: 6 additions & 3 deletions src/ngx-uploader/directives/ng-file-drop.directive.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
import { Directive, ElementRef, EventEmitter, Input, Output, OnInit, OnDestroy, HostListener } from '@angular/core';
import { isPlatformServer } from '@angular/common';
import { NgUploaderService, UploadOutput, UploadInput, UploadFile } from '../classes/ngx-uploader.class';
import { NgUploaderService, UploadOutput, UploadInput, UploadFile, UploaderOptions } from '../classes/ngx-uploader.class';

@Directive({
selector: '[ngFileDrop]'
})
export class NgFileDropDirective implements OnInit, OnDestroy {
@Input() options: UploaderOptions;
@Input() uploadInput: EventEmitter<UploadInput>;
@Output() uploadOutput: EventEmitter<UploadOutput>;

upload: NgUploaderService;
el: HTMLInputElement;

constructor( private elementRef: ElementRef) {
this.upload = new NgUploaderService();
constructor(private elementRef: ElementRef) {
this.uploadOutput = new EventEmitter<UploadOutput>();
}

ngOnInit() {
const concurrency = this.options && this.options.concurrency || Number.POSITIVE_INFINITY;
this.upload = new NgUploaderService(concurrency);

this.el = this.elementRef.nativeElement;

this.upload.serviceEvents.subscribe((event: UploadOutput) => {
Expand Down
7 changes: 5 additions & 2 deletions src/ngx-uploader/directives/ng-file-select.directive.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
import { Directive, ElementRef, EventEmitter, Input, Output, OnInit, OnDestroy } from '@angular/core';
import { isPlatformServer } from '@angular/common';
import { NgUploaderService, UploadOutput, UploadInput, UploadFile } from '../classes/ngx-uploader.class';
import { NgUploaderService, UploadOutput, UploadInput, UploadFile, UploaderOptions } from '../classes/ngx-uploader.class';

@Directive({
selector: '[ngFileSelect]'
})
export class NgFileSelectDirective implements OnInit, OnDestroy {
@Input() options: UploaderOptions;
@Input() uploadInput: EventEmitter<any>;
@Output() uploadOutput: EventEmitter<UploadOutput>;

upload: NgUploaderService;
el: HTMLInputElement;

constructor(private elementRef: ElementRef) {
this.upload = new NgUploaderService();
this.uploadOutput = new EventEmitter<UploadOutput>();
}

ngOnInit() {
const concurrency = this.options && this.options.concurrency || Number.POSITIVE_INFINITY;
this.upload = new NgUploaderService(concurrency);

this.el = this.elementRef.nativeElement;
this.el.addEventListener('change', this.fileListener, false);

Expand Down

0 comments on commit cfc7e3d

Please sign in to comment.