Skip to content

Commit

Permalink
camunda: release the go-libs camunda, a Camunda API wrapper (#70)
Browse files Browse the repository at this point in the history
This release adds a library for interacting with Camunda via the Camunda API. It supports starting process instances, sending messages and updating variables, subscribing to external task queues and basic authentication.

Co-authored-by: Stephan Detje <[email protected]>
Co-authored-by: Anderson Queiroz <[email protected]>
  • Loading branch information
3 people authored Nov 9, 2021
1 parent 5e175de commit c7d7b3a
Show file tree
Hide file tree
Showing 14 changed files with 978 additions and 0 deletions.
9 changes: 9 additions & 0 deletions .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ pipeline:
- cd logger
- make gitconfig test

test_camunda:
image: golang:latest
volumes:
- /var/run/docker.sock:/var/run/docker.sock
commands:
- export $(cat .ci-secrets | xargs)
- cd camunda
- make gitconfig test

test_x/events:
image: docker/compose:alpine-1.28.2
volumes:
Expand Down
21 changes: 21 additions & 0 deletions camunda/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
.PHONY: download lint

GO ?= go
GOLINT ?= golint
GOPRIVATE ?= github.com/blacklane/*
STATICCHECK ?= staticcheck

# Use a github token or similar to access private repos
# In this example it's got no effect rather than changing from ssh to https
gitconfig:
@git config --global url.https://${GITHUB_TOKEN}@github.aaakk.us.kg.insteadOf https://github.com

deps:
GOPRIVATE=${GOPRIVATE} ${GO} mod download

test: deps
${GO} test -race -cover ./...

lint:
${GOLINT} ./...
${STATICCHECK} ./...
37 changes: 37 additions & 0 deletions camunda/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Camunda API client

This library is a wrapper for the [camunda API](https://docs.camunda.org/manual/7.15/reference/rest/).
It can connect to any camunda engine deployment and supports a subset of operations from the official camunda API.

## Supported Operations
* starting a process instance
* sending a message (and updating variables)
* subscribing to an external task queue
* using basic authentication

## Installation
```shell
go get -u github.com/blacklane/go-libs/camunda
```

## Getting Started

Create a new API client:
```go
url := "http://localhost:8080"
processKey := "example-process"
credentials := camunda.BasicAuthCredentials{...}
client := camunda.NewClient(url, processKey, http.Client{}, credentials)
```

Starting a new process instance:
```go
businessKey := uuid.New().String()
variables := map[string]camunda.Variable{}
err := client.StartProcess(context.Background(), businessKey, variables)
if err != nil {
log.Err(err).Msg("Failed to start process")
}
```

For complete examples check out the [/examples](/examples) directory.
155 changes: 155 additions & 0 deletions camunda/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package camunda

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/blacklane/go-libs/camunda/internal"
)

type (
Client interface {
StartProcess(ctx context.Context, businessKey string, variables map[string]Variable) error
SendMessage(ctx context.Context, messageType string, businessKey string, updatedVariables map[string]Variable) error
Subscribe(topicName string, workerID string, handler TaskHandler, options ...func(*Subscription)) *Subscription
}
BasicAuthCredentials struct {
User string
Password string
}
client struct {
camundaURL string
credentials BasicAuthCredentials
processKey string
httpClient internal.HttpClient
}
)

func NewClient(url string, processKey string, httpClient http.Client, credentials BasicAuthCredentials) Client {
return &client{
camundaURL: url,
credentials: credentials,
processKey: processKey,
httpClient: &httpClient,
}
}

func (c *client) StartProcess(ctx context.Context, businessKey string, variables map[string]Variable) error {
variables[businessKeyJSONKey] = NewStringVariable(VarTypeString, businessKey)
params := processStartParams{
BusinessKey: businessKey,
Variables: variables,
}

buf := bytes.Buffer{}
err := json.NewEncoder(&buf).Encode(params)
if err != nil {
return fmt.Errorf("failed to send camunda message due to json error: %w", err)
}

url := fmt.Sprintf("process-definition/key/%s/start", c.processKey)
_, err = c.doPostRequest(ctx, &buf, url)
if err != nil {
return fmt.Errorf("failed to start process for business key [%s] due to: %w", params.BusinessKey, err)
}

return nil
}

func (c *client) SendMessage(ctx context.Context, messageType string, businessKey string, updatedVariables map[string]Variable) error {
buf := bytes.Buffer{}
url := "message"
newMessage := newMessage(messageType, businessKey, updatedVariables)
err := json.NewEncoder(&buf).Encode(newMessage)
if err != nil {
return fmt.Errorf("failed to send camunda message due to json error: %w", err)
}

_, err = c.doPostRequest(ctx, &buf, url)
if err != nil {
return fmt.Errorf("failed to send message for business key [%s] due to: %w", newMessage.BusinessKey, err)
}

return nil
}

func (c *client) Subscribe(topicName string, workerID string, handler TaskHandler, options ...func(*Subscription)) *Subscription {
sub := newSubscription(c, topicName, workerID)
sub.addHandler(handler)

for _, option := range options {
option(sub)
}

// run async fetch loop
go sub.schedule()

return sub
}

func (c *client) complete(ctx context.Context, taskId string, params taskCompletionParams) error {
buf := bytes.Buffer{}
err := json.NewEncoder(&buf).Encode(params)
if err != nil {
return fmt.Errorf("failed to complete camunda task due to json error: %w", err)
}

url := fmt.Sprintf("external-task/%s/complete", taskId)
_, err = c.doPostRequest(ctx, &buf, url)
if err != nil {
return err
}

return nil
}

func (c *client) fetchAndLock(param *fetchAndLock) ([]Task, error) {
var tasks []Task
buf := bytes.Buffer{}
err := json.NewEncoder(&buf).Encode(param)
if err != nil {
return tasks, fmt.Errorf("failed to fetch camunda tasks due to json error: %w", err)
}

url := "external-task/fetchAndLock"
body, err := c.doPostRequest(context.Background(), &buf, url)
if err != nil {
return tasks, err
}

err = json.Unmarshal(body, &tasks)
if err != nil {
return tasks, fmt.Errorf("could not unmarshal task due to: %w", err)
}

return tasks, nil
}

func (c *client) doPostRequest(ctx context.Context, params *bytes.Buffer, endpoint string) ([]byte, error) {
url := fmt.Sprintf("%s/%s", c.camundaURL, endpoint)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, params)
if err != nil {
return nil, fmt.Errorf("could not create POST request due to: %w", err)
}
req.Header.Add(internal.HeaderContentType, "application/json")

req.SetBasicAuth(c.credentials.User, c.credentials.Password)

resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("could not send POST request due to: %w", err)
}
defer resp.Body.Close()

body, _ := ioutil.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("camunda API returned Status %d with body: %v", resp.StatusCode, string(body))
}

return body, nil
}
Loading

0 comments on commit c7d7b3a

Please sign in to comment.