Skip to content

Commit

Permalink
Large asset Ex OOM fix when in s3 asset mode (#3598)
Browse files Browse the repository at this point in the history
This PR makes the following changes: 

- Improves the s3 backend get() requests to grab assets in a more memory
efficient way
- This resolves and issue where pulling and decompressing large assets
from s3 would cause and OOM on the execution controller on job startup
- Add error message when asset loader would close with an error that
advises what to do in the case of an OOM issue

Ref to issue #3595
  • Loading branch information
sotojn authored Apr 30, 2024
1 parent 959d1e0 commit 1baff7a
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 19 deletions.
115 changes: 109 additions & 6 deletions e2e/test/cases/assets/simple-spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import 'jest-extended';
import { createReadStream } from 'node:fs';
import fs from 'node:fs';
import os from 'os';
import path from 'path';
import decompress from 'decompress';
import archiver from 'archiver';
import {
createS3Client,
getS3Object,
S3Client,
} from '@terascope/file-asset-apis';
import { Teraslice } from '@terascope/types';
import { pWhile } from '@terascope/utils';
import crypto from 'crypto';
import { TerasliceHarness, JobFixtureNames } from '../../teraslice-harness.js';
import {
ASSET_STORAGE_CONNECTION_TYPE, MINIO_ACCESS_KEY, MINIO_HOST, MINIO_SECRET_KEY, TEST_PLATFORM
Expand All @@ -32,7 +38,7 @@ describe('assets', () => {
* @param {string} assetPath the relative path to the asset file
*/
async function submitAndValidateAssetJob(jobSpecName: JobFixtureNames, assetPath: string) {
const fileStream = createReadStream(assetPath);
const fileStream = fs.createReadStream(assetPath);
const jobSpec = terasliceHarness.newJob(jobSpecName);
// Set resource constraints on workers within CI
if (TEST_PLATFORM === 'kubernetes') {
Expand All @@ -57,7 +63,7 @@ describe('assets', () => {
}

it('after uploading an asset, it can be deleted', async () => {
const testStream = createReadStream('test/fixtures/assets/example_asset_1.zip');
const testStream = fs.createReadStream('test/fixtures/assets/example_asset_1.zip');

const result = await terasliceHarness.teraslice.assets.upload(
testStream,
Expand All @@ -79,7 +85,7 @@ describe('assets', () => {
// {"error":"asset.json was not found in root directory of asset bundle
// nor any immediate sub directory"}
it('uploading a bad asset returns an error', async () => {
const testStream = createReadStream('test/fixtures/assets/example_bad_asset_1.zip');
const testStream = fs.createReadStream('test/fixtures/assets/example_bad_asset_1.zip');

try {
await terasliceHarness.teraslice.assets.upload(testStream, { blocking: true });
Expand Down Expand Up @@ -113,7 +119,7 @@ describe('assets', () => {
it('can update an asset bundle and use the new asset', async () => {
const assetPath = 'test/fixtures/assets/example_asset_1updated.zip';

const fileStream = createReadStream(assetPath);
const fileStream = fs.createReadStream(assetPath);
// the asset on this job already points to 'ex1' so it should use the latest available asset
const jobSpec = terasliceHarness.newJob('generator-asset');
// Set resource constraints on workers within CI
Expand Down Expand Up @@ -173,6 +179,8 @@ describe('assets', () => {
describe('s3 asset storage', () => {
// If the connection type is S3 run tests to ensure assets are stored in S3
if (ASSET_STORAGE_CONNECTION_TYPE === 's3') {
/// keep 'largeAssetPath' in outer scope so afterAll can cleanup even on failure
const largeAssetPath = fs.mkdtempSync(path.join(os.tmpdir(), 'example_large_asset_top'));
let terasliceInfo: Teraslice.ApiRootResponse;
let terasliceHarness: TerasliceHarness;
let s3client: S3Client;
Expand All @@ -197,9 +205,14 @@ describe('s3 asset storage', () => {
bucketName = `ts-assets-${terasliceInfo.name}`.replaceAll('_', '-');
});

afterAll(async () => {
/// cleanup
fs.rmSync(largeAssetPath, { recursive: true, force: true });
});

it('stores assets in s3', async () => {
const assetPath = 'test/fixtures/assets/example_asset_1updated.zip';
const fileStream = createReadStream(assetPath);
const fileStream = fs.createReadStream(assetPath);
const assetResponse = await terasliceHarness.teraslice.assets.upload(fileStream, {
blocking: true
});
Expand All @@ -221,5 +234,95 @@ describe('s3 asset storage', () => {
expect(record._source?.blob).toBeUndefined();
}
});

it('can upload and use large asset', async () => {
/// Create a large asset within the test so we don't have to a upload
/// large binary file to the repo
const assetPath = 'test/fixtures/assets/example_asset_1updated.zip';
if (!fs.existsSync(largeAssetPath)) {
fs.mkdirSync(largeAssetPath, { recursive: true });
}
const largeAssetPathSub = path.join(largeAssetPath, 'example_large_asset_sub');
if (!fs.existsSync(largeAssetPathSub)) {
fs.mkdirSync(largeAssetPathSub, { recursive: true });
}
const assetBuffer = fs.readFileSync(assetPath);
await decompress(assetBuffer, largeAssetPathSub);
fs.mkdirSync(path.join(largeAssetPathSub, '__static_assets'), { recursive: true });
const largeDocumentPath = path.join(largeAssetPathSub, '__static_assets', 'data.txt');
fs.writeFileSync(largeDocumentPath, '');
const writer = fs.createWriteStream(largeDocumentPath);
let generateComplete = false;

/// TODO: This functionality could be moved to utils at some point.
/// Writes a chunk of random string data to data.txt
/// It needs to be random to maintain size during compression
function writeData() {
/// chunk size in bytes
/// 5mb per chunk
const chunkSize = 5242880;
const stringChunk = crypto.randomBytes(chunkSize);
writer.write(stringChunk, writerCB);
}

/// Once the previous chunk is proccesed,
/// write another chunk until the bytes written is >= 60mb
/// This is so we don't hold all 60mb in memory
function writerCB(error: Error | null | void) {
if (error) {
throw new Error(error.message);
}
const totalBytes = writer.bytesWritten;
if (totalBytes >= 62914560) {
writer.end();
generateComplete = true;
} else {
writeData();
}
}
/// Once the write stream is ready start writing data to the file
writer.on('ready', () => {
writeData();
});

writer.on('error', (err) => {
throw new Error(err.message);
});
/// Wait for all data to be written to file
await pWhile(async () => generateComplete);

/// Change name in asset.json
const assetJSON = JSON.parse(fs.readFileSync(path.join(largeAssetPathSub, 'asset.json'), 'utf8'));
assetJSON.name = 'large-example-asset';
fs.writeFileSync(path.join(largeAssetPathSub, 'asset.json'), JSON.stringify(assetJSON, null, 2));

/// Zip the large asset
const zippedFile = fs.createWriteStream(path.join(largeAssetPath, 'example_large_asset.zip'));
const zipper = archiver('zip');
zipper.pipe(zippedFile);
zipper.on('error', (err) => {
throw new Error(err.message);
});
zipper.directory(largeAssetPathSub, false);
await zipper.finalize();

const fileStream = fs.createReadStream(path.join(largeAssetPath, 'example_large_asset.zip'));

/// Will throw error if unable to upload
await terasliceHarness.teraslice.assets.upload(fileStream, {
blocking: true
});

const jobSpec = terasliceHarness.newJob('generator-large-asset');
// // Set resource constraints on workers within CI
if (TEST_PLATFORM === 'kubernetes') {
jobSpec.resources_requests_cpu = 0.1;
}

const ex = await terasliceHarness.submitAndStart(jobSpec);
const status = await ex.waitForStatus('completed');

expect(status).toBe('completed');
});
}
});
19 changes: 19 additions & 0 deletions e2e/test/fixtures/jobs/generator-large-asset.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "generator",
"slicers": 1,
"lifecycle": "once",
"workers": 3,
"analytics": false,
"assets": ["standard", "large-example-asset"],
"max_retries": 0,
"operations": [
{
"_op": "data_generator",
"size": 1000,
"stress_test": false
},
{
"_op": "noop"
}
]
}
2 changes: 2 additions & 0 deletions e2e/test/teraslice-harness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { scaleWorkers, getElapsed } from './docker-helpers.js';
import signale from './signale.js';
import generatorToESJob from './fixtures/jobs/generate-to-es.json' assert { type: 'json' };
import generatorAssetJob from './fixtures/jobs/generator-asset.json' assert { type: 'json' };
import generatorLargeAssetJob from './fixtures/jobs/generator-large-asset.json' assert { type: 'json' };
import generatorJob from './fixtures/jobs/generator.json' assert { type: 'json' };
import idJob from './fixtures/jobs/id.json' assert { type: 'json' };
import kafkaReaderJob from './fixtures/jobs/kafka-reader.json' assert { type: 'json' };
Expand All @@ -29,6 +30,7 @@ import { defaultAssetBundles } from './download-assets.js';
const JobDict = Object.freeze({
'generate-to-es': generatorToESJob,
'generator-asset': generatorAssetJob,
'generator-large-asset': generatorLargeAssetJob,
generator: generatorJob,
id: idJob,
'kafka-reader': kafkaReaderJob,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice-workspace",
"displayName": "Teraslice",
"version": "1.3.0",
"version": "1.3.1",
"private": true,
"homepage": "https://github.com/terascope/teraslice",
"bugs": {
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "teraslice",
"displayName": "Teraslice",
"version": "1.3.0",
"version": "1.3.1",
"description": "Distributed computing platform for processing JSON data",
"homepage": "https://github.com/terascope/teraslice#readme",
"bugs": {
Expand Down
27 changes: 21 additions & 6 deletions packages/teraslice/src/lib/storage/backends/s3_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,39 @@ export class S3Store {

// TODO: if we want to use the S3 store more generically we can't
// assume the key will have a '.zip' extension
async get(recordId: string) {
async get(recordId: string): Promise<Buffer> {
const command = {
Bucket: this.bucket,
Key: `${recordId}.zip`
};
try {
this.logger.debug(`getting record with id: ${recordId} from s3 ${this.connection} connection, ${this.bucket} bucket.`);
const client = this.api;
const bufferArray: Buffer[] = [];
let triggerReturn = false;
const response = await s3RequestWithRetry({
client,
func: getS3Object,
params: command
});
const s3File = await response.Body?.transformToString('base64');
if (typeof s3File !== 'string') {
throw new TSError(`Unable to get recordId ${recordId} from s3 ${this.connection} connection, ${this.bucket} bucket.`);
}
return s3File;
/// Convert the response body to a Node read stream
const s3Stream = response.Body as NodeJS.ReadableStream;

/// Store the data coming into s3 into a buffer array
s3Stream.on('data', (chunk: Buffer) => {
bufferArray.push(chunk);
});
s3Stream.on('end', () => {
triggerReturn = true;
});
s3Stream.on('error', (err) => {
throw new TSError(`Unable to get recordId ${recordId} from s3 ${this.connection} connection, ${this.bucket} bucket.
Reason: ${err.message}`);
});

await pWhile(async () => triggerReturn);

return Buffer.concat(bufferArray);
} catch (err) {
if (err instanceof S3ClientResponse.NoSuchKey) {
throw new TSError(`recordId ${recordId} does not exist in s3 ${this.connection} connection, ${this.bucket} bucket.`, {
Expand Down
8 changes: 7 additions & 1 deletion packages/teraslice/src/lib/workers/assets/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,14 @@ export class AssetLoader {

const assetRecord = await this.assetsStorage.get(assetIdentifier);
this.logger.info(`loading assets: ${assetIdentifier}`);
let buff: Buffer;

if (this.context.sysconfig.terafoundation.asset_storage_connection_type === 's3') {
buff = assetRecord.blob as Buffer;
} else {
buff = Buffer.from(assetRecord.blob as string, 'base64');
}

const buff = Buffer.from(assetRecord.blob as string, 'base64');
const saveResult = await saveAsset(
this.logger,
this.assetsDirectory,
Expand Down
3 changes: 2 additions & 1 deletion packages/teraslice/src/lib/workers/assets/spawn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ export async function spawnAssetLoader(

if (!isSuccess) {
const errMsg = get(message, 'error', `exit code ${code}`);
const error = new Error(`Failure to get assets, caused by ${errMsg}`);
const errOOM = 'If running out of memory, try consider increasing the memory allocation for the process by adding/modifying the "memory_execution_controller" or "resources_limits_memory" (for workers) field in the job file.';
const error = new Error(`Failure to get assets, caused by ${errMsg}\n${errOOM}`);
reject(error);
} else {
resolve(get(message, 'assetIds', []));
Expand Down
5 changes: 4 additions & 1 deletion packages/teraslice/test/storage/assets_storage-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ describe('AssetsStorage using S3 backend', () => {
});

it('can get an asset from S3', async () => {
/// create a buffer copy of example_asset_1.zip to test if it equals what s3 sends back
const filePath = 'e2e/test/fixtures/assets/example_asset_1.zip';
const buffer = fs.readFileSync(filePath);
const assetRecord = await storage.get('2909ec5fd38466cf6276cc14ede25096f1f34ee9');
expect(assetRecord.blob).toStartWith('UEsDBAoAAAAAANxV');
expect(buffer.equals(assetRecord.blob as Buffer)).toBe(true);
expect(assetRecord.name).toBe('ex1');
});

Expand Down
5 changes: 3 additions & 2 deletions packages/teraslice/test/storage/s3_store-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ describe('S3 backend test', () => {

it('should be able to download asset', async () => {
const filePath = 'e2e/test/fixtures/assets/example_asset_2.zip';
await s3Backend.save('ex2', fse.readFileSync(filePath), 30000);
const fileBuffer = fse.readFileSync(filePath);
await s3Backend.save('ex2', fileBuffer, 30000);

const result = await s3Backend.get('ex2');

expect(result).toStartWith('UEsDBAo');
expect(result.equals(fileBuffer)).toBe(true);
await s3Backend.remove('ex2');
});
});
Expand Down

0 comments on commit 1baff7a

Please sign in to comment.