使用Django+Flower+Celery来引入 / 从Django的外部调用Celery

芹菜是一种蔬菜。

    非同期のタスクキューです。RubyだとActiveJob、Sidekiq、Delayed_jobに相当します。ざっくり言うと非同期でアプリケーションを動かすためのフレームワークです。

将Celery引入Django。

添加包裹

celery
flower                 # Celeryのダッシュボード
django-celery-results  # タスクの状況を見れる
django-redis           # タスクのキューをredisにする場合
cached-property 

/config/celery.py -> 配置/celery.py

import os
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'xxxxxx') # Django側の設定と同じにする

app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY') # Celeryの設定ファイルの場所
app.autodiscover_tasks()

配置/__init__.py

    このファイルはceleryを起動するときに利用します。
from .celery import app as celery_app

__all__ = ('celery_app',)

配置/设置.py

DJANGO_APPS = [
    # 〜〜〜
    'django_celery_results',
]
CELERY_BROKER_URL = 'redis://redis:6379/0'
CELERY_TIMEZONE = 'Asia/Tokyo'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_TASK_TRACK_STARTED = True
CELERY_IMPORTS = ['worker.sample_task']  # タスクの場所

工作人员/示例任务.py

from celery import shared_task


@shared_task(name='sample_task_execute')   # 外部から呼び出ししたい場合は、nameをつける
def execute(something: str):
    print(f'============= something={something} =============')

app/views/sample_view.py 任务调用源

from worker.sample_task import execute

execute.delay('hogefuga') # メソッド名にdelayをつけて呼び出す

启动芹菜。

    CeleryはDjangoとは別に起動する必要があります。
# config/__init__.pyで定義したので、 -A config.celeryではなく、-A configと出来る
celery -A config worker -l INFO 

如果想要启动Flower的话

celery -A config flower --address=0.0.0.0 --port=5555

如果想要从Django外部调用Celery,

send_taskを使うことにより外部から呼び出せる

from celery import Celery

celery = Celery('webapi')
celery.conf.broker_url = 'redis://redis:6379/0'
celery.send_task('sample_task_execute', kwargs={'something': 'Hallo World'})

如果能够连接到DB,可以使用celery.conf.result_backend将结果保存到DB中。

广告
将在 10 秒后关闭
bannerAds