Skip to content

Commit

Permalink
Revert PortableRunnerTest changes
Browse files Browse the repository at this point in the history
  • Loading branch information
damondouglas committed Aug 21, 2024
1 parent 48a01ec commit 94fa12e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ public class TestJobService extends JobServiceImplBase {
private final ApiServiceDescriptor stagingEndpoint;
private final String preparationId;
private final String jobId;
private JobState.Enum jobState;
private final JobState.Enum jobState;
private JobApi.MetricResults metrics;

public TestJobService(
ApiServiceDescriptor stagingEndpoint,
String preparationId,
String jobId,
JobState.Enum initialState,
JobState.Enum jobState,
JobApi.MetricResults metrics) {
this.stagingEndpoint = stagingEndpoint;
this.preparationId = preparationId;
this.jobId = jobId;
this.jobState = initialState;
this.jobState = jobState;
this.metrics = metrics;
}

Expand All @@ -68,10 +68,6 @@ public void prepare(
responseObserver.onCompleted();
}

public void setJobState(JobState.Enum state) {
this.jobState = state;
}

@Override
public void run(RunJobRequest request, StreamObserver<RunJobResponse> responseObserver) {
responseObserver.onNext(RunJobResponse.newBuilder().setJobId(jobId).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -105,13 +106,13 @@ public void stagesAndRunsJob() throws Exception {
assertThat(state, is(State.DONE));
}

@Ignore
@Test
public void extractsMetrics() throws Exception {
JobApi.MetricResults metricResults = generateMetricResults();
TestJobService testJobService = createJobServer(JobState.Enum.RUNNING, metricResults);
createJobServer(JobState.Enum.DONE, metricResults);
PortableRunner runner = PortableRunner.create(options, ManagedChannelFactory.createInProcess());
PipelineResult result = runner.run(p);
testJobService.setJobState(JobState.Enum.DONE);
result.waitUntilFinish();
MetricQueryResults metricQueryResults = result.metrics().allMetrics();
assertThat(
Expand Down Expand Up @@ -180,7 +181,7 @@ private JobApi.MetricResults generateMetricResults() throws Exception {
.build();
}

private TestJobService createJobServer(JobState.Enum jobState, JobApi.MetricResults metricResults)
private void createJobServer(JobState.Enum jobState, JobApi.MetricResults metricResults)
throws IOException {
ArtifactStagingService stagingService =
new ArtifactStagingService(
Expand All @@ -197,16 +198,15 @@ public ArtifactStagingService.ArtifactDestination getDestination(
public void removeStagedArtifacts(String stagingToken) {}
});
stagingService.registerJob("TestStagingToken", ImmutableMap.of());
TestJobService testJobService =
new TestJobService(ENDPOINT_DESCRIPTOR, "prepId", "jobId", jobState, metricResults);
Server server =
grpcCleanupRule.register(
InProcessServerBuilder.forName(ENDPOINT_URL)
.addService(testJobService)
.addService(
new TestJobService(
ENDPOINT_DESCRIPTOR, "prepId", "jobId", jobState, metricResults))
.addService(stagingService)
.build());
server.start();
return testJobService;
}

private static PipelineOptions createPipelineOptions() {
Expand Down

0 comments on commit 94fa12e

Please sign in to comment.