Celery多项目运行worker的方法

2024年4月12日 09:44 by wst

异步处理

Celery作为异步任务处理的利器,受到越来越多的追捧,特别对python开发人员来说,几乎是必备技能。

也许已经习惯了单项目启动celery,多项目情况下怎么处理呢?

下面来聊一下我的思路,以及我的处理过程。

1 任务中心

刚开始的想法是,建一个任务中心,所有项目的异步/延迟/定时任务全部抛给这个任务中心。

下面是实验过程:

创建多个项目的文件夹,看看worker启动的时候是否可以自动发现任务

目录结构如下:

demo
├── main.py
├── project1
│   └── tasks.py
├── project2
│   └── tasks.py
├── readme.md
└── tasks.py

具体代码如下:

### demo/main.py #####################################
# 主程序
from celery import Celery

# 创建celery实例对象
app = Celery('demo', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
# 通过app对象加载配置,config对应上面的config.py的导入路径,因为启动消费者命令是在my_celery的同级目录,你也可以跳过config.py,使用app.conf的属性点出config.py的所有配置,比如app.conf.timezone = 'Asia/Hong_Kong'等


# 加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2"]),因为我的目录下只有一个tasks.py,如果你的目录结构下有多个tasks.py,结构类似task1/tasks.py,task2/tasks.py  那这里就应该app.autodiscover_tasks(["my_celery.task1", "my_celery.task2"])
app.autodiscover_tasks(["demo", "demo.project1", "demo.project2"])

### demo/tasks.py ########################################
# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
from demo.main import app
from celery import Task
import time
import logging

logging.basicConfig(format='[%(filename)s:%(lineno)d] %(asctime)s - %(levelname)s : %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S %p',
                    level=20)


# 自己定义的任务类
class MyTask(Task):
    def run(self, *args, **kwargs):
        pass

    abstract = True

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logging.error(f'{task_id}执行失败')

    def on_success(self, retval, task_id, args, kwargs):
        logging.info(f'{task_id}执行成功')


@app.task(base=MyTask)
def send_num(**kwargs):

    print(f"收到{kwargs}")
    time.sleep(1)
    return f"{kwargs} OK"


@app.task(base=MyTask)  # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
def task_start(**kwargs):
    logging.info(f'异步任务接口接收到请求,参数为{kwargs}')
    start_time = time.time()
    # 具体业务代码
    time.sleep(7)
    print('执行任务')
    logging.info(f"任务执行完成,耗时:{time.time() - start_time}")
    return '完成'

### demo/project1/tasks.py ##################
from demo.main import app
@app.task
def add_num(a, b):
    c = a + b
    return c

### demo/project2/tasks.py ###################
from demo.main import app
@app.task
def mutiply_num(a, b):
    c = a * b
    return c

启动命令,在demo的外层文件夹运行:

celery -A demo.main worker  --loglevel=info -c 1

输出:

/usr/local/lib/python3.10/site-packages/celery/platforms.py:829: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(

 -------------- celery@wst-comp-01-0 v5.3.6 (emerald-rush)
--- ***** ----- 
-- ******* ---- Linux-3.10.0-1160.42.2.el7.x86_64-x86_64-with-glibc2.36 2024-04-12 08:59:54
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         demo:0x7f8558a9bd90
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/1
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . demo.project1.tasks.add_num
  . demo.project2.tasks.mutiply_num
  . demo.tasks.send_num
  . demo.tasks.task_start

[2024-04-12 08:59:54,579: WARNING/MainProcess] /usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-04-12 08:59:54,585: INFO/MainProcess] Connected to redis://localhost:6379/0
[2024-04-12 08:59:54,585: WARNING/MainProcess] /usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-04-12 08:59:54,586: INFO/MainProcess] mingle: searching for neighbors
[2024-04-12 08:59:55,592: INFO/MainProcess] mingle: all alone
[2024-04-12 08:59:55,599: INFO/MainProcess] celery@wst-comp-01-0 ready.

结论:从日志输出来看,成功发现了project1和project2下面的任务。

[tasks]
  . demo.project1.tasks.add_num
  . demo.project2.tasks.mutiply_num
  . demo.tasks.send_num
  . demo.tasks.task_start

问题:要想这么做,得把所有项目的业务代码拷过来,这样代码一下子就重复了很多,且容易造成不一致。

能不能,每个项目自动自己的worker呢?

2 单项目自动自己的worker

为了实验这个思路,把demo文件夹拷贝出来了两份,分别叫funer和lucy,注意修改文件内容,只要是出现demo的地方全部改成funer; funcy也是同样的改法。

启动命令和对应输出分别如下:

注意这里的项目和上文中说的项目概念又扩大了,这里整个demo作为一个项目。

1. demo项目

