Skip to content

Commit

Permalink
feat(cli): make cogs will update the process job status if exists. (#…
Browse files Browse the repository at this point in the history
…2180)

* Deprecate action job batch cli

* Update the job:failure if catch errors.

* Update Processing job complete

* Export JobStatus and ProcessingJobFailed from basemaps config

* Some minor refiment.

* Minor fix.

* Update Config ProcessingJob to use isWriteable.

* Add processing Id in the Cog Job.

* Update job status by job.processingId.

* Add projection, tileset id and url into processing job config.

* Minor refinements.

* Use tilematrix identifier in job config, and throw error if config not found.

* Add tileMatrix and tilesetId into Processing job.
  • Loading branch information
Wentao-Kuang authored May 13, 2022
1 parent 1b3a7eb commit 855ce1c
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 75 deletions.
8 changes: 4 additions & 4 deletions packages/cli/src/cli/cogify/__test__/action.batch.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import o from 'ospec';
import { CogJob } from '../../../cog/types.js';
import { ActionBatchJob, extractResolutionFromName } from '../action.batch.js';
import { BatchJob, extractResolutionFromName } from '../batch.job.js';

o.spec('action.batch', () => {
o('extractResolutionFromName', () => {
Expand All @@ -15,17 +15,17 @@ o.spec('action.batch', () => {

o('should create valid jobNames', () => {
const fakeJob = { id: '01FHRPYJ5FV1XAARZAC4T4K6MC', name: 'geographx_nz_texture_shade_2012_8-0m' } as CogJob;
o(ActionBatchJob.id(fakeJob, '0')).equals('01FHRPYJ5FV1XAARZAC4T4K6MC-9af5e139bbb3e502-0');
o(BatchJob.id(fakeJob, '0')).equals('01FHRPYJ5FV1XAARZAC4T4K6MC-9af5e139bbb3e502-0');

fakeJob.name = 'ōtorohanga_urban_2021_0.1m_RGB';
o(ActionBatchJob.id(fakeJob, '0')).equals('01FHRPYJ5FV1XAARZAC4T4K6MC-5294acface81c107-0');
o(BatchJob.id(fakeJob, '0')).equals('01FHRPYJ5FV1XAARZAC4T4K6MC-5294acface81c107-0');
});

o('should truncate job names to 128 characters', () => {
const fakeJob = { id: '01FHRPYJ5FV1XAARZAC4T4K6MC', name: 'geographx_nz_texture_shade_2012_8-0m' } as CogJob;

o(
ActionBatchJob.id(
BatchJob.id(
fakeJob,
'this is a really long file name it should over flow 128 characters so it should be truncated at some point.tiff',
),
Expand Down
28 changes: 26 additions & 2 deletions packages/cli/src/cli/cogify/action.cog.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Env, fsa, LogConfig, LoggerFatalError, LogType } from '@basemaps/shared';
import { Config, Env, fsa, LogConfig, LoggerFatalError, LogType } from '@basemaps/shared';
import {
CommandLineAction,
CommandLineFlagParameter,
Expand All @@ -16,6 +16,8 @@ import { Gdal } from '../../gdal/gdal.js';
import { CliId } from '../base.cli.js';
import { makeTempFolder } from '../folder.js';
import path from 'path';
import { JobStatus, ProcessingJobComplete, ProcessingJobFailed } from '@basemaps/config';
import { prepareUrl } from '../util.js';

export class ActionCogCreate extends CommandLineAction {
private job?: CommandLineStringParameter;
Expand Down Expand Up @@ -126,6 +128,17 @@ export class ActionCogCreate extends CommandLineAction {
} else {
logger.warn('DryRun:Done');
}
} catch (e) {
if (job.processingId != null) {
// Update job status if this is the processing job.
const jobConfig = await Config.ProcessingJob.get(job.processingId);
if (jobConfig == null) throw new Error('Unable to find Job Processing Config:' + job.processingId);
const jobFailed = jobConfig as ProcessingJobFailed;
jobFailed.status = JobStatus.Fail;
jobFailed.error = String(e);
if (Config.ProcessingJob.isWriteable()) await Config.ProcessingJob.put(jobFailed);
else throw new Error('Unable update the Processing Job status:' + jobFailed.id);
}
} finally {
// Cleanup!
await fs.rm(tmpFolder, { recursive: true });
Expand All @@ -147,7 +160,18 @@ export class ActionCogCreate extends CommandLineAction {
}

if (expectedTiffs.size === 0) {
logger.info({ tiffCount: jobSize, tiffTotal: jobSize }, 'CogCreate:JobComplete');
const url = await prepareUrl(job);
if (job.processingId != null) {
// Update job status if this is the processing job.
const jobConfig = await Config.ProcessingJob.get(job.processingId);
if (jobConfig == null) throw new Error('Unable to find Job Processing Config:' + job.processingId);
const jobComplete = jobConfig as ProcessingJobComplete;
jobComplete.status = JobStatus.Complete;
jobComplete.url = url;
if (Config.ProcessingJob.isWriteable()) await Config.ProcessingJob.put(jobConfig);
else throw new Error('Unable update the Processing Job status:' + jobConfig.id);
}
logger.info({ tiffCount: jobSize, tiffTotal: jobSize, url }, 'CogCreate:JobComplete');
} else {
logger.info({ tiffCount: jobSize, tiffRemaining: expectedTiffs.size }, 'CogCreate:JobProgress');
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { TileMatrixSet } from '@basemaps/geo';
import { Env, fsa, LogConfig, LogType, Projection } from '@basemaps/shared';
import { CommandLineAction, CommandLineFlagParameter, CommandLineStringParameter } from '@rushstack/ts-command-line';
import Batch from 'aws-sdk/clients/batch.js';
import { createHash } from 'crypto';
import { CogStacJob } from '../../cog/cog.stac.job.js';
import { CogJob } from '../../cog/types.js';

const JobQueue = 'CogBatchJobQueue';
Expand All @@ -25,19 +23,7 @@ export function extractResolutionFromName(name: string): number {
return parseFloat(matches[1].replace('-', '.')) * 1000;
}

export class ActionBatchJob extends CommandLineAction {
private job?: CommandLineStringParameter;
private commit?: CommandLineFlagParameter;
private oneCog?: CommandLineStringParameter;

public constructor() {
super({
actionName: 'batch',
summary: 'AWS batch jobs',
documentation: 'Submit a list of cogs to a AWS Batch queue to be process',
});
}

export class BatchJob {
/**
* Create a id for a job
*
Expand All @@ -59,7 +45,7 @@ export class ActionBatchJob extends CommandLineAction {
name: string,
isCommit: boolean,
): Promise<{ jobName: string; jobId: string; memory: number }> {
const jobName = ActionBatchJob.id(job, name);
const jobName = BatchJob.id(job, name);
const tile = TileMatrixSet.nameToTile(name);
const alignmentLevels = Projection.findAlignmentLevels(job.tileMatrix, tile, job.source.gsd);
// Give 25% more memory to larger jobs
Expand Down Expand Up @@ -112,19 +98,6 @@ export class ActionBatchJob extends CommandLineAction {
return okMap;
}

async onExecute(): Promise<void> {
if (this.job?.value == null) {
throw new Error('Failed to read parameters');
}

await ActionBatchJob.batchJob(
await CogStacJob.load(this.job.value),
this.commit?.value,
this.oneCog?.value,
LogConfig.get(),
);
}

static async batchJob(job: CogJob, commit = false, oneCog: string | undefined, logger: LogType): Promise<void> {
const jobPath = job.getJobPath('job.json');
if (!jobPath.startsWith('s3://')) {
Expand All @@ -136,12 +109,12 @@ export class ActionBatchJob extends CommandLineAction {
const batch = new Batch({ region });

fsa.configure(job.output.location);
const runningJobs = await ActionBatchJob.getCurrentJobList(batch);
const runningJobs = await BatchJob.getCurrentJobList(batch);

const stats = await Promise.all(
job.output.files.map(async ({ name }) => {
if (oneCog != null && oneCog !== name) return { name, ok: true };
const jobName = ActionBatchJob.id(job, name);
const jobName = BatchJob.id(job, name);
const isRunning = runningJobs.get(jobName);
if (isRunning) {
logger.info({ jobName }, 'JobRunning');
Expand Down Expand Up @@ -172,7 +145,7 @@ export class ActionBatchJob extends CommandLineAction {
);

for (const name of toSubmit) {
const jobStatus = await ActionBatchJob.batchOne(jobPath, job, batch, name, commit);
const jobStatus = await BatchJob.batchOne(jobPath, job, batch, name, commit);
logger.info(jobStatus, 'JobSubmitted');
}

Expand All @@ -181,26 +154,4 @@ export class ActionBatchJob extends CommandLineAction {
return;
}
}

protected onDefineParameters(): void {
this.job = this.defineStringParameter({
argumentName: 'JOB',
parameterLongName: '--job',
description: 'Job config source to access',
required: true,
});

this.oneCog = this.defineStringParameter({
argumentName: 'COG_NAME',
parameterLongName: '--one-cog',
description: 'Restrict batch to build a single COG file',
required: false,
});

this.commit = this.defineFlagParameter({
parameterLongName: '--commit',
description: 'Begin the transformation',
required: false,
});
}
}
2 changes: 0 additions & 2 deletions packages/cli/src/cli/cogify/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env node
import 'source-map-support/register.js';
import { BaseCommandLine } from '../base.cli.js';
import { ActionBatchJob } from './action.batch.js';
import { ActionCogCreate } from './action.cog.js';
import { ActionJobCreate } from './action.job.js';

Expand All @@ -13,7 +12,6 @@ export class CogifyCommandLine extends BaseCommandLine {
});
this.addAction(new ActionCogCreate());
this.addAction(new ActionJobCreate());
this.addAction(new ActionBatchJob());
}
}

Expand Down
17 changes: 16 additions & 1 deletion packages/cli/src/cli/util.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fsa, LogConfig } from '@basemaps/shared';
import { Env, fsa, LogConfig, Projection } from '@basemaps/shared';
import CloudFormation from 'aws-sdk/clients/cloudformation.js';
import CloudFront from 'aws-sdk/clients/cloudfront.js';
import S3 from 'aws-sdk/clients/s3.js';
Expand All @@ -7,6 +7,7 @@ import crypto from 'crypto';
import path from 'path';
import { gzip } from 'zlib';
import { promisify } from 'util';
import { CogStacJob } from '../cog/cog.stac.job.js';

// Cloudfront has to be defined in us-east-1
const cloudFormation = new CloudFormation({ region: 'us-east-1' });
Expand Down Expand Up @@ -126,3 +127,17 @@ export async function uploadStaticFile(
.promise();
return true;
}

/**
* Prepare QA urls with center location
*/
export async function prepareUrl(job: CogStacJob): Promise<string> {
const bounds = job.output.bounds;
const center = { x: bounds.x + bounds.width / 2, y: bounds.y + bounds.height / 2 };
const proj = Projection.get(job.tileMatrix.projection);
const centerLatLon = proj.toWgs84([center.x, center.y]).map((c) => c.toFixed(6));
const targetZoom = Math.max(job.tileMatrix.findBestZoom(job.output.gsd) - 12, 0);
const base = Env.get(Env.PublicUrlBase);
const url = `${base}/?i=${job.id}&p=${job.tileMatrix.identifier}&debug#@${centerLatLon[1]},${centerLatLon[0]},z${targetZoom}`;
return url;
}
9 changes: 9 additions & 0 deletions packages/cli/src/cog/cog.stac.job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ export interface JobCreationContext {
* @Default GdalCogBuilderDefaults.resampling
*/
resampling?: GdalCogBuilderResampling;

/**
* Override job processing Id
*/
processingId?: string;
};

/**
Expand Down Expand Up @@ -110,6 +115,7 @@ export class CogStacJob implements CogJob {
gsd: number;
zoom: number;
};
processingId: string;

/**
* Load the job.json
Expand Down Expand Up @@ -210,6 +216,9 @@ export class CogStacJob implements CogJob {
},
});

// Only include processing ID if exists
if (ctx.override?.processingId != null) job.processingId = ctx.override?.processingId;

const nowStr = new Date().toISOString();

const sourceProj = Projection.get(job.source.epsg);
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/cog/job.factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { fsa, isConfigS3Role, isFileConfigPath, LogConfig } from '@basemaps/shar
import { basename } from 'path';
import * as ulid from 'ulid';
import { CogBuilder } from '../index.js';
import { ActionBatchJob } from '../cli/cogify/action.batch.js';
import { BatchJob } from '../cli/cogify/batch.job.js';
import { Gdal } from '../gdal/gdal.js';
import { CogStacJob, JobCreationContext } from './cog.stac.job.js';
import { Cutline } from './cutline.js';
Expand Down Expand Up @@ -108,7 +108,7 @@ export const CogJobFactory = {
cutlinePoly: cutline.clipPoly,
});

if (ctx.batch) await ActionBatchJob.batchJob(job, true, undefined, logger);
if (ctx.batch) await BatchJob.batchJob(job, true, undefined, logger);
logger.info({ tileMatrix: ctx.tileMatrix.identifier, job: job.getJobPath() }, 'Done');

return job;
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/cog/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ export interface CogJobJson {

source: CogSourceProperties;
output: CogOutputProperties;

processingId?: string;
}

export interface CogJob extends CogJobJson {
Expand Down
18 changes: 14 additions & 4 deletions packages/config/src/config/processing.job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,27 @@ import { BaseConfig } from './base.js';
export enum JobStatus {
Processing = 'processing',
Complete = 'complete',
Fail = 'failed',
}

export type ConfigProcessingJob = ProcessingJob | ProcessingJobFailed;
export type ConfigProcessingJob = ProcessingJob | ProcessingJobComplete | ProcessingJobFailed;

export interface ProcessingJob extends BaseConfig {
/** Job Status for the imagery importing batch jobs */
status: JobStatus;
/** Processed Imagery projection */
tileMatrix: string;
/** Processed TileSet Id */
tileSet: string;
/** Basemaps TileSet url */
}

export interface ProcessingJobFailed extends BaseConfig {
status: 'failed';
export interface ProcessingJobComplete extends ProcessingJob {
status: JobStatus.Complete;
url: string;
}

export interface ProcessingJobFailed extends ProcessingJob {
status: JobStatus.Fail;
/** Job Batch processing error messages */
error: string;
}
8 changes: 7 additions & 1 deletion packages/config/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ export { BaseConfig } from './config/base.js';
export { ConfigImagery } from './config/imagery.js';
export { ConfigPrefix } from './config/prefix.js';
export { ConfigProvider } from './config/provider.js';
export { ConfigProcessingJob } from './config/processing.job.js';
export {
JobStatus,
ProcessingJob,
ProcessingJobComplete,
ProcessingJobFailed,
ConfigProcessingJob,
} from './config/processing.job.js';
export {
ConfigLayer,
ConfigTileSet,
Expand Down
16 changes: 11 additions & 5 deletions packages/lambda-tiler/src/routes/import.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import { createHash } from 'crypto';
import { findImagery, RoleRegister } from '../import/imagery.find.js';
import { Nztm2000Tms, TileMatrixSets } from '@basemaps/geo';
import { getJobCreationContext } from '../import/make.cog.js';
import { ConfigProcessingJob } from '@basemaps/config';
import { ConfigProcessingJob, JobStatus } from '@basemaps/config';
import { CogJobFactory } from '@basemaps/cli';
import * as ulid from 'ulid';

/**
* Trigger import imagery job by this endpoint
Expand Down Expand Up @@ -35,18 +36,23 @@ export async function Import(req: LambdaHttpRequest): Promise<LambdaHttpResponse
// Prepare Cog jobs
const ctx = await getJobCreationContext(path, targetTms, role, files);

const id = createHash('sha256').update(JSON.stringify(ctx)).digest('base64');
const jobId = Config.ProcessingJob.id(id);
const hash = createHash('sha256').update(JSON.stringify(ctx)).digest('base64');
const jobId = Config.ProcessingJob.id(hash);
let jobConfig = await Config.ProcessingJob.get(jobId);
if (jobConfig == null) {
// Add id back to JobCreationContext
// Add ids into JobCreationContext
const id = ulid.ulid();
ctx.override!.id = id;
ctx.override!.processingId = jobId;
ctx.outputLocation.path = fsa.join(ctx.outputLocation.path, id);

// Insert Processing job config
jobConfig = {
id: jobId,
name: path,
status: 'processing',
status: JobStatus.Processing,
tileMatrix: targetTms.identifier,
tileSet: Config.TileSet.id(id),
} as ConfigProcessingJob;

if (Config.ProcessingJob.isWriteable()) await Config.ProcessingJob.put(jobConfig);
Expand Down

0 comments on commit 855ce1c

Please sign in to comment.