Talk: Python + Celery

Meetup Python Grenoble - 2019/10/30

Manage your asynchronous tasks with Celery!

About me

Romain Clement

  • Meetup Python Grenoble co-organizer
  • Freelance Software Engineer
  • CTO @ Sylha (FinTech startup)
  • Website
  • GitHub

Why asynchronous tasks?

More and more use-cases everyday for non-blocking processes:

  • Sending e-mails (single, batch, scheduled)
  • Data pipelines (ETL)
  • Machine learning
  • Micro-services communication
  • Logging
  • ...

Is it that hard?

Mmmh, kinda, see for yourself:

  • Concurrence model (threading, etc.)
  • Result storage (databases, etc.)
  • Scaling strategy (pool/cluster of workers)
  • Scheduling (cron-jobs)
  • Retry on failure strategies
  • Distributing tasks accross multiple services
  • Task broadcasting
  • ...

Why Celery?

Celery is a Python package abstracting task definitions and invocations, using a message-broker and a result-backend behind the scenes:

  • Choose a message broker (Redis, RabbitMQ, etc.) and a result backend (Redis, SQLAlchemy, Mongo, etc.), if any
  • Define your tasks using Python code
  • Define your cron-jobs using Python code
  • Define retry on failure strategies for each task
  • Call your tasks (almost) as a function call!

Celery Overview

Show me some code!

Initialize Celery

In [ ]:
import os

from typing import Optional
from celery import Celery

celery_broker_url: Optional[str] = os.environ.get("CELERY_BROKER_URL")
celery_result_backend: Optional[str] = os.environ.get("CELERY_RESULT_BACKEND")

celery_app: Celery = Celery("example")

celery_app.conf.update(
    broker_url=celery_broker_url,
    result_backend=celery_result_backend,
)

Define a simple task

In [ ]:
def say_hi(name: str) -> None:
    print(f"Hi, {name}!")
In [ ]:
celery_app.task(
    say_hi,
    name="tasks.say_hi",
    ignore_result=True,
)

Call a simple task

Start a Celery worker before-hand:

celery worker \
    --app example.celery:celery_app \
    --loglevel=info
In [ ]:
celery_app.send_task("tasks.say_hi", args=["Joe"])

Get a result from a task

In [ ]:
from datetime import datetime

def talking_clock() -> datetime:
    return datetime.now()
In [ ]:
celery_app.task(
    talking_clock,
    name="tasks.talking_clock",
    ignore_result=False
)
In [ ]:
from datetime import datetime
from celery.result import AsyncResult

talking_clock_task: AsyncResult = celery_app.send_task("tasks.talking_clock")
# store `talking_clock_task.id` somewhere for further use
In [ ]:
talking_clock_task_result = AsyncResult(talking_clock_task.id, app=celery_app)
now: datetime = talking_clock_task_result.get()

print(f"result: {now}")

Auto-retry failing tasks

In [ ]:
def failing_task() -> None:
    raise RuntimeError
In [ ]:
celery_app.task(
    failing_task,
    name="tasks.failing_task",
    ignore_result=True,
    autoretry_for=(RuntimeError,),
    max_retries=5,
    retry_backoff=True,
    retry_jitter=True,
)
In [ ]:
celery_app.send_task("tasks.failing_task")

Scheduling tasks

In [ ]:
def send_report(email: str) -> None:
    print(f"Sending report to {email}")
In [ ]:
celery_app.task(send_report, name="tasks.send_report", ignore_result=True)
In [ ]:
from celery import Celery, schedules

celery_app.conf.update(
    enable_utc=True,
    timezone="UTC",
    beat_schedule={
        "send-report-minutely": {
            "task": "tasks.send_report",
            "kwargs": {"email": "[email protected]"},
            "options": {"queue" : "default"},
            "schedule": schedules.crontab(
                minute="*",
                hour="*",
                day_of_week="*",
                day_of_month="*",
                month_of_year="*",
            ),
        },
    }
)

Start a Celery beat process (act as an autonomous producer):

celery beat \
    --app example.celery:celery_app \
    --loglevel=info

Warning: beware of distributed beat to only have one of the replicas executing tasks! (use alternative implementations)

Splitting tasks in queues

When starting a Celery worker, specify which queues to listen to:

celery worker \
    --app example.celery:celery_app \
    --queues=mailing,broadcast \
    --loglevel=info
In [ ]:
celery_app.send_task(
    "mailing.send_email",
    queue="mailing",
    kwargs={"to": "[email protected]"}
)
In [ ]:
celery.conf.update(
    task_routes={
        "mailing.*": "mailing",
    },
)

Pipelining tasks

Lots of possible workflows using the "canvas" feature, but the main ones are:

  • chain: chain tasks (with or without passing arguments)
  • group: parallelize tasks
  • chord: parallelize then executing a finalizing task

Eliminates the problem of a tasks wanting to call/wait other tasks!

In [ ]:
class User:
    name: str
    email: str

def get_all_users() -> List[User]:
    return [
        User(name="John Doe", email="[email protected]"),
        User(name="Jane Doe", email="[email protected]"),
    ]

celery_app.task(
    get_all_users,
    name="users.get_all",
    ignore_result=False
)
In [ ]:
def send_newsletter_to_all(all_users: List[User]) -> None:
    for u in all_users:
        print(f"Sending newsletter to {u.name}: {u.email}")
    
celery_app.task(
    send_newsletter_to_all,
    name="mailing.send_newsletter_to_all",
    ignore_result=True
)
In [ ]:
from celery import chain, signature

def send_newsletter():
    user_sig = signature(
        "users.get_all",
        queue="users"
    )
    newsletter_sig = signature(
        "mailing.send_newsletter_to_all",
        queue="mailing"
    )
    task_chain = chain(user_sig | newsletter_sig)
    task_chain.apply_async()

celery_app.task(
    send_newsletter,
    name="tasks.send_newsletter",
    ignore_result=True
)

Things to consider

  • Python only (experimental Node.js client with node-celery)
  • No asyncio (async / await) support for tasks results (coming to celery >= 5.0.0)
  • Some regressions recently introduced either within celery or kombu, be careful

Follow-up

  • Storing tasks results in SQL/NoSQL database
  • Testing Celery tasks (mocks)
  • Monitoring (flower)

Alternatives

Q&A