diff --git a/components/gcp/bigquery/query/bigquery_query_to_gcs_op.yaml b/components/gcp/bigquery/query/bigquery_query_to_gcs_op.yaml new file mode 100644 index 00000000000..459fd25d78a --- /dev/null +++ b/components/gcp/bigquery/query/bigquery_query_to_gcs_op.yaml @@ -0,0 +1,83 @@ +# Export to bucket in gcs + +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Bigquery - Query +description: | + A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery + service and dump outputs to a Google Cloud Storage blob. +metadata: + labels: + add-pod-env: 'true' +inputs: + - name: query + description: 'The query used by Bigquery service to fetch the results.' + type: String + - name: project_id + description: 'The project to execute the query job.' + type: GCPProjectID + - name: dataset_id + description: 'The ID of the persistent dataset to keep the results of the query.' + default: '' + type: String + - name: table_id + description: >- + The ID of the table to keep the results of the query. If absent, the operation + will generate a random id for the table. + default: '' + type: String + - name: output_gcs_path + description: 'The path to the Cloud Storage bucket to store the query output.' + default: '' + type: GCSPath + - name: dataset_location + description: 'The location to create the dataset. Defaults to `US`.' + default: 'US' + type: String + - name: job_config + description: >- + The full config spec for the query job.See + [QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig) + for details. + default: '' + type: Dict + - name: output_kfp_path + description: 'The path to where the file should be stored.' + default: '' + type: String +outputs: + - name: output_gcs_path + description: 'The path to the Cloud Storage bucket containing the query output in CSV format.' + type: GCSPath + - name: MLPipeline UI metadata + type: UI metadata +implementation: + container: + image: gcr.io/ds-production-259110/ml-pipeline-gcp + args: [ + --ui_metadata_path, {outputPath: MLPipeline UI metadata}, + kfp_component.google.bigquery, query, + --query, {inputValue: query}, + --project_id, {inputValue: project_id}, + --dataset_id, {inputValue: dataset_id}, + --table_id, {inputValue: table_id}, + --dataset_location, {inputValue: dataset_location}, + --output_gcs_path, {inputValue: output_gcs_path}, + --job_config, {inputValue: job_config}, + ] + env: + KFP_POD_NAME: "{{pod.name}}" + fileOutputs: + output_gcs_path: /tmp/kfp/output/bigquery/query-output-path.txt diff --git a/components/gcp/bigquery/query/biqquery_query_op.yaml b/components/gcp/bigquery/query/biqquery_query_op.yaml new file mode 100644 index 00000000000..a2bb2d2fccf --- /dev/null +++ b/components/gcp/bigquery/query/biqquery_query_op.yaml @@ -0,0 +1,61 @@ +# Export to file for next processing step in pipeline + +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Bigquery - Query +description: | + A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery + service. +metadata: + labels: + add-pod-env: 'true' +inputs: + - name: query + description: 'The query used by Bigquery service to fetch the results.' + type: String + - name: project_id + description: 'The project to execute the query job.' + type: GCPProjectID + - name: output_kfp_path + description: 'The path to where the file should be stored.' + default: '' + type: String + - name: dataset_location + description: 'The location to create the dataset. Defaults to `US`.' + default: 'US' + type: String + - name: job_config + description: >- + The full config spec for the query job.See + [QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig) + for details. + default: '' + type: Dict +outputs: + - name: MLPipeline UI metadata + type: UI metadata +implementation: + container: + image: gcr.io/ds-production-259110/ml-pipeline-gcp + args: [ + --ui_metadata_path, {outputPath: MLPipeline UI metadata}, + kfp_component.google.bigquery, query_only, + --query, {inputValue: query}, + --project_id, {inputValue: project_id}, + --dataset_location, {inputValue: dataset_location}, + --job_config, {inputValue: job_config}, + ] + env: + KFP_POD_NAME: "{{pod.name}}" diff --git a/components/gcp/bigquery/query/biqquery_query_to_table.yaml b/components/gcp/bigquery/query/biqquery_query_to_table.yaml new file mode 100644 index 00000000000..a59ced5c8f8 --- /dev/null +++ b/components/gcp/bigquery/query/biqquery_query_to_table.yaml @@ -0,0 +1,69 @@ +# export to new table. + +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Bigquery - Query +description: | + A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery + service and dump outputs to new table. +metadata: + labels: + add-pod-env: 'true' +inputs: + - name: query + description: 'The query used by Bigquery service to fetch the results.' + type: String + - name: project_id + description: 'The project to execute the query job.' + type: GCPProjectID + - name: dataset_id + description: 'The ID of the persistent dataset to keep the results of the query.' + default: '' + type: String + - name: table_id + description: >- + The ID of the table to keep the results of the query. If absent, the operation + will generate a random id for the table. + default: '' + type: String + - name: dataset_location + description: 'The location to create the dataset. Defaults to `US`.' + default: 'US' + type: String + - name: job_config + description: >- + The full config spec for the query job.See + [QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig) + for details. + default: '' + type: Dict +outputs: + - name: MLPipeline UI metadata + type: UI metadata +implementation: + container: + image: gcr.io/ds-production-259110/ml-pipeline-gcp + args: [ + --ui_metadata_path, {outputPath: MLPipeline UI metadata}, + kfp_component.google.bigquery, query, + --query, {inputValue: query}, + --project_id, {inputValue: project_id}, + --dataset_id, {inputValue: dataset_id}, + --table_id, {inputValue: table_id}, + --dataset_location, {inputValue: dataset_location}, + --job_config, {inputValue: job_config}, + ] + env: + KFP_POD_NAME: "{{pod.name}}" diff --git a/components/gcp/container/component_sdk/python/kfp_component/google/bigquery/__init__.py b/components/gcp/container/component_sdk/python/kfp_component/google/bigquery/__init__.py index 174efe9afc9..0bc32a9cc6f 100644 --- a/components/gcp/container/component_sdk/python/kfp_component/google/bigquery/__init__.py +++ b/components/gcp/container/component_sdk/python/kfp_component/google/bigquery/__init__.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._query import query \ No newline at end of file +from ._query import query, query_only \ No newline at end of file diff --git a/components/gcp/container/component_sdk/python/kfp_component/google/bigquery/_query.py b/components/gcp/container/component_sdk/python/kfp_component/google/bigquery/_query.py index c94c81a8ea2..9292580305e 100644 --- a/components/gcp/container/component_sdk/python/kfp_component/google/bigquery/_query.py +++ b/components/gcp/container/component_sdk/python/kfp_component/google/bigquery/_query.py @@ -24,6 +24,39 @@ # TODO(hongyes): make this path configurable as a environment variable KFP_OUTPUT_PATH = '/tmp/kfp/output/' + +def query_only(query, project_id, dataset_location='US', job_config=None): + """Submit a query to Bigquery service and dump outputs to Bigquery table or + a GCS blob. + + Args: + query (str): The query used by Bigquery service to fetch the results. + project_id (str): The project to execute the query job. + dataset_location (str): The dataset location. + job_config (dict): The full config spec for the query job. + Returns: + The API representation of the completed query job. + """ + client = bigquery.Client(project=project_id, location=dataset_location) + if not job_config: + job_config = bigquery.QueryJobConfig() + else: + job_config = bigquery.QueryJobConfig.from_api_repr(job_config) + def cancel(): + if job_id: + client.cancel_job(job_id) + with KfpExecutionContext(on_cancel=cancel) as ctx: + job_id = 'query_' + ctx.context_id() + query_job = _get_job(client, job_id) + if not query_job: + query_job = client.query(query, job_config, job_id=job_id) + _display_job_link(project_id, job_id) + query_job.result() # Wait for query to finish + _dump_outputs(query_job, None, None) + return query_job.to_api_repr() + + + def query(query, project_id, dataset_id=None, table_id=None, output_gcs_path=None, dataset_location='US', job_config=None): """Submit a query to Bigquery service and dump outputs to Bigquery table or @@ -59,8 +92,8 @@ def cancel(): query_job = _get_job(client, job_id) table_ref = None if not query_job: - dataset_ref = _prepare_dataset_ref(client, dataset_id, output_gcs_path, - dataset_location) + dataset_ref = _prepare_dataset_ref(client, dataset_id, output_gcs_path + dataset_location) if dataset_ref: if not table_id: table_id = job_id @@ -79,6 +112,30 @@ def cancel(): _dump_outputs(query_job, output_gcs_path, table_ref) return query_job.to_api_repr() +def create_table_from(): + from google.cloud import bigquery + + # TODO(developer): Construct a BigQuery client object. + client = bigquery.Client() + + # TODO(developer): Set table_id to the ID of the destination table. + table_id = "your-project.your_dataset.your_table_name" + + job_config = bigquery.QueryJobConfig(destination=table_id) + + sql = """ + SELECT corpus + FROM `bigquery-public-data.samples.shakespeare` + GROUP BY corpus; + """ + + # Start the query, passing in the extra configuration. + query_job = client.query(sql, job_config=job_config) # Make an API request. + query_job.result() # Wait for the job to complete. + + print("Query results loaded to the table {}".format(table_id)) + + def _get_job(client, job_id): try: return client.get_job(job_id) @@ -88,7 +145,6 @@ def _get_job(client, job_id): def _prepare_dataset_ref(client, dataset_id, output_gcs_path, dataset_location): if not output_gcs_path and not dataset_id: return None - if not dataset_id: dataset_id = 'kfp_tmp_dataset' dataset_ref = client.dataset(dataset_id) diff --git a/frontend/server/aws-helper.test.ts b/frontend/server/aws-helper.test.ts index e5580ff5150..b1bf739be2f 100644 --- a/frontend/server/aws-helper.test.ts +++ b/frontend/server/aws-helper.test.ts @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. import fetch from 'node-fetch'; -import { awsInstanceProfileCredentials } from './aws-helper'; +import { awsInstanceProfileCredentials, isS3Endpoint } from './aws-helper'; // mock node-fetch module jest.mock('node-fetch'); @@ -104,3 +104,25 @@ describe('awsInstanceProfileCredentials', () => { }); }); }); + +describe('isS3Endpoint', () => { + it('checks a valid s3 endpoint', () => { + expect(isS3Endpoint('s3.amazonaws.com')).toBe(true); + }); + + it('checks a valid s3 regional endpoint', () => { + expect(isS3Endpoint('s3.dualstack.us-east-1.amazonaws.com')).toBe(true); + }); + + it('checks a valid s3 cn endpoint', () => { + expect(isS3Endpoint('s3.cn-north-1.amazonaws.com.cn')).toBe(true); + }); + + it('checks an invalid s3 endpoint', () => { + expect(isS3Endpoint('amazonaws.com')).toBe(false); + }); + + it('checks non-s3 endpoint', () => { + expect(isS3Endpoint('minio.kubeflow')).toBe(false); + }); +}); diff --git a/frontend/server/aws-helper.ts b/frontend/server/aws-helper.ts index 693445eccb2..029d3559a86 100644 --- a/frontend/server/aws-helper.ts +++ b/frontend/server/aws-helper.ts @@ -46,6 +46,15 @@ async function getIAMInstanceProfile(): Promise { } } +/** + * Check if the provided string is an S3 endpoint (can be any region). + * + * @param endpoint minio endpoint to check. + */ +export function isS3Endpoint(endpoint: string = ''): boolean { + return !!endpoint.match(/s3.{0,}\.amazonaws\.com\.?.{0,}/i); +} + /** * Class to handle the session credentials for AWS ec2 instance profile. */ diff --git a/frontend/src/lib/AwsHelper.test.ts b/frontend/src/lib/AwsHelper.test.ts new file mode 100644 index 00000000000..6093c99b090 --- /dev/null +++ b/frontend/src/lib/AwsHelper.test.ts @@ -0,0 +1,38 @@ +/* + * Copyright 2018-2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { isS3Endpoint } from './AwsHelper'; + +describe('isS3Endpoint', () => { + it('checks a valid s3 endpoint', () => { + expect(isS3Endpoint('s3.amazonaws.com')).toBe(true); + }); + + it('checks a valid s3 regional endpoint', () => { + expect(isS3Endpoint('s3.dualstack.us-east-1.amazonaws.com')).toBe(true); + }); + + it('checks a valid s3 cn endpoint', () => { + expect(isS3Endpoint('s3.cn-north-1.amazonaws.com.cn')).toBe(true); + }); + + it('checks an invalid s3 endpoint', () => { + expect(isS3Endpoint('amazonaws.com')).toBe(false); + }); + + it('checks non-s3 endpoint', () => { + expect(isS3Endpoint('minio.kubeflow')).toBe(false); + }); +}); diff --git a/frontend/src/lib/AwsHelper.ts b/frontend/src/lib/AwsHelper.ts new file mode 100644 index 00000000000..3d14744e246 --- /dev/null +++ b/frontend/src/lib/AwsHelper.ts @@ -0,0 +1,24 @@ +/* + * Copyright 2018-2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Check if the provided string is an S3 endpoint (can be any region). + * + * @param endpoint minio endpoint to check. + */ +export function isS3Endpoint(endpoint: string = ''): boolean { + return !!endpoint.match(/s3.{0,}\.amazonaws\.com\.?.{0,}/i); +} diff --git a/frontend/src/lib/WorkflowParser.test.ts b/frontend/src/lib/WorkflowParser.test.ts index 7dc0cb8ac15..95ff3c58890 100644 --- a/frontend/src/lib/WorkflowParser.test.ts +++ b/frontend/src/lib/WorkflowParser.test.ts @@ -887,7 +887,7 @@ describe('WorkflowParser', () => { ]); }); - it('returns the right bucket and key for a correct metadata artifact', () => { + it('returns the right bucket, key and source eq `minio` for a correct metadata artifact', () => { expect( WorkflowParser.loadNodeOutputPaths({ outputs: { @@ -910,6 +910,31 @@ describe('WorkflowParser', () => { }, ]); }); + + it('returns the right bucket, key and source eq `s3` for a correct metadata artifact', () => { + expect( + WorkflowParser.loadNodeOutputPaths({ + outputs: { + artifacts: [ + { + name: 'mlpipeline-ui-metadata', + s3: { + endpoint: 's3.amazonaws.com', + bucket: 'test bucket', + key: 'test key', + }, + }, + ], + }, + } as any), + ).toEqual([ + { + bucket: 'test bucket', + key: 'test key', + source: 's3', + }, + ]); + }); }); describe('loadAllOutputPaths', () => { diff --git a/frontend/src/lib/WorkflowParser.ts b/frontend/src/lib/WorkflowParser.ts index 0fd1b150775..9a6f46fb251 100644 --- a/frontend/src/lib/WorkflowParser.ts +++ b/frontend/src/lib/WorkflowParser.ts @@ -29,6 +29,7 @@ import { Constants } from './Constants'; import { KeyValue } from './StaticGraphParser'; import { hasFinished, NodePhase, statusToBgColor, parseNodePhase } from './StatusUtils'; import { parseTaskDisplayName } from './ParserUtils'; +import { isS3Endpoint } from './AwsHelper'; export enum StorageService { GCS = 'gcs', @@ -292,11 +293,10 @@ export default class WorkflowParser { outputPaths.push({ bucket: a.s3!.bucket, key: a.s3!.key, - source: StorageService.MINIO, + source: isS3Endpoint(a.s3!.endpoint) ? StorageService.S3 : StorageService.MINIO, }), ); } - return outputPaths; }