1. Home
  2. Docs
  3. Infrastructure
  4. Camunda BPM Platform &#03...
  5. Camunda Workers (External Service Tasks)

Camunda Workers (External Service Tasks)

To provide custom logic to the Camunda BPM workflows, we use External Service Tasks. These tasks are then implemented by a NestJS or Python worker. We prefer pull workers instead of “push” REST API endpoints because:

  • it’s more scalable
  • easier to debug (locally)
  • potential for better (centralized) logging
  • easier authentication mechanism (workers authenticating to Camunda REST API instead of Camunda authenticating to external task REST APIs)
  • no need to worry about SSL etc. for individual workers
  • does not require http-connector

Cons:

  • a worker requires long-running operation, i.e. deployed in Kubernetes, cannot be deployed serverless to AWS Lambda. We reduce this overhead by bundling multiple workers into one project, and by ensuring workers are lightweight (does not use unnecessary dependencies).

NestJS (TypeScript)

It may seem overkill to use NestJS for a worker that typically does not serve HTTP API. However, NestJS ensures uniform project structure, Jest testing, standardized TypeScript configuration, and provides room to add more transports in the future (including HTTP with Fastify). Here’s how to setup minimal NestJS for Camunda worker.

sudo yarn global add @nestjs/cli
nest new --package-manager yarn lovia-[name]

Delete app.controller.spec.ts and app.controller.ts.

yarn remove @nestjs/platform-express @types/express
yarn add dotenv @sentry/node camunda-external-task-client-js axios
yarn add --dev @types/camunda-external-task-client-js

Prepend to top of .gitignore:

/.env

Create files .env and .env.dev:

SENTRY_DSN=
CAMUNDA_API_URL=https://camunda.lovia.life/engine-rest
CAMUNDA_USERNAME=
CAMUNDA_PASSWORD=

Tweak app.module.ts:

import { Module } from '@nestjs/common';
import { AppService } from './app.service';

@Module({
  imports: [],
  controllers: [],
  providers: [AppService],
})
export class AppModule {}

Implement workers in app.service.ts:

import { Injectable } from '@nestjs/common';
import { Client, logger, Variables, ClientConfig } from "camunda-external-task-client-js";
import * as camunda from "camunda-external-task-client-js";
const { BasicAuthInterceptor } = camunda as any;
// TODO: https://jira.camunda.com/browse/CAM-11830
// const { BasicAuthInterceptor } = require("camunda-external-task-client-js");
import axios from 'axios';

@Injectable()
export class AppService {

  /**
   * Subscribe Camunda workers.
   */
  async subscribeWorkers() {
    const basicAuthentication = new BasicAuthInterceptor({
      username: process.env.CAMUNDA_USERNAME,
      password: process.env.CAMUNDA_PASSWORD
    });
    const config: ClientConfig = {
      baseUrl: process.env.CAMUNDA_API_URL,
      interceptors: basicAuthentication,
      use: logger,
      // recommended to avoid issues, see https://forum.camunda.org/t/external-workers-scalability/11384/4
      maxTasks: 1
    };
    const client = new Client(config);

    // TODO: Change topic name
    client.subscribe('infra.rocketchat.PostMessage', this.postMessage.bind(this));
  }

  /**
   * Posts a message to Rocket.Chat chatroom.
   * @param args Task and task service. 
   */
  async postMessage({task, taskService}: camunda.HandlerArgs) {
    const userId: number = task.variables.get('userId');
    try {
      console.info(task.topicName, userId, '...');

      // TODO: Implement
      const processVariables = new Variables();
      // processVariables.set('user', userDoc);
      await taskService.complete(task, processVariables);
    } catch (e) {
      logger.error(`${task.topicName}: ${e}`);
      await taskService.handleFailure(task, {errorMessage: e.toString()});
    } finally {
      // cleanup
    }
  }

}

Edit main.ts:

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { AppService } from './app.service';
import { config } from 'dotenv';
import * as Sentry from '@sentry/node';

config();
Sentry.init({ dsn: process.env.SENTRY_DSN });

async function bootstrap() {
  // https://www.petermorlion.com/nestjs-aws-lambda-without-http/
  const app = await NestFactory.createApplicationContext(AppModule);
  const appService = app.get(AppService);
  appService.subscribeWorkers();
}
bootstrap();

