使用Django和Docker,在Celery beat上实现作业的定期执行

Celery是一个在Django中非常方便的库,可以用于异步处理和定期执行任务。然而,很遗憾的是,关于Celery的日文文章非常少,更找不到关于结合Docker的文章,让我感到非常困惑。

本文将分享关于Celery功能中可在Mac本地环境上实现定期执行任务的Celery Beat的实现方法和遇到的问题。由于这是一篇由Django初学者撰写的文章,如果有任何遗漏之处,请指出。

补充记录

我在Heroku上实现了Celery beat。
请参考此处,生产环境的实现与本地环境略有不同。

在Heroku的免费计划(1 Dyno)上部署Celery beat的方法

环境 is the Chinese paraphrase of “environment.”

mac OS 10.14
python3.7.3
Docker(Desktop for mac) 18.09.2
Django 2.2.4
Celery相关
Celery 4.3.0(Windows不支持)
django-celery-beat 1.5.0
django-celery-results 1.1.2

梵高定制版10.14
蟒蛇3.7.3
Docker(mac版)18.09.2
戴恩戈2.2.4
Celery相关
Celery 4.3.0(不支持Windows)
戴恩戈-莎莉-心跳1.5.0
戴恩戈-莎莉结果1.1.2

西芹和西芹节拍

我首先查看了官方文档,但对于第一次接触的人来说,可能会稍微难以把握整体情况。我也曾经有过同样的经历,可能也会有人直接跳到django-celery-beat这个步骤。重要的是要明白,仅仅通过pip安装django-celery-beat并不能使其工作。即使只是要定期执行任务,首先还是需要安装Celery本身。

另一个重要的事情是定期执行有两种方法。

    • パターン(1)(django-celery-beatを使わないやり方)

 

    パターン(2)(django-celery-beatを使うやり方)

在公式文件中有如下所述。

默认的调度器是celery.beat.PersistentScheduler,它仅在本地shelve数据库文件中跟踪最后运行的时间。
还有django-celery-beat扩展,它将任务调度存储在Django数据库中,并提供一个方便的管理界面来运行时管理周期性任务。

模式(1)是将最后执行日期保存在本地的shelve中的方法。模式(2)是将计划保存在Django数据库中,以便可以在管理员仪表板上控制定期任务。

如果先记住这件事,我觉得会更快理解。虽然尝试了两种模式,但第二种模式绝对是推荐的。

