To provide custom logic to the Camunda BPM workflows, we use External Service Tasks. These tasks are then implemented by a NestJS or Python worker. We prefer pull workers instead of “push” REST API endpoints because:
- it’s more scalable
- easier to debug (locally)
- potential for better (centralized) logging
- easier authentication mechanism (workers authenticating to Camunda REST API instead of Camunda authenticating to external task REST APIs)
- no need to worry about SSL etc. for individual workers
- does not require http-connector
Cons:
- a worker requires long-running operation, i.e. deployed in Kubernetes, cannot be deployed serverless to AWS Lambda. We reduce this overhead by bundling multiple workers into one project, and by ensuring workers are lightweight (does not use unnecessary dependencies).
NestJS (TypeScript)
It may seem overkill to use NestJS for a worker that typically does not serve HTTP API. However, NestJS ensures uniform project structure, Jest testing, standardized TypeScript configuration, and provides room to add more transports in the future (including HTTP with Fastify). Here’s how to setup minimal NestJS for Camunda worker.
sudo yarn global add @nestjs/cli
nest new --package-manager yarn lovia-[name]
Delete app.controller.spec.ts
and app.controller.ts
.
yarn remove @nestjs/platform-express @types/express
yarn add dotenv @sentry/node camunda-external-task-client-js axios
yarn add --dev @types/camunda-external-task-client-js
Prepend to top of .gitignore
:
/.env
Create files .env
and .env.dev
:
SENTRY_DSN=
CAMUNDA_API_URL=https://camunda.lovia.life/engine-rest
CAMUNDA_USERNAME=
CAMUNDA_PASSWORD=
Tweak app.module.ts
:
import { Module } from '@nestjs/common';
import { AppService } from './app.service';
@Module({
imports: [],
controllers: [],
providers: [AppService],
})
export class AppModule {}
Implement workers in app.service.ts
:
import { Injectable } from '@nestjs/common';
import { Client, logger, Variables, ClientConfig } from "camunda-external-task-client-js";
import * as camunda from "camunda-external-task-client-js";
const { BasicAuthInterceptor } = camunda as any;
// TODO: https://jira.camunda.com/browse/CAM-11830
// const { BasicAuthInterceptor } = require("camunda-external-task-client-js");
import axios from 'axios';
@Injectable()
export class AppService {
/**
* Subscribe Camunda workers.
*/
async subscribeWorkers() {
const basicAuthentication = new BasicAuthInterceptor({
username: process.env.CAMUNDA_USERNAME,
password: process.env.CAMUNDA_PASSWORD
});
const config: ClientConfig = {
baseUrl: process.env.CAMUNDA_API_URL,
interceptors: basicAuthentication,
use: logger,
// recommended to avoid issues, see https://forum.camunda.org/t/external-workers-scalability/11384/4
maxTasks: 1
};
const client = new Client(config);
// TODO: Change topic name
client.subscribe('infra.rocketchat.PostMessage', this.postMessage.bind(this));
}
/**
* Posts a message to Rocket.Chat chatroom.
* @param args Task and task service.
*/
async postMessage({task, taskService}: camunda.HandlerArgs) {
const userId: number = task.variables.get('userId');
try {
console.info(task.topicName, userId, '...');
// TODO: Implement
const processVariables = new Variables();
// processVariables.set('user', userDoc);
await taskService.complete(task, processVariables);
} catch (e) {
logger.error(`${task.topicName}: ${e}`);
await taskService.handleFailure(task, {errorMessage: e.toString()});
} finally {
// cleanup
}
}
}
Edit main.ts
:
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { AppService } from './app.service';
import { config } from 'dotenv';
import * as Sentry from '@sentry/node';
config();
Sentry.init({ dsn: process.env.SENTRY_DSN });
async function bootstrap() {
// https://www.petermorlion.com/nestjs-aws-lambda-without-http/
const app = await NestFactory.createApplicationContext(AppModule);
const appService = app.get(AppService);
appService.subscribeWorkers();
}
bootstrap();
Now you can start the workers (in development):
yarn start:dev
The worker user in Camunda needs to be authorized in the Process Definition for: TASK_WORK, READ_INSTANCE, READ_TASK, UPDATE_TASK_VARIABLE, UPDATE_INSTANCE_VARIABLE, READ_INSTANCE_VARIABLE, READ_TASK_VARIABLE, UPDATE_INSTANCE, UPDATE_TASK. (this may be more than needed, but too few authorizations will fail silently)
GitLab CI/CD
To deploy this worker to production, you’ll need:
- .dockerignore
- Dockerfile
- kube/*.yaml
- .gitlab-ci.yml
.dockerignore
.env
node_modules
Dockerfile
FROM node:lts-slim
WORKDIR /app
COPY ./package.json ./
COPY ./yarn.lock ./
RUN yarn install
COPY . .
ENV NODE_ENV production
RUN yarn build
CMD ["yarn", "start:prod"]
kube/[PROJECT].yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: lovia-infra # FIXME: change this
spec:
selector:
matchLabels:
app: lovia-infra # FIXME: change this
replicas: 1
template:
metadata:
labels:
app: lovia-infra # FIXME: change this
spec:
containers:
- name: lovia-infra # FIXME: change this
image: registry.gitlab.com/lovia/lovia-infra:latest # FIXME: change this
env:
- name: SENTRY_DSN
valueFrom:
secretKeyRef:
name: lovia-prod-infra # FIXME: change this
key: sentry-dsn
- name: CAMUNDA_API_URL
valueFrom:
secretKeyRef:
name: lovia-prod-camunda-worker
key: api-url
- name: CAMUNDA_USERNAME
value: worker
- name: CAMUNDA_PASSWORD
valueFrom:
secretKeyRef:
name: lovia-prod-camunda-worker
key: password
resources:
requests:
memory: 100Mi
cpu: 100m
limits:
memory: 100Mi
imagePullSecrets:
- name: regcred
.gitlab-ci.yml
image: docker:latest
services:
- docker:dind
variables:
# Use TLS https://docs.gitlab.com/ee/ci/docker/using_docker_build.html#tls-enabled
DOCKER_HOST: tcp://docker:2376
DOCKER_TLS_CERTDIR: "/certs"
build:
stage: build
script:
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
- docker pull $CI_REGISTRY_IMAGE:latest || true
- docker build --cache-from $CI_REGISTRY_IMAGE:latest --tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA --tag $CI_REGISTRY_IMAGE:latest .
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
- docker push $CI_REGISTRY_IMAGE:latest
only:
- prod
deploy_prod:
stage: deploy
image:
name: bitnami/kubectl
# https://gitlab.com/gitlab-org/gitlab-foss/-/issues/65110#note_198073241
entrypoint: [""]
script:
# FIXME: change the "lovia-infra" below
#- kubectl apply -f kube/lovia-infra.yaml
# https://github.com/kubernetes/kubernetes/issues/27081
#- kubectl patch deployment lovia-infra -p \
# "{\"spec\":{\"template\":{\"metadata\":{\"annotations\":{\"date\":\"`date +'%s'`\"}}}}}"
environment:
name: production
only:
- prod
As we’re using GitLab Kubernetes integration for deploying to environments, this means that each environment is its own Kubernetes namespace. You need to create the secrets for both GitLab Private Docker Container Registry and all app secrets.
kubectl create secret docker-registry regcred -n NAMESPACE --docker-server=registry.gitlab.com --docker-username=DOCKER_USER --docker-password=PERSONAL_ACCESS_TOKEN
kubectl create secret generic lovia-prod-infra -n NAMESPACE --from-literal=sentry-dsn=...
Python Worker
Unfortunately there is no official Python Camunda Worker library yet. There are some discussions, and Camunda External Task Python module.
So based on camundacon2019’s code, here’s a template:
import requests
import json
import time
import uuid
class client:
def __init__(self, url, workerid = "defaultid"):
self.url = url
self.workerid = workerid
def subscribe(self, topic, lockDuration = 1000, longPolling = 5000):
# Define the endpoint for fetch and lock
endpoint = str(self.url) +"/external-task/fetchAndLock"
# Define unique ID for the worker
#global uid
#uid = uuid.uuid1()
#uid = str(uid)
workerid = str(self.workerid)
#Define the Json for the Request
task= {"workerId": workerid,
"maxTasks":1,
"usePriority":"true",
"asyncResponseTimeout": longPolling,
"topics":
[{"topicName": topic,
"lockDuration": lockDuration
}]
}
#Make the request
global engine
engine = True
try:
fetch_and_lock = requests.post(endpoint, json=task)
print(fetch_and_lock.status_code)
global body
body = fetch_and_lock.text
except:
engine = False
print("Engine is down")
if (engine == True):
while body == '[]':
print("polling")
fetch_and_lock = requests.post(endpoint, json=task)
body = fetch_and_lock.text
time.sleep(5)
if body != '[]':
break
data = json.loads(body)
# TODO: Do your processing here with 'data', then call self.complete(vars) when done
#Complete Call
def complete(self, **kwargs):
response_body = json.loads(body)
taskid = response_body[0]['id']
taskid = str(taskid)
endpoint = str(self.url) + "/external-task/" + taskid + "/complete"
#get workerid
workerid = response_body[0]['workerId']
workerid = str(workerid)
#puts the variables from the dictonary into the nested format for the json response
variables_for_response = {}
for key, val in kwargs.items():
variable_new = {key:{"value": val}}
variables_for_response.update(variable_new)
response= {"workerId": workerid,
"variables": variables_for_response
}
try:
complete = requests.post(endpoint, json =response)
body_complete = complete.text
print(body_complete)
print(complete.status_code)
except:
print('fail')
#BPMN Error
def error(self, bpmn_error, error_message = "not defined", **kwargs):
response_body = json.loads(body)
taskid = response_body[0]['id']
taskid = str(taskid)
endpoint = str(self.url) + "/external-task/"+ taskid + "/bpmnError"
workerid = response_body[0]['workerId']
workerid = str(workerid)
variables_for_response = {}
for key, val in kwargs.items():
variable_new = {key:{"value": val}}
variables_for_response.update(variable_new)
response = {
"workerId": workerid,
"errorCode": bpmn_error,
"errorMessage": error_message,
"variables": variables_for_response
}
try:
error = requests.post(endpoint, json = response)
print(error.status_code)
except:
print('fail')
#Create an incident
def fail(self, error_message, retries = 0, retry_timeout= 0):
response_body = json.loads(body)
taskid = response_body[0]['id']
taskid = str(taskid)
endpoint = str(self.url) + "/external-task/"+ taskid + "/failure"
workerid = response_body[0]['workerId']
workerid = str(workerid)
response = {
"workerId": workerid,
"errorMessage": error_message,
"retries": retries,
"retryTimeout": retry_timeout}
try:
fail = requests.post(endpoint, json = response)
print(fail.status_code)
except:
print('fail')
# New Lockduration
def new_lockduration(self, new_duration):
response_body = json.loads(body)
taskid = response_body[0]['id']
taskid = str(taskid)
endpoint = str(self.url) + "/external-task/"+ taskid + "/extendLock"
workerid = response_body[0]['workerId']
workerid = str(workerid)
response = {
"workerId": workerid,
"newDuration": new_duration
}
try:
newDuration = requests.post(endpoint, json = response)
print(newDuration.status_code)
print(workerid)
except:
print('fail')