PYTHON CELERY GRACEFULLY HANDLING TERMINATION

In this article we will discuss how we exit a celery long running task using another python external module.

We use the same requirements as we used in our previous article about celery and docker. And we use the same app or project to implement how we can gracefully exit the task.

Lets Code

To implement this, We use our previously created project. In our base directory we will create a new file workerAJobs.py apart from this we only need to add a few changes in our workerA.py file.

First we create a workerAJobs.py file and we create a long running process method. Add the following code in this file.

class WorkerAJobs:
   logger = False
   kill_now = False
   current_counter = 1


   def __init__(self, logger):
       self.logger = logger


   def exit_gracefully(self, signum, frame):
       self.logger.info(f'Breaking out of the loop')
       self.logger.info(f'Current state before exit {self.current_counter}')
       self.logger.info(f'Gracefully exiting')
       self.kill_now = True


   def counter_task(self):
       while True:
           value = '{}'.format(self.current_counter)
           self.logger.info(value)
           self.current_counter += 1
           if self.kill_now:
               return False

After creating this file and adding these lines in the file. Now we add a few more lines of code in our workerA.py file.

import signal
from celery import Celery
from jobs.workerAJobs import WorkerAJobs
import celstash
import logging


celstash.configure(logstash_host='logstash', logstash_port=9999)
logger = celstash.new_logger('flask-celery')
logger.setLevel(logging.INFO)


# Celery configuration
CELERY_BROKER_URL = 'amqp://rabbitmq:rabbitmq@rabbit:5672/'
CELERY_RESULT_BACKEND = 'rpc://'


# Initialize Celery
celery = Celery('workerB', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND)


workerAjobs = WorkerAJobs(logger)


@celery.task()
def counter_task():
   # Handling External SIGTERM to communicate with external python module
   signal.signal(signal.SIGTERM, workerAjobs.exit_gracefully)


   # Calling external python module 
   result = workerAobs.counter_task()
   if not result:
       raise Exception("Exiting from here ")

After adding these lines in the code. We need to restart our docker containers.

Managing Python flask Services

After editing and writing all this code stop the containers.

docker-compose down

And again create a build and start all the containers.

docker-compose build
docker-compose up -d

To check the newly add containers.

docker-compose ps