实施步骤 (shí shī bù

假设除了Celery之外,Django在Docker上的基本设置已经完成,下面将详细列出步骤。
在官方教程的目录中,有一个“Django”项目,但Django的相关人员也首先建议查看“first-steps-with-celery”部分。

公式教程-入门教程-celery

作为最初的步骤,主要有以下四个步骤。

    1. 选择经纪人

 

    1. 安装Celery

 

    1. 启动工作器

 

    执行任务

在这篇日文文章中出现了经纪人和工人等词汇,但是这篇日文文章对Celery的整体理解非常清晰,图文并茂,非常易懂。

如何在Mac本地实现Django的异步处理方法(使用Celery和Redis)

使用pip命令安装相关模块。

使用pip安装celery和redis(作为消息代理)。如果不使用这两个功能,无需安装。

celery==4.3.0
redis==3.3.0
django_celery_beat==1.5.0     #使用しない場合はインストール不要
django-celery-results==1.1.2  #使用しない場合はインストール不要

Django的编辑(通用)

根据上述内容,Celery beat的定期执行有两种方法。首先,我们将共同设置Django的配置。

プロジェクトルート/
  |--manage.py
  config/
    |--__init__.py
    |--settings.py
    |--urls.py
    |--celery.py
  core(アプリ名)/
   |--__init__.py
   |--tasks.py
    |--models.py

在config文件夹下创建一个名为celery.py的新文件,并按照以下方式进行编辑。

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')

app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

需要单独更改设置的地方是config.settings和Celery(‘config’)。将”config”替换为存储settings.py文件的文件夹名称即可。

芹菜的拍打模式(1)的执行

以下是关于第一种模式(不使用django-celery-beat)的Docker设置。由于官方教程没有提供Docker的实施方法,我参考了以下文章。

今天我学到了- Celery和Django和Docker。

在Github上发布的django-celery-docker-example。

Docker Compose配置

version: "3"

services:
    db:
        image: postgres:10-alpine
        volumes:
            - postgres_data:/var/lib/postgresql/data/
        environment:
            - POSTGRES_DB=app
            - POSTGRES_USER=postgres
            - POSTGRES_PASSWORD=supersecretpassword
        ports:
            - "5432:5432"
    redis:
        image: "redis:alpine"
    app:
        build:
            context: .
        ports:
            - "8000:8000"
        volumes:
            - ./app:/app
        command: >
         sh -c "python manage.py wait_for_db &&
                python manage.py runserver 0.0.0.0:8000"
        environment:
            - DB_HOST=db
            - DB_NAME=app
            - DB_USER=postgres
            - DB_PASS=supersecretpassword
        depends_on:
            - db
            - redis
    celery:
        build: .
        command: celery -A config worker -l info
        volumes:
            - ./app:/app
        environment:
            - DB_HOST=db
            - DB_NAME=app
            - DB_USER=postgres
            - DB_PASS=supersecretpassword
        depends_on:
            - app
            - db
            - redis
    celery-beat:
        build: .
        command: celery -A config beat -l INFO --pidfile=
        volumes:
            - ./app:/app
        environment:
            - DB_HOST=db
            - DB_NAME=app
            - DB_USER=postgres
            - DB_PASS=supersecretpassword
        depends_on:
            - app
            - db
            - redis
volumes:
  postgres_data:

pid文件已经存在错误

连续启动Celery beat时出现以下错误。

celery-beat_1  | ERROR: Pidfile (celerybeat.pid) already exists.

当启动 celery beat 时,它会输出 celerybeat.pid 文件,但如果连续启动的话,文件已存在会导致错误。解决方法可以是每次事先删除 celerybeat.pid,或保存到 tmp 目录等。但我选择采用添加 –pidfile= 来避免输出 pid 的方法。

command: celery -A config beat -l INFO --pidfile=

Docker和Celery – 错误:Pidfile(celerybeat.pid)已经存在。

Django的settings.py配置

from celery.schedules import crontab
中略
# Celery config
CELERY_BROKER_URL = 'redis://redis:6379'
CELERY_RESULT_BACKEND = 'redis://redis:6379'
CELERY_BEAT_SCHEDULE = {
    'hello': {
        'task': 'core.tasks.hello',
        'schedule': crontab()  # execute every minute
    }
}

crontab()的默认设置是每分钟执行一次。crontab的配置在官方教程中有详细说明。

ExampleMeaningcrontab()1分毎に実行crontab(minute=0, hour=0)毎日0:00に実行crontab(minute=0, hour=’*/3′)3時間毎に実行: 0am, 3am, 6am, 9am, 正午, 3pm, 6pm, 9pm.crontab(minute=0, hour=’0,3,6,9,12,15,18,21′)同上crontab(day_of_week=’sunday’)日曜日に毎分ごとに実行

创建并执行任务

在应用程序文件夹(例如core文件夹)中创建一个名为tasks.py的文件,并注册要执行的任务。

from celery import shared_task


@shared_task
def hello():
    print('Hello!')

在@shared_task装饰器下注册要执行的任务。在此处设置的函数需要与settings.py的CELERY_BEAT_SCHEDULE中注册的任务匹配。

如果能够每隔一分钟输出以下结果,通过docker-compose up就算成功了。

celery-beat_1  | [2019-09-08 01:07:13,412: INFO/MainProcess] Writing entries...
celery-beat_1  | [2019-09-08 01:07:20,299: INFO/MainProcess] Scheduler: Sending due task hello (core.tasks.hello)
celery_1       | [2019-09-08 01:07:20,322: INFO/MainProcess] Received task: core.tasks.hello[91522887-c7a1-4334-a0a0-eb2587cff1f8]  
celery_1       | [2019-09-08 01:07:20,324: WARNING/ForkPoolWorker-2] Hello!
celery_1       | [2019-09-08 01:07:20,352: INFO/ForkPoolWorker-2] Task core.tasks.hello[91522887-c7a1-4334-a0a0-eb2587cff1f8] succeeded in 0.028256000019609928s: None

Celery beat 的第二种模式的实现

以下是使用django_celery_beat的方法。官方教程中有相关部分。

celery==4.3.0
redis==3.3.0
django_celery_beat==1.5.0
django-celery-results==1.1.2

如果要使用django_celery_beat,就需要安装Celery和redis。虽然django-celery-results不是必需的,但如后面所述,它可以方便地通过管理仪表板查看任务的执行结果。

Docker Compose的配置

虽然基本相同于上述模式(1),但将 celery-beat 命令更改为以下方式。

celery-beat:
        build: .
        command: celery -A config beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler --pidfile=
        volumes:
            - ./app:/app
        environment:
            - DB_HOST=db
            - DB_NAME=app
            - DB_USER=postgres
            - DB_PASS=supersecretpassword
        depends_on:
            - app
            - db
            - redis

在命令行中添加参数「–scheduler django_celery_beat.schedulers:DatabaseScheduler」。-pidfile=应与模式(1)一样必需。

django-celery-results的教程如下:
django-celery-results的官方教程

Django的settings.py文件的配置

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'core',
    'django_celery_beat',   #追加
    'django_celery_results', #追加
]
中略

# Celery config
CELERY_BROKER_URL = 'redis://redis:6379'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'

执行迁移

docker-compose run app /bin/sh -c "python manage.py migrate"

注册并执行任务

スクリーンショット 2019-09-08 10.27.12.jpg

打开”定期任务”的选项卡,然后添加任务。

django_admin_minute.jpg

与模式(1)相同,我们将要执行的任务注册到tasks.py中。由于我想要进行数据库的定期更新,所以现在我们将添加对模型对象的访问。

