使用Django和Docker,在Celery beat上实现作业的定期执行
Celery是一个在Django中非常方便的库,可以用于异步处理和定期执行任务。然而,很遗憾的是,关于Celery的日文文章非常少,更找不到关于结合Docker的文章,让我感到非常困惑。
本文将分享关于Celery功能中可在Mac本地环境上实现定期执行任务的Celery Beat的实现方法和遇到的问题。由于这是一篇由Django初学者撰写的文章,如果有任何遗漏之处,请指出。
补充记录
我在Heroku上实现了Celery beat。
请参考此处,生产环境的实现与本地环境略有不同。
在Heroku的免费计划(1 Dyno)上部署Celery beat的方法
环境 is the Chinese paraphrase of “environment.”
mac OS 10.14
python3.7.3
Docker(Desktop for mac) 18.09.2
Django 2.2.4
Celery相关
Celery 4.3.0(Windows不支持)
django-celery-beat 1.5.0
django-celery-results 1.1.2
梵高定制版10.14
蟒蛇3.7.3
Docker(mac版)18.09.2
戴恩戈2.2.4
Celery相关
Celery 4.3.0(不支持Windows)
戴恩戈-莎莉-心跳1.5.0
戴恩戈-莎莉结果1.1.2
西芹和西芹节拍
我首先查看了官方文档,但对于第一次接触的人来说,可能会稍微难以把握整体情况。我也曾经有过同样的经历,可能也会有人直接跳到django-celery-beat这个步骤。重要的是要明白,仅仅通过pip安装django-celery-beat并不能使其工作。即使只是要定期执行任务,首先还是需要安装Celery本身。
另一个重要的事情是定期执行有两种方法。
-
- パターン(1)(django-celery-beatを使わないやり方)
- パターン(2)(django-celery-beatを使うやり方)
在公式文件中有如下所述。
默认的调度器是celery.beat.PersistentScheduler,它仅在本地shelve数据库文件中跟踪最后运行的时间。
还有django-celery-beat扩展,它将任务调度存储在Django数据库中,并提供一个方便的管理界面来运行时管理周期性任务。
模式(1)是将最后执行日期保存在本地的shelve中的方法。模式(2)是将计划保存在Django数据库中,以便可以在管理员仪表板上控制定期任务。
如果先记住这件事,我觉得会更快理解。虽然尝试了两种模式,但第二种模式绝对是推荐的。
实施步骤 (shí shī bù
假设除了Celery之外,Django在Docker上的基本设置已经完成,下面将详细列出步骤。
在官方教程的目录中,有一个“Django”项目,但Django的相关人员也首先建议查看“first-steps-with-celery”部分。
公式教程-入门教程-celery
作为最初的步骤,主要有以下四个步骤。
-
- 选择经纪人
-
- 安装Celery
-
- 启动工作器
- 执行任务
在这篇日文文章中出现了经纪人和工人等词汇,但是这篇日文文章对Celery的整体理解非常清晰,图文并茂,非常易懂。
如何在Mac本地实现Django的异步处理方法(使用Celery和Redis)
使用pip命令安装相关模块。
使用pip安装celery和redis(作为消息代理)。如果不使用这两个功能,无需安装。
celery==4.3.0
redis==3.3.0
django_celery_beat==1.5.0 #使用しない場合はインストール不要
django-celery-results==1.1.2 #使用しない場合はインストール不要
Django的编辑(通用)
根据上述内容,Celery beat的定期执行有两种方法。首先,我们将共同设置Django的配置。
プロジェクトルート/
|--manage.py
config/
|--__init__.py
|--settings.py
|--urls.py
|--celery.py
core(アプリ名)/
|--__init__.py
|--tasks.py
|--models.py
在config文件夹下创建一个名为celery.py的新文件,并按照以下方式进行编辑。
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
需要单独更改设置的地方是config.settings和Celery(‘config’)。将”config”替换为存储settings.py文件的文件夹名称即可。
芹菜的拍打模式(1)的执行
以下是关于第一种模式(不使用django-celery-beat)的Docker设置。由于官方教程没有提供Docker的实施方法,我参考了以下文章。
今天我学到了- Celery和Django和Docker。
在Github上发布的django-celery-docker-example。
Docker Compose配置
version: "3"
services:
db:
image: postgres:10-alpine
volumes:
- postgres_data:/var/lib/postgresql/data/
environment:
- POSTGRES_DB=app
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=supersecretpassword
ports:
- "5432:5432"
redis:
image: "redis:alpine"
app:
build:
context: .
ports:
- "8000:8000"
volumes:
- ./app:/app
command: >
sh -c "python manage.py wait_for_db &&
python manage.py runserver 0.0.0.0:8000"
environment:
- DB_HOST=db
- DB_NAME=app
- DB_USER=postgres
- DB_PASS=supersecretpassword
depends_on:
- db
- redis
celery:
build: .
command: celery -A config worker -l info
volumes:
- ./app:/app
environment:
- DB_HOST=db
- DB_NAME=app
- DB_USER=postgres
- DB_PASS=supersecretpassword
depends_on:
- app
- db
- redis
celery-beat:
build: .
command: celery -A config beat -l INFO --pidfile=
volumes:
- ./app:/app
environment:
- DB_HOST=db
- DB_NAME=app
- DB_USER=postgres
- DB_PASS=supersecretpassword
depends_on:
- app
- db
- redis
volumes:
postgres_data:
pid文件已经存在错误
连续启动Celery beat时出现以下错误。
celery-beat_1 | ERROR: Pidfile (celerybeat.pid) already exists.
当启动 celery beat 时,它会输出 celerybeat.pid 文件,但如果连续启动的话,文件已存在会导致错误。解决方法可以是每次事先删除 celerybeat.pid,或保存到 tmp 目录等。但我选择采用添加 –pidfile= 来避免输出 pid 的方法。
command: celery -A config beat -l INFO --pidfile=
Docker和Celery – 错误:Pidfile(celerybeat.pid)已经存在。
Django的settings.py配置
from celery.schedules import crontab
(中略)
# Celery config
CELERY_BROKER_URL = 'redis://redis:6379'
CELERY_RESULT_BACKEND = 'redis://redis:6379'
CELERY_BEAT_SCHEDULE = {
'hello': {
'task': 'core.tasks.hello',
'schedule': crontab() # execute every minute
}
}
crontab()的默认设置是每分钟执行一次。crontab的配置在官方教程中有详细说明。
创建并执行任务
在应用程序文件夹(例如core文件夹)中创建一个名为tasks.py的文件,并注册要执行的任务。
from celery import shared_task
@shared_task
def hello():
print('Hello!')
在@shared_task装饰器下注册要执行的任务。在此处设置的函数需要与settings.py的CELERY_BEAT_SCHEDULE中注册的任务匹配。
如果能够每隔一分钟输出以下结果,通过docker-compose up就算成功了。
celery-beat_1 | [2019-09-08 01:07:13,412: INFO/MainProcess] Writing entries...
celery-beat_1 | [2019-09-08 01:07:20,299: INFO/MainProcess] Scheduler: Sending due task hello (core.tasks.hello)
celery_1 | [2019-09-08 01:07:20,322: INFO/MainProcess] Received task: core.tasks.hello[91522887-c7a1-4334-a0a0-eb2587cff1f8]
celery_1 | [2019-09-08 01:07:20,324: WARNING/ForkPoolWorker-2] Hello!
celery_1 | [2019-09-08 01:07:20,352: INFO/ForkPoolWorker-2] Task core.tasks.hello[91522887-c7a1-4334-a0a0-eb2587cff1f8] succeeded in 0.028256000019609928s: None
Celery beat 的第二种模式的实现
以下是使用django_celery_beat的方法。官方教程中有相关部分。
celery==4.3.0
redis==3.3.0
django_celery_beat==1.5.0
django-celery-results==1.1.2
如果要使用django_celery_beat,就需要安装Celery和redis。虽然django-celery-results不是必需的,但如后面所述,它可以方便地通过管理仪表板查看任务的执行结果。
Docker Compose的配置
虽然基本相同于上述模式(1),但将 celery-beat 命令更改为以下方式。
celery-beat:
build: .
command: celery -A config beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler --pidfile=
volumes:
- ./app:/app
environment:
- DB_HOST=db
- DB_NAME=app
- DB_USER=postgres
- DB_PASS=supersecretpassword
depends_on:
- app
- db
- redis
在命令行中添加参数「–scheduler django_celery_beat.schedulers:DatabaseScheduler」。-pidfile=应与模式(1)一样必需。
django-celery-results的教程如下:
django-celery-results的官方教程
Django的settings.py文件的配置
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'core',
'django_celery_beat', #追加
'django_celery_results', #追加
]
(中略)
# Celery config
CELERY_BROKER_URL = 'redis://redis:6379'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'
执行迁移
docker-compose run app /bin/sh -c "python manage.py migrate"
注册并执行任务
打开”定期任务”的选项卡,然后添加任务。
与模式(1)相同,我们将要执行的任务注册到tasks.py中。由于我想要进行数据库的定期更新,所以现在我们将添加对模型对象的访问。
from celery import shared_task
from .models import Repository
@shared_task
def hello():
print('Hello!')
print('count=' ,Repository.objects.count())
使用Docker Compose 启动
如果在docker-compose up之前安装了django_celery_results,您可以在admin仪表板的CELERY RESULTS中查看结果。
通过Celery beat访问model对象(postgresql)时出现错误。
我列举了一个成功的例子,但是在从Celery beat访问模型对象(postgresql)时遇到了错误,困扰了数天。
#エラーメッセージ
celery-beat_1 | [2019-09-05 14:05:00,017: INFO/MainProcess] Scheduler: Sending due task hello (core.tasks.hello)
celery_1 | [2019-09-05 14:05:00,040: INFO/MainProcess] Received task: core.tasks.hello[7ec020bf-bc75-4bef-818d-fbbe44801f38]
celery_1 | [2019-09-05 14:05:00,073: ERROR/ForkPoolWorker-2] Task core.tasks.hello[7ec020bf-bc75-4bef-818d-fbbe44801f38] raised unexpected: OperationalError('could not connect to server: No such file or directory\n\tIs the server running locally and accepting\n\tconnections on Unix domain socket "/tmp/.s.PGSQL.5432"?\n')
celery_1 | Traceback (most recent call last):
celery_1 | File "/usr/local/lib/python3.7/site-packages/django/db/backends/base/base.py", line 217, in ensure_connection
celery_1 | self.connect()
celery_1 | File "/usr/local/lib/python3.7/site-packages/django/db/backends/base/base.py", line 195, in connect
celery_1 | self.connection = self.get_new_connection(conn_params)
celery_1 | File "/usr/local/lib/python3.7/site-packages/django/db/backends/postgresql/base.py", line 178, in get_new_connection
celery_1 | connection = Database.connect(**conn_params)
celery_1 | File "/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py", line 130, in connect
celery_1 | conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
celery_1 | psycopg2.OperationalError: could not connect to server: No such file or directory
celery_1 | Is the server running locally and accepting
celery_1 | connections on Unix domain socket "/tmp/.s.PGSQL.5432"?
最后的错误信息如下所示:
celery_1 | psycopg2.OperationalError: could not connect to server: No such file or directory
celery_1 | Is the server running locally and accepting
celery_1 | connections on Unix domain socket "/tmp/.s.PGSQL.5432"?
由于”/tmp/.s.PGSQL.5432″的存在,可以推断出可能发生了某种与Postgre有关的错误,但在网上搜索后发现,出现”/tmp/.s.PGSQL.5432″可能是因为Postgre没有启动而导致的错误。然而,Postgre容器正常运行,原因不明。
最后我看了一下docker-compose并自己发现了问题,即Celery和Celery beat没有进行postgre环境的配置,导致Celery无法访问postgre,这就是问题所在。
一旦将以下的配置添加到Docker Compose的Celery和Celery beat中,然后即可实现从Celery beat的定期执行任务中对数据库进行更新。
#CeleryとCelery beatに以下の記載追加
environment:
- DB_HOST=db
- DB_NAME=app
- DB_USER=postgres
- DB_PASS=supersecretpassword
任务未注册时发生错误。
当我试图将在tasks.py中以@shared_task方式定义的函数替换为其他名称并进行执行时,遇到了以下情况:任务未注册并引发了错误。
celery-beat_1 | [2019-09-23 04:16:47,995: INFO/MainProcess] Scheduler: Sending due task update (core.tasks.update)
celery_1 | [2019-09-23 04:16:48,000: ERROR/MainProcess] Received unregistered task of type 'core.tasks.update'.
celery_1 | The message has been ignored and discarded.
celery_1 |
celery_1 | Did you remember to import the module containing this task?
celery_1 | Or maybe you're using relative imports?
celery_1 |
celery_1 | Please see
celery_1 | http://docs.celeryq.org/en/latest/internals/protocol.html
celery_1 | for more information.
celery_1 |
celery_1 | The full contents of the message body was:
celery_1 | b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
celery_1 | Traceback (most recent call last):
celery_1 | File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
celery_1 | strategy = strategies[type_]
celery_1 | KeyError: 'core.tasks.update'
通常情况下,Celery.py中的autodiscover_tasks()函数应该会自动检索@shared task的定义并注册任务,但似乎有时无法正常工作。
app.autodiscover_tasks()
只需一种方式,作为解决策,在settings.py中添加以下内容后,成功注册的问题得到了解决。
CELERY_IMPORTS = ('core.tasks') #tasks.pyを格納してるフォルダ
结束
我們這次重點介紹了Celery的定期執行功能,但將來我們也想嘗試非同步處理。有了Celery,就可以在Django中進行批處理的工作。
另外,由于我尚未将自己所开发的应用部署到正式环境中,因此在想要在正式环境中运行Celery时可能会遇到一些问题。在部署后,我将更新这篇文章。非常感谢您的阅读。