Computer Science

[Python] Celery를 활용한 스케줄링 시스템

오늘은 파이썬 프레임워크인 Celery을 활용한 스케줄링에 대해 작성하고자 한다.

작년 말에 celery를 이용한 프로젝트를 진행했었다. 
여러 툴들을 비교하다가 celery를 스케줄링 도구로 사용했었는데, 시간도 지났고 당시 빠르게 구현해서 서비스화하는 것이 목표였어서
완벽히 이해하지 못하고 넘어갔었다. 이번 기회를 통해 제대로 정리를 해보고자 한다.

celery란?


[celery docs] Celery는 파이썬 프레임워크로, 비동기 테스크 큐 + 스케줄링 툴이다.

Celery의 역할

  • 요청에 대한 응답이 오래걸리는 task들이 있을 때, 해당 task를 비동기로 처리하도록 하고 바로 응답하게끔 처리할 수 있게 함
  • broker를 사용하여 적절한 worker들에게 job을 나누어 전달/할당
  • 원하는 시간에 작업을 수행 ( ex. 매일 정오에 api요청을 통해 데이터 업데이트하기 )
  • (요약 정리) Celery는 시스템 내에서 1. 메세지를 전달하는 역할과 2. 메세지를 message broker에서 가져와서 작업을 수행(worker, consumer)하는 역할을 함

Celery producer에서 task를 생성 ➡️ task queue에 차례로 쌓임 ➡️ broker가 task queue에 있는 task들을 적절한 worker에 전달하면, celery가 task들을 실행한다. 
즉, celery는 producer, broker, consumer를 포함하는 큰 개념의 프레임워크이다.

 

 

비동기란? 

 

그렇다면 celery의 장점이라는 비동기 방식은 무엇일까? 동기 방식과 비교를 통해 비동기 방식을 이해해보자.
 

출처: https://medium.com/@vivianyim/synchronous-vs-asynchronous-javascript-de4918e8ad62

- 동기: 요청에 대한 응답이 돌아와야 다음 동작 수행 가능. 
위의 그림을 보면, 1번 task가 완전히 수행된 후, 2번, 3번, 4번이 차례로 수행됨을 확인할 수 있다.
➡️ request & response가 바로 동시에 이뤄짐.
- 비동기: 요청에 대한 응답 상태와 상관없이, 다음 task를 수행 가능.
위의 그림처럼, 1번 task가 수행중이지만, 2번, 3번, 4번을 동시에 수행됨을 확인할 수 있다.
➡️ request & response가 바로 동시에 이뤄지지않음.
 
 

 

broker

 

message broker는 producer(요청을 보내는 주체)로부터 메세지를 받아서 전달하는 중간 역할을 한다.  ( messaging middleware)
이 때, 바로 메세지를 넘겨서 처리하는 것이 아니라, message queue에 메세지를 적재시킨다.  

message broker로 주로 redis, Apache kafka, rabbitMQ를 사용한다.

출처: https://velog.io/@mdy0102/MQ-비교-Kafka-RabbitMQ-Redis

 

필자의 경우에는, broker로 RabbitMQ를 선택했다. 

(선택의 이유) redis와 RabbitMQ, kafka를 고민했으나, 
kafka는 아직 프로젝트 초기 mvp단계인지라 over-spec이어서 패스했고, 
redis는 인메모리 DB라서 빠르지만 약간의 손실은 있을 수 있다고 한다. 
추가로, rabbitMQ가 큰 메세지 전달에 더 효과적이기때문에 선택하게 되었다.
( 아직 복잡하지않은 시스템이라서, redis로 했었어도 되었지 않았을지.. 이제와 생각해본다... )

message broker를 왜 사용하는 지에 대해서는 이 블로그에서 참고하길 바란다. 

 

code: how to?

 

# main.py

from celery import Celery
from celery.schedules import crontab
from src.dags import CustomDAG # 작성한 custom dag class

AWS_MQ_URL=""
AWS_MQ_USERNAME=""
AWS_MQ_PASSWORD=""

app = Celery(
    app="scheduler",
    broker=AWS_MQ_URL,
    BROKER_USER=AWS_MQ_USERNAME,
    BROKER_PASSWORD=AWS_MQ_PASSWORD,
)


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(
        crontab(hour=12, minute=0),
        run_custom_dag,
        name="Run '매일 정오에 customDAG크론탭'",
    )


@app.task
def run_custom_dag():
    dag = CustomDAG()
    dag.run()
# celery.sh
nohup celery --app main worker -B --loglevel=INFO &

 

주기를 설정하고, 실행하는 task를 설정해둔 main.py를 celery를 통해 전달하면, 
지정한 시간(매일 정오)에 beat scheduler가 task를 task queue에 쌓고,queue의 task들을 브로커가 적절히 worker에 전달하고, 
available한 worker가 이 task를 수행한다. 

즉, task queue에 차례로 쌓임 ➡️ broker가 task queue에 있는 task들을 적절한 worker에 전달하면, celery가 task들을 실행한다. 

진행하는 프로젝트에서는 매일 정오에 crontab으로 실행되도록 했기 때문에, 
세션이 종료되어도 프로세스가 백그라운드에서 계속 실행되도록 하기 위해 nohup으로 celery프로세스를 실행했다.
따라서 결과는 nohup.out에서 확인할 수 있다. 

 정확한 파라미터들과 사용법은 docs에서 확인 가능하다.

 


 

이번 프로젝트에서는 celery의 많은 기능중에 스케줄러만 활용했지만, 더 많은 기능은 차차 살펴보기로 하자!

 

-끝-