Skip to content

Commit

Permalink
Pipeline launch label support (#322)
Browse files Browse the repository at this point in the history
* pipeline launch (simple) label support

* fix: reflection config

* fix: follow UI behaviour

* fix: launch user can only use same labels as pipeline if any

* fix: typo
  • Loading branch information
JaimeSeqLabs authored Sep 22, 2023
1 parent 2893bdb commit 654deaa
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 0 deletions.
5 changes: 5 additions & 0 deletions conf/reflect-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2809,6 +2809,11 @@
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"io.seqera.tower.model.PipelineOptimizationStatus",
"allDeclaredFields":true,
"queryAllDeclaredMethods":true
},
{
"name":"io.seqera.tower.model.PipelineSecret",
"allDeclaredFields":true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ protected String apiUrl() {
return app().url;
}

protected String token() {
return app().token;
}

protected OrgAndWorkspaceDbDto findOrgAndWorkspaceByName(String organizationName, String workspaceName) throws ApiException {
ListWorkspacesAndOrgResponse workspacesAndOrgResponse = api().listWorkspacesUser(userId());

Expand Down
113 changes: 113 additions & 0 deletions src/main/java/io/seqera/tower/cli/commands/LaunchCmd.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,21 @@

package io.seqera.tower.cli.commands;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.seqera.tower.ApiException;
import io.seqera.tower.cli.commands.enums.OutputType;
import io.seqera.tower.cli.commands.global.WorkspaceOptionalOptions;
import io.seqera.tower.cli.exceptions.InvalidResponseException;
import io.seqera.tower.cli.responses.Response;
import io.seqera.tower.cli.responses.runs.RunSubmited;
import io.seqera.tower.model.ComputeEnvResponseDto;
import io.seqera.tower.model.CreateLabelRequest;
import io.seqera.tower.model.CreateLabelResponse;
import io.seqera.tower.model.LabelDbDto;
import io.seqera.tower.model.LabelType;
import io.seqera.tower.model.Launch;
import io.seqera.tower.model.ListLabelsResponse;
import io.seqera.tower.model.ListPipelinesResponse;
import io.seqera.tower.model.PipelineDbDto;
import io.seqera.tower.model.SubmitWorkflowLaunchRequest;
Expand All @@ -31,10 +38,23 @@
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

import javax.annotation.Nullable;
import javax.ws.rs.core.UriBuilder;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static io.seqera.tower.cli.utils.FilesHelper.readString;
import static io.seqera.tower.cli.utils.ModelHelper.coalesce;
Expand Down Expand Up @@ -75,6 +95,9 @@ public class LaunchCmd extends AbstractRootCmd {
@Option(names = {"--wait"}, description = "Wait until given status or fail. Valid options: ${COMPLETION-CANDIDATES}.")
public WorkflowStatus wait;

@Option(names = {"-l", "--labels"}, split = ",", description = "Comma-separated list of labels for the pipeline.")
List<String> labels;

@ArgGroup(heading = "%nAdvanced options:%n", validate = false)
AdvancedOptions adv;

Expand All @@ -98,9 +121,12 @@ protected Response exec() throws ApiException, IOException {
protected Response runNextflowPipeline(Long wspId) throws ApiException, IOException {
// Retrieve the provided computeEnv or use the primary if not provided
ComputeEnvResponseDto ce = computeEnv != null ? computeEnvByRef(wspId, computeEnv) : primaryComputeEnv(wspId);
// Retrieve the IDs for the labels specified by the user if any
List<Long> labels = obtainLabelIDs(wspId);

return submitWorkflow(updateLaunchRequest(new WorkflowLaunchRequest()
.pipeline(pipeline)
.labelIds(labels.isEmpty() ? null : labels)
.computeEnvId(ce.getId())
.workDir(ce.getConfig().getWorkDir())
.preRunScript(ce.getConfig().getPreRunScript())
Expand All @@ -112,6 +138,7 @@ private WorkflowLaunchRequest updateLaunchRequest(WorkflowLaunchRequest base) th
return new WorkflowLaunchRequest()
.id(base.getId())
.pipeline(base.getPipeline())
.labelIds(base.getLabelIds())
.computeEnvId(base.getComputeEnvId())
.runName(coalesce(name, base.getRunName()))
.workDir(coalesce(workDir, base.getWorkDir()))
Expand All @@ -131,6 +158,7 @@ private WorkflowLaunchRequest updateLaunchRequest(WorkflowLaunchRequest base) th
}

protected Response runTowerPipeline(Long wspId) throws ApiException, IOException {

ListPipelinesResponse pipelines = api().listPipelines(Collections.emptyList(), wspId, 50, 0, pipeline, "all");
if (pipelines.getTotalSize() == 0) {
throw new InvalidResponseException(String.format("Pipeline '%s' not found on this workspace.", pipeline));
Expand Down Expand Up @@ -168,6 +196,9 @@ protected Response runTowerPipeline(Long wspId) throws ApiException, IOException
launchRequest.workDir(ce.getConfig().getWorkDir());
}

List<Long> labels = obtainLabelIDs(wspId);
launchRequest.labelIds(labels.isEmpty() ? null : labels);

return submitWorkflow(updateLaunchRequest(launchRequest), wspId, sourceWorkspaceId);
}

Expand Down Expand Up @@ -209,6 +240,88 @@ private WorkflowStatus checkWorkflowStatus(String workflowId, Long workspaceId)
}
}

private List<Long> obtainLabelIDs(@Nullable Long workspaceId) throws ApiException {

if (labels == null || labels.isEmpty()) {
return Collections.emptyList();
}

// retrieve labels for the workspace and check if we need to create new ones
List<LabelDbDto> wspLabels = new ArrayList<>();

ListLabelsResponse res = api().listLabels(workspaceId, null, null, null, LabelType.SIMPLE, null);
if (res.getLabels() != null) {
wspLabels.addAll(res.getLabels());
}

Map<String, Long> nameToID = wspLabels
.stream()
.collect(Collectors.toMap(LabelDbDto::getName, LabelDbDto::getId));

// get label names not registered in workspace (names are unique per wspID)
List<String> newLabels = labels
.stream()
.filter(labelName -> !nameToID.containsKey(labelName))
.collect(Collectors.toList());

if (!newLabels.isEmpty() && !labelPermission(workspaceId)) {
throw new ApiException("User does not have permission to modify pipeline labels");
}

// create the new ones via POST /labels
for (String labelName: newLabels) {
CreateLabelResponse created = api().createLabel(
new CreateLabelRequest()
.name(labelName)
.resource(false)
.isDefault(false),
workspaceId
);
nameToID.put(created.getName(), created.getId());
}

// map requested label names to label IDs
return labels
.stream()
.map(nameToID::get)
.collect(Collectors.toList());
}

private boolean labelPermission(@Nullable Long wspId) throws ApiException {

// personal workspace
if (wspId == null) return true;

var client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();

var uri = UriBuilder
.fromUri(URI.create(apiUrl() + "/permissions"))
.queryParam("workspaceId", wspId.toString())
.build();

var req = HttpRequest.newBuilder()
.GET()
.uri(uri)
.header("Authorization", String.format("Bearer %s", token()))
.build();

try {
HttpResponse<String> response = client.send(req, HttpResponse.BodyHandlers.ofString()); // sync

JsonNode json = new ObjectMapper().readTree(response.body());

var roleSet = new HashSet<String>();

json.get("workspace").get("roles").forEach(role -> roleSet.add(role.textValue()));

return roleSet.contains("owner") || roleSet.contains("admin") || roleSet.contains("maintain");

} catch (Throwable exception) {
throw new ApiException("Unable to reach API");
}
}

private AdvancedOptions adv() {
if (adv == null) {
Expand Down
88 changes: 88 additions & 0 deletions src/test/java/io/seqera/tower/cli/LaunchCmdTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import static org.mockserver.matchers.Times.exactly;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
import static org.mockserver.model.JsonBody.json;


class LaunchCmdTest extends BaseCmdTest {

Expand Down Expand Up @@ -217,6 +219,92 @@ void testSubmitLaunchpadPipelineWithCustomName(OutputType format, MockServerClie
assertOutput(format, out, new RunSubmited("35aLiS0bIM5efd", null, baseUserUrl(mock, "jordi"), USER_WORKSPACE_NAME));
}

@ParameterizedTest
@EnumSource(OutputType.class)
void testSubmitLaunchpadPipelineWithLabels(OutputType format, MockServerClient mock) {

// labels endpoint mock
mock.when(
request()
.withMethod("GET")
.withPath("/labels")
.withQueryStringParameter("type", "simple"),
exactly(1)
).respond(
response()
.withStatusCode(200)
.withBody(loadResource("labels_user"))
.withContentType(MediaType.APPLICATION_JSON)
);
mock.when(
request()
.withMethod("POST")
.withPath("/labels")
.withBody(json(" {\n" +
" \"name\": \"LabelThree\",\n" +
" \"resource\": false,\n" +
" \"isDefault\": false\n" +
" }\n")),
exactly(1)
).respond(
response()
.withStatusCode(200)
.withBody(json("{\n" +
" \"id\": 3,\n" +
" \"name\": \"LabelThree\",\n" +
" \"resource\": false,\n" +
" \"isDefault\": false\n" +
"}\n"))
.withContentType(MediaType.APPLICATION_JSON)
);

// pipelines endpoint mock
mock.when(
request().withMethod("GET").withPath("/pipelines"), exactly(1)
).respond(
response().withStatusCode(200).withBody(loadResource("pipelines_sarek")).withContentType(MediaType.APPLICATION_JSON)
);

mock.when(
request().withMethod("GET").withPath("/pipelines/250911634275687/launch"), exactly(1)
).respond(
response().withStatusCode(200).withBody(loadResource("pipeline_launch_describe")).withContentType(MediaType.APPLICATION_JSON)
);

// launch endpoint mock
mock.when(
request()
.withMethod("POST")
.withPath("/workflow/launch")
.withBody(json(" {\n" +
" \"launch\":{\n" +
" \"id\":\"5nmCvXcarkvv8tELMF4KyY\",\n" +
" \"computeEnvId\":\"4X7YrYJp9B1d1DUpfur7DS\",\n" +
" \"pipeline\":\"https://github.com/nf-core/sarek\",\n" +
" \"workDir\":\"/efs\",\n" +
" \"pullLatest\":false,\n" +
" \"stubRun\":false,\n" +
" \"labelIds\": [2, 3]\n" +
" }\n" +
" }\n")),
exactly(1)
).respond(
response().withStatusCode(200).withBody(loadResource("workflow_launch")).withContentType(MediaType.APPLICATION_JSON)
);

mock.when(
request().withMethod("GET").withPath("/user-info"), exactly(1)
).respond(
response().withStatusCode(200).withBody(loadResource("user")).withContentType(MediaType.APPLICATION_JSON)
);

// Run the command
ExecOut out = exec(format, mock, "launch", "sarek", "-l", "LabelTwo,LabelThree");

// Assert results
assertOutput(format, out, new RunSubmited("35aLiS0bIM5efd", null, baseUserUrl(mock, "jordi"), USER_WORKSPACE_NAME));
}

@Test
void testSubmitToAWorkspace(MockServerClient mock) {

Expand Down
20 changes: 20 additions & 0 deletions src/test/resources/runcmd/labels_user.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"labels": [

{
"id": 1,
"name": "LabelOne",
"value": null,
"resource": false,
"isDefault": false
},
{
"id": 2,
"name": "LabelTwo",
"value": null,
"resource": false,
"isDefault": false
}
],
"totalsize": 2
}

0 comments on commit 654deaa

Please sign in to comment.