Skip to content

Commit

Permalink
feat: Implement custom trigger for webhooks (#9879)
Browse files Browse the repository at this point in the history
  • Loading branch information
gt2345 authored Sep 9, 2024
1 parent b6eb05e commit bfeb418
Show file tree
Hide file tree
Showing 18 changed files with 2,282 additions and 1,239 deletions.
84 changes: 84 additions & 0 deletions e2e_tests/tests/cluster/test_webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,90 @@ def test_log_pattern_send_webhook(should_match: bool) -> None:
test_agent_user_group._delete_workspace_and_check(sess, workspace)


@pytest.mark.e2e_cpu
@pytest.mark.parametrize("isSlack", [True, False])
def test_custom_webhook(isSlack: bool) -> None:
port = 5009 if isSlack else 5010
server = utils.WebhookServer(port, allow_dupes=True)
sess = api_utils.admin_session()
workspace = bindings.post_PostWorkspace(
sess, body=bindings.v1PostWorkspaceRequest(name=f"webhook-test{random.random()}")
).workspace
project = bindings.post_PostProject(
sess,
body=bindings.v1PostProjectRequest(
name=f"webhook-test{random.random()}",
workspaceId=workspace.id,
),
workspaceId=workspace.id,
).project

webhook = bindings.v1Webhook(
url=f"http://localhost:{port}",
webhookType=bindings.v1WebhookType.SLACK if isSlack else bindings.v1WebhookType.DEFAULT,
triggers=[
bindings.v1Trigger(
triggerType=bindings.v1TriggerType.CUSTOM,
)
],
mode=bindings.v1WebhookMode.WORKSPACE,
name=f"webhook_1{random.random()}",
workspaceId=workspace.id,
)
# custom triggers only work on webhook with mode specific
with pytest.raises(errors.APIException):
bindings.post_PostWebhook(sess, body=webhook)
webhook.mode = bindings.v1WebhookMode.SPECIFIC
w = bindings.post_PostWebhook(sess, body=webhook).webhook

experiment_id = exp.create_experiment(
sess,
conf.fixtures_path("core_api/11_generic_metrics.yaml"),
conf.fixtures_path("core_api"),
[
"--project_id",
f"{project.id}",
"--config",
f"integrations.webhooks.webhook_name=['{webhook.name}']",
],
)

# this experiment should not trigger webhook because the name does not match.
control_exp_id = exp.create_experiment(
sess,
conf.fixtures_path("core_api/11_generic_metrics.yaml"),
conf.fixtures_path("core_api"),
[
"--project_id",
f"{project.id}",
"--config",
"integrations.webhooks.webhook_name=['abc']",
],
)

exp.wait_for_experiment_state(
sess,
experiment_id,
bindings.experimentv1State.COMPLETED,
max_wait_secs=conf.DEFAULT_MAX_WAIT_SECS,
)
exp.wait_for_experiment_state(
sess,
control_exp_id,
bindings.experimentv1State.COMPLETED,
max_wait_secs=conf.DEFAULT_MAX_WAIT_SECS,
)

responses = server.close_and_return_responses()
assert len(responses) == 1
assert "end of main" in responses["/"]
assert "DEBUG" in responses["/"]
assert str(experiment_id) in responses["/"]

bindings.delete_DeleteWebhook(sess, id=w.id or 0)
test_agent_user_group._delete_workspace_and_check(sess, workspace)


@pytest.mark.e2e_cpu
def test_specific_webhook() -> None:
port1 = 5007
Expand Down
1 change: 1 addition & 0 deletions e2e_tests/tests/fixtures/core_api/11_generic_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def main(core_context: det.core.Context, increment_by: float):
core_context.train.report_metrics(
group=group, steps_completed=steps_completed, metrics=metrics
)
core_context.alert(title="end of main", level="debug")


if __name__ == "__main__":
Expand Down
101 changes: 101 additions & 0 deletions harness/determined/common/api/bindings.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 33 additions & 1 deletion harness/determined/core/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import threading
import traceback
import types
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Literal, Optional, Union

import appdirs

Expand Down Expand Up @@ -83,6 +83,37 @@ def __enter__(self) -> "Context":
self.start()
return self

def alert(
self,
title: Optional[str] = None,
description: Optional[str] = None,
level: Optional[Literal["warn", "info", "debug", "error"]] = "info",
) -> None:
if not isinstance(self._session, api.BaseSession):
raise ValueError("init() needs to be called before sending alert.")
if self.info is None:
raise ValueError("Workload alerting only works on determined-managed experiment.")
if self.info.trial is None:
raise ValueError("alert() only works for trial type of task.")
log_level: bindings.v1LogLevel = bindings.v1LogLevel.INFO
if level == "warn":
log_level = bindings.v1LogLevel.WARNING
elif level == "debug":
log_level = bindings.v1LogLevel.DEBUG
elif level == "error":
log_level = bindings.v1LogLevel.ERROR

bindings.post_PostWebhookEventData(
session=self._session,
body=bindings.v1PostWebhookEventDataRequest(
data=bindings.v1CustomWebhookEventData(
title=title or "", description=description or "", level=log_level
),
experimentId=self.info.trial.experiment_id,
trialId=self.info.trial.trial_id,
),
)

def close(
self,
exc_type: Optional[type] = None,
Expand Down Expand Up @@ -359,6 +390,7 @@ def init(
_metrics=metrics,
_tensorboard_manager=tensorboard_manager,
_session=session,
info=info,
)


Expand Down
33 changes: 33 additions & 0 deletions master/internal/webhooks/api_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ func (a *WebhooksAPIServer) PostWebhook(
regexConditionKey, m)
}
}
if t.TriggerType == webhookv1.TriggerType_TRIGGER_TYPE_CUSTOM {
if req.Webhook.Mode != webhookv1.WebhookMode_WEBHOOK_MODE_SPECIFIC {
return nil, status.Errorf(codes.InvalidArgument,
"custom trigger only works on webhook with mode 'SPECIFIC'. Got %v",
req.Webhook.Mode)
}
}
}

w := WebhookFromProto(req.Webhook)
Expand Down Expand Up @@ -223,3 +230,29 @@ func (a *WebhooksAPIServer) TestWebhook(
}
return &apiv1.TestWebhookResponse{}, nil
}

// PostWebhookEventData handles data for custom trigger.
func (a *WebhooksAPIServer) PostWebhookEventData(
ctx context.Context, req *apiv1.PostWebhookEventDataRequest,
) (*apiv1.PostWebhookEventDataResponse, error) {
var res apiv1.PostWebhookEventDataResponse
_, _, err := grpcutil.GetUser(ctx)
if err != nil {
return &res, status.Errorf(codes.Internal, "failed to get the user: %s", err)
}

var data CustomTriggerData
if req.Data != nil {
data.Title = req.Data.Title
data.Description = req.Data.Description
data.Level = model.TaskLogLevelFromProto(req.Data.Level)
}
err = handleCustomTriggerData(ctx, data, int(req.ExperimentId), ptrs.Ptr(int(req.TrialId)))
if err != nil {
return &res, status.Errorf(codes.Internal,
"failed to handle custom trigger data: %+v experiment id: %d trial_id %d : %s",
data, req.ExperimentId, req.TrialId, err)
}

return &res, nil
}
Loading

0 comments on commit bfeb418

Please sign in to comment.