Skip to content

Commit

Permalink
Merge branch 'master' into release
Browse files Browse the repository at this point in the history
  • Loading branch information
yoavniran committed Feb 11, 2022
2 parents 794884f + 74bf36a commit 9b14e0d
Show file tree
Hide file tree
Showing 22 changed files with 326 additions and 286 deletions.
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@
"mocha-junit-reporter": "^2.0.2",
"mocha-multi-reporters": "^1.5.1",
"octokit-plugin-create-pull-request": "^3.10.0",
"pacote": "^12.0.2",
"rc-progress": "^3.1.4",
"react": "^17.0.2",
"react-dnd-html5-backend": "^14.0.2",
Expand All @@ -150,7 +149,7 @@
"semver-utils": "^1.1.4",
"shelljs": "^0.8.4",
"styled-components": "^5.3.3",
"typescript": "^3.9.5",
"typescript": "^4.5.5",
"wait-on": "^6.0.0",
"weak-napi": "^2.0.2",
"webpack": "^5.68.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// @flow
import { logger } from "@rpldy/shared";
import xhrSend from "@rpldy/sender";
import { getMandatoryOptions } from "../utils";
import processChunks from "./processChunks";

import type { BatchItem } from "@rpldy/shared";
import type { OnProgress, SendResult } from "@rpldy/sender";
import type { TriggerMethod } from "@rpldy/life-events";
import type { ChunkedOptions, ChunkedSender, ChunkedSendOptions } from "../types";

const createChunkedSender = (chunkedOptions: ?ChunkedOptions, trigger: TriggerMethod): ChunkedSender => {
const options = getMandatoryOptions(chunkedOptions);

const send = (items: BatchItem[], url: ?string, sendOptions: ChunkedSendOptions, onProgress: OnProgress): SendResult => {
let result;

if (!options.chunked || items.length > 1 || items[0].url || !items[0].file.size) {
result = xhrSend(items, url, sendOptions, onProgress);
logger.debugLog(`chunkedSender: sending items as normal, un-chunked requests`);
} else {
logger.debugLog(`chunkedSender: sending file as a chunked request`);
result = processChunks(
items[0],
options,
url,
sendOptions,
onProgress,
trigger);
}

return result;
};

return {
send,
};
};

export default createChunkedSender;
40 changes: 40 additions & 0 deletions packages/core/chunked-sender/src/chunkedSender/getChunkedState.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// @flow
import createState from "@rpldy/simple-state";

import type { ChunkedSendOptions, MandatoryChunkedOptions } from "../types";
import type { Chunk, ChunkedState, State } from "./types";

const getChunkedState = (
chunks: Chunk[],
url: ?string,
sendOptions: ChunkedSendOptions,
chunkedOptions: MandatoryChunkedOptions
): ChunkedState => {
const { state, update } = createState<State>({
finished: false,
aborted: false,
error: false,
uploaded: {},
requests: {},
responses: [],
chunkCount: chunks.length,
startByte: sendOptions.startByte || 0,
chunks,
url,
sendOptions,
...chunkedOptions,
});

const getState = (): State => state;

const updateState = (updater: (State) => void) => {
update(updater);
};

return {
getState,
updateState,
};
};

export default getChunkedState;
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// @flow
import ChunkedSendError from "./ChunkedSendError";
import type { State, Chunk } from "./types";
import type { Chunk, ChunkedState } from "./types";