Now you can start the workers (in development):

yarn start:dev

The worker user in Camunda needs to be authorized in the Process Definition for: TASK_WORK, READ_INSTANCE, READ_TASK, UPDATE_TASK_VARIABLE, UPDATE_INSTANCE_VARIABLE, READ_INSTANCE_VARIABLE, READ_TASK_VARIABLE, UPDATE_INSTANCE, UPDATE_TASK. (this may be more than needed, but too few authorizations will fail silently)

GitLab CI/CD

To deploy this worker to production, you’ll need:

  • .dockerignore
  • Dockerfile
  • kube/*.yaml
  • .gitlab-ci.yml

.dockerignore

.env
node_modules

Dockerfile

FROM node:lts-slim

WORKDIR /app

COPY ./package.json ./
COPY ./yarn.lock ./

RUN yarn install

COPY . .

ENV NODE_ENV production

RUN yarn build

CMD ["yarn", "start:prod"]

kube/[PROJECT].yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: lovia-infra # FIXME: change this
spec:
  selector:
    matchLabels:
      app: lovia-infra # FIXME: change this
  replicas: 1
  template:
    metadata:
      labels:
        app: lovia-infra # FIXME: change this
    spec:
      containers:
        - name: lovia-infra # FIXME: change this
          image: registry.gitlab.com/lovia/lovia-infra:latest # FIXME: change this
          env:
            - name: SENTRY_DSN
              valueFrom:
                secretKeyRef:
                  name: lovia-prod-infra # FIXME: change this
                  key: sentry-dsn
            - name: CAMUNDA_API_URL
              valueFrom:
                secretKeyRef:
                  name: lovia-prod-camunda-worker
                  key: api-url
            - name: CAMUNDA_USERNAME
              value: worker
            - name: CAMUNDA_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: lovia-prod-camunda-worker
                  key: password
          resources:
            requests:
              memory: 100Mi
              cpu: 100m
            limits:
              memory: 100Mi
      imagePullSecrets:
        - name: regcred

.gitlab-ci.yml

image: docker:latest

services:
  - docker:dind

variables:
  # Use TLS https://docs.gitlab.com/ee/ci/docker/using_docker_build.html#tls-enabled
  DOCKER_HOST: tcp://docker:2376
  DOCKER_TLS_CERTDIR: "/certs"

build:
  stage: build
  script:
    - docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
    - docker pull $CI_REGISTRY_IMAGE:latest || true
    - docker build --cache-from $CI_REGISTRY_IMAGE:latest --tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA --tag $CI_REGISTRY_IMAGE:latest .
    - docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
    - docker push $CI_REGISTRY_IMAGE:latest
  only:
    - prod

deploy_prod:
  stage: deploy
  image:
    name: bitnami/kubectl
    # https://gitlab.com/gitlab-org/gitlab-foss/-/issues/65110#note_198073241
    entrypoint: [""]
  script:
    # FIXME: change the "lovia-infra" below
    #- kubectl apply -f kube/lovia-infra.yaml
    # https://github.com/kubernetes/kubernetes/issues/27081
    #- kubectl patch deployment lovia-infra -p \
    #  "{\"spec\":{\"template\":{\"metadata\":{\"annotations\":{\"date\":\"`date +'%s'`\"}}}}}"
  environment:
    name: production
  only:
    - prod

As we’re using GitLab Kubernetes integration for deploying to environments, this means that each environment is its own Kubernetes namespace. You need to create the secrets for both GitLab Private Docker Container Registry and all app secrets.

kubectl create secret docker-registry regcred -n NAMESPACE --docker-server=registry.gitlab.com --docker-username=DOCKER_USER --docker-password=PERSONAL_ACCESS_TOKEN
kubectl create secret generic lovia-prod-infra -n NAMESPACE --from-literal=sentry-dsn=...

Python Worker

Unfortunately there is no official Python Camunda Worker library yet. There are some discussions, and Camunda External Task Python module.

So based on camundacon2019’s code, here’s a template:

import requests
import json
import time
import uuid


class client:
    def __init__(self, url, workerid = "defaultid"):  
        self.url = url
        self.workerid = workerid
        
    
    def subscribe(self, topic, lockDuration = 1000, longPolling = 5000):
        # Define the endpoint for fetch and lock
        endpoint = str(self.url) +"/external-task/fetchAndLock"
        # Define unique ID for the worker
    
        #global uid
        #uid = uuid.uuid1()
        #uid = str(uid)

        workerid = str(self.workerid)
    
        #Define the Json for the Request
        task= {"workerId": workerid,
               "maxTasks":1,
               "usePriority":"true",
               "asyncResponseTimeout": longPolling,
               "topics":
               [{"topicName": topic,
                 "lockDuration": lockDuration
                }]
              }
 
    
        #Make the request
        global engine
        engine = True
    
        try:
            fetch_and_lock = requests.post(endpoint, json=task)
            print(fetch_and_lock.status_code)
            global body
            body = fetch_and_lock.text
               
        except:
            engine = False
            print("Engine is down")
        
        if (engine == True):
            while body == '[]':
                print("polling")
                fetch_and_lock = requests.post(endpoint, json=task)
                body = fetch_and_lock.text
                time.sleep(5)
            
                if body != '[]':
                    break

        data = json.loads(body)
        # TODO: Do your processing here with 'data', then call self.complete(vars) when done

                
    #Complete Call
    def complete(self, **kwargs): 
        response_body = json.loads(body)
        taskid = response_body[0]['id']
        taskid = str(taskid)
        
        endpoint = str(self.url) + "/external-task/" + taskid + "/complete"
    
        #get workerid
        workerid = response_body[0]['workerId']
        workerid = str(workerid)
    
    
    
    
        #puts the variables from the dictonary into the nested format for the json response
        variables_for_response = {}
        for key, val in kwargs.items():
            variable_new = {key:{"value": val}}
            variables_for_response.update(variable_new)  

         
    
        response= {"workerId": workerid,
                  "variables": variables_for_response
                  }
    
       
        try:
            complete = requests.post(endpoint, json =response)
            body_complete = complete.text
            print(body_complete)
            print(complete.status_code)
        
        except:
            print('fail')
            
    
    #BPMN Error
    
    def error(self, bpmn_error, error_message = "not defined", **kwargs):
        
        response_body = json.loads(body)
        taskid = response_body[0]['id']
        taskid = str(taskid)
    
      
        endpoint = str(self.url) + "/external-task/"+ taskid + "/bpmnError"
    
    
        workerid = response_body[0]['workerId']
        workerid = str(workerid)
    
    
        variables_for_response = {}
        for key, val in kwargs.items():
            variable_new = {key:{"value": val}}
            variables_for_response.update(variable_new)  

    
        response =  {
          "workerId": workerid,
          "errorCode": bpmn_error,
          "errorMessage": error_message,
            "variables": variables_for_response
           }
       
    
        try:
            error = requests.post(endpoint, json = response)
            print(error.status_code)
        
        except:
            print('fail')
            
            
#Create an incident
            

    def fail(self, error_message, retries = 0, retry_timeout= 0):
        response_body = json.loads(body)
        taskid = response_body[0]['id']
        taskid = str(taskid)   

    
        endpoint = str(self.url) + "/external-task/"+ taskid + "/failure"
    
        workerid = response_body[0]['workerId']
        workerid = str(workerid)
    
    
        response = {
            "workerId": workerid,
            "errorMessage": error_message,
            "retries": retries, 
            "retryTimeout": retry_timeout}
                   
        try:
            fail = requests.post(endpoint, json = response)
            print(fail.status_code)        
        
        except:
            print('fail')
            
 # New Lockduration           
    
    def new_lockduration(self, new_duration):
        
        response_body = json.loads(body)
        taskid = response_body[0]['id']
        taskid = str(taskid)
        
        endpoint = str(self.url) + "/external-task/"+ taskid + "/extendLock"
        
        workerid = response_body[0]['workerId']
        workerid = str(workerid)
        
        response = {
            "workerId": workerid,
            "newDuration": new_duration
        }
        
        try:
            newDuration = requests.post(endpoint, json = response)
            print(newDuration.status_code)
            print(workerid)
        except:
            print('fail')


How can we help?

Leave a Reply

Your email address will not be published. Required fields are marked *