from celery import shared_task
from .models import Repository


@shared_task
def hello():
    print('Hello!')
    print('count=' ,Repository.objects.count())

使用Docker Compose 启动

如果在docker-compose up之前安装了django_celery_results,您可以在admin仪表板的CELERY RESULTS中查看结果。

task_result.jpg

通过Celery beat访问model对象(postgresql)时出现错误。

我列举了一个成功的例子,但是在从Celery beat访问模型对象(postgresql)时遇到了错误,困扰了数天。

#エラーメッセージ
celery-beat_1  | [2019-09-05 14:05:00,017: INFO/MainProcess] Scheduler: Sending due task hello (core.tasks.hello)
celery_1       | [2019-09-05 14:05:00,040: INFO/MainProcess] Received task: core.tasks.hello[7ec020bf-bc75-4bef-818d-fbbe44801f38]  
celery_1       | [2019-09-05 14:05:00,073: ERROR/ForkPoolWorker-2] Task core.tasks.hello[7ec020bf-bc75-4bef-818d-fbbe44801f38] raised unexpected: OperationalError('could not connect to server: No such file or directory\n\tIs the server running locally and accepting\n\tconnections on Unix domain socket "/tmp/.s.PGSQL.5432"?\n')
celery_1       | Traceback (most recent call last):
celery_1       |   File "/usr/local/lib/python3.7/site-packages/django/db/backends/base/base.py", line 217, in ensure_connection
celery_1       |     self.connect()
celery_1       |   File "/usr/local/lib/python3.7/site-packages/django/db/backends/base/base.py", line 195, in connect
celery_1       |     self.connection = self.get_new_connection(conn_params)
celery_1       |   File "/usr/local/lib/python3.7/site-packages/django/db/backends/postgresql/base.py", line 178, in get_new_connection
celery_1       |     connection = Database.connect(**conn_params)
celery_1       |   File "/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py", line 130, in connect
celery_1       |     conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
celery_1       | psycopg2.OperationalError: could not connect to server: No such file or directory
celery_1       |    Is the server running locally and accepting
celery_1       |    connections on Unix domain socket "/tmp/.s.PGSQL.5432"?

最后的错误信息如下所示:

celery_1       | psycopg2.OperationalError: could not connect to server: No such file or directory
celery_1       |    Is the server running locally and accepting
celery_1       |    connections on Unix domain socket "/tmp/.s.PGSQL.5432"?

由于”/tmp/.s.PGSQL.5432″的存在,可以推断出可能发生了某种与Postgre有关的错误,但在网上搜索后发现,出现”/tmp/.s.PGSQL.5432″可能是因为Postgre没有启动而导致的错误。然而,Postgre容器正常运行,原因不明。

最后我看了一下docker-compose并自己发现了问题,即Celery和Celery beat没有进行postgre环境的配置,导致Celery无法访问postgre,这就是问题所在。

一旦将以下的配置添加到Docker Compose的Celery和Celery beat中,然后即可实现从Celery beat的定期执行任务中对数据库进行更新。

#CeleryとCelery beatに以下の記載追加
       environment:
            - DB_HOST=db
            - DB_NAME=app
            - DB_USER=postgres
            - DB_PASS=supersecretpassword

任务未注册时发生错误。

当我试图将在tasks.py中以@shared_task方式定义的函数替换为其他名称并进行执行时,遇到了以下情况:任务未注册并引发了错误。

celery-beat_1  | [2019-09-23 04:16:47,995: INFO/MainProcess] Scheduler: Sending due task update (core.tasks.update)
celery_1       | [2019-09-23 04:16:48,000: ERROR/MainProcess] Received unregistered task of type 'core.tasks.update'.
celery_1       | The message has been ignored and discarded.
celery_1       | 
celery_1       | Did you remember to import the module containing this task?
celery_1       | Or maybe you're using relative imports?
celery_1       | 
celery_1       | Please see
celery_1       | http://docs.celeryq.org/en/latest/internals/protocol.html
celery_1       | for more information.
celery_1       | 
celery_1       | The full contents of the message body was:
celery_1       | b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
celery_1       | Traceback (most recent call last):
celery_1       |   File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
celery_1       |     strategy = strategies[type_]
celery_1       | KeyError: 'core.tasks.update'

通常情况下,Celery.py中的autodiscover_tasks()函数应该会自动检索@shared task的定义并注册任务,但似乎有时无法正常工作。

app.autodiscover_tasks()

只需一种方式,作为解决策,在settings.py中添加以下内容后,成功注册的问题得到了解决。

CELERY_IMPORTS = ('core.tasks') #tasks.pyを格納してるフォルダ

结束

我們這次重點介紹了Celery的定期執行功能,但將來我們也想嘗試非同步處理。有了Celery,就可以在Django中進行批處理的工作。

另外,由于我尚未将自己所开发的应用部署到正式环境中,因此在想要在正式环境中运行Celery时可能会遇到一些问题。在部署后,我将更新这篇文章。非常感谢您的阅读。

广告
将在 10 秒后关闭
bannerAds