Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add cli functionality to dataproc quickstart #2047

Merged
merged 7 commits into from
Feb 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion dataproc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<configuration>
<mainClass>Quickstart</mainClass>
</configuration>
</plugin>
</plugins>
</build>

<dependencyManagement>
Expand Down Expand Up @@ -65,5 +75,4 @@
<scope>test</scope>
</dependency>
</dependencies>

</project>
51 changes: 36 additions & 15 deletions dataproc/src/main/java/Quickstart.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@
*/

// [START dataproc_quickstart]
/* This quickstart sample walks a user through creating a Cloud Dataproc
* cluster, submitting a PySpark job from Google Cloud Storage to the
* cluster, reading the output of the job and deleting the cluster, all
* using the Java client library.
*
* Usage:
* mvn clean package -DskipTests
*
* mvn exec:java -Dexec.args="<PROJECT_ID> <REGION> <CLUSTER_NAME> <GCS_JOB_FILE_PATH>"
*
* You can also set these arguments in the main function instead of providing them via the CLI.
*/

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.dataproc.v1.Cluster;
import com.google.cloud.dataproc.v1.ClusterConfig;
Expand Down Expand Up @@ -60,15 +73,6 @@ public static Job waitForJobCompletion(
}
}

public static void quickstart() throws IOException, InterruptedException {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String region = "your-project-region";
String clusterName = "your-cluster-name";
String jobFilePath = "your-job-file-path";
quickstart(projectId, region, clusterName, jobFilePath);
}

public static void quickstart(
String projectId, String region, String clusterName, String jobFilePath)
throws IOException, InterruptedException {
Expand All @@ -82,10 +86,10 @@ public static void quickstart(
JobControllerSettings jobControllerSettings =
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();

// Create both a cluster controller client and job controller client with the configured
// settings. The client only needs to be created once and can be reused for multiple requests.
// Using a try-with-resources closes the client, but this can also be done manually with
// the .close() method.
// Create both a cluster controller client and job controller client with the
// configured settings. The client only needs to be created once and can be reused for
// multiple requests. Using a try-with-resources closes the client, but this can also be done
// manually with the .close() method.
try (ClusterControllerClient clusterControllerClient =
ClusterControllerClient.create(clusterControllerSettings);
JobControllerClient jobControllerClient =
Expand Down Expand Up @@ -114,7 +118,8 @@ public static void quickstart(
OperationFuture<Cluster, ClusterOperationMetadata> createClusterAsyncRequest =
clusterControllerClient.createClusterAsync(projectId, region, cluster);
Cluster response = createClusterAsyncRequest.get();
System.out.printf("Cluster created successfully: %s", response.getClusterName());
System.out.println(
String.format("Cluster created successfully: %s", response.getClusterName()));

// Configure the settings for our job.
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
Expand All @@ -133,7 +138,7 @@ public static void quickstart(
int timeout = 10;
try {
Job jobInfo = finishedJobFuture.get(timeout, TimeUnit.MINUTES);
System.out.printf("Job %s finished successfully.", jobId);
System.out.println(String.format("Job %s finished successfully.", jobId));

// Cloud Dataproc job output gets saved to a GCS bucket allocated to it.
Cluster clusterInfo = clusterControllerClient.getCluster(projectId, region, clusterName);
Expand Down Expand Up @@ -163,5 +168,21 @@ public static void quickstart(
System.err.println(String.format("Error executing quickstart: %s ", e.getMessage()));
}
}

public static void main(String... args) throws IOException, InterruptedException {
if (args.length != 4) {
System.err.println(
"Insufficient number of parameters provided. Please make sure a "
+ "PROJECT_ID, REGION, CLUSTER_NAME and JOB_FILE_PATH are provided, in this order.");
return;
}

String projectId = args[0]; // project-id of project to create the cluster in
String region = args[1]; // region to create the cluster
String clusterName = args[2]; // name of the cluster
String jobFilePath = args[3]; // location in GCS of the PySpark job

quickstart(projectId, region, clusterName, jobFilePath);
}
}
// [END dataproc_quickstart]
2 changes: 1 addition & 1 deletion dataproc/src/test/java/QuickstartTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void setUp() {

@Test
public void quickstartTest() throws IOException, InterruptedException {
Quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH);
Quickstart.main(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH);
String output = bout.toString();

assertThat(output, CoreMatchers.containsString("Cluster created successfully"));
Expand Down