Django에서 Celery + SQS 사용시 간단한 테스트 방법을 정리합니다.

테스트 환경 설정

.env 파일에서 celery 설정에 사용되는 환경변수를 수정합니다. 로컬에서 AWS 리소스를 직접 사용할수 있도록 credential 정보를 확인하고 입력합니다.

  • AWS CREDENTIAL - AWS 리소스 접근이 가능한 사용자의 액세스 키입니다.
  • SQUAD - 테스트 환경의 sqs 이름과 매칭하기 위해 사용할 키워드입니다.
  • SQS_URL - vpc 엔드포인트를 사용하고 있습니다.

임시 태스크 생성

from time import sleep
from uuid import uuid4
from celery import shared_task
...

@shared_task(queue=TESTFIFO)
def test_fifo_task():
    test()

@shared_task(queue=TEST)
def test_task():
    test()

def test():
    id = str(uuid4())
    CommonLogger.info(f'{id} start fifo')
    sleep(5)
    CommonLogger.info(f'{id} end fifo')

워커 실행을 위한 커맨드 작성

위에서 생성한 태스크를 10번 호출하는 Django 커맨드입니다.

import traceback
...

class Command(CommonBaseCommand):
    def run(self, *args, **options) -> None:
        try:
            for _ in range(10):
                test_task.delay()
        except:
            traceback.print_exc()

워커 실행

테스트를 위해 다음과 같이 워커를 실행했습니다. wait_time_seconds = 0 옵션을 명시하여 지연시간을 최소화 했습니다.

DJANGO_SETTINGS_MODULE='server.settings.local' celery -A server worker \
--loglevel=info \
-Q queue-express.fifo,queue-standard.fifo,dio-test.fifo,dio-test \
--concurrency=2