const getChunksToSend = (chunkedState: ChunkedState): Array<Chunk> => {
const state = chunkedState.getState();

const getChunksToSend = (state: State): Array<Chunk> => {
const chunks = [],
inProgressIds = Object.keys(state.requests),
parallel = state.parallel || 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,44 @@ import { CHUNK_EVENTS } from "../consts";
import type { BatchItem } from "@rpldy/shared";
import type { OnProgress, SendResult } from "@rpldy/sender";
import type { TriggerMethod } from "@rpldy/life-events";
import type { State } from "./types";
import type { ChunkedState } from "./types";

const handleChunkRequest = (
state: State,
chunkedState: ChunkedState,
item: BatchItem,
chunkId: string,
chunkSendResult: SendResult,
trigger: TriggerMethod,
onProgress: OnProgress,
): Promise<void> => {
state.requests[chunkId] = {
id: chunkId,
abort: chunkSendResult.abort,
};
chunkedState.updateState((state) => {
state.requests[chunkId] = {
id: chunkId,
abort: chunkSendResult.abort,
};
});

return chunkSendResult.request
.then((result) => {
logger.debugLog(`chunkedSender: request finished for chunk: ${chunkId} - `, result);

delete state.requests[chunkId];
chunkedState.updateState((state) => {
delete state.requests[chunkId];
});

const index = state.chunks.findIndex((c) => c.id === chunkId);
const chunks = chunkedState.getState().chunks;
const index = chunks.findIndex((c) => c.id === chunkId);

if (~index) {
if (result.state === FILE_STATES.FINISHED) {
//remove chunk so eventually there are no more chunks to send
//TODO: splicing array is dangerous. Need to find a better (immutable) way to progress chunk upload
const spliced = state.chunks.splice(index, 1);
const finishedChunk = spliced[0];
const finishedChunk = chunks[index];

chunkedState.updateState((state) => {
//remove chunk so eventually there are no more chunks to send
state.chunks = state.chunks.slice(0, index)
.concat(state.chunks.slice(index + 1));
});

const chunkSize = finishedChunk.end - finishedChunk.start;

//issue progress event when chunk finished uploading, so item progress data is updated
Expand All @@ -47,10 +56,14 @@ const handleChunkRequest = (
});
} else if (result.state !== FILE_STATES.ABORTED) {
//increment attempt in case chunk failed (and not aborted)
state.chunks[index].attempt += 1;
chunkedState.updateState((state) => {
state.chunks[index].attempt += 1;
});
}

state.responses.push(result.response);
chunkedState.updateState((state) => {
state.responses.push(result.response);
});
}
});
};
Expand Down
41 changes: 1 addition & 40 deletions packages/core/chunked-sender/src/chunkedSender/index.js
Original file line number Diff line number Diff line change
@@ -1,40 +1 @@
// @flow
import { logger } from "@rpldy/shared";
import xhrSend from "@rpldy/sender";
import { getMandatoryOptions } from "../utils";
import processChunks from "./processChunks";

import type { BatchItem } from "@rpldy/shared";
import type { OnProgress, SendResult } from "@rpldy/sender";
import type { TriggerMethod } from "@rpldy/life-events";
import type { ChunkedOptions, ChunkedSender, ChunkedSendOptions } from "../types";

const createChunkedSender = (chunkedOptions: ?ChunkedOptions, trigger: TriggerMethod): ChunkedSender => {
const options = getMandatoryOptions(chunkedOptions);

const send = (items: BatchItem[], url: ?string, sendOptions: ChunkedSendOptions, onProgress: OnProgress): SendResult => {
let result;

if (!options.chunked || items.length > 1 || items[0].url || !items[0].file.size) {
result = xhrSend(items, url, sendOptions, onProgress);
logger.debugLog(`chunkedSender: sending items as normal, un-chunked requests`);
} else {
logger.debugLog(`chunkedSender: sending file as a chunked request`);
result = processChunks(
items[0],
options,
url,
sendOptions,
onProgress,
trigger);
}

return result;
};

return {
send,
};
};

export default createChunkedSender;
export default from "./createChunkedSender";
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
// @flow
import type { BatchItem } from "@rpldy/shared";
import type { State } from "./types";
import type { ChunkedState } from "./types";

const processChunkProgressData = (state: State, item: BatchItem, chunkId: string, chunkUploaded: number): { loaded: number, total: number } => {
state.uploaded[chunkId] = Math.max(chunkUploaded, (state.uploaded[chunkId] || 0));
const processChunkProgressData = (chunkedState: ChunkedState, item: BatchItem, chunkId: string, chunkUploaded: number):
{ loaded: number, total: number } => {
chunkedState.updateState((state) => {
state.uploaded[chunkId] = Math.max(chunkUploaded, (state.uploaded[chunkId] || 0));
});

const state = chunkedState.getState();

const loadedSum = Object.keys(state.uploaded)
.reduce((res, id) => res + state.uploaded[id],
Expand Down
37 changes: 14 additions & 23 deletions packages/core/chunked-sender/src/chunkedSender/processChunks.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ import getChunks from "./getChunks";
import sendChunks from "./sendChunks";
import { CHUNKED_SENDER_TYPE } from "../consts";
import processChunkProgressData from "./processChunkProgressData";
import getChunkedState from "./getChunkedState";

import type { BatchItem } from "@rpldy/shared";
import type { OnProgress, SendResult } from "@rpldy/sender";
import type { TriggerMethod } from "@rpldy/life-events";
import type { MandatoryChunkedOptions, ChunkedSendOptions } from "../types";
import type { State, ChunksSendResponse, Chunk } from "./types";
import type { ChunksSendResponse, Chunk, ChunkedState } from "./types";

export const abortChunkedRequest = (state: State, item: BatchItem): boolean => {
export const abortChunkedRequest = (chunkedState: ChunkedState, item: BatchItem): boolean => {
logger.debugLog(`chunkedSender: aborting chunked upload for item: ${item.id}`);
const state = chunkedState.getState();

if (!state.finished && !state.aborted) {
Object.keys(state.requests)
Expand All @@ -21,31 +23,33 @@ export const abortChunkedRequest = (state: State, item: BatchItem): boolean => {
state.requests[chunkId].abort();
});

state.aborted = true;
chunkedState.updateState((state) => {
state.aborted = true;
});
}

return state.aborted;
};

export const process = (
state: State,
chunkedState: ChunkedState,
item: BatchItem,
onProgress: OnProgress,
trigger: TriggerMethod,
): ChunksSendResponse => {
const onChunkProgress = (e, chunks: Chunk[]) => {
//we only ever send one chunk per request
const progressData = processChunkProgressData(state, item, chunks[0].id, e.loaded);
const progressData = processChunkProgressData(chunkedState, item, chunks[0].id, e.loaded);
onProgress(progressData, [item]);
};

const sendPromise = new Promise((resolve) => {
sendChunks(state, item, onChunkProgress, resolve, trigger);
sendChunks(chunkedState, item, onChunkProgress, resolve, trigger);
});

return {
sendPromise,
abort: () => abortChunkedRequest(state, item),
abort: () => abortChunkedRequest(chunkedState, item),
};
};

Expand All @@ -58,24 +62,11 @@ const processChunks = (
trigger: TriggerMethod
): SendResult => {
const chunks = getChunks(item, chunkedOptions, sendOptions.startByte);
logger.debugLog(`chunkedSender: created ${chunks.length} chunks for: ${item.file.name}`);
const chunkedState = getChunkedState(chunks, url, sendOptions, chunkedOptions);

const state = {
finished: false,
aborted: false,
error: false,
uploaded: {},
requests: {},
responses: [],
chunkCount: chunks.length,
startByte: sendOptions.startByte || 0,
chunks,
url,
sendOptions,
...chunkedOptions,
};
logger.debugLog(`chunkedSender: created ${chunks.length} chunks for: ${item.file.name}`);

const { sendPromise, abort } = process(state, item, onProgress, trigger);
const { sendPromise, abort } = process(chunkedState, item, onProgress, trigger);

return {
request: sendPromise,
Expand Down
Loading

0 comments on commit 9b14e0d

Please sign in to comment.