diff --git a/.github/workflows/planet-cdk-cd.yml b/.github/workflows/planet-cdk-cd.yml index d103247..a7ede0c 100644 --- a/.github/workflows/planet-cdk-cd.yml +++ b/.github/workflows/planet-cdk-cd.yml @@ -32,13 +32,25 @@ jobs: aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: ${{ secrets.AWS_REGION }} - - name: Deploy CDK + - name: Deploy Planet Raster Tiles CDK run: | cd tiles-generation/cdk cdk deploy \ - PlanetTileGenerationStack \ - PlanetEmailNotificationStack \ - PlanetSlackNotificationStack \ + PlanetVectorTileGeneration \ + EmailSNSPlanetRasterTileGeneration \ + SlackSNSPlanetRasterTileGeneration \ + --context BUCKET=${{secrets.TILE_S3_BUCKET}} \ + --context EMAIL=${{secrets.EMAIL}} \ + --context SLACK=${{secrets.SLACK}} \ + --require-approval never + + - name: Deploy Planet Vector Tiles CDK + run: | + cd tiles-generation/cdk + cdk deploy \ + PlanetVectorTileGeneration \ + EmailSNSPlanetVectorTileGeneration \ + SlackSNSPlanetVectorTileGeneration \ --context BUCKET=${{secrets.TILE_S3_BUCKET}} \ --context EMAIL=${{secrets.EMAIL}} \ --context SLACK=${{secrets.SLACK}} \ diff --git a/.github/workflows/test-cdk-cd.yml b/.github/workflows/test-cdk-cd.yml index 7a4751c..86a8c4e 100644 --- a/.github/workflows/test-cdk-cd.yml +++ b/.github/workflows/test-cdk-cd.yml @@ -44,22 +44,42 @@ jobs: aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: ${{ secrets.AWS_REGION }} - - name: Deploy CDK + - name: Deploy Test Raster Tile CDK run: | cd tiles-generation/cdk cdk deploy \ - TestTileGenerationStack \ - TestSlackNotificationStack \ - TestEmailNotificationStack \ + TestRasterTileGeneration \ + SlackSNSTestRasterTileGeneration \ + EmailSNSTestRasterTileGeneration \ --context BUCKET=${{secrets.TILE_S3_BUCKET}} \ --context EMAIL=${{secrets.EMAIL}} \ --context SLACK=${{secrets.SLACK}} \ --require-approval never \ - --outputs-file ./cdk-outputs.json + --outputs-file ./cdk-outputs-raster.json - - name: Run ECS task + - name: Run Test Raster Tiles ECS task run: | - ClusterName=`jq -r '.TestTileGenerationStack .ClusterName' tiles-generation/cdk/cdk-outputs.json` - CapacityProviderName=`jq -r '.TestTileGenerationStack .CapacityProviderName' tiles-generation/cdk/cdk-outputs.json` - TaskDefinitionArn=`jq -r '.TestTileGenerationStack .TaskDefinitionArn' tiles-generation/cdk/cdk-outputs.json` + ClusterName=`jq -r '.TestRasterTileGeneration .ClusterName' tiles-generation/cdk/cdk-outputs-raster.json` + CapacityProviderName=`jq -r '.TestRasterTileGeneration .CapacityProviderName' tiles-generation/cdk/cdk-outputs-raster.json` + TaskDefinitionArn=`jq -r '.TestRasterTileGeneration .TaskDefinitionArn' tiles-generation/cdk/cdk-outputs-raster.json` aws ecs run-task --cluster $ClusterName --task-definition $TaskDefinitionArn --capacity-provider-strategy capacityProvider=$CapacityProviderName + + - name: Deploy Test Vector Tile CDK + run: | + cd tiles-generation/cdk + cdk deploy \ + TestVectorTileGeneration \ + SlackSNSTestVectorTileGeneration \ + EmailSNSTestVectorTileGeneration \ + --context BUCKET=${{secrets.TILE_S3_BUCKET}} \ + --context EMAIL=${{secrets.EMAIL}} \ + --context SLACK=${{secrets.SLACK}} \ + --require-approval never \ + --outputs-file ./cdk-outputs-vector.json + + - name: Run Test Vector Tiles ECS task + run: | + ClusterName=`jq -r '.TestVectorTileGeneration .ClusterName' tiles-generation/cdk/cdk-outputs-vector.json` + CapacityProviderName=`jq -r '.TestRasterTileGeneration .CapacityProviderName' tiles-generation/cdk/cdk-outputs-vector.json` + TaskDefinitionArn=`jq -r '.TestRasterTileGeneration .TaskDefinitionArn' tiles-generation/cdk/cdk-outputs-vector.json` + aws ecs run-task --cluster $ClusterName --task-definition $TaskDefinitionArn --capacity-provider-strategy capacityProvider=$CapacityProviderName \ No newline at end of file diff --git a/tiles-generation/cdk/.prettierrc.js b/tiles-generation/cdk/.prettierrc.js new file mode 100644 index 0000000..7a13d67 --- /dev/null +++ b/tiles-generation/cdk/.prettierrc.js @@ -0,0 +1,6 @@ +module.exports = { + trailingComma: 'es5', + tabWidth: 4, + semi: true, + singleQuote: true, +}; diff --git a/tiles-generation/cdk/README.md b/tiles-generation/cdk/README.md index 08b0517..3176642 100644 --- a/tiles-generation/cdk/README.md +++ b/tiles-generation/cdk/README.md @@ -1,6 +1,6 @@ # Tiles generation pipeline with CDK - The tiles generation stack builds a raster map tiles generation pipeline on [Amazon ECS](https://aws.amazon.com/ecs/) using [AWS CDK](https://aws.amazon.com/cdk/). The docker base image used in ECS comes from the [openstreetmap-tile-server](https://github.com/Overv/openstreetmap-tile-server) project. +This project helps to build raster tiles and vector tiles generation pipeline on [Amazon ECS](https://aws.amazon.com/ecs/) using [AWS CDK](https://aws.amazon.com/cdk/). The raster tiles generation docker base image comes from the [openstreetmap-tile-server](https://github.com/Overv/openstreetmap-tile-server) project. The vector tiles generation leverage [Planetiler](https://github.com/onthegomap/planetiler) to generate tiles into MBTiles(sqlite) and then leverage [MBUtil](https://github.com/mapbox/mbutil) to extract it to PBF(Google Protobufs) files. ## Diagram ![alt text](./tiles-generation-diagram.png) @@ -39,33 +39,40 @@ Visit [here](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-in ``` { "BUCKET":"s3BucketName", - "EMAIL":"emailAdress", + "EMAIL":"emailAddress", "SLACK":"slackWebHookURL" } ``` * `BUCKET` - the S3 bucket to store generated tiles. This bucket must be in the same aws account. * `EMAIL` - optional, the email to receive Notification for ECS task status change. -* `SLACK` - optional, the slack Webhook url to send notification to slakc channel. +* `SLACK` - optional, the slack Webhook url to send notification to Slack channel. 3. Deploy CDK stacks, all AWS resources defined within the scope of a stack. -* Test tile generation stack, it's used to test with light data for development and tuning performance. Since planet tiles generation task will take days, when you want to update the code source, it's recommended to test first with the tile generation test stack. -``` -cdk deploy TestTileGenerationStack -``` +- Test tile generation stack, it's used to test with light data for development and tuning performance. Since planet tiles generation task will take days, when you want to update the code source, it's recommended to test first with the tile generation test stack. -* Planet tile generation stack, currently we are using OSM planet data from [OpenStreetMap on AWS](https://registry.opendata.aws/osm/), the ECS task will download the latest version PBF file from there. -``` -cdk deploy PlanetTileGenerationStack -``` + - Test raster tiles + - `cdk deploy TestVectorTileGeneration` + - Test vector tiles + - `cdk deploy TestRasterTileGeneration` + +* Planet tile generation stack, currently we are using OSM planet data from [OpenStreetMap on AWS](https://registry.opendata.aws/osm/), the ECS task will download the latest PBF file from there. + - Planet raster tiles + - `cdk deploy PlanetRasterTileGeneration` + - Planet vector tiles + - `cdk deploy PlanetVectorTileGeneration` -* (Optional) If you want to get notifications of the tiles generation ECS task state changes, you can optionally deploy the below stacks. It allowes you have slack notification and email notification. You will need to update the context values of EMAIL, SLACK in `cdk.context.json` file before deploy these stacks. When creating Slack webhooks, you need set a `Content` variable for the webhook. Learn how to [get Slack webhook URL](https://slack.com/help/articles/360041352714-Create-more-advanced-workflows-using-webhooks). +* (Optional) If you want to get notifications of the tiles generation ECS task state changes, you can optionally deploy the below stacks. It allows you get Slack notification and email notification. You will need to update the context values of EMAIL, SLACK in `cdk.context.json` file before deploy these stacks. When creating Slack webhooks, you need set a `Content` variable for the webhook. Learn how to [get Slack webhook URL](https://slack.com/help/articles/360041352714-Create-more-advanced-workflows-using-webhooks). ``` -cdk deploy TestSlackNotificationStack -cdk deploy TestEmailNotificationStack -cdk deploy PlanetSlackNotificationStack -cdk deploy PlanetEmailNotificationStack +cdk deploy EmailSNSPlanetRasterTileGeneration +cdk deploy EmailSNSPlanetVectorTileGeneration +cdk deploy EmailSNSTestRasterTileGeneration +cdk deploy EmailSNSTestVectorTileGeneration +cdk deploy SlackSNSPlanetRasterTileGeneration +cdk deploy SlackSNSPlanetVectorTileGeneration +cdk deploy SlackSNSTestRasterTileGeneration +cdk deploy SlackSNSTestVectorTileGeneration ``` ### Execute ECS task diff --git a/tiles-generation/cdk/bin/cdk.ts b/tiles-generation/cdk/bin/cdk.ts index e8fdf30..a5d683b 100644 --- a/tiles-generation/cdk/bin/cdk.ts +++ b/tiles-generation/cdk/bin/cdk.ts @@ -11,110 +11,199 @@ import { App } from 'aws-cdk-lib'; import { TileGenerationStack } from '../lib/tile-generation-stack'; import { SlackNotificationStack } from '../lib/slack-notification-stack'; import { EmailNotificationStack } from '../lib/email-notification-stack'; -import * as ec2 from "aws-cdk-lib/aws-ec2"; -import * as asc from "aws-cdk-lib/aws-autoscaling"; +import * as ec2 from 'aws-cdk-lib/aws-ec2'; +import * as asc from 'aws-cdk-lib/aws-autoscaling'; +import { tilesGenerationJob } from '../types/tilesGenerationJob'; const app = new App(); const env = { - region: app.node.tryGetContext('region') || process.env.CDK_INTEG_REGION || process.env.CDK_DEFAULT_REGION, - account: app.node.tryGetContext('account') || process.env.CDK_INTEG_ACCOUNT || process.env.CDK_DEFAULT_ACCOUNT + region: + app.node.tryGetContext('region') || + process.env.CDK_INTEG_REGION || + process.env.CDK_DEFAULT_REGION, + account: + app.node.tryGetContext('account') || + process.env.CDK_INTEG_ACCOUNT || + process.env.CDK_DEFAULT_ACCOUNT, }; -const testInstanceType = ec2.InstanceType.of(ec2.InstanceClass.MEMORY3, ec2.InstanceSize.XLARGE); - -// Define volume size for test stack instance -const testEC2Volume = asc.BlockDeviceVolume.ebs(30, { - deleteOnTermination: true, - // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSVolumeTypes.html - volumeType: asc.EbsDeviceVolumeType.IO1, - // The number of I/O operations per second (IOPS) to provision for the volume - iops: 1000 -}); - -//Define docker environment for test stack ecs task -const testDockerEnv = { - // Bounding box defines the area that tiles will be generated, it's the area for luxembourg - 'BBOX':'5.654568,49.431689,6.574673,50.177945', - // Maps raw data - 'DOWNLOAD_PBF':'https://download.geofabrik.de/europe/luxembourg-latest.osm.pbf', - // Configuration for OSM2OGSQL https://osm2pgsql.org/doc/manual.html - 'OSM2PGSQL_EXTRA_ARGS':'-C 10000', - // Minimum zoom level for generation tiles - 'MIN_ZOOM':'0', - // Maximum level for generation tiles - 'MAX_ZOOM':'10', - // Threads for import and rendering tiles process. - 'THREADS':'8', - // Make generate_tiles.py logs output to container log - 'PYTHONUNBUFFERED':'1', - // The database use the autovacuum feature by default. This behavior can be changed with AUTOVACUUM environment variable - 'AUTOVACUUM':'off' -}; - -const testTileGenerationStack = new TileGenerationStack(app, 'TestTileGenerationStack', { - env: env, - instanceType: testInstanceType, - volume: testEC2Volume, - // The value for the size (in MiB) of the /dev/shm volume for container default - // Default limit 64MB is too low for container - sharedMemorySize: 200, - // Docker environment for test stack ecs task - dockerEnv: testDockerEnv, - // Memory reservation for the task. - memoryReservationMiB: 25000 -}); - -const PlanetInstanceType = ec2.InstanceType.of(ec2.InstanceClass.X1E, ec2.InstanceSize.XLARGE8); - -// For planet tile generation, the whole process maximum volume will reach to about 1300GB -const PlanetEC2Volume = asc.BlockDeviceVolume.ebs(1500, { - deleteOnTermination: true, - volumeType: asc.EbsDeviceVolumeType.IO1, - iops: 5000 -}); - -const planetDockerEnv = { - // Enable DOWNLOAD_PLANET to download OSM PBF file from public S3 bucket - 'DOWNLOAD_PLANET':'enabled', - // Bounding box for whole planet - 'BBOX':'-180.0,-90.0,180.0,90.0', - // Configuration for OSM2OGSQL https://osm2pgsql.org/doc/manual.html - 'OSM2PGSQL_EXTRA_ARGS':'-C 400000 --flat-nodes /nodes/flat_nodes.bin', - 'MIN_ZOOM':'0', - 'MAX_ZOOM':'10', - 'THREADS':'30', - 'PYTHONUNBUFFERED':'1', - 'AUTOVACUUM':'off', -}; +createValidationStacks(); +createProdStacks(); -// Stack for generating planet tiles -const planetTileGenerationStack = new TileGenerationStack(app, 'PlanetTileGenerationStack', { - env: env, - instanceType: PlanetInstanceType, - volume: PlanetEC2Volume, - sharedMemorySize: 1000, - dockerEnv: planetDockerEnv, - memoryReservationMiB: 900000 -}); - -//ECS task state change notification stacks -const testSlackNotificationStack = new SlackNotificationStack(app, 'TestSlackNotificationStack', { - env: env, - cluster: testTileGenerationStack.cluster -}); - -const testEmailNotificationStack = new EmailNotificationStack(app, 'TestEmailNotificationStack', { - env: env, - cluster: testTileGenerationStack.cluster -}); - -const planetSlackNotificationStack = new SlackNotificationStack(app, 'PlanetSlackNotificationStack', { - env: env, - cluster: planetTileGenerationStack.cluster -}); - -const planetEmailNotificationStack = new EmailNotificationStack(app, 'PlanetEmailNotificationStack', { - env: env, - cluster: planetTileGenerationStack.cluster -}); \ No newline at end of file +/** + * Create small area tiles generation stacks for validation + */ +function createValidationStacks() { + // Define ec2 instance type for test small area tile generation + const testInstanceType = ec2.InstanceType.of( + ec2.InstanceClass.MEMORY3, + ec2.InstanceSize.XLARGE + ); + + // Define volume size for test stack instance + const testEC2Volume = asc.BlockDeviceVolume.ebs(30, { + deleteOnTermination: true, + // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSVolumeTypes.html + volumeType: asc.EbsDeviceVolumeType.IO1, + // The number of I/O operations per second (IOPS) to provision for the volume + iops: 1000, + }); + + //Define docker environment for test stack ecs task + const testRasterTileDockerEnv = { + // Bounding box defines the area that tiles will be generated, it's the area for luxembourg + BBOX: '5.654568,49.431689,6.574673,50.177945', + // Maps raw data + DOWNLOAD_PBF: + 'https://download.geofabrik.de/europe/luxembourg-latest.osm.pbf', + // Configuration for OSM2OGSQL https://osm2pgsql.org/doc/manual.html + OSM2PGSQL_EXTRA_ARGS: '-C 10000', + // Minimum zoom level for generation tiles + MIN_ZOOM: '0', + // Maximum level for generation tiles + MAX_ZOOM: '10', + // Threads for import and rendering tiles process. + THREADS: '8', + // Make generate_tiles.py logs output to container log + PYTHONUNBUFFERED: '1', + // The database use the autovacuum feature by default. This behavior can be changed with AUTOVACUUM environment variable + AUTOVACUUM: 'off', + }; + + const testVectorTileDockerEnv = { + AREA: 'Azores', + JAVA_TOOL_OPTIONS: '-Xmx1g', + }; + + const testTileGenJobs: tilesGenerationJob[] = [ + { + id: 'TestRasterTileGeneration', + isProd: false, + props: { + env: env, + instanceType: testInstanceType, + volume: testEC2Volume, + // The value for the size (in MiB) of the /dev/shm volume for container + // Default limit 64MB is too low for container + sharedMemorySize: 200, + // Docker environment for test stack ecs task + dockerEnv: testRasterTileDockerEnv, + // Memory reservation for the task. + memoryReservationMiB: 25000, + containerImage: '../../docker/raster-tile/', + containerCommand: 'generatetiles', + }, + }, + { + id: 'TestVectorTileGeneration', + isProd: false, + props: { + env: env, + instanceType: testInstanceType, + volume: testEC2Volume, + sharedMemorySize: 200, + dockerEnv: testVectorTileDockerEnv, + memoryReservationMiB: 25000, + containerImage: '../../docker/vector-tile/', + containerCommand: 'generate', + }, + }, + ]; + + createStacks(testTileGenJobs); +} + +/** + * Create planet tiles generation stacks for prod + */ +function createProdStacks() { + // Define ec2 instance type for planet tile generation + const PlanetInstanceType = ec2.InstanceType.of( + ec2.InstanceClass.X1E, + ec2.InstanceSize.XLARGE8 + ); + + // For planet tile generation, the whole process maximum volume will reach to about 1300GB + const planetEC2Volume = asc.BlockDeviceVolume.ebs(1500, { + deleteOnTermination: true, + volumeType: asc.EbsDeviceVolumeType.IO1, + iops: 5000, + }); + + const planetRasterTileDockerEnv = { + // Enable DOWNLOAD_PLANET to download OSM PBF file from public S3 bucket + DOWNLOAD_PLANET: 'enabled', + // Bounding box for whole planet + BBOX: '-180.0,-90.0,180.0,90.0', + // Configuration for OSM2OGSQL https://osm2pgsql.org/doc/manual.html + OSM2PGSQL_EXTRA_ARGS: '-C 400000 --flat-nodes /nodes/flat_nodes.bin', + MIN_ZOOM: '0', + MAX_ZOOM: '10', + THREADS: '30', + PYTHONUNBUFFERED: '1', + AUTOVACUUM: 'off', + }; + + const planetVectorTileDockerEnv = { + AREA: 'planet', + JAVA_TOOL_OPTIONS: '-Xmx500g -XX:MaxHeapFreeRatio=40', + STORAGE: 'ram', + }; + + const planetTileGenJobs: tilesGenerationJob[] = [ + { + id: 'PlanetRasterTileGeneration', + isProd: true, + props: { + env: env, + instanceType: PlanetInstanceType, + volume: planetEC2Volume, + sharedMemorySize: 1000, + dockerEnv: planetRasterTileDockerEnv, + memoryReservationMiB: 900000, + containerImage: '../../docker/raster-tile/', + containerCommand: 'generatetiles', + }, + }, + { + id: 'PlanetVectorTileGeneration', + isProd: true, + props: { + env: env, + instanceType: PlanetInstanceType, + volume: planetEC2Volume, + sharedMemorySize: 1000, + dockerEnv: planetVectorTileDockerEnv, + memoryReservationMiB: 900000, + containerImage: '../../docker/vector-tile/', + containerCommand: 'generate', + }, + }, + ]; + + createStacks(planetTileGenJobs); +} + +/** + * Create CDK stacks + */ +function createStacks(tileGenJobs: tilesGenerationJob[]) { + tileGenJobs.forEach((tileGenJob) => { + const tileGenStack = new TileGenerationStack( + app, + tileGenJob.id, + tileGenJob.props + ); + + new SlackNotificationStack(app, `SlackSNS${tileGenJob.id}`, { + env: env, + cluster: tileGenStack.cluster, + }); + + new EmailNotificationStack(app, `EmailSNS${tileGenJob.id}`, { + env: env, + cluster: tileGenStack.cluster, + }); + }); +} diff --git a/tiles-generation/cdk/cdk.context.json b/tiles-generation/cdk/cdk.context.json index 5b427bf..21e92ef 100644 --- a/tiles-generation/cdk/cdk.context.json +++ b/tiles-generation/cdk/cdk.context.json @@ -1,5 +1,5 @@ { "BUCKET":"s3Bucket", - "EMAIL":"emailAdress", + "EMAIL":"emailAddress", "SLACK":"slackWebHookURL" } \ No newline at end of file diff --git a/tiles-generation/cdk/lambda/email-notification.js b/tiles-generation/cdk/lambda/email-notification.js index 99529d6..62ce4da 100644 --- a/tiles-generation/cdk/lambda/email-notification.js +++ b/tiles-generation/cdk/lambda/email-notification.js @@ -1,7 +1,7 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 - * + * * Lambda function for tiles generation ECS task state change email notification. */ @@ -9,38 +9,40 @@ const aws = require('aws-sdk'); const sns = new aws.SNS(); const eventProcess = require('./event-process'); -exports.handler = function(event, context, callback) { +exports.handler = function (event, context, callback) { try { const eventText = JSON.stringify(event, null, 2); console.log(`\n${eventText}`); - + let topicArn; topicArn = process.env.topicArn; - + const message = eventProcess.getStateInformation(event); - + const params = { Subject: 'ECS task state change notification', Message: message, - TopicArn: topicArn + TopicArn: topicArn, }; const publishMessagePromise = sns.publish(params).promise(); publishMessagePromise - .then(data => { - console.log(`Message\n ${params.Message}\nsent to the topic ${params.TopicArn}`); - callback(null, data); - }) - .catch(err => { - console.log(err); - callback(err); - }); + .then((data) => { + console.log( + `Message\n ${params.Message}\nsent to the topic ${params.TopicArn}` + ); + callback(null, data); + }) + .catch((err) => { + console.log(err); + callback(err); + }); console.log(publishMessagePromise); } catch (err) { console.log(err); } -}; \ No newline at end of file +}; diff --git a/tiles-generation/cdk/lambda/event-process.js b/tiles-generation/cdk/lambda/event-process.js index d7885ad..f0a526e 100644 --- a/tiles-generation/cdk/lambda/event-process.js +++ b/tiles-generation/cdk/lambda/event-process.js @@ -1,17 +1,17 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 - * + * * Helper function to process lambda event information. */ -const getStateInformation = function(event) { +const getStateInformation = function (event) { try { let stateInfo; const taskArn = event.resources; const taskLastStatus = event.detail.lastStatus; const container = event.detail.containers[0]; - + if (container === undefined) { if (taskLastStatus == 'PROVISIONING') { stateInfo = `Task arn: ${taskArn}\n\nTask state: task in provisioning before launch`; @@ -41,11 +41,11 @@ const getStateInformation = function(event) { } } return stateInfo; - } catch(err) { + } catch (err) { console.log(err); } -} +}; module.exports = { - getStateInformation -} \ No newline at end of file + getStateInformation, +}; diff --git a/tiles-generation/cdk/lambda/slack-notification.js b/tiles-generation/cdk/lambda/slack-notification.js index 5a641b2..3117fdb 100644 --- a/tiles-generation/cdk/lambda/slack-notification.js +++ b/tiles-generation/cdk/lambda/slack-notification.js @@ -1,32 +1,31 @@ /* * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 - * + * * Lambda function for tiles generation ECS task state change slack notification. */ -const eventProcess = require("./event-process"); +const eventProcess = require('./event-process'); const https = require('https'); const executePostRequest = (event) => { - const eventText = JSON.stringify(event, null, 2); - + console.log(`\n${eventText}`); let slackHookPath; - + try { slackHookPath = process.env.slackHookPath; - } catch (error){ + } catch (error) { console.error(error, 'Caught error for slackHookPath'); } - + return new Promise((resolve, reject) => { const options = { host: 'hooks.slack.com', path: slackHookPath, - method: 'POST' + method: 'POST', }; const req = https.request(options, (res) => { resolve(JSON.stringify(res.statusCode)); @@ -34,21 +33,25 @@ const executePostRequest = (event) => { req.on('error', (e) => { reject(e.message); }); - + const stateInfo = eventProcess.getStateInformation(event); - + const message = { - "Content": `${stateInfo}\n-------------------------------------` + Content: `${stateInfo}\n-------------------------------------`, }; - + req.write(JSON.stringify(message)); - + req.end(); }); }; exports.handler = async (event) => { - await executePostRequest(event) - .then(result => console.log(`Status code: ${result}`)) - .catch(e => console.error(`Error execute the request: ${JSON.stringify(event)} => ${e}`)); -}; \ No newline at end of file + await executePostRequest(event) + .then((result) => console.log(`Status code: ${result}`)) + .catch((e) => + console.error( + `Error execute the request: ${JSON.stringify(event)} => ${e}` + ) + ); +}; diff --git a/tiles-generation/cdk/lib/email-notification-stack.ts b/tiles-generation/cdk/lib/email-notification-stack.ts index 7423843..46334e9 100644 --- a/tiles-generation/cdk/lib/email-notification-stack.ts +++ b/tiles-generation/cdk/lib/email-notification-stack.ts @@ -8,54 +8,58 @@ import { Environment, Stack, StackProps } from 'aws-cdk-lib'; import { Construct } from 'constructs'; import { EventRuleStack } from './event-rule-stack'; -import * as iam from "aws-cdk-lib/aws-iam"; -import * as sns from "aws-cdk-lib/aws-sns"; -import * as subscriptions from "aws-cdk-lib/aws-sns-subscriptions" -import * as targets from "aws-cdk-lib/aws-events-targets"; +import * as iam from 'aws-cdk-lib/aws-iam'; +import * as sns from 'aws-cdk-lib/aws-sns'; +import * as subscriptions from 'aws-cdk-lib/aws-sns-subscriptions'; +import * as targets from 'aws-cdk-lib/aws-events-targets'; import * as lambda from 'aws-cdk-lib/aws-lambda'; -import * as ecs from "aws-cdk-lib/aws-ecs"; +import * as ecs from 'aws-cdk-lib/aws-ecs'; export interface EmailNotificationStackProps extends StackProps { - env: Environment, - cluster: ecs.Cluster + env: Environment; + cluster: ecs.Cluster; } export class EmailNotificationStack extends Stack { - constructor(scope: Construct, id: string, props: EmailNotificationStackProps ) { + constructor( + scope: Construct, + id: string, + props: EmailNotificationStackProps + ) { super(scope, id); const eventRuleStack = new EventRuleStack(this, 'EventRuleStack', { - cluster: props.cluster + cluster: props.cluster, }); - const { rule } = eventRuleStack + const { rule } = eventRuleStack; - const topic = new sns.Topic(this, "topic"); + const topic = new sns.Topic(this, 'topic'); const email = this.node.tryGetContext('EMAIL'); topic.addSubscription(new subscriptions.EmailSubscription(email)); - const taskStatusEmailNotifyLambda = new lambda.Function(this, 'emailFunction', { - runtime: lambda.Runtime.NODEJS_14_X, - code: lambda.Code.fromAsset("lambda"), - handler: "email-notification.handler", - environment: { - "topicArn": topic.topicArn + const taskStatusEmailNotifyLambda = new lambda.Function( + this, + 'emailFunction', + { + runtime: lambda.Runtime.NODEJS_14_X, + code: lambda.Code.fromAsset('lambda'), + handler: 'email-notification.handler', + environment: { + topicArn: topic.topicArn, + }, } - }); + ); - const lambdaRolePolicy = new iam.PolicyStatement({ - actions: [ - "sns:Publish" - ], - resources: [topic.topicArn] + const lambdaRolePolicy = new iam.PolicyStatement({ + actions: ['sns:Publish'], + resources: [topic.topicArn], }); taskStatusEmailNotifyLambda.addToRolePolicy(lambdaRolePolicy); - rule.addTarget(new targets.LambdaFunction( - taskStatusEmailNotifyLambda - )); + rule.addTarget(new targets.LambdaFunction(taskStatusEmailNotifyLambda)); } -} \ No newline at end of file +} diff --git a/tiles-generation/cdk/lib/event-rule-stack.ts b/tiles-generation/cdk/lib/event-rule-stack.ts index 9ff4512..3b79841 100644 --- a/tiles-generation/cdk/lib/event-rule-stack.ts +++ b/tiles-generation/cdk/lib/event-rule-stack.ts @@ -7,29 +7,29 @@ import { StackProps, NestedStack } from 'aws-cdk-lib'; import { Construct } from 'constructs'; -import * as ecs from "aws-cdk-lib/aws-ecs"; -import * as events from "aws-cdk-lib/aws-events"; +import * as ecs from 'aws-cdk-lib/aws-ecs'; +import * as events from 'aws-cdk-lib/aws-events'; export interface EventRuleStackProps extends StackProps { - cluster: ecs.Cluster + cluster: ecs.Cluster; } export class EventRuleStack extends NestedStack { public readonly rule: events.Rule; - constructor(scope: Construct, id: string, props: EventRuleStackProps ) { + constructor(scope: Construct, id: string, props: EventRuleStackProps) { super(scope, id); - const clusterArn = props.cluster.clusterArn + const clusterArn = props.cluster.clusterArn; this.rule = new events.Rule(this, 'ecs-task-state-change-rule', { eventPattern: { - source: ["aws.ecs"], - detailType: ["ECS Task State Change"], - detail: { - "clusterArn": [clusterArn], - "lastStatus": ["RUNNING", "STOPPED", "PROVISIONING"] - }, + source: ['aws.ecs'], + detailType: ['ECS Task State Change'], + detail: { + clusterArn: [clusterArn], + lastStatus: ['RUNNING', 'STOPPED', 'PROVISIONING'], + }, }, }); } -} \ No newline at end of file +} diff --git a/tiles-generation/cdk/lib/slack-notification-stack.ts b/tiles-generation/cdk/lib/slack-notification-stack.ts index d03febd..5b8fa7e 100644 --- a/tiles-generation/cdk/lib/slack-notification-stack.ts +++ b/tiles-generation/cdk/lib/slack-notification-stack.ts @@ -8,38 +8,44 @@ import { Environment, Stack, StackProps } from 'aws-cdk-lib'; import { Construct } from 'constructs'; import { EventRuleStack } from './event-rule-stack'; -import * as targets from "aws-cdk-lib/aws-events-targets"; +import * as targets from 'aws-cdk-lib/aws-events-targets'; import * as lambda from 'aws-cdk-lib/aws-lambda'; -import * as ecs from "aws-cdk-lib/aws-ecs"; +import * as ecs from 'aws-cdk-lib/aws-ecs'; export interface SlackNotificationStackProps extends StackProps { - env: Environment, - cluster: ecs.Cluster + env: Environment; + cluster: ecs.Cluster; } export class SlackNotificationStack extends Stack { - constructor(scope: Construct, id: string, props: SlackNotificationStackProps ) { + constructor( + scope: Construct, + id: string, + props: SlackNotificationStackProps + ) { super(scope, id); const slackHookPath = this.node.tryGetContext('SLACK'); const eventRuleStack = new EventRuleStack(this, 'EventRuleStack', { - cluster: props.cluster + cluster: props.cluster, }); const { rule } = eventRuleStack; - const taskStatusSlackNotifyLambda = new lambda.Function(this, 'slack-function', { - runtime: lambda.Runtime.NODEJS_14_X, - code: lambda.Code.fromAsset("lambda"), - handler: "slack-notification.handler", - environment: { - "slackHookPath": slackHookPath + const taskStatusSlackNotifyLambda = new lambda.Function( + this, + 'slack-function', + { + runtime: lambda.Runtime.NODEJS_14_X, + code: lambda.Code.fromAsset('lambda'), + handler: 'slack-notification.handler', + environment: { + slackHookPath: slackHookPath, + }, } - }); + ); - rule.addTarget(new targets.LambdaFunction( - taskStatusSlackNotifyLambda - )); + rule.addTarget(new targets.LambdaFunction(taskStatusSlackNotifyLambda)); } -} \ No newline at end of file +} diff --git a/tiles-generation/cdk/lib/tile-generation-stack.ts b/tiles-generation/cdk/lib/tile-generation-stack.ts index 3508ec1..2b25d0b 100644 --- a/tiles-generation/cdk/lib/tile-generation-stack.ts +++ b/tiles-generation/cdk/lib/tile-generation-stack.ts @@ -8,26 +8,25 @@ import { CfnOutput, Environment, Stack, StackProps } from 'aws-cdk-lib'; import * as path from 'path'; import { Construct } from 'constructs'; -import * as ec2 from "aws-cdk-lib/aws-ec2"; -import * as ecs from "aws-cdk-lib/aws-ecs"; -import * as iam from "aws-cdk-lib/aws-iam"; -import * as asc from "aws-cdk-lib/aws-autoscaling"; - - export interface TileGenerationStackProps extends StackProps { - env: Environment, - instanceType: ec2.InstanceType, - volume: asc.BlockDeviceVolume, - sharedMemorySize: number, - dockerEnv: {[key: string]: string}, - memoryReservationMiB: number +import * as ec2 from 'aws-cdk-lib/aws-ec2'; +import * as ecs from 'aws-cdk-lib/aws-ecs'; +import * as iam from 'aws-cdk-lib/aws-iam'; +import * as asc from 'aws-cdk-lib/aws-autoscaling'; + +export interface TileGenerationStackProps extends StackProps { + env: Environment; + instanceType: ec2.InstanceType; + volume: asc.BlockDeviceVolume; + sharedMemorySize: number; + dockerEnv: { [key: string]: string }; + memoryReservationMiB: number; + containerImage: string; + containerCommand: string; } export class TileGenerationStack extends Stack { - public readonly cluster : ecs.Cluster; - constructor( - scope: Construct, - id: string, - props: TileGenerationStackProps) { + public readonly cluster: ecs.Cluster; + constructor(scope: Construct, id: string, props: TileGenerationStackProps) { super(scope, id); const tilesDestinationBucket = this.node.tryGetContext('BUCKET'); @@ -37,73 +36,89 @@ export class TileGenerationStack extends Stack { const vpc = new ec2.Vpc(this, `vpc-${this.stackName}`, { cidr: '10.0.0.0/16', natGateways: 1, - maxAzs: 2 + maxAzs: 2, }); - this.cluster = new ecs.Cluster(this, `cluster-${this.stackName}`, { - vpc: vpc, + this.cluster = new ecs.Cluster(this, 'cluster', { + vpc: vpc, }); - new CfnOutput(this, "ClusterName", { + new CfnOutput(this, 'ClusterName', { value: this.cluster.clusterName, }); - const autoScalingGroup = new asc.AutoScalingGroup(this, `autoScalingGroup-${this.stackName}`, { - vpc, - instanceType: props.instanceType, - machineImage: ecs.EcsOptimizedImage.amazonLinux2(), - minCapacity: 0, - desiredCapacity: 0, - maxCapacity: 1, - newInstancesProtectedFromScaleIn: true, - blockDevices: [{ - deviceName: "/dev/xvda", - volume: props.volume - }] - }); - - const capacityProvider = new ecs.AsgCapacityProvider(this, `AsgCapacityProvider-${this.stackName}`, { - autoScalingGroup, - enableManagedScaling: true - }); + const autoScalingGroup = new asc.AutoScalingGroup( + this, + `autoScalingGroup-${this.stackName}`, + { + vpc, + instanceType: props.instanceType, + machineImage: ecs.EcsOptimizedImage.amazonLinux2(), + minCapacity: 0, + desiredCapacity: 0, + maxCapacity: 2, + newInstancesProtectedFromScaleIn: true, + blockDevices: [ + { + deviceName: '/dev/xvda', + volume: props.volume, + }, + ], + } + ); + + const capacityProvider = new ecs.AsgCapacityProvider( + this, + 'asg-capacity-provider', + { + autoScalingGroup, + enableManagedScaling: true, + } + ); new CfnOutput(this, 'CapacityProviderName', { - value: capacityProvider.capacityProviderName + value: capacityProvider.capacityProviderName, }); this.cluster.addAsgCapacityProvider(capacityProvider); - const taskDefinition = new ecs.Ec2TaskDefinition(this, 'TaskDef'); + const taskDefinition = new ecs.Ec2TaskDefinition( + this, + 'task-definition' + ); new CfnOutput(this, 'TaskDefinitionArn', { - value: taskDefinition.taskDefinitionArn + value: taskDefinition.taskDefinitionArn, }); - const taskRolePolicy = new iam.PolicyStatement({ - actions: [ - "s3:PutObject", - "s3:GetObject", - ], - resources: ['*'] - }) + const taskRolePolicy = new iam.PolicyStatement({ + actions: ['s3:PutObject', 's3:GetObject'], + resources: ['*'], + }); taskDefinition.addToTaskRolePolicy(taskRolePolicy); const logging = new ecs.AwsLogDriver({ - streamPrefix: `logs-${this.stackName}` + streamPrefix: `logs-${this.stackName}`, }); - const linuxParameters = new ecs.LinuxParameters(this, `LinuxParameters-${this.stackName}`, { - sharedMemorySize: props.sharedMemorySize - }); - - taskDefinition.addContainer('DefaultContainer', { - image: ecs.ContainerImage.fromAsset(path.join(__dirname, '../../dockerAssets/')), + const linuxParameters = new ecs.LinuxParameters( + this, + `LinuxParameters-${this.stackName}`, + { + sharedMemorySize: props.sharedMemorySize, + } + ); + + taskDefinition.addContainer('container', { + image: ecs.ContainerImage.fromAsset( + path.join(__dirname, props.containerImage) + ), memoryReservationMiB: props.memoryReservationMiB, - command: ['generatetiles'], - logging:logging, + command: [props.containerCommand], + logging: logging, environment: props.dockerEnv, - linuxParameters: linuxParameters + linuxParameters: linuxParameters, }); } -} \ No newline at end of file +} diff --git a/tiles-generation/cdk/package-lock.json b/tiles-generation/cdk/package-lock.json index da247ff..2bec406 100644 --- a/tiles-generation/cdk/package-lock.json +++ b/tiles-generation/cdk/package-lock.json @@ -8,7 +8,7 @@ "name": "tiles-generation-cdk", "version": "0.1.0", "dependencies": { - "aws-cdk-lib": "2.12.0", + "aws-cdk-lib": "^2.27.0", "constructs": "^10.0.0", "source-map-support": "^0.5.16" }, @@ -18,8 +18,9 @@ "devDependencies": { "@types/jest": "^26.0.10", "@types/node": "^10.17.27", - "aws-cdk": "2.12.0", + "aws-cdk": "^2.12.0", "jest": "^26.4.2", + "prettier": "^2.7.1", "ts-jest": "^26.2.0", "ts-node": "^9.0.0", "typescript": "~3.9.7" @@ -1249,9 +1250,9 @@ } }, "node_modules/aws-cdk-lib": { - "version": "2.12.0", - "resolved": "https://registry.npmjs.org/aws-cdk-lib/-/aws-cdk-lib-2.12.0.tgz", - "integrity": "sha512-ot2gJycvHy9OqKkiDVSOP1QNOxapNUbjahEWSp0Mdxkweark0OdrzBv4JNaJYfGMn8IdZZ6FV/hpsi4MYHQ/+w==", + "version": "2.27.0", + "resolved": "https://registry.npmjs.org/aws-cdk-lib/-/aws-cdk-lib-2.27.0.tgz", + "integrity": "sha512-CsTy/+RIBMvP9LlZjkQakkKPpwU50+HAMCMaGOcyIHADDQdXobGcZXTO+Tq00sH2DoQjPV1ZhhBim5z3k/2KXw==", "bundleDependencies": [ "@balena/dockerignore", "case", @@ -1269,9 +1270,9 @@ "fs-extra": "^9.1.0", "ignore": "^5.2.0", "jsonschema": "^1.4.0", - "minimatch": "^3.0.5", + "minimatch": "^3.1.2", "punycode": "^2.1.1", - "semver": "^7.3.5", + "semver": "^7.3.7", "yaml": "1.10.2" }, "engines": { @@ -1336,7 +1337,7 @@ } }, "node_modules/aws-cdk-lib/node_modules/graceful-fs": { - "version": "4.2.9", + "version": "4.2.10", "inBundle": true, "license": "ISC" }, @@ -1379,7 +1380,7 @@ } }, "node_modules/aws-cdk-lib/node_modules/minimatch": { - "version": "3.0.5", + "version": "3.1.2", "inBundle": true, "license": "ISC", "dependencies": { @@ -1398,7 +1399,7 @@ } }, "node_modules/aws-cdk-lib/node_modules/semver": { - "version": "7.3.5", + "version": "7.3.7", "inBundle": true, "license": "ISC", "dependencies": { @@ -6265,6 +6266,21 @@ "node": ">= 0.8.0" } }, + "node_modules/prettier": { + "version": "2.7.1", + "resolved": "https://registry.npmjs.org/prettier/-/prettier-2.7.1.tgz", + "integrity": "sha512-ujppO+MkdPqoVINuDFDRLClm7D78qbDt0/NR+wp5FqEZOoTNAjPHWj17QRhu7geIHJfcNhRk1XVQmF8Bp3ye+g==", + "dev": true, + "bin": { + "prettier": "bin-prettier.js" + }, + "engines": { + "node": ">=10.13.0" + }, + "funding": { + "url": "https://github.com/prettier/prettier?sponsor=1" + } + }, "node_modules/pretty-format": { "version": "26.6.2", "resolved": "https://registry.npmjs.org/pretty-format/-/pretty-format-26.6.2.tgz", @@ -10801,18 +10817,18 @@ } }, "aws-cdk-lib": { - "version": "2.12.0", - "resolved": "https://registry.npmjs.org/aws-cdk-lib/-/aws-cdk-lib-2.12.0.tgz", - "integrity": "sha512-ot2gJycvHy9OqKkiDVSOP1QNOxapNUbjahEWSp0Mdxkweark0OdrzBv4JNaJYfGMn8IdZZ6FV/hpsi4MYHQ/+w==", + "version": "2.27.0", + "resolved": "https://registry.npmjs.org/aws-cdk-lib/-/aws-cdk-lib-2.27.0.tgz", + "integrity": "sha512-CsTy/+RIBMvP9LlZjkQakkKPpwU50+HAMCMaGOcyIHADDQdXobGcZXTO+Tq00sH2DoQjPV1ZhhBim5z3k/2KXw==", "requires": { "@balena/dockerignore": "^1.0.2", "case": "1.6.3", "fs-extra": "^9.1.0", "ignore": "^5.2.0", "jsonschema": "^1.4.0", - "minimatch": "^3.0.5", + "minimatch": "^3.1.2", "punycode": "^2.1.1", - "semver": "^7.3.5", + "semver": "^7.3.7", "yaml": "1.10.2" }, "dependencies": { @@ -10855,7 +10871,7 @@ } }, "graceful-fs": { - "version": "4.2.9", + "version": "4.2.10", "bundled": true }, "ignore": { @@ -10882,7 +10898,7 @@ } }, "minimatch": { - "version": "3.0.5", + "version": "3.1.2", "bundled": true, "requires": { "brace-expansion": "^1.1.7" @@ -10893,7 +10909,7 @@ "bundled": true }, "semver": { - "version": "7.3.5", + "version": "7.3.7", "bundled": true, "requires": { "lru-cache": "^6.0.0" @@ -13312,6 +13328,12 @@ "integrity": "sha1-IZMqVJ9eUv/ZqCf1cOBL5iqX2lQ=", "dev": true }, + "prettier": { + "version": "2.7.1", + "resolved": "https://registry.npmjs.org/prettier/-/prettier-2.7.1.tgz", + "integrity": "sha512-ujppO+MkdPqoVINuDFDRLClm7D78qbDt0/NR+wp5FqEZOoTNAjPHWj17QRhu7geIHJfcNhRk1XVQmF8Bp3ye+g==", + "dev": true + }, "pretty-format": { "version": "26.6.2", "resolved": "https://registry.npmjs.org/pretty-format/-/pretty-format-26.6.2.tgz", diff --git a/tiles-generation/cdk/package.json b/tiles-generation/cdk/package.json index d9cae5a..39d8e90 100644 --- a/tiles-generation/cdk/package.json +++ b/tiles-generation/cdk/package.json @@ -13,14 +13,15 @@ "devDependencies": { "@types/jest": "^26.0.10", "@types/node": "^10.17.27", - "aws-cdk": "2.12.0", + "aws-cdk": "^2.12.0", "jest": "^26.4.2", + "prettier": "^2.7.1", "ts-jest": "^26.2.0", "ts-node": "^9.0.0", "typescript": "~3.9.7" }, "dependencies": { - "aws-cdk-lib": "2.12.0", + "aws-cdk-lib": "^2.27.0", "constructs": "^10.0.0", "source-map-support": "^0.5.16" } diff --git a/tiles-generation/cdk/types/tilesGenerationJob.ts b/tiles-generation/cdk/types/tilesGenerationJob.ts new file mode 100644 index 0000000..c6c9875 --- /dev/null +++ b/tiles-generation/cdk/types/tilesGenerationJob.ts @@ -0,0 +1,18 @@ +import { Environment } from 'aws-cdk-lib'; +import * as ec2 from 'aws-cdk-lib/aws-ec2'; +import * as asc from 'aws-cdk-lib/aws-autoscaling'; + +export type tilesGenerationJob = { + id: string; + isProd: boolean; + props: { + env: Environment; + instanceType: ec2.InstanceType; + volume: asc.BlockDeviceVolume; + sharedMemorySize: number; + dockerEnv: { [key: string]: string }; + memoryReservationMiB: number; + containerImage: string; + containerCommand: string; + }; +}; diff --git a/tiles-generation/dockerAssets/Dockerfile b/tiles-generation/docker/raster-tile/Dockerfile similarity index 100% rename from tiles-generation/dockerAssets/Dockerfile rename to tiles-generation/docker/raster-tile/Dockerfile diff --git a/tiles-generation/dockerAssets/entrypoint.sh b/tiles-generation/docker/raster-tile/entrypoint.sh similarity index 100% rename from tiles-generation/dockerAssets/entrypoint.sh rename to tiles-generation/docker/raster-tile/entrypoint.sh diff --git a/tiles-generation/dockerAssets/generate_tiles.py b/tiles-generation/docker/raster-tile/generate_tiles.py similarity index 100% rename from tiles-generation/dockerAssets/generate_tiles.py rename to tiles-generation/docker/raster-tile/generate_tiles.py diff --git a/tiles-generation/dockerAssets/postgresql.custom.conf.tmpl b/tiles-generation/docker/raster-tile/postgresql.custom.conf.tmpl similarity index 100% rename from tiles-generation/dockerAssets/postgresql.custom.conf.tmpl rename to tiles-generation/docker/raster-tile/postgresql.custom.conf.tmpl diff --git a/tiles-generation/docker/vector-tile/Dockerfile b/tiles-generation/docker/vector-tile/Dockerfile new file mode 100644 index 0000000..5f694cd --- /dev/null +++ b/tiles-generation/docker/vector-tile/Dockerfile @@ -0,0 +1,31 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# + +FROM amazonlinux:2 + +RUN yum clean all && \ + yum update -y && \ + amazon-linux-extras install epel && \ + yum install -y java-17-amazon-corretto-headless wget tar go make python37 + +RUN wget https://github.com/onthegomap/planetiler/releases/download/v0.5.0/planetiler.jar + +RUN wget -O mbutil.tar.gz https://github.com/mapbox/mbutil/archive/refs/tags/v0.3.0.tar.gz && \ + tar -xzf mbutil.tar.gz && \ + cd /mbutil-0.3.0 && \ + python3 setup.py install + +RUN wget -O s5cmd.tar.gz https://github.com/peak/s5cmd/archive/refs/tags/v2.0.0-beta.2.tar.gz && \ + tar -xzf s5cmd.tar.gz +COPY modified-s5cmd/s3.go /s5cmd-2.0.0-beta.2/storage/s3.go +COPY modified-s5cmd/storage.go /s5cmd-2.0.0-beta.2/storage/storage.go +COPY modified-s5cmd/cp.go /s5cmd-2.0.0-beta.2/command/cp.go +RUN cd /s5cmd-2.0.0-beta.2/ && \ + CGO_ENABLED=0 make build && \ + mv /s5cmd-2.0.0-beta.2/s5cmd / + +COPY entrypoint.sh / +RUN chmod +x entrypoint.sh +ENTRYPOINT ["/entrypoint.sh"] diff --git a/tiles-generation/docker/vector-tile/entrypoint.sh b/tiles-generation/docker/vector-tile/entrypoint.sh new file mode 100755 index 0000000..f5c1615 --- /dev/null +++ b/tiles-generation/docker/vector-tile/entrypoint.sh @@ -0,0 +1,78 @@ +#!/bin/sh + +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 + +set -e + +usage() { + echo "" + echo "This script is used to build map vector tiles to pbf files and upload to s3 bucket." + echo "For planet setting, reference to https://github.com/onthegomap/planetiler/blob/main/PLANET.md" + echo "--------------------------------------------------------------------------" + echo "" + echo "Docker container environment variables:" + echo -e "AREA \tSpecify the area that we want to generate, for planet, it's 'planet', other areas name reference to https://download.geofabrik.de/" + echo -e "TILE_S3_BUCKET \tSpecify the S3 bucket to store the generated vector pbf tiles" + echo -e "AWS_ACCESS_KEY_ID \tSpecify the AWS access key id used used for s3 copy, not required if have access role instead" + echo -e "AWS_SECRET_ACCESS_KEY \tSpecify the AWS AWS secret access key used for s3 copy, not required if have access role instead" + echo -e "JAVA_TOOL_OPTIONS \tSpecify the JVM config, for example:'-Xmx1g'" + echo -e "STORAGE \tSpecify node location cache in memory by 'ram' or in a temporary memory-mapped file by 'mmap', Needed for planet tile generation" + echo "" + echo "--------------------------------------------------------------------------" +} + +# Validate the required parameters to present +validateParameter() { + if [ -z "$AREA" ] || [ -z "$TILE_S3_BUCKET" ]; then + echo "You must specify '-e AREA', '-e TILE_S3_BUCKET' in container environment variables" + usage + exit 1 + fi +} + +if [ "$1" == "generate" ] +then + validateParameter + + if [ "$AREA" == "planet" ] + then + java "${JAVA_TOOL_OPTIONS}" \ + -jar planetiler.jar \ + # Download the latest planet.osm.pbf from s3://osm-pds bucket + --area=planet --bounds=planet --download \ + # Accelerate the download by fetching the 10 1GB chunks at a time in parallel + --download-threads=10 --download-chunk-size-mb=1000 \ + # Download name translations from wikidata + --fetch-wikidata \ + --mbtiles="${AREA}".mbtiles \ + # Store temporary node locations in memory + --nodemap-type=array --storage="${STORAGE}" \ + # compact_db is not compatible with mbutil, set it to false + --compact_db=false + else + java "${JAVA_TOOL_OPTIONS}" \ + -jar planetiler.jar \ + --download \ + --area="${AREA}" \ + --mbtiles="${AREA}".mbtiles \ + --compact_db=false + fi + + # Extract mbtiles to zxy pbf tiles + mb-util "${AREA}".mbtiles tiles --image_format=pbf + + # Upload to S3 Bucket + currentTime=$(date "+%Y%m%d%H%M%S") + /s5cmd cp tiles/ s3://"${TILE_S3_BUCKET}"/"${AREA}"-VT-"${currentTime}"/vector/ + echo "tiles copy finished" + exit 0 +fi + +if [ "$1" == "help" ]; then + usage + exit 0 +fi + +echo "invalid command" +exit 1 diff --git a/tiles-generation/docker/vector-tile/modified-s5cmd/cp.go b/tiles-generation/docker/vector-tile/modified-s5cmd/cp.go new file mode 100644 index 0000000..874a3af --- /dev/null +++ b/tiles-generation/docker/vector-tile/modified-s5cmd/cp.go @@ -0,0 +1,868 @@ +// Copyright OpenSearch Contributors +// SPDX-License-Identifier: Apache-2.0 + +// In order to support add metadata 'Content-Encoding=gzip' and 'Content-Type=application/x-protobuf' on copy command, +// modified this file from https://github.com/peak/s5cmd for docker image build. + +package command + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "mime" + "net/http" + "os" + "path/filepath" + "strings" + + "github.com/hashicorp/go-multierror" + "github.com/urfave/cli/v2" + + errorpkg "github.com/peak/s5cmd/error" + "github.com/peak/s5cmd/log" + "github.com/peak/s5cmd/log/stat" + "github.com/peak/s5cmd/parallel" + "github.com/peak/s5cmd/storage" + "github.com/peak/s5cmd/storage/url" +) + +const ( + defaultCopyConcurrency = 5 + defaultPartSize = 50 // MiB + megabytes = 1024 * 1024 +) + +var copyHelpTemplate = `Name: + {{.HelpName}} - {{.Usage}} +Usage: + {{.HelpName}} [options] source destination +Options: + {{range .VisibleFlags}}{{.}} + {{end}} +Examples: + 01. Download an S3 object to working directory + > s5cmd {{.HelpName}} s3://bucket/prefix/object.gz . + 02. Download an S3 object and rename + > s5cmd {{.HelpName}} s3://bucket/prefix/object.gz myobject.gz + 03. Download all S3 objects to a directory + > s5cmd {{.HelpName}} s3://bucket/* target-directory/ + 04. Download an S3 object from a public bucket + > s5cmd --no-sign-request {{.HelpName}} s3://bucket/prefix/object.gz . + 05. Upload a file to S3 bucket + > s5cmd {{.HelpName}} myfile.gz s3://bucket/ + 06. Upload matching files to S3 bucket + > s5cmd {{.HelpName}} dir/*.gz s3://bucket/ + 07. Upload all files in a directory to S3 bucket recursively + > s5cmd {{.HelpName}} dir/ s3://bucket/ + 08. Copy S3 object to another bucket + > s5cmd {{.HelpName}} s3://bucket/object s3://target-bucket/prefix/object + 09. Copy matching S3 objects to another bucket + > s5cmd {{.HelpName}} s3://bucket/*.gz s3://target-bucket/prefix/ + 10. Copy files in a directory to S3 prefix if not found on target + > s5cmd {{.HelpName}} -n -s -u dir/ s3://bucket/target-prefix/ + 11. Copy files in an S3 prefix to another S3 prefix if not found on target + > s5cmd {{.HelpName}} -n -s -u s3://bucket/source-prefix/* s3://bucket/target-prefix/ + 12. Perform KMS Server Side Encryption of the object(s) at the destination + > s5cmd {{.HelpName}} --sse aws:kms s3://bucket/object s3://target-bucket/prefix/object + 13. Perform KMS-SSE of the object(s) at the destination using customer managed Customer Master Key (CMK) key id + > s5cmd {{.HelpName}} --sse aws:kms --sse-kms-key-id s3://bucket/object s3://target-bucket/prefix/object + 14. Force transfer of GLACIER objects with a prefix whether they are restored or not + > s5cmd {{.HelpName}} --force-glacier-transfer s3://bucket/prefix/* target-directory/ + 15. Upload a file to S3 bucket with public read s3 acl + > s5cmd {{.HelpName}} --acl "public-read" myfile.gz s3://bucket/ + 16. Upload a file to S3 bucket with expires header + > s5cmd {{.HelpName}} --expires "2024-10-01T20:30:00Z" myfile.gz s3://bucket/ + 17. Upload a file to S3 bucket with cache-control header + > s5cmd {{.HelpName}} --cache-control "public, max-age=345600" myfile.gz s3://bucket/ + 18. Copy all files to S3 bucket but exclude the ones with txt and gz extension + > s5cmd {{.HelpName}} --exclude "*.txt" --exclude "*.gz" dir/ s3://bucket + 19. Copy all files from S3 bucket to another S3 bucket but exclude the ones starts with log + > s5cmd {{.HelpName}} --exclude "log*" s3://bucket/* s3://destbucket + 20. Download an S3 object from a requester pays bucket + > s5cmd --request-payer=requester {{.HelpName}} s3://bucket/prefix/object.gz . +` + +func NewSharedFlags() []cli.Flag { + return []cli.Flag{ + &cli.BoolFlag{ + Name: "no-follow-symlinks", + Usage: "do not follow symbolic links", + }, + &cli.StringFlag{ + Name: "storage-class", + Usage: "set storage class for target ('STANDARD','REDUCED_REDUNDANCY','GLACIER','STANDARD_IA','ONEZONE_IA','INTELLIGENT_TIERING','DEEP_ARCHIVE')", + }, + &cli.IntFlag{ + Name: "concurrency", + Aliases: []string{"c"}, + Value: defaultCopyConcurrency, + Usage: "number of concurrent parts transferred between host and remote server", + }, + &cli.IntFlag{ + Name: "part-size", + Aliases: []string{"p"}, + Value: defaultPartSize, + Usage: "size of each part transferred between host and remote server, in MiB", + }, + &cli.StringFlag{ + Name: "sse", + Usage: "perform server side encryption of the data at its destination, e.g. aws:kms", + }, + &cli.StringFlag{ + Name: "sse-kms-key-id", + Usage: "customer master key (CMK) id for SSE-KMS encryption; leave it out if server-side generated key is desired", + }, + &cli.StringFlag{ + Name: "acl", + Usage: "set acl for target: defines granted accesses and their types on different accounts/groups, e.g. cp --acl 'public-read'", + }, + &cli.StringFlag{ + Name: "cache-control", + Usage: "set cache control for target: defines cache control header for object, e.g. cp --cache-control 'public, max-age=345600'", + }, + &cli.StringFlag{ + Name: "expires", + Usage: "set expires for target (uses RFC3339 format): defines expires header for object, e.g. cp --expires '2024-10-01T20:30:00Z'", + }, + &cli.BoolFlag{ + Name: "force-glacier-transfer", + Usage: "force transfer of glacier objects whether they are restored or not", + }, + &cli.BoolFlag{ + Name: "ignore-glacier-warnings", + Usage: "turns off glacier warnings: ignore errors encountered during copying, downloading and moving glacier objects", + }, + &cli.StringFlag{ + Name: "source-region", + Usage: "set the region of source bucket; the region of the source bucket will be automatically discovered if --source-region is not specified", + }, + &cli.StringFlag{ + Name: "destination-region", + Usage: "set the region of destination bucket: the region of the destination bucket will be automatically discovered if --destination-region is not specified", + }, + &cli.StringSliceFlag{ + Name: "exclude", + Usage: "exclude objects with given pattern", + }, + &cli.BoolFlag{ + Name: "raw", + Usage: "disable the wildcard operations, useful with filenames that contains glob characters", + }, + } +} + +func NewCopyCommandFlags() []cli.Flag { + copyFlags := []cli.Flag{ + &cli.BoolFlag{ + Name: "flatten", + Aliases: []string{"f"}, + Usage: "flatten directory structure of source, starting from the first wildcard", + }, + &cli.BoolFlag{ + Name: "no-clobber", + Aliases: []string{"n"}, + Usage: "do not overwrite destination if already exists", + }, + &cli.BoolFlag{ + Name: "if-size-differ", + Aliases: []string{"s"}, + Usage: "only overwrite destination if size differs", + }, + &cli.BoolFlag{ + Name: "if-source-newer", + Aliases: []string{"u"}, + Usage: "only overwrite destination if source modtime is newer", + }, + } + sharedFlags := NewSharedFlags() + return append(copyFlags, sharedFlags...) +} + +func NewCopyCommand() *cli.Command { + return &cli.Command{ + Name: "cp", + HelpName: "cp", + Usage: "copy objects", + Flags: NewCopyCommandFlags(), + CustomHelpTemplate: copyHelpTemplate, + Before: func(c *cli.Context) error { + err := validateCopyCommand(c) + if err != nil { + printError(commandFromContext(c), c.Command.Name, err) + } + return err + }, + Action: func(c *cli.Context) (err error) { + defer stat.Collect(c.Command.FullName(), &err)() + + // don't delete source + return NewCopy(c, false).Run(c.Context) + }, + } +} + +// Copy holds copy operation flags and states. +type Copy struct { + src string + dst string + op string + fullCommand string + + deleteSource bool + + // flags + noClobber bool + ifSizeDiffer bool + ifSourceNewer bool + flatten bool + followSymlinks bool + storageClass storage.StorageClass + encryptionMethod string + encryptionKeyID string + acl string + forceGlacierTransfer bool + ignoreGlacierWarnings bool + exclude []string + raw bool + cacheControl string + expires string + + // region settings + srcRegion string + dstRegion string + + // s3 options + concurrency int + partSize int64 + storageOpts storage.Options +} + +// NewCopy creates Copy from cli.Context. +func NewCopy(c *cli.Context, deleteSource bool) Copy { + return Copy{ + src: c.Args().Get(0), + dst: c.Args().Get(1), + op: c.Command.Name, + fullCommand: commandFromContext(c), + deleteSource: deleteSource, + // flags + noClobber: c.Bool("no-clobber"), + ifSizeDiffer: c.Bool("if-size-differ"), + ifSourceNewer: c.Bool("if-source-newer"), + flatten: c.Bool("flatten"), + followSymlinks: !c.Bool("no-follow-symlinks"), + storageClass: storage.StorageClass(c.String("storage-class")), + concurrency: c.Int("concurrency"), + partSize: c.Int64("part-size") * megabytes, + encryptionMethod: c.String("sse"), + encryptionKeyID: c.String("sse-kms-key-id"), + acl: c.String("acl"), + forceGlacierTransfer: c.Bool("force-glacier-transfer"), + ignoreGlacierWarnings: c.Bool("ignore-glacier-warnings"), + exclude: c.StringSlice("exclude"), + raw: c.Bool("raw"), + cacheControl: c.String("cache-control"), + expires: c.String("expires"), + // region settings + srcRegion: c.String("source-region"), + dstRegion: c.String("destination-region"), + + storageOpts: NewStorageOpts(c), + } +} + +const fdlimitWarning = ` +WARNING: s5cmd is hitting the max open file limit allowed by your OS. Either +increase the open file limit or try to decrease the number of workers with +'-numworkers' parameter. +` + +// Run starts copying given source objects to destination. +func (c Copy) Run(ctx context.Context) error { + srcurl, err := url.New(c.src, url.WithRaw(c.raw)) + if err != nil { + printError(c.fullCommand, c.op, err) + return err + } + + dsturl, err := url.New(c.dst, url.WithRaw(c.raw)) + if err != nil { + printError(c.fullCommand, c.op, err) + return err + } + + // override source region if set + if c.srcRegion != "" { + c.storageOpts.SetRegion(c.srcRegion) + } + + client, err := storage.NewClient(ctx, srcurl, c.storageOpts) + if err != nil { + printError(c.fullCommand, c.op, err) + return err + } + + objch, err := expandSource(ctx, client, c.followSymlinks, srcurl) + + if err != nil { + printError(c.fullCommand, c.op, err) + return err + } + + waiter := parallel.NewWaiter() + + var ( + merrorWaiter error + merrorObjects error + errDoneCh = make(chan bool) + ) + + go func() { + defer close(errDoneCh) + for err := range waiter.Err() { + if strings.Contains(err.Error(), "too many open files") { + fmt.Println(strings.TrimSpace(fdlimitWarning)) + fmt.Printf("ERROR %v\n", err) + + os.Exit(1) + } + printError(c.fullCommand, c.op, err) + merrorWaiter = multierror.Append(merrorWaiter, err) + } + }() + + isBatch := srcurl.IsWildcard() + if !isBatch && !srcurl.IsRemote() { + obj, _ := client.Stat(ctx, srcurl) + isBatch = obj != nil && obj.Type.IsDir() + } + + excludePatterns, err := createExcludesFromWildcard(c.exclude) + if err != nil { + printError(c.fullCommand, c.op, err) + return err + } + + for object := range objch { + if object.Type.IsDir() || errorpkg.IsCancelation(object.Err) { + continue + } + + if err := object.Err; err != nil { + merrorObjects = multierror.Append(merrorObjects, err) + printError(c.fullCommand, c.op, err) + continue + } + + if object.StorageClass.IsGlacier() && !c.forceGlacierTransfer { + if !c.ignoreGlacierWarnings { + err := fmt.Errorf("object '%v' is on Glacier storage", object) + merrorObjects = multierror.Append(merrorObjects, err) + printError(c.fullCommand, c.op, err) + } + continue + } + + if isURLExcluded(excludePatterns, object.URL.Path, srcurl.Prefix) { + continue + } + + srcurl := object.URL + var task parallel.Task + + switch { + case srcurl.Type == dsturl.Type: // local->local or remote->remote + task = c.prepareCopyTask(ctx, srcurl, dsturl, isBatch) + case srcurl.IsRemote(): // remote->local + task = c.prepareDownloadTask(ctx, srcurl, dsturl, isBatch) + case dsturl.IsRemote(): // local->remote + task = c.prepareUploadTask(ctx, srcurl, dsturl, isBatch) + default: + panic("unexpected src-dst pair") + } + + parallel.Run(task, waiter) + } + + waiter.Wait() + <-errDoneCh + + return multierror.Append(merrorWaiter, merrorObjects).ErrorOrNil() +} + +func (c Copy) prepareCopyTask( + ctx context.Context, + srcurl *url.URL, + dsturl *url.URL, + isBatch bool, +) func() error { + return func() error { + dsturl = prepareRemoteDestination(srcurl, dsturl, c.flatten, isBatch) + err := c.doCopy(ctx, srcurl, dsturl) + if err != nil { + return &errorpkg.Error{ + Op: c.op, + Src: srcurl, + Dst: dsturl, + Err: err, + } + } + return nil + } +} + +func (c Copy) prepareDownloadTask( + ctx context.Context, + srcurl *url.URL, + dsturl *url.URL, + isBatch bool, +) func() error { + return func() error { + dsturl, err := prepareLocalDestination(ctx, srcurl, dsturl, c.flatten, isBatch, c.storageOpts) + if err != nil { + return err + } + err = c.doDownload(ctx, srcurl, dsturl) + if err != nil { + return &errorpkg.Error{ + Op: c.op, + Src: srcurl, + Dst: dsturl, + Err: err, + } + } + return nil + } +} + +func (c Copy) prepareUploadTask( + ctx context.Context, + srcurl *url.URL, + dsturl *url.URL, + isBatch bool, +) func() error { + return func() error { + dsturl = prepareRemoteDestination(srcurl, dsturl, c.flatten, isBatch) + err := c.doUpload(ctx, srcurl, dsturl) + if err != nil { + return &errorpkg.Error{ + Op: c.op, + Src: srcurl, + Dst: dsturl, + Err: err, + } + } + return nil + } +} + +// doDownload is used to fetch a remote object and save as a local object. +func (c Copy) doDownload(ctx context.Context, srcurl *url.URL, dsturl *url.URL) error { + srcClient, err := storage.NewRemoteClient(ctx, srcurl, c.storageOpts) + if err != nil { + return err + } + + dstClient := storage.NewLocalClient(c.storageOpts) + + err = c.shouldOverride(ctx, srcurl, dsturl) + if err != nil { + // FIXME(ig): rename + if errorpkg.IsWarning(err) { + printDebug(c.op, err, srcurl, dsturl) + return nil + } + return err + } + + file, err := dstClient.Create(dsturl.Absolute()) + if err != nil { + return err + } + defer file.Close() + + size, err := srcClient.Get(ctx, srcurl, file, c.concurrency, c.partSize) + if err != nil { + _ = dstClient.Delete(ctx, dsturl) + return err + } + + if c.deleteSource { + _ = srcClient.Delete(ctx, srcurl) + } + + msg := log.InfoMessage{ + Operation: c.op, + Source: srcurl, + Destination: dsturl, + Object: &storage.Object{ + Size: size, + }, + } + log.Info(msg) + + return nil +} + +func (c Copy) doUpload(ctx context.Context, srcurl *url.URL, dsturl *url.URL) error { + srcClient := storage.NewLocalClient(c.storageOpts) + + file, err := srcClient.Open(srcurl.Absolute()) + if err != nil { + return err + } + defer file.Close() + + err = c.shouldOverride(ctx, srcurl, dsturl) + if err != nil { + if errorpkg.IsWarning(err) { + printDebug(c.op, err, srcurl, dsturl) + return nil + } + return err + } + + // override destination region if set + if c.dstRegion != "" { + c.storageOpts.SetRegion(c.dstRegion) + } + dstClient, err := storage.NewRemoteClient(ctx, dsturl, c.storageOpts) + if err != nil { + return err + } + + metadata := storage.NewMetadata(). + SetContentType("application/x-protobuf"). + SetContentEncoding("gzip"). + SetStorageClass(string(c.storageClass)). + SetSSE(c.encryptionMethod). + SetSSEKeyID(c.encryptionKeyID). + SetACL(c.acl). + SetCacheControl(c.cacheControl). + SetExpires(c.expires) + + err = dstClient.Put(ctx, file, dsturl, metadata, c.concurrency, c.partSize) + if err != nil { + return err + } + + obj, _ := srcClient.Stat(ctx, srcurl) + size := obj.Size + + if c.deleteSource { + // close the file before deleting + file.Close() + if err := srcClient.Delete(ctx, srcurl); err != nil { + return err + } + } + + msg := log.InfoMessage{ + Operation: c.op, + Source: srcurl, + Destination: dsturl, + Object: &storage.Object{ + Size: size, + StorageClass: c.storageClass, + }, + } + log.Info(msg) + + return nil +} + +func (c Copy) doCopy(ctx context.Context, srcurl, dsturl *url.URL) error { + // override destination region if set + if c.dstRegion != "" { + c.storageOpts.SetRegion(c.dstRegion) + } + dstClient, err := storage.NewClient(ctx, dsturl, c.storageOpts) + if err != nil { + return err + } + + metadata := storage.NewMetadata(). + SetStorageClass(string(c.storageClass)). + SetSSE(c.encryptionMethod). + SetSSEKeyID(c.encryptionKeyID). + SetACL(c.acl). + SetCacheControl(c.cacheControl). + SetExpires(c.expires) + + err = c.shouldOverride(ctx, srcurl, dsturl) + if err != nil { + if errorpkg.IsWarning(err) { + printDebug(c.op, err, srcurl, dsturl) + return nil + } + return err + } + + err = dstClient.Copy(ctx, srcurl, dsturl, metadata) + if err != nil { + return err + } + + if c.deleteSource { + srcClient, err := storage.NewClient(ctx, srcurl, c.storageOpts) + if err != nil { + return err + } + if err := srcClient.Delete(ctx, srcurl); err != nil { + return err + } + } + + msg := log.InfoMessage{ + Operation: c.op, + Source: srcurl, + Destination: dsturl, + Object: &storage.Object{ + URL: dsturl, + StorageClass: c.storageClass, + }, + } + log.Info(msg) + + return nil +} + +// shouldOverride function checks if the destination should be overridden if +// the source-destination pair and given copy flags conform to the +// override criteria. For example; "cp -n -s " should not override +// the if and filenames are the same, except if the size +// differs. +func (c Copy) shouldOverride(ctx context.Context, srcurl *url.URL, dsturl *url.URL) error { + // if not asked to override, ignore. + if !c.noClobber && !c.ifSizeDiffer && !c.ifSourceNewer { + return nil + } + + srcClient, err := storage.NewClient(ctx, srcurl, c.storageOpts) + if err != nil { + return err + } + + srcObj, err := getObject(ctx, srcurl, srcClient) + if err != nil { + return err + } + + dstClient, err := storage.NewClient(ctx, dsturl, c.storageOpts) + if err != nil { + return err + } + + dstObj, err := getObject(ctx, dsturl, dstClient) + if err != nil { + return err + } + + // if destination not exists, no conditions apply. + if dstObj == nil { + return nil + } + + var stickyErr error + if c.noClobber { + stickyErr = errorpkg.ErrObjectExists + } + + if c.ifSizeDiffer { + if srcObj.Size == dstObj.Size { + stickyErr = errorpkg.ErrObjectSizesMatch + } else { + stickyErr = nil + } + } + + if c.ifSourceNewer { + srcMod, dstMod := srcObj.ModTime, dstObj.ModTime + + if !srcMod.After(*dstMod) { + stickyErr = errorpkg.ErrObjectIsNewer + } else { + stickyErr = nil + } + } + + return stickyErr +} + +// prepareRemoteDestination will return a new destination URL for +// remote->remote and local->remote copy operations. +func prepareRemoteDestination( + srcurl *url.URL, + dsturl *url.URL, + flatten bool, + isBatch bool, +) *url.URL { + objname := srcurl.Base() + if isBatch && !flatten { + objname = srcurl.Relative() + } + + if dsturl.IsPrefix() || dsturl.IsBucket() { + dsturl = dsturl.Join(objname) + } + return dsturl +} + +// prepareDownloadDestination will return a new destination URL for +// remote->local copy operations. +func prepareLocalDestination( + ctx context.Context, + srcurl *url.URL, + dsturl *url.URL, + flatten bool, + isBatch bool, + storageOpts storage.Options, +) (*url.URL, error) { + objname := srcurl.Base() + if isBatch && !flatten { + objname = srcurl.Relative() + } + + client := storage.NewLocalClient(storageOpts) + + if isBatch { + err := client.MkdirAll(dsturl.Absolute()) + if err != nil { + return nil, err + } + } + + obj, err := client.Stat(ctx, dsturl) + if err != nil && err != storage.ErrGivenObjectNotFound { + return nil, err + } + + if isBatch && !flatten { + dsturl = dsturl.Join(objname) + err := client.MkdirAll(dsturl.Dir()) + if err != nil { + return nil, err + } + } + + if err == storage.ErrGivenObjectNotFound { + err := client.MkdirAll(dsturl.Dir()) + if err != nil { + return nil, err + } + if strings.HasSuffix(dsturl.Absolute(), "/") { + dsturl = dsturl.Join(objname) + } + } else { + if obj.Type.IsDir() { + dsturl = obj.URL.Join(objname) + } + } + + return dsturl, nil +} + +// getObject checks if the object from given url exists. If no object is +// found, error and returning object would be nil. +func getObject(ctx context.Context, url *url.URL, client storage.Storage) (*storage.Object, error) { + obj, err := client.Stat(ctx, url) + if err == storage.ErrGivenObjectNotFound { + return nil, nil + } + + return obj, err +} + +func validateCopyCommand(c *cli.Context) error { + if c.Args().Len() != 2 { + return fmt.Errorf("expected source and destination arguments") + } + + ctx := c.Context + src := c.Args().Get(0) + dst := c.Args().Get(1) + + srcurl, err := url.New(src, url.WithRaw(c.Bool("raw"))) + if err != nil { + return err + } + + dsturl, err := url.New(dst, url.WithRaw(c.Bool("raw"))) + if err != nil { + return err + } + + // wildcard destination doesn't mean anything + if dsturl.IsWildcard() { + return fmt.Errorf("target %q can not contain glob characters", dst) + } + + // we don't operate on S3 prefixes for copy and delete operations. + if srcurl.IsBucket() || srcurl.IsPrefix() { + return fmt.Errorf("source argument must contain wildcard character") + } + + // 'cp dir/* s3://bucket/prefix': expect a trailing slash to avoid any + // surprises. + if srcurl.IsWildcard() && dsturl.IsRemote() && !dsturl.IsPrefix() && !dsturl.IsBucket() { + return fmt.Errorf("target %q must be a bucket or a prefix", dsturl) + } + + switch { + case srcurl.Type == dsturl.Type: + return validateCopy(srcurl, dsturl) + case dsturl.IsRemote(): + return validateUpload(ctx, srcurl, dsturl, NewStorageOpts(c)) + default: + return nil + } +} + +func validateCopy(srcurl, dsturl *url.URL) error { + if srcurl.IsRemote() || dsturl.IsRemote() { + return nil + } + + // we don't support local->local copies + return fmt.Errorf("local->local copy operations are not permitted") +} + +func validateUpload(ctx context.Context, srcurl, dsturl *url.URL, storageOpts storage.Options) error { + srcclient := storage.NewLocalClient(storageOpts) + + if srcurl.IsWildcard() { + return nil + } + + obj, err := srcclient.Stat(ctx, srcurl) + if err != nil { + return err + } + + // 'cp dir/ s3://bucket/prefix-without-slash': expect a trailing slash to + // avoid any surprises. + if obj.Type.IsDir() && !dsturl.IsBucket() && !dsturl.IsPrefix() { + return fmt.Errorf("target %q must be a bucket or a prefix", dsturl) + } + + return nil +} + +// guessContentType gets content type of the file. +func guessContentType(file *os.File) string { + contentType := mime.TypeByExtension(filepath.Ext(file.Name())) + if contentType == "" { + defer file.Seek(0, io.SeekStart) + + const bufsize = 512 + buf, err := ioutil.ReadAll(io.LimitReader(file, bufsize)) + if err != nil { + return "" + } + + return http.DetectContentType(buf) + } + return contentType +} \ No newline at end of file diff --git a/tiles-generation/docker/vector-tile/modified-s5cmd/s3.go b/tiles-generation/docker/vector-tile/modified-s5cmd/s3.go new file mode 100644 index 0000000..f187b72 --- /dev/null +++ b/tiles-generation/docker/vector-tile/modified-s5cmd/s3.go @@ -0,0 +1,969 @@ +// Copyright OpenSearch Contributors +// SPDX-License-Identifier: Apache-2.0 + +// In order to support add metadata 'Content-Encoding=gzip' and 'Content-Type=application/x-protobuf' on copy command, +// modified this file from https://github.com/peak/s5cmd for docker image build. + +package storage + +import ( + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + urlpkg "net/url" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/client" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/endpoints" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/aws/aws-sdk-go/service/s3/s3manager/s3manageriface" + + "github.com/peak/s5cmd/log" + "github.com/peak/s5cmd/storage/url" +) + +var sentinelURL = urlpkg.URL{} + +const ( + // deleteObjectsMax is the max allowed objects to be deleted on single HTTP + // request. + deleteObjectsMax = 1000 + + // Amazon Accelerated Transfer endpoint + transferAccelEndpoint = "s3-accelerate.amazonaws.com" + + // Google Cloud Storage endpoint + gcsEndpoint = "storage.googleapis.com" +) + +// Re-used AWS sessions dramatically improve performance. +var globalSessionCache = &SessionCache{ + sessions: map[Options]*session.Session{}, +} + +// S3 is a storage type which interacts with S3API, DownloaderAPI and +// UploaderAPI. +type S3 struct { + api s3iface.S3API + downloader s3manageriface.DownloaderAPI + uploader s3manageriface.UploaderAPI + endpointURL urlpkg.URL + dryRun bool + useListObjectsV1 bool + requestPayer string +} + +func (s *S3) RequestPayer() *string { + if s.requestPayer == "" { + return nil + } + return &s.requestPayer +} + +func parseEndpoint(endpoint string) (urlpkg.URL, error) { + if endpoint == "" { + return sentinelURL, nil + } + // add a scheme to correctly parse the endpoint. Without a scheme, + // url.Parse will put the host information in path" + if !strings.HasPrefix(endpoint, "http") { + endpoint = "http://" + endpoint + } + u, err := urlpkg.Parse(endpoint) + if err != nil { + return sentinelURL, fmt.Errorf("parse endpoint %q: %v", endpoint, err) + } + + return *u, nil +} + +// NewS3Storage creates new S3 session. +func newS3Storage(ctx context.Context, opts Options) (*S3, error) { + endpointURL, err := parseEndpoint(opts.Endpoint) + if err != nil { + return nil, err + } + + awsSession, err := globalSessionCache.newSession(ctx, opts) + if err != nil { + return nil, err + } + + return &S3{ + api: s3.New(awsSession), + downloader: s3manager.NewDownloader(awsSession), + uploader: s3manager.NewUploader(awsSession), + endpointURL: endpointURL, + dryRun: opts.DryRun, + useListObjectsV1: opts.UseListObjectsV1, + requestPayer: opts.RequestPayer, + }, nil +} + +// Stat retrieves metadata from S3 object without returning the object itself. +func (s *S3) Stat(ctx context.Context, url *url.URL) (*Object, error) { + output, err := s.api.HeadObjectWithContext(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(url.Bucket), + Key: aws.String(url.Path), + RequestPayer: s.RequestPayer(), + }) + if err != nil { + if errHasCode(err, "NotFound") { + return nil, ErrGivenObjectNotFound + } + return nil, err + } + + etag := aws.StringValue(output.ETag) + mod := aws.TimeValue(output.LastModified) + return &Object{ + URL: url, + Etag: strings.Trim(etag, `"`), + ModTime: &mod, + Size: aws.Int64Value(output.ContentLength), + }, nil +} + +// List is a non-blocking S3 list operation which paginates and filters S3 +// keys. If no object found or an error is encountered during this period, +// it sends these errors to object channel. +func (s *S3) List(ctx context.Context, url *url.URL, _ bool) <-chan *Object { + if isGoogleEndpoint(s.endpointURL) || s.useListObjectsV1 { + return s.listObjects(ctx, url) + } + + return s.listObjectsV2(ctx, url) +} + +func (s *S3) listObjectsV2(ctx context.Context, url *url.URL) <-chan *Object { + listInput := s3.ListObjectsV2Input{ + Bucket: aws.String(url.Bucket), + Prefix: aws.String(url.Prefix), + RequestPayer: s.RequestPayer(), + } + + if url.Delimiter != "" { + listInput.SetDelimiter(url.Delimiter) + } + + objCh := make(chan *Object) + + go func() { + defer close(objCh) + objectFound := false + + var now time.Time + + err := s.api.ListObjectsV2PagesWithContext(ctx, &listInput, func(p *s3.ListObjectsV2Output, lastPage bool) bool { + for _, c := range p.CommonPrefixes { + prefix := aws.StringValue(c.Prefix) + if !url.Match(prefix) { + continue + } + + newurl := url.Clone() + newurl.Path = prefix + objCh <- &Object{ + URL: newurl, + Type: ObjectType{os.ModeDir}, + } + + objectFound = true + } + // track the instant object iteration began, + // so it can be used to bypass objects created after this instant + if now.IsZero() { + now = time.Now().UTC() + } + + for _, c := range p.Contents { + key := aws.StringValue(c.Key) + if !url.Match(key) { + continue + } + + mod := aws.TimeValue(c.LastModified).UTC() + if mod.After(now) { + objectFound = true + continue + } + + var objtype os.FileMode + if strings.HasSuffix(key, "/") { + objtype = os.ModeDir + } + + newurl := url.Clone() + newurl.Path = aws.StringValue(c.Key) + etag := aws.StringValue(c.ETag) + + objCh <- &Object{ + URL: newurl, + Etag: strings.Trim(etag, `"`), + ModTime: &mod, + Type: ObjectType{objtype}, + Size: aws.Int64Value(c.Size), + StorageClass: StorageClass(aws.StringValue(c.StorageClass)), + } + + objectFound = true + } + + return !lastPage + }) + + if err != nil { + objCh <- &Object{Err: err} + return + } + + if !objectFound { + objCh <- &Object{Err: ErrNoObjectFound} + } + }() + + return objCh +} + +// listObjects is used for cloud services that does not support S3 +// ListObjectsV2 API. I'm looking at you GCS. +func (s *S3) listObjects(ctx context.Context, url *url.URL) <-chan *Object { + listInput := s3.ListObjectsInput{ + Bucket: aws.String(url.Bucket), + Prefix: aws.String(url.Prefix), + RequestPayer: s.RequestPayer(), + } + + if url.Delimiter != "" { + listInput.SetDelimiter(url.Delimiter) + } + + objCh := make(chan *Object) + + go func() { + defer close(objCh) + objectFound := false + + var now time.Time + + err := s.api.ListObjectsPagesWithContext(ctx, &listInput, func(p *s3.ListObjectsOutput, lastPage bool) bool { + for _, c := range p.CommonPrefixes { + prefix := aws.StringValue(c.Prefix) + if !url.Match(prefix) { + continue + } + + newurl := url.Clone() + newurl.Path = prefix + objCh <- &Object{ + URL: newurl, + Type: ObjectType{os.ModeDir}, + } + + objectFound = true + } + // track the instant object iteration began, + // so it can be used to bypass objects created after this instant + if now.IsZero() { + now = time.Now().UTC() + } + + for _, c := range p.Contents { + key := aws.StringValue(c.Key) + if !url.Match(key) { + continue + } + + mod := aws.TimeValue(c.LastModified).UTC() + if mod.After(now) { + objectFound = true + continue + } + + var objtype os.FileMode + if strings.HasSuffix(key, "/") { + objtype = os.ModeDir + } + + newurl := url.Clone() + newurl.Path = aws.StringValue(c.Key) + etag := aws.StringValue(c.ETag) + + objCh <- &Object{ + URL: newurl, + Etag: strings.Trim(etag, `"`), + ModTime: &mod, + Type: ObjectType{objtype}, + Size: aws.Int64Value(c.Size), + StorageClass: StorageClass(aws.StringValue(c.StorageClass)), + } + + objectFound = true + } + + return !lastPage + }) + + if err != nil { + objCh <- &Object{Err: err} + return + } + + if !objectFound { + objCh <- &Object{Err: ErrNoObjectFound} + } + }() + + return objCh +} + +// Copy is a single-object copy operation which copies objects to S3 +// destination from another S3 source. +func (s *S3) Copy(ctx context.Context, from, to *url.URL, metadata Metadata) error { + if s.dryRun { + return nil + } + + // SDK expects CopySource like "bucket[/key]" + copySource := from.EscapedPath() + + input := &s3.CopyObjectInput{ + Bucket: aws.String(to.Bucket), + Key: aws.String(to.Path), + CopySource: aws.String(copySource), + RequestPayer: s.RequestPayer(), + } + + storageClass := metadata.StorageClass() + if storageClass != "" { + input.StorageClass = aws.String(storageClass) + } + + sseEncryption := metadata.SSE() + if sseEncryption != "" { + input.ServerSideEncryption = aws.String(sseEncryption) + sseKmsKeyID := metadata.SSEKeyID() + if sseKmsKeyID != "" { + input.SSEKMSKeyId = aws.String(sseKmsKeyID) + } + } + + acl := metadata.ACL() + if acl != "" { + input.ACL = aws.String(acl) + } + + cacheControl := metadata.CacheControl() + if cacheControl != "" { + input.CacheControl = aws.String(cacheControl) + } + + expires := metadata.Expires() + if expires != "" { + t, err := time.Parse(time.RFC3339, expires) + if err != nil { + return err + } + input.Expires = aws.Time(t) + } + + _, err := s.api.CopyObject(input) + return err +} + +// Read fetches the remote object and returns its contents as an io.ReadCloser. +func (s *S3) Read(ctx context.Context, src *url.URL) (io.ReadCloser, error) { + resp, err := s.api.GetObjectWithContext(ctx, &s3.GetObjectInput{ + Bucket: aws.String(src.Bucket), + Key: aws.String(src.Path), + RequestPayer: s.RequestPayer(), + }) + if err != nil { + return nil, err + } + return resp.Body, nil +} + +// Get is a multipart download operation which downloads S3 objects into any +// destination that implements io.WriterAt interface. +// Makes a single 'GetObject' call if 'concurrency' is 1 and ignores 'partSize'. +func (s *S3) Get( + ctx context.Context, + from *url.URL, + to io.WriterAt, + concurrency int, + partSize int64, +) (int64, error) { + if s.dryRun { + return 0, nil + } + + return s.downloader.DownloadWithContext(ctx, to, &s3.GetObjectInput{ + Bucket: aws.String(from.Bucket), + Key: aws.String(from.Path), + RequestPayer: s.RequestPayer(), + }, func(u *s3manager.Downloader) { + u.PartSize = partSize + u.Concurrency = concurrency + }) +} + +type SelectQuery struct { + ExpressionType string + Expression string + CompressionType string +} + +func (s *S3) Select(ctx context.Context, url *url.URL, query *SelectQuery, resultCh chan<- json.RawMessage) error { + if s.dryRun { + return nil + } + + input := &s3.SelectObjectContentInput{ + Bucket: aws.String(url.Bucket), + Key: aws.String(url.Path), + ExpressionType: aws.String(query.ExpressionType), + Expression: aws.String(query.Expression), + InputSerialization: &s3.InputSerialization{ + CompressionType: aws.String(query.CompressionType), + JSON: &s3.JSONInput{ + Type: aws.String("Lines"), + }, + }, + OutputSerialization: &s3.OutputSerialization{ + JSON: &s3.JSONOutput{}, + }, + } + + resp, err := s.api.SelectObjectContentWithContext(ctx, input) + if err != nil { + return err + } + + reader, writer := io.Pipe() + + go func() { + defer writer.Close() + + eventch := resp.EventStream.Reader.Events() + defer resp.EventStream.Close() + + for { + select { + case <-ctx.Done(): + return + case event, ok := <-eventch: + if !ok { + return + } + + switch e := event.(type) { + case *s3.RecordsEvent: + writer.Write(e.Payload) + } + } + } + }() + + decoder := json.NewDecoder(reader) + for { + var record json.RawMessage + err := decoder.Decode(&record) + if err == io.EOF { + break + } + if err != nil { + return err + } + resultCh <- record + } + + return resp.EventStream.Reader.Err() +} + +// Put is a multipart upload operation to upload resources, which implements +// io.Reader interface, into S3 destination. +func (s *S3) Put( + ctx context.Context, + reader io.Reader, + to *url.URL, + metadata Metadata, + concurrency int, + partSize int64, +) error { + if s.dryRun { + return nil + } + + contentType := metadata.ContentType() + if contentType == "" { + contentType = "application/octet-stream" + } + + input := &s3manager.UploadInput{ + Bucket: aws.String(to.Bucket), + Key: aws.String(to.Path), + Body: reader, + ContentType: aws.String("application/x-protobuf"), + ContentEncoding: aws.String("gzip"), + RequestPayer: s.RequestPayer(), + } + + storageClass := metadata.StorageClass() + if storageClass != "" { + input.StorageClass = aws.String(storageClass) + } + acl := metadata.ACL() + if acl != "" { + input.ACL = aws.String(acl) + } + + cacheControl := metadata.CacheControl() + if cacheControl != "" { + input.CacheControl = aws.String(cacheControl) + } + + expires := metadata.Expires() + if expires != "" { + t, err := time.Parse(time.RFC3339, expires) + if err != nil { + return err + } + input.Expires = aws.Time(t) + } + + sseEncryption := metadata.SSE() + if sseEncryption != "" { + input.ServerSideEncryption = aws.String(sseEncryption) + sseKmsKeyID := metadata.SSEKeyID() + if sseKmsKeyID != "" { + input.SSEKMSKeyId = aws.String(sseKmsKeyID) + } + } + + _, err := s.uploader.UploadWithContext(ctx, input, func(u *s3manager.Uploader) { + u.PartSize = partSize + u.Concurrency = concurrency + }) + + return err +} + +// chunk is an object identifier container which is used on MultiDelete +// operations. Since DeleteObjects API allows deleting objects up to 1000, +// splitting keys into multiple chunks is required. +type chunk struct { + Bucket string + Keys []*s3.ObjectIdentifier +} + +// calculateChunks calculates chunks for given URL channel and returns +// read-only chunk channel. +func (s *S3) calculateChunks(ch <-chan *url.URL) <-chan chunk { + chunkch := make(chan chunk) + + go func() { + defer close(chunkch) + + var keys []*s3.ObjectIdentifier + initKeys := func() { + keys = make([]*s3.ObjectIdentifier, 0) + } + + var bucket string + for url := range ch { + bucket = url.Bucket + + objid := &s3.ObjectIdentifier{Key: aws.String(url.Path)} + keys = append(keys, objid) + if len(keys) == deleteObjectsMax { + chunkch <- chunk{ + Bucket: bucket, + Keys: keys, + } + initKeys() + } + } + + if len(keys) > 0 { + chunkch <- chunk{ + Bucket: bucket, + Keys: keys, + } + } + }() + + return chunkch +} + +// Delete is a single object delete operation. +func (s *S3) Delete(ctx context.Context, url *url.URL) error { + chunk := chunk{ + Bucket: url.Bucket, + Keys: []*s3.ObjectIdentifier{ + {Key: aws.String(url.Path)}, + }, + } + + resultch := make(chan *Object, 1) + defer close(resultch) + + s.doDelete(ctx, chunk, resultch) + obj := <-resultch + return obj.Err +} + +// doDelete deletes the given keys given by chunk. Results are piggybacked via +// the Object container. +func (s *S3) doDelete(ctx context.Context, chunk chunk, resultch chan *Object) { + if s.dryRun { + for _, k := range chunk.Keys { + key := fmt.Sprintf("s3://%v/%v", chunk.Bucket, aws.StringValue(k.Key)) + url, _ := url.New(key) + resultch <- &Object{URL: url} + } + return + } + + bucket := chunk.Bucket + o, err := s.api.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ + Bucket: aws.String(bucket), + Delete: &s3.Delete{Objects: chunk.Keys}, + RequestPayer: s.RequestPayer(), + }) + if err != nil { + resultch <- &Object{Err: err} + return + } + + for _, d := range o.Deleted { + key := fmt.Sprintf("s3://%v/%v", bucket, aws.StringValue(d.Key)) + url, _ := url.New(key) + resultch <- &Object{URL: url} + } + + for _, e := range o.Errors { + key := fmt.Sprintf("s3://%v/%v", bucket, aws.StringValue(e.Key)) + url, _ := url.New(key) + resultch <- &Object{ + URL: url, + Err: fmt.Errorf(aws.StringValue(e.Message)), + } + } +} + +// MultiDelete is a asynchronous removal operation for multiple objects. +// It reads given url channel, creates multiple chunks and run these +// chunks in parallel. Each chunk may have at most 1000 objects since DeleteObjects +// API has a limitation. +// See: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html. +func (s *S3) MultiDelete(ctx context.Context, urlch <-chan *url.URL) <-chan *Object { + resultch := make(chan *Object) + + go func() { + sem := make(chan bool, 10) + defer close(sem) + defer close(resultch) + + chunks := s.calculateChunks(urlch) + + var wg sync.WaitGroup + for chunk := range chunks { + chunk := chunk + + wg.Add(1) + sem <- true + + go func() { + defer wg.Done() + s.doDelete(ctx, chunk, resultch) + <-sem + }() + } + + wg.Wait() + }() + + return resultch +} + +// ListBuckets is a blocking list-operation which gets bucket list and returns +// the buckets that match with given prefix. +func (s *S3) ListBuckets(ctx context.Context, prefix string) ([]Bucket, error) { + o, err := s.api.ListBucketsWithContext(ctx, &s3.ListBucketsInput{}) + if err != nil { + return nil, err + } + + var buckets []Bucket + for _, b := range o.Buckets { + bucketName := aws.StringValue(b.Name) + if prefix == "" || strings.HasPrefix(bucketName, prefix) { + buckets = append(buckets, Bucket{ + CreationDate: aws.TimeValue(b.CreationDate), + Name: bucketName, + }) + } + } + return buckets, nil +} + +// MakeBucket creates an S3 bucket with the given name. +func (s *S3) MakeBucket(ctx context.Context, name string) error { + if s.dryRun { + return nil + } + + _, err := s.api.CreateBucketWithContext(ctx, &s3.CreateBucketInput{ + Bucket: aws.String(name), + }) + return err +} + +// RemoveBucket removes an S3 bucket with the given name. +func (s *S3) RemoveBucket(ctx context.Context, name string) error { + if s.dryRun { + return nil + } + + _, err := s.api.DeleteBucketWithContext(ctx, &s3.DeleteBucketInput{ + Bucket: aws.String(name), + }) + return err +} + +// SessionCache holds session.Session according to s3Opts and it synchronizes +// access/modification. +type SessionCache struct { + sync.Mutex + sessions map[Options]*session.Session +} + +// newSession initializes a new AWS session with region fallback and custom +// options. +func (sc *SessionCache) newSession(ctx context.Context, opts Options) (*session.Session, error) { + sc.Lock() + defer sc.Unlock() + + if sess, ok := sc.sessions[opts]; ok { + return sess, nil + } + + awsCfg := aws.NewConfig() + + if opts.NoSignRequest { + // do not sign requests when making service API calls + awsCfg.Credentials = credentials.AnonymousCredentials + } + + endpointURL, err := parseEndpoint(opts.Endpoint) + if err != nil { + return nil, err + } + + // use virtual-host-style if the endpoint is known to support it, + // otherwise use the path-style approach. + isVirtualHostStyle := isVirtualHostStyle(endpointURL) + + useAccelerate := supportsTransferAcceleration(endpointURL) + // AWS SDK handles transfer acceleration automatically. Setting the + // Endpoint to a transfer acceleration endpoint would cause bucket + // operations fail. + if useAccelerate { + endpointURL = sentinelURL + } + + var httpClient *http.Client + if opts.NoVerifySSL { + httpClient = insecureHTTPClient + } + + awsCfg = awsCfg. + WithEndpoint(endpointURL.String()). + WithS3ForcePathStyle(!isVirtualHostStyle). + WithS3UseAccelerate(useAccelerate). + WithHTTPClient(httpClient) + + awsCfg.Retryer = newCustomRetryer(opts.MaxRetries) + + useSharedConfig := session.SharedConfigEnable + { + // Reverse of what the SDK does: if AWS_SDK_LOAD_CONFIG is 0 (or a + // falsy value) disable shared configs + loadCfg := os.Getenv("AWS_SDK_LOAD_CONFIG") + if loadCfg != "" { + if enable, _ := strconv.ParseBool(loadCfg); !enable { + useSharedConfig = session.SharedConfigDisable + } + } + } + + sess, err := session.NewSessionWithOptions( + session.Options{ + Config: *awsCfg, + SharedConfigState: useSharedConfig, + }, + ) + if err != nil { + return nil, err + } + + // get region of the bucket and create session accordingly. if the region + // is not provided, it means we want region-independent session + // for operations such as listing buckets, making a new bucket etc. + // only get bucket region when it is not specified. + if opts.region != "" { + sess.Config.Region = aws.String(opts.region) + } else { + if err := setSessionRegion(ctx, sess, opts.bucket); err != nil { + return nil, err + } + } + + sc.sessions[opts] = sess + + return sess, nil +} + +func (sc *SessionCache) clear() { + sc.Lock() + defer sc.Unlock() + sc.sessions = map[Options]*session.Session{} +} + +func setSessionRegion(ctx context.Context, sess *session.Session, bucket string) error { + region := aws.StringValue(sess.Config.Region) + + if region != "" { + return nil + } + + // set default region + sess.Config.Region = aws.String(endpoints.UsEast1RegionID) + + if bucket == "" { + return nil + } + + // auto-detection + region, err := s3manager.GetBucketRegion(ctx, sess, bucket, "", func(r *request.Request) { + // s3manager.GetBucketRegion uses Path style addressing and + // AnonymousCredentials by default, updating Request's Config to match + // the session config. + r.Config.S3ForcePathStyle = sess.Config.S3ForcePathStyle + r.Config.Credentials = sess.Config.Credentials + }) + if err != nil { + if errHasCode(err, "NotFound") { + return err + } + // don't deny any request to the service if region auto-fetching + // receives an error. Delegate error handling to command execution. + err = fmt.Errorf("session: fetching region failed: %v", err) + msg := log.ErrorMessage{Err: err.Error()} + log.Error(msg) + } else { + sess.Config.Region = aws.String(region) + } + + return nil +} + +// customRetryer wraps the SDK's built in DefaultRetryer adding additional +// error codes. Such as, retry for S3 InternalError code. +type customRetryer struct { + client.DefaultRetryer +} + +func newCustomRetryer(maxRetries int) *customRetryer { + return &customRetryer{ + DefaultRetryer: client.DefaultRetryer{ + NumMaxRetries: maxRetries, + }, + } +} + +// ShouldRetry overrides SDK's built in DefaultRetryer, adding custom retry +// logics that are not included in the SDK. +func (c *customRetryer) ShouldRetry(req *request.Request) bool { + shouldRetry := errHasCode(req.Error, "InternalError") || errHasCode(req.Error, "RequestTimeTooSkewed") || strings.Contains(req.Error.Error(), "connection reset") || strings.Contains(req.Error.Error(), "connection timed out") + if !shouldRetry { + shouldRetry = c.DefaultRetryer.ShouldRetry(req) + } + + // Errors related to tokens + if errHasCode(req.Error, "ExpiredToken") || errHasCode(req.Error, "ExpiredTokenException") || errHasCode(req.Error, "InvalidToken") { + return false + } + + if shouldRetry && req.Error != nil { + err := fmt.Errorf("retryable error: %v", req.Error) + msg := log.DebugMessage{Err: err.Error()} + log.Debug(msg) + } + + return shouldRetry +} + +var insecureHTTPClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, +} + +func supportsTransferAcceleration(endpoint urlpkg.URL) bool { + return endpoint.Hostname() == transferAccelEndpoint +} + +func isGoogleEndpoint(endpoint urlpkg.URL) bool { + return endpoint.Hostname() == gcsEndpoint +} + +// isVirtualHostStyle reports whether the given endpoint supports S3 virtual +// host style bucket name resolving. If a custom S3 API compatible endpoint is +// given, resolve the bucketname from the URL path. +func isVirtualHostStyle(endpoint urlpkg.URL) bool { + return endpoint == sentinelURL || supportsTransferAcceleration(endpoint) || isGoogleEndpoint(endpoint) +} + +func errHasCode(err error, code string) bool { + if err == nil || code == "" { + return false + } + + var awsErr awserr.Error + if errors.As(err, &awsErr) { + if awsErr.Code() == code { + return true + } + } + + var multiUploadErr s3manager.MultiUploadFailure + if errors.As(err, &multiUploadErr) { + return errHasCode(multiUploadErr.OrigErr(), code) + } + + return false + +} + +// IsCancelationError reports whether given error is a storage related +// cancelation error. +func IsCancelationError(err error) bool { + return errHasCode(err, request.CanceledErrorCode) +} \ No newline at end of file diff --git a/tiles-generation/docker/vector-tile/modified-s5cmd/storage.go b/tiles-generation/docker/vector-tile/modified-s5cmd/storage.go new file mode 100644 index 0000000..7fbb84f --- /dev/null +++ b/tiles-generation/docker/vector-tile/modified-s5cmd/storage.go @@ -0,0 +1,282 @@ +// Copyright OpenSearch Contributors +// SPDX-License-Identifier: Apache-2.0 + +// In order to support add metadata 'Content-Encoding=gzip' and 'Content-Type=application/x-protobuf' on copy command, +// modified this file from https://github.com/peak/s5cmd for docker image build. + +// Package storage implements operations for s3 and fs. +package storage + +import ( + "context" + "encoding/json" + "fmt" + "os" + "time" + + "github.com/peak/s5cmd/storage/url" + "github.com/peak/s5cmd/strutil" +) + +var ( + // ErrGivenObjectNotFound indicates a specified object is not found. + ErrGivenObjectNotFound = fmt.Errorf("given object not found") + + // ErrNoObjectFound indicates there are no objects found from a given directory. + ErrNoObjectFound = fmt.Errorf("no object found") +) + +// Storage is an interface for storage operations that is common +// to local filesystem and remote object storage. +type Storage interface { + // Stat returns the Object structure describing object. If src is not + // found, ErrGivenObjectNotFound is returned. + Stat(ctx context.Context, src *url.URL) (*Object, error) + + // List the objects and directories/prefixes in the src. + List(ctx context.Context, src *url.URL, followSymlinks bool) <-chan *Object + + // Delete deletes the given src. + Delete(ctx context.Context, src *url.URL) error + + // MultiDelete deletes all items returned from given urls in batches. + MultiDelete(ctx context.Context, urls <-chan *url.URL) <-chan *Object + + // Copy src to dst, optionally setting the given metadata. Src and dst + // arguments are of the same type. If src is a remote type, server side + // copying will be used. + Copy(ctx context.Context, src, dst *url.URL, metadata Metadata) error +} + +func NewLocalClient(opts Options) *Filesystem { + return &Filesystem{dryRun: opts.DryRun} +} + +func NewRemoteClient(ctx context.Context, url *url.URL, opts Options) (*S3, error) { + newOpts := Options{ + MaxRetries: opts.MaxRetries, + Endpoint: opts.Endpoint, + NoVerifySSL: opts.NoVerifySSL, + DryRun: opts.DryRun, + NoSignRequest: opts.NoSignRequest, + UseListObjectsV1: opts.UseListObjectsV1, + RequestPayer: opts.RequestPayer, + bucket: url.Bucket, + region: opts.region, + } + return newS3Storage(ctx, newOpts) +} + +func NewClient(ctx context.Context, url *url.URL, opts Options) (Storage, error) { + if url.IsRemote() { + return NewRemoteClient(ctx, url, opts) + } + return NewLocalClient(opts), nil +} + +// Options stores configuration for storage. +type Options struct { + MaxRetries int + Endpoint string + NoVerifySSL bool + DryRun bool + NoSignRequest bool + UseListObjectsV1 bool + RequestPayer string + bucket string + region string +} + +func (o *Options) SetRegion(region string) { + o.region = region +} + +// Object is a generic type which contains metadata for storage items. +type Object struct { + URL *url.URL `json:"key,omitempty"` + Etag string `json:"etag,omitempty"` + ModTime *time.Time `json:"last_modified,omitempty"` + Type ObjectType `json:"type,omitempty"` + Size int64 `json:"size,omitempty"` + StorageClass StorageClass `json:"storage_class,omitempty"` + Err error `json:"error,omitempty"` +} + +// String returns the string representation of Object. +func (o *Object) String() string { + return o.URL.String() +} + +// JSON returns the JSON representation of Object. +func (o *Object) JSON() string { + return strutil.JSON(o) +} + +// ObjectType is the type of Object. +type ObjectType struct { + mode os.FileMode +} + +// String returns the string representation of ObjectType. +func (o ObjectType) String() string { + switch mode := o.mode; { + case mode.IsRegular(): + return "file" + case mode.IsDir(): + return "directory" + case mode&os.ModeSymlink != 0: + return "symlink" + } + return "" +} + +// MarshalJSON returns the stringer of ObjectType as a marshalled json. +func (o ObjectType) MarshalJSON() ([]byte, error) { + return json.Marshal(o.String()) +} + +// IsDir checks if the object is a directory. +func (o ObjectType) IsDir() bool { + return o.mode.IsDir() +} + +// IsSymlink checks if the object is a symbolic link. +func (o ObjectType) IsSymlink() bool { + return o.mode&os.ModeSymlink != 0 +} + +// ShouldProcessUrl returns true if follow symlinks is enabled. +// If follow symlinks is disabled we should not process the url. +// (this check is needed only for local files) +func ShouldProcessUrl(url *url.URL, followSymlinks bool) bool { + if followSymlinks { + return true + } + + if url.IsRemote() { + return true + } + fi, err := os.Lstat(url.Absolute()) + if err != nil { + return false + } + + // do not process symlinks + return fi.Mode()&os.ModeSymlink == 0 +} + +// dateFormat is a constant time template for the bucket. +const dateFormat = "2006/01/02 15:04:05" + +// Bucket is a container for storage objects. +type Bucket struct { + CreationDate time.Time `json:"created_at"` + Name string `json:"name"` +} + +// String returns the string representation of Bucket. +func (b Bucket) String() string { + return fmt.Sprintf("%s s3://%s", b.CreationDate.Format(dateFormat), b.Name) +} + +// JSON returns the JSON representation of Bucket. +func (b Bucket) JSON() string { + return strutil.JSON(b) +} + +// StorageClass represents the storage used to store an object. +type StorageClass string + +func (s StorageClass) IsGlacier() bool { + return s == "GLACIER" +} + +// notImplemented is a structure which is used on the unsupported operations. +type notImplemented struct { + apiType string + method string +} + +// Error returns the string representation of Error for notImplemented. +func (e notImplemented) Error() string { + return fmt.Sprintf("%q is not supported on %q storage", e.method, e.apiType) +} + +type Metadata map[string]string + +// NewMetadata will return an empty metadata object. +func NewMetadata() Metadata { + return Metadata{} +} + +func (m Metadata) ACL() string { + return m["ACL"] +} + +func (m Metadata) SetACL(acl string) Metadata { + m["ACL"] = acl + return m +} + +func (m Metadata) CacheControl() string { + return m["CacheControl"] +} + +func (m Metadata) SetCacheControl(cacheControl string) Metadata { + m["CacheControl"] = cacheControl + return m +} + +func (m Metadata) Expires() string { + return m["Expires"] +} + +func (m Metadata) SetExpires(expires string) Metadata { + m["Expires"] = expires + return m +} + +func (m Metadata) StorageClass() string { + return m["StorageClass"] +} + +func (m Metadata) SetStorageClass(class string) Metadata { + m["StorageClass"] = class + return m +} + +func (m Metadata) ContentType() string { + return m["ContentType"] +} + +func (m Metadata) SetContentType(contentType string) Metadata { + m["ContentType"] = contentType + return m +} + +func (m Metadata) SSE() string { + return m["EncryptionMethod"] +} + +func (m Metadata) SetSSE(sse string) Metadata { + m["EncryptionMethod"] = sse + return m +} + +func (m Metadata) SSEKeyID() string { + return m["EncryptionKeyID"] +} + +func (m Metadata) SetSSEKeyID(kid string) Metadata { + m["EncryptionKeyID"] = kid + return m +} + +func (m Metadata) ContentEncoding() string { + return m["ContentEncoding"] +} + +func (m Metadata) SetContentEncoding(contentEncoding string) Metadata { + m["ContentEncoding"] = contentEncoding + return m +} \ No newline at end of file