Meetup Python Grenoble - 2019/10/30
Manage your asynchronous tasks with Celery!
More and more use-cases everyday for non-blocking processes:
Mmmh, kinda, see for yourself:
Celery is a Python package abstracting task definitions and invocations, using a message-broker and a result-backend behind the scenes:
import os
from typing import Optional
from celery import Celery
celery_broker_url: Optional[str] = os.environ.get("CELERY_BROKER_URL")
celery_result_backend: Optional[str] = os.environ.get("CELERY_RESULT_BACKEND")
celery_app: Celery = Celery("example")
celery_app.conf.update(
broker_url=celery_broker_url,
result_backend=celery_result_backend,
)
def say_hi(name: str) -> None:
print(f"Hi, {name}!")
celery_app.task(
say_hi,
name="tasks.say_hi",
ignore_result=True,
)
Start a Celery worker before-hand:
celery worker \
--app example.celery:celery_app \
--loglevel=info
celery_app.send_task("tasks.say_hi", args=["Joe"])
from datetime import datetime
def talking_clock() -> datetime:
return datetime.now()
celery_app.task(
talking_clock,
name="tasks.talking_clock",
ignore_result=False
)
from datetime import datetime
from celery.result import AsyncResult
talking_clock_task: AsyncResult = celery_app.send_task("tasks.talking_clock")
# store `talking_clock_task.id` somewhere for further use
talking_clock_task_result = AsyncResult(talking_clock_task.id, app=celery_app)
now: datetime = talking_clock_task_result.get()
print(f"result: {now}")
def failing_task() -> None:
raise RuntimeError
celery_app.task(
failing_task,
name="tasks.failing_task",
ignore_result=True,
autoretry_for=(RuntimeError,),
max_retries=5,
retry_backoff=True,
retry_jitter=True,
)
celery_app.send_task("tasks.failing_task")
def send_report(email: str) -> None:
print(f"Sending report to {email}")
celery_app.task(send_report, name="tasks.send_report", ignore_result=True)
from celery import Celery, schedules
celery_app.conf.update(
enable_utc=True,
timezone="UTC",
beat_schedule={
"send-report-minutely": {
"task": "tasks.send_report",
"kwargs": {"email": "admin@domain.com"},
"options": {"queue" : "default"},
"schedule": schedules.crontab(
minute="*",
hour="*",
day_of_week="*",
day_of_month="*",
month_of_year="*",
),
},
}
)
Start a Celery beat process (act as an autonomous producer):
celery beat \
--app example.celery:celery_app \
--loglevel=info
Warning: beware of distributed beat to only have one of the replicas executing tasks! (use alternative implementations)
When starting a Celery worker, specify which queues to listen to:
celery worker \
--app example.celery:celery_app \
--queues=mailing,broadcast \
--loglevel=info
celery_app.send_task(
"mailing.send_email",
queue="mailing",
kwargs={"to": "joe@doe.com"}
)
celery.conf.update(
task_routes={
"mailing.*": "mailing",
},
)
Lots of possible workflows using the "canvas" feature, but the main ones are:
chain
: chain tasks (with or without passing arguments)group
: parallelize taskschord
: parallelize then executing a finalizing taskEliminates the problem of a tasks wanting to call/wait other tasks!
class User:
name: str
email: str
def get_all_users() -> List[User]:
return [
User(name="John Doe", email="john@doe.com"),
User(name="Jane Doe", email="jane@doe.com"),
]
celery_app.task(
get_all_users,
name="users.get_all",
ignore_result=False
)
def send_newsletter_to_all(all_users: List[User]) -> None:
for u in all_users:
print(f"Sending newsletter to {u.name}: {u.email}")
celery_app.task(
send_newsletter_to_all,
name="mailing.send_newsletter_to_all",
ignore_result=True
)
from celery import chain, signature
def send_newsletter():
user_sig = signature(
"users.get_all",
queue="users"
)
newsletter_sig = signature(
"mailing.send_newsletter_to_all",
queue="mailing"
)
task_chain = chain(user_sig | newsletter_sig)
task_chain.apply_async()
celery_app.task(
send_newsletter,
name="tasks.send_newsletter",
ignore_result=True
)
node-celery
)asyncio
(async
/ await
) support for tasks results (coming to celery >= 5.0.0
)celery
or kombu
, be carefulflower
)