Skip to content

Commit

Permalink
fix: process files in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
mcarvin8 committed Jan 21, 2025
1 parent 5a3f2de commit 042121f
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 46 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
},
"dependencies": {
"tslib": "^2.6.2",
"xml-disassembler": "^1.3.8",
"xml-disassembler": "^1.3.9",
"yaml": "^2.7.0"
},
"repository": {
Expand Down
8 changes: 4 additions & 4 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 14 additions & 8 deletions src/service/deleteReassembledXML.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
"use strict";

import { stat, readdir, rm } from "node:fs/promises";
import { readdir, rm } from "node:fs/promises";
import { join } from "node:path/posix";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";
import { withConcurrencyLimit } from "./withConcurrencyLimit";

export async function deleteReassembledXML(
disassembledPath: string,
): Promise<void> {
const files = await readdir(disassembledPath);
const tasks: (() => Promise<void>)[] = [];
const files = await readdir(disassembledPath, { withFileTypes: true });
const concurrencyLimit = getConcurrencyThreshold();

for (const file of files) {
const filePath = join(disassembledPath, file);
const fileStat = await stat(filePath);
if (fileStat.isFile() && filePath.endsWith(".xml")) {
await rm(filePath);
} else if (fileStat.isDirectory()) {
await deleteReassembledXML(filePath);
const subFilePath = join(disassembledPath, file.name);

if (file.isFile() && subFilePath.endsWith(".xml")) {
tasks.push(() => rm(subFilePath));
} else if (file.isDirectory()) {
tasks.push(() => deleteReassembledXML(subFilePath));
}
}
await withConcurrencyLimit(tasks, concurrencyLimit);
}
10 changes: 10 additions & 0 deletions src/service/getConcurrencyThreshold.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"use strict";

import { availableParallelism } from "node:os";

export function getConcurrencyThreshold(): number {
const AVAILABLE_PARALLELISM: number = availableParallelism
? availableParallelism()
: Infinity;
return Math.min(AVAILABLE_PARALLELISM, 6);
}
30 changes: 18 additions & 12 deletions src/service/transform2YAML.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
"use strict";

import { readdir, rm, stat, writeFile } from "node:fs/promises";
import { readdir, rm, writeFile } from "node:fs/promises";
import { join } from "node:path/posix";
import { stringify } from "yaml";
import { parseXML } from "xml-disassembler";

import { logger } from "@src/index";
import { withConcurrencyLimit } from "./withConcurrencyLimit";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";

export async function transform2YAML(xmlPath: string): Promise<void> {
const subFiles = await readdir(xmlPath);
for (const subFile of subFiles) {
const subFilePath = join(xmlPath, subFile);
if ((await stat(subFilePath)).isDirectory()) {
await transform2YAML(subFilePath);
} else if (
(await stat(subFilePath)).isFile() &&
subFilePath.endsWith(".xml")
) {
await writeYAML(subFilePath);
await rm(subFilePath);
const tasks: (() => Promise<void>)[] = [];
const files = await readdir(xmlPath, { withFileTypes: true });
const concurrencyLimit = getConcurrencyThreshold();
const foldersToRemote = [];

for (const subFile of files) {
const subFilePath = join(xmlPath, subFile.name);
if (subFile.isDirectory()) {
tasks.push(() => transform2YAML(subFilePath));
} else if (subFile.isFile() && subFilePath.endsWith(".xml")) {
tasks.push(() => writeYAML(subFilePath));
foldersToRemote.push(subFilePath);
}
}
await withConcurrencyLimit(tasks, concurrencyLimit);
const deleteTasks = foldersToRemote.map((filePath) => () => rm(filePath));
await withConcurrencyLimit(deleteTasks, concurrencyLimit);
}

async function writeYAML(xmlPath: string): Promise<void> {
Expand Down
22 changes: 22 additions & 0 deletions src/service/withConcurrencyLimit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export async function withConcurrencyLimit<T>(
tasks: (() => Promise<T>)[],
limit: number,
): Promise<T[]> {
const results: Promise<T>[] = [];
const executing: Promise<T>[] = []; // Change this to Promise<T>[]

for (const task of tasks) {
const p = task().then((result) => {
executing.splice(executing.indexOf(p), 1);
return result;
});
results.push(p);
executing.push(p);

if (executing.length >= limit) {
await Promise.race(executing); // Wait for the first one to complete
}
}

return Promise.all(results);
}
38 changes: 24 additions & 14 deletions src/service/xml2yamlDisassembler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { resolve, join, basename, dirname, extname } from "node:path/posix";
import { logger } from "@src/index";
import { disassembleHandler } from "@src/service/disassembleHandler";
import { transform2YAML } from "@src/service/transform2YAML";
import { withConcurrencyLimit } from "./withConcurrencyLimit";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";

export class XmlToYamlDisassembler {
async disassemble(xmlAttributes: {
Expand All @@ -23,6 +25,8 @@ export class XmlToYamlDisassembler {
postPurge = false,
ignorePath = ".xmldisassemblerignore",
} = xmlAttributes;
const concurrencyLimit = getConcurrencyThreshold();
const tasks = [];
const fileStat = await stat(filePath);

if (fileStat.isFile()) {
Expand All @@ -31,28 +35,34 @@ export class XmlToYamlDisassembler {
logger.error(`The file path is not an XML file: ${resolvedPath}`);
return;
}
await this.processFile({
filePath: resolvedPath,
uniqueIdElements,
prePurge,
postPurge,
ignorePath,
});
tasks.push(() =>
this.processFile({
filePath: resolvedPath,
uniqueIdElements,
prePurge,
postPurge,
ignorePath,
}),
);
} else if (fileStat.isDirectory()) {
const subFiles = await readdir(filePath);
for (const subFile of subFiles) {
const subFilePath = join(filePath, subFile);
if (subFilePath.endsWith(".xml")) {
await this.processFile({
filePath: subFilePath,
uniqueIdElements,
prePurge,
postPurge,
ignorePath,
});
tasks.push(() =>
this.processFile({
filePath: subFilePath,
uniqueIdElements,
prePurge,
postPurge,
ignorePath,
}),
);
}
}
}

await withConcurrencyLimit(tasks, concurrencyLimit);
}

async processFile(xmlAttributes: {
Expand Down
20 changes: 13 additions & 7 deletions src/service/yaml2xmlReassembler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { logger } from "@src/index";
import { reassembleHandler } from "@src/service/reassembleHandler";
import { transform2XML } from "@src/service/transform2XML";
import { deleteReassembledXML } from "@src/service/deleteReassembledXML";
import { withConcurrencyLimit } from "./withConcurrencyLimit";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";

export class YamlToXmlReassembler {
async reassemble(xmlAttributes: {
Expand Down Expand Up @@ -34,15 +36,19 @@ export class YamlToXmlReassembler {
}

async processFile(filePath: string): Promise<void> {
const files = await readdir(filePath);
const tasks: (() => Promise<void>)[] = [];
const files = await readdir(filePath, { withFileTypes: true });
const concurrencyLimit = getConcurrencyThreshold();

for (const file of files) {
const subFilePath = join(filePath, file);
const subFileStat = await stat(subFilePath);
if (subFileStat.isFile() && subFilePath.endsWith(".yaml")) {
await transform2XML(subFilePath);
} else if (subFileStat.isDirectory()) {
await this.processFile(subFilePath);
const subFilePath = join(filePath, file.name);

if (file.isFile() && subFilePath.endsWith(".yaml")) {
tasks.push(() => transform2XML(subFilePath));
} else if (file.isDirectory()) {
tasks.push(() => this.processFile(subFilePath));
}
}
await withConcurrencyLimit(tasks, concurrencyLimit);
}
}
28 changes: 28 additions & 0 deletions test/main.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ describe("main function", () => {

afterEach(async () => {
jest.restoreAllMocks();
jest.resetModules();
});

afterAll(async () => {
Expand Down Expand Up @@ -119,6 +120,33 @@ describe("main function", () => {
await rm(fakeFile);
expect(logger.error).toHaveBeenCalled();
});
it("should return the minimum of available parallelism and 6", () => {
jest.mock("node:os", () => ({
availableParallelism: jest.fn(() => 4), // Mock availableParallelism to return 4
}));
const {
getConcurrencyThreshold,
} = require("../src/service/getConcurrencyThreshold");
expect(getConcurrencyThreshold()).toBe(4);
});
it("should return 6 if availableParallelism returns a higher value", () => {
jest.mock("node:os", () => ({
availableParallelism: jest.fn(() => 10), // Mock availableParallelism to return 10
}));
const {
getConcurrencyThreshold,
} = require("../src/service/getConcurrencyThreshold");
expect(getConcurrencyThreshold()).toBe(6);
});
it("should return 6 if availableParallelism is undefined", () => {
jest.mock("node:os", () => ({
availableParallelism: undefined, // Simulate unavailable function
}));
const {
getConcurrencyThreshold,
} = require("../src/service/getConcurrencyThreshold");
expect(getConcurrencyThreshold()).toBe(6);
});
// This should always be the final test
it("should compare the files created in the mock directory against the baselines to confirm no changes.", async () => {
await compareDirectories(baselineDir, mockDir);
Expand Down

0 comments on commit 042121f

Please sign in to comment.