Skip to content

Commit

Permalink
Adding cleanup process for older temp files (Azure#14935)
Browse files Browse the repository at this point in the history
* Adding cleanup process for older temp files

* Lint

* Increase wait time
  • Loading branch information
hectorhdzg authored Apr 20, 2021
1 parent 9e3fd22 commit 9110a9b
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 24 deletions.
20 changes: 14 additions & 6 deletions sdk/monitor/monitor-opentelemetry-exporter/src/export/trace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,20 @@ export class AzureMonitorTraceExporter implements SpanExporter {
if (result) {
diag.info(result);
const breezeResponse = JSON.parse(result) as BreezeResponse;
const filteredEnvelopes = breezeResponse.errors.reduce(
(acc, v) => [...acc, envelopes[v.index]],
[] as Envelope[]
);
// calls resultCallback(ExportResult) based on result of persister.push
return await this._persist(filteredEnvelopes);
let filteredEnvelopes: Envelope[] = [];
breezeResponse.errors.forEach((error) => {
if (error.statusCode && isRetriable(error.statusCode)) {
filteredEnvelopes.push(envelopes[error.index]);
}
});
if (filteredEnvelopes.length > 0) {
// calls resultCallback(ExportResult) based on result of persister.push
return await this._persist(filteredEnvelopes);
}
// Failed -- not retriable
return {
code: ExportResultCode.FAILED
};
} else {
// calls resultCallback(ExportResult) based on result of persister.push
return await this._persist(envelopes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ export class FileSystemPersist implements PersistentStorage {

static FILENAME_SUFFIX = ".ai.json";

fileRetemptionPeriod = 7 * 24 * 60 * 60 * 1000; // 7 days
cleanupTimeOut = 60 * 60 * 1000; // 1 hour
maxBytesOnDisk: number = 50_000_000; // ~50MB

private _tempDirectory: string;
private _fileCleanupTimer: NodeJS.Timer | null = null;

private readonly _options: AzureExporterInternalConfig;

constructor(options: Partial<AzureExporterInternalConfig> = {}) {
Expand All @@ -36,6 +41,17 @@ export class FileSystemPersist implements PersistentStorage {
`No instrumentation key was provided to FileSystemPersister. Files may not be properly persisted`
);
}
this._tempDirectory = path.join(
os.tmpdir(),
FileSystemPersist.TEMPDIR_PREFIX + this._options.instrumentationKey
);
// Starts file cleanup task
if (!this._fileCleanupTimer) {
this._fileCleanupTimer = setTimeout(() => {
this._fileCleanupTask();
}, this.cleanupTimeOut);
this._fileCleanupTimer.unref();
}
}

push(value: unknown[]): Promise<boolean> {
Expand All @@ -61,23 +77,18 @@ export class FileSystemPersist implements PersistentStorage {
* reads the first file if exist, deletes it and tries to send its load
*/
private async _getFirstFileOnDisk(): Promise<Buffer | null> {
const tempDir = path.join(
os.tmpdir(),
FileSystemPersist.TEMPDIR_PREFIX + this._options.instrumentationKey
);

try {
const stats = await statAsync(tempDir);
const stats = await statAsync(this._tempDirectory);
if (stats.isDirectory()) {
const origFiles = await readdirAsync(tempDir);
const origFiles = await readdirAsync(this._tempDirectory);
const files = origFiles.filter((f) =>
path.basename(f).includes(FileSystemPersist.FILENAME_SUFFIX)
);
if (files.length === 0) {
return null;
} else {
const firstFile = files[0];
const filePath = path.join(tempDir, firstFile);
const filePath = path.join(this._tempDirectory, firstFile);
const payload = await readFileAsync(filePath);
// delete the file first to prevent double sending
await unlinkAsync(filePath);
Expand All @@ -96,20 +107,15 @@ export class FileSystemPersist implements PersistentStorage {
}

private async _storeToDisk(payload: string): Promise<boolean> {
const directory = path.join(
os.tmpdir(),
FileSystemPersist.TEMPDIR_PREFIX + this._options.instrumentationKey
);

try {
await confirmDirExists(directory);
await confirmDirExists(this._tempDirectory);
} catch (error) {
diag.warn(`Error while checking/creating directory: `, error && error.message);
return false;
}

try {
const size = await getShallowDirectorySize(directory);
const size = await getShallowDirectorySize(this._tempDirectory);
if (size > this.maxBytesOnDisk) {
diag.warn(
`Not saving data due to max size limit being met. Directory size in bytes is: ${size}`
Expand All @@ -121,10 +127,8 @@ export class FileSystemPersist implements PersistentStorage {
return false;
}

// create file - file name for now is the timestamp, @todo: a better approach would be a UUID but that
// would require an external dependency
const fileName = `${new Date().getTime()}${FileSystemPersist.FILENAME_SUFFIX}`;
const fileFullPath = path.join(directory, fileName);
const fileFullPath = path.join(this._tempDirectory, fileName);

// Mode 600 is w/r for creator and no read access for others
diag.info(`saving data to disk at: ${fileFullPath}`);
Expand All @@ -136,4 +140,36 @@ export class FileSystemPersist implements PersistentStorage {
}
return true;
}

private async _fileCleanupTask(): Promise<boolean> {
try {
const stats = await statAsync(this._tempDirectory);
if (stats.isDirectory()) {
const origFiles = await readdirAsync(this._tempDirectory);
const files = origFiles.filter((f) =>
path.basename(f).includes(FileSystemPersist.FILENAME_SUFFIX)
);
if (files.length === 0) {
return false;
} else {
files.forEach(async (file) => {
// Check expiration
let fileCreationDate: Date = new Date(
parseInt(file.split(FileSystemPersist.FILENAME_SUFFIX)[0])
);
let expired = new Date(+new Date() - this.fileRetemptionPeriod) > fileCreationDate;
if (expired) {
var filePath = path.join(this._tempDirectory, file);
await unlinkAsync(filePath);
}
});
return true;
}
}
return false;
} catch (error) {
diag.info(`Failed cleanup of persistent file storage expired files`, error);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ describe("#AzureMonitorBaseExporter", () => {
assert.strictEqual(persistedEnvelopes?.length, 2);
});

it("should not persist partial non retriable failed telemetry", async () => {
const exporter = new TestExporter();
const response = partialBreezeResponse([407, 501, 408]);
scope.reply(206, JSON.stringify(response));

const result = await exporter.exportEnvelopesPrivate([envelope, envelope, envelope]);
assert.strictEqual(result.code, ExportResultCode.SUCCESS);

const persistedEnvelopes = (await exporter["_persister"].shift()) as Envelope[];
assert.strictEqual(persistedEnvelopes?.length, 1);
});

it("should not persist non-retriable failed telemetry", async () => {
const exporter = new TestExporter();
const response = failedBreezeResponse(1, 400);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,21 @@ describe("FileSystemPersist", () => {
assert.deepStrictEqual(value2, secondBatch);
});
});

describe("#fileCleanupTask()", () => {
it("must clean old files from temp location", async () => {
const sleep = promisify(setTimeout);
const persister = new FileSystemPersist({ instrumentationKey });
const firstBatch = [{ batch: "first" }];
const success1 = await persister.push(firstBatch);
assert.strictEqual(success1, true);
persister.fileRetemptionPeriod = 1;
// wait 100 ms
await sleep(100);
let cleanup = await persister["_fileCleanupTask"]();
assert.strictEqual(cleanup, true);
let fileValue = await persister.shift();
assert.deepStrictEqual(fileValue, null); //File doesn't exist anymore
});
});
});

0 comments on commit 9110a9b

Please sign in to comment.