Skip to content

Commit

Permalink
Updated gcp components
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryBe authored and NikeNano committed Jun 4, 2020
1 parent 09a8689 commit df497d4
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 8 deletions.
83 changes: 83 additions & 0 deletions components/gcp/bigquery/query/bigquery_query_to_gcs_op.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Export to bucket in gcs

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Bigquery - Query
description: |
A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery
service and dump outputs to a Google Cloud Storage blob.
metadata:
labels:
add-pod-env: 'true'
inputs:
- name: query
description: 'The query used by Bigquery service to fetch the results.'
type: String
- name: project_id
description: 'The project to execute the query job.'
type: GCPProjectID
- name: dataset_id
description: 'The ID of the persistent dataset to keep the results of the query.'
default: ''
type: String
- name: table_id
description: >-
The ID of the table to keep the results of the query. If absent, the operation
will generate a random id for the table.
default: ''
type: String
- name: output_gcs_path
description: 'The path to the Cloud Storage bucket to store the query output.'
default: ''
type: GCSPath
- name: dataset_location
description: 'The location to create the dataset. Defaults to `US`.'
default: 'US'
type: String
- name: job_config
description: >-
The full config spec for the query job.See
[QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig)
for details.
default: ''
type: Dict
- name: output_kfp_path
description: 'The path to where the file should be stored.'
default: ''
type: String
outputs:
- name: output_gcs_path
description: 'The path to the Cloud Storage bucket containing the query output in CSV format.'
type: GCSPath
- name: MLPipeline UI metadata
type: UI metadata
implementation:
container:
image: gcr.io/ds-production-259110/ml-pipeline-gcp
args: [
--ui_metadata_path, {outputPath: MLPipeline UI metadata},
kfp_component.google.bigquery, query,
--query, {inputValue: query},
--project_id, {inputValue: project_id},
--dataset_id, {inputValue: dataset_id},
--table_id, {inputValue: table_id},
--dataset_location, {inputValue: dataset_location},
--output_gcs_path, {inputValue: output_gcs_path},
--job_config, {inputValue: job_config},
]
env:
KFP_POD_NAME: "{{pod.name}}"
fileOutputs:
output_gcs_path: /tmp/kfp/output/bigquery/query-output-path.txt
61 changes: 61 additions & 0 deletions components/gcp/bigquery/query/biqquery_query_op.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Export to file for next processing step in pipeline

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Bigquery - Query
description: |
A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery
service.
metadata:
labels:
add-pod-env: 'true'
inputs:
- name: query
description: 'The query used by Bigquery service to fetch the results.'
type: String
- name: project_id
description: 'The project to execute the query job.'
type: GCPProjectID
- name: output_kfp_path
description: 'The path to where the file should be stored.'
default: ''
type: String
- name: dataset_location
description: 'The location to create the dataset. Defaults to `US`.'
default: 'US'
type: String
- name: job_config
description: >-
The full config spec for the query job.See
[QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig)
for details.
default: ''
type: Dict
outputs:
- name: MLPipeline UI metadata
type: UI metadata
implementation:
container:
image: gcr.io/ds-production-259110/ml-pipeline-gcp
args: [
--ui_metadata_path, {outputPath: MLPipeline UI metadata},
kfp_component.google.bigquery, query_only,
--query, {inputValue: query},
--project_id, {inputValue: project_id},
--dataset_location, {inputValue: dataset_location},
--job_config, {inputValue: job_config},
]
env:
KFP_POD_NAME: "{{pod.name}}"
69 changes: 69 additions & 0 deletions components/gcp/bigquery/query/biqquery_query_to_table.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# export to new table.

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Bigquery - Query
description: |
A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery
service and dump outputs to new table.
metadata:
labels:
add-pod-env: 'true'
inputs:
- name: query
description: 'The query used by Bigquery service to fetch the results.'
type: String
- name: project_id
description: 'The project to execute the query job.'
type: GCPProjectID
- name: dataset_id
description: 'The ID of the persistent dataset to keep the results of the query.'
default: ''
type: String
- name: table_id
description: >-
The ID of the table to keep the results of the query. If absent, the operation
will generate a random id for the table.
default: ''
type: String
- name: dataset_location
description: 'The location to create the dataset. Defaults to `US`.'
default: 'US'
type: String
- name: job_config
description: >-
The full config spec for the query job.See
[QueryJobConfig](https://googleapis.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig)
for details.
default: ''
type: Dict
outputs:
- name: MLPipeline UI metadata
type: UI metadata
implementation:
container:
image: gcr.io/ds-production-259110/ml-pipeline-gcp
args: [
--ui_metadata_path, {outputPath: MLPipeline UI metadata},
kfp_component.google.bigquery, query,
--query, {inputValue: query},
--project_id, {inputValue: project_id},
--dataset_id, {inputValue: dataset_id},
--table_id, {inputValue: table_id},
--dataset_location, {inputValue: dataset_location},
--job_config, {inputValue: job_config},
]
env:
KFP_POD_NAME: "{{pod.name}}"
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ._query import query
from ._query import query, query_only
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,39 @@
# TODO(hongyes): make this path configurable as a environment variable
KFP_OUTPUT_PATH = '/tmp/kfp/output/'


def query_only(query, project_id, dataset_location='US', job_config=None):
"""Submit a query to Bigquery service and dump outputs to Bigquery table or
a GCS blob.
Args:
query (str): The query used by Bigquery service to fetch the results.
project_id (str): The project to execute the query job.
dataset_location (str): The dataset location.
job_config (dict): The full config spec for the query job.
Returns:
The API representation of the completed query job.
"""
client = bigquery.Client(project=project_id, location=dataset_location)
if not job_config:
job_config = bigquery.QueryJobConfig()
else:
job_config = bigquery.QueryJobConfig.from_api_repr(job_config)
def cancel():
if job_id:
client.cancel_job(job_id)
with KfpExecutionContext(on_cancel=cancel) as ctx:
job_id = 'query_' + ctx.context_id()
query_job = _get_job(client, job_id)
if not query_job:
query_job = client.query(query, job_config, job_id=job_id)
_display_job_link(project_id, job_id)
query_job.result() # Wait for query to finish
_dump_outputs(query_job, None, None)
return query_job.to_api_repr()



def query(query, project_id, dataset_id=None, table_id=None,
output_gcs_path=None, dataset_location='US', job_config=None):
"""Submit a query to Bigquery service and dump outputs to Bigquery table or
Expand Down Expand Up @@ -59,8 +92,8 @@ def cancel():
query_job = _get_job(client, job_id)
table_ref = None
if not query_job:
dataset_ref = _prepare_dataset_ref(client, dataset_id, output_gcs_path,
dataset_location)
dataset_ref = _prepare_dataset_ref(client, dataset_id, output_gcs_path
dataset_location)
if dataset_ref:
if not table_id:
table_id = job_id
Expand All @@ -79,6 +112,30 @@ def cancel():
_dump_outputs(query_job, output_gcs_path, table_ref)
return query_job.to_api_repr()

def create_table_from():
from google.cloud import bigquery

# TODO(developer): Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the destination table.
table_id = "your-project.your_dataset.your_table_name"

job_config = bigquery.QueryJobConfig(destination=table_id)

sql = """
SELECT corpus
FROM `bigquery-public-data.samples.shakespeare`
GROUP BY corpus;
"""

# Start the query, passing in the extra configuration.
query_job = client.query(sql, job_config=job_config) # Make an API request.
query_job.result() # Wait for the job to complete.

print("Query results loaded to the table {}".format(table_id))


def _get_job(client, job_id):
try:
return client.get_job(job_id)
Expand All @@ -88,7 +145,6 @@ def _get_job(client, job_id):
def _prepare_dataset_ref(client, dataset_id, output_gcs_path, dataset_location):
if not output_gcs_path and not dataset_id:
return None

if not dataset_id:
dataset_id = 'kfp_tmp_dataset'
dataset_ref = client.dataset(dataset_id)
Expand Down
24 changes: 23 additions & 1 deletion frontend/server/aws-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import fetch from 'node-fetch';
import { awsInstanceProfileCredentials } from './aws-helper';
import { awsInstanceProfileCredentials, isS3Endpoint } from './aws-helper';

// mock node-fetch module
jest.mock('node-fetch');
Expand Down Expand Up @@ -104,3 +104,25 @@ describe('awsInstanceProfileCredentials', () => {
});
});
});

describe('isS3Endpoint', () => {
it('checks a valid s3 endpoint', () => {
expect(isS3Endpoint('s3.amazonaws.com')).toBe(true);
});

it('checks a valid s3 regional endpoint', () => {
expect(isS3Endpoint('s3.dualstack.us-east-1.amazonaws.com')).toBe(true);
});

it('checks a valid s3 cn endpoint', () => {
expect(isS3Endpoint('s3.cn-north-1.amazonaws.com.cn')).toBe(true);
});

it('checks an invalid s3 endpoint', () => {
expect(isS3Endpoint('amazonaws.com')).toBe(false);
});

it('checks non-s3 endpoint', () => {
expect(isS3Endpoint('minio.kubeflow')).toBe(false);
});
});
9 changes: 9 additions & 0 deletions frontend/server/aws-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ async function getIAMInstanceProfile(): Promise<string | undefined> {
}
}

/**
* Check if the provided string is an S3 endpoint (can be any region).
*
* @param endpoint minio endpoint to check.
*/
export function isS3Endpoint(endpoint: string = ''): boolean {
return !!endpoint.match(/s3.{0,}\.amazonaws\.com\.?.{0,}/i);
}

/**
* Class to handle the session credentials for AWS ec2 instance profile.
*/
Expand Down
Loading

0 comments on commit df497d4

Please sign in to comment.