root@wst-comp-01-0: celery -A demo.main worker  --loglevel=info -c 1
/usr/local/lib/python3.10/site-packages/celery/platforms.py:829: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(

 -------------- celery@wst-comp-01-0 v5.3.6 (emerald-rush)
--- ***** ----- 
-- ******* ---- Linux-3.10.0-1160.42.2.el7.x86_64-x86_64-with-glibc2.36 2024-04-12 08:59:54
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         demo:0x7f8558a9bd90
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/1
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . demo.project1.tasks.add_num
  . demo.project2.tasks.mutiply_num
  . demo.tasks.send_num
  . demo.tasks.task_start

[2024-04-12 08:59:54,579: WARNING/MainProcess] /usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-04-12 08:59:54,585: INFO/MainProcess] Connected to redis://localhost:6379/0
[2024-04-12 08:59:54,585: WARNING/MainProcess] /usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-04-12 08:59:54,586: INFO/MainProcess] mingle: searching for neighbors
[2024-04-12 08:59:55,592: INFO/MainProcess] mingle: all alone
[2024-04-12 08:59:55,599: INFO/MainProcess] celery@wst-comp-01-0 ready.

2. funer项目

root@wst-comp-01-0: celery -A funer.main worker  --loglevel=info -c 1
/usr/local/lib/python3.10/site-packages/celery/platforms.py:829: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(

 -------------- celery@wst-comp-01-0 v5.3.6 (emerald-rush)
--- ***** ----- 
-- ******* ---- Linux-3.10.0-1160.42.2.el7.x86_64-x86_64-with-glibc2.36 2024-04-12 09:03:21
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         funer:0x7f7bdc43ee60
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/1
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . funer.project1.tasks.add_num
  . funer.project2.tasks.mutiply_num
  . funer.tasks.send_num
  . funer.tasks.task_start

[2024-04-12 09:03:21,999: WARNING/MainProcess] /usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-04-12 09:03:22,004: INFO/MainProcess] Connected to redis://localhost:6379/0
[2024-04-12 09:03:22,004: WARNING/MainProcess] /usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-04-12 09:03:22,005: INFO/MainProcess] mingle: searching for neighbors
[2024-04-12 09:03:23,011: INFO/MainProcess] mingle: all alone
[2024-04-12 09:03:23,019: INFO/MainProcess] celery@wst-comp-01-0 ready.

3. lucy项目

root@wst-comp-01-0: celery -A lucy.main worker  --loglevel=info -c 1
/usr/local/lib/python3.10/site-packages/celery/platforms.py:829: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(

 -------------- celery@wst-comp-01-0 v5.3.6 (emerald-rush)
--- ***** ----- 
-- ******* ---- Linux-3.10.0-1160.42.2.el7.x86_64-x86_64-with-glibc2.36 2024-04-12 09:05:28
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         lucy:0x7f4be7093790
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/1
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . lucy.project1.tasks.add_num
  . lucy.project2.tasks.mutiply_num
  . lucy.tasks.send_num
  . lucy.tasks.task_start

[2024-04-12 09:05:28,058: WARNING/MainProcess] /usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-04-12 09:05:28,063: INFO/MainProcess] Connected to redis://localhost:6379/0
[2024-04-12 09:05:28,063: WARNING/MainProcess] /usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:507: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
  warnings.warn(

[2024-04-12 09:05:28,064: INFO/MainProcess] mingle: searching for neighbors
[2024-04-12 09:05:29,071: WARNING/MainProcess] /usr/local/lib/python3.10/site-packages/celery/app/control.py:56: DuplicateNodenameWarning: Received multiple replies from node name: celery@wst-comp-01-0.
Please make sure you give each node a unique nodename using
the celery worker `-n` option.
  warnings.warn(DuplicateNodenameWarning(

[2024-04-12 09:05:29,071: INFO/MainProcess] mingle: all alone
[2024-04-12 09:05:29,079: INFO/MainProcess] celery@wst-comp-01-0 ready.

3 调用方法

上面所说的都是启动worker的方法,怎么调用任务呢?大家可以发表下自己的看法。

4 调用实践

后来调用时候发现,funer中的任务会跑到demo的worker下执行。会提示找不到对应任务。

解决方法:通过指定队列,让它们相互之间隔离。

以funer为例说明,其他都一样。主要分为启动和调用,下面具体说明。

启动worker

用-Q参数指定worker监听的队列funer。

celery -A funer.main worker -Q funer  --loglevel=info -c 1

调用任务

调用任务的时候,指定任务放到funer队列。

from main import app
r1 = app.send_task("funer.project2.tasks.mutiply_num", args=(3,4), queue="funer")
r1.get()

 

5 总结

上面启动的worker的时候,用的redis配置是相同的,但这不会相互影响,因为使用的队列是不同的。

每个项目可以启动自己的worker,发送任务到自己的队列,这样还方便,不用再建立任务中心了。


Comments(0) Add Your Comment

Not Comment!