Django 整合 Celery,实现异步任务、定时任务。组件版本

1
2
3
4
5
Django                3.2
celery                4.4.7
djangorestframework   3.12.4
django-celery-beat    2.2.0
django-celery-results 2.0.1

django celery 实现异步任务

Step 0: 新建 django 项目

1
2
3
4
5
django-admin startproject tutorial
cd tutorial
django-admin startapp feedback
# 安装 rabbitmq,默认用户名密码 guest/guest
# 或者使用 redis

celery 配置

Step 1: 在settings.py 同级目录新建文件celery.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tutorial.settings')
app = Celery('tutorial')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

其中的 namespace 指定了setttings.py配置项前缀。

修改__init__.py

1
2
3
4
from __future__ import absolute_import
from .celery import app as celery_app

__all__ = ('celery_app',)

Step 2: 修改settings.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Celery settings
CELERY_BROKER_URL = 'amqp://guest:guest@localhost'
CELERY_RESULT_BACKEND = 'rpc://'
#CELERY_RESULT_BACKEND = 'django-db'
#CELERY_BROKER_URL = 'redis://localhost:6379/1'
#CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'

CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = "Asia/Shanghai"

简单异步任务

修改settings.py

1
2
3
4
5
INSTALLED_APPS = [
    'django_celery_results',

    'feedback.apps.FeedbackConfig',
]

在 app feedback中新建tasks.py

1
2
3
4
5
6
7
import time
from celery import shared_task

@shared_task
def create_task(task_type):
    time.sleep(int(task_type) * 10)
    return True

修改views.py

1
2
3
4
5
6
7
8
9
from django.views.decorators.csrf import csrf_exempt
from .tasks import create_task

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        task = create_task.delay(int(task_type))
        return JsonResponse({"task_id": task.id}, status=202)

修改urls.py

1
2
3
4
5
6
import feedback.views as feedback_views

urlpatterns = [
    path('admin/', admin.site.urls),
    path('async_add/', feedback_views.async_add_task),
]

测试

1
2
3
4
5
# 启动 celery
celery -A tutorial worker -l info
# 测试
curl -F type=1 http://localhost:8000/tasks/
# 输出类似:{"task_id": "bbb1a471-db36-442f-a751-4ddff9533149"}

异步任务队列

多个异步任务,每个任务分配在不同的queue中。

下例中只写了一个加法任务,可自行添加队列与任务。

settings.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from kombu import Queue, Exchange
CELERY_TASK_QUEUES = {
    Queue("celery", Exchange("celery"), routing_key="celery"),
    Queue("add_queue", Exchange("compute_node"), routing_key="add_task"),
}
CELERY_TASK_ROUTES = {
    # 名称可用 feedback.task.*,多个匹配规则为最长匹配
    'feedback.tasks.add': {
        'queue': 'add_queue',
        'routing_key': 'add_task'
    }
}

tasks.py

1
2
3
@shared_task
def add(x, y):
    return x + y

views.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def async_add_task(request):
    arg1 = 1
    arg2 = 2
    result = add.apply_async(
        args=(
            arg1,
            arg2,
        ),
        # 除配置文件中配置外,代码中可自己指定
        # queue="add_queue",
        # routing_key="add_task",
        # priority=0,
        # exchange="compute_node",
    )
    task_status = AsyncResult(result.task_id)
    return JsonResponse(
        {
            "input_args": [arg1, arg2],
            "task_id": result.task_id,
            "result": task_status.get(),
        },
        status=200
    )

urls.py修改略。

1
2
# 每个 worker 执行不同的任务
celery -A celery_app worker -l info -Q add_task 

定时任务

修改settings.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
ALLOWED_HOSTS = ['*']

INSTALLED_APPS = [
    'restframework',
    'django_celery_beat',
]

REST_FRAMEWORK = {
    'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
    'PAGE_SIZE': 10
}

from celery.schedules import crontab
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_BEAT_SCHEDULE = {
    'crawl_currency': {
        'task': 'feedback.tasks.crawl_currency',
        'schedule': crontab('*/1')
    }
}

在基中我们定义一个定时任务feedback.tasks.crawl_currency,每分钟执行一次,从https://coinranking.com中获取加密货币信息(需要使用代理)。

tasks.py中添加

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import time
import requests
from celery import shared_task
from bs4 import BeautifulSoup

from .models import Cryptocurrency

@shared_task
# do some heavy stuff
def crawl_currency():
    print("Crawling data and creating objects in database ..")
    proxy = {"http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890"}
    req = requests.get(
        "https://coinranking.com", headers={"User-Agent": "Mozilla/5.0"}, proxies=proxy
    )
    html = req.content
    bs = BeautifulSoup(html, "html.parser")
    # Find first 5 table rows
    rows = bs.find("tbody", class_="table__body").find_all("tr", class_="table__row")[
        0:5
    ]
    for row in rows:
        cryptocurrency = (
            row.find("span", class_="profile__name")
            .get_text()
            .strip()
            .replace("\n", "")
        )
        values = row.find_all("div", class_="valuta")
        price = values[0].get_text().strip().replace("\n", "")
        market_cap = values[1].get_text().strip().replace("\n", "")
        change = (
            row.find("div", class_="change")
            .find("span")
            .get_text()
            .strip()
            .replace("\n", "")
        )
        print(
            {
                "cryptocurrency": cryptocurrency,
                "price": price,
                "market_cap": market_cap,
                "change": change,
            }
        )
        # Create object in database from crawled data
        Cryptocurrency.objects.create(
            cryptocurrency=cryptocurrency,
            price=price,
            market_cap=market_cap,
            change=change,
        )
        # Sleep 3 seconds to avoid any errors
        time.sleep(3)

modes.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from django.db import models

# Create your models here.
class Cryptocurrency(models.Model): 
    cryptocurrency = models.CharField(max_length=100)
    price = models.CharField(max_length=100)
    market_cap = models.CharField(max_length=100)
    change = models.CharField(max_length=100)

    def __str__(self):
        return self.cryptocurrency

serializers.py

1
2
3
4
5
6
7
8
from rest_framework import serializers
from .models import Cryptocurrency


class CryptocurrencySerializer(serializers.ModelSerializer):
    class Meta:
        model = Cryptocurrency
        fields = ["cryptocurrency", "price", "market_cap", "change"]

views.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from rest_framework import generics
from .models import Cryptocurrency
from .serializers import CryptocurrencySerializer


class ListCryptocurrencyView(generics.ListAPIView):
    """
    Provides a get method handler.
    """

    queryset = Cryptocurrency.objects.all()
    serializer_class = CryptocurrencySerializer

urls.py

1
2
3
4
5
6
7
import feedback.views as feedback_views

urlpatterns = [
    path('admin/', admin.site.urls),
    path('async_add/', feedback_views.async_add_task),
    path('currency/', feedback_views.ListCryptocurrencyView.as_view()),
]

测试

1
2
3
# celery -A tutorial beat -l info
# celery worker 和 django runserver 都要启动
# 浏览器访问 http://localhost:8000/currency/

参考: