2024年1月21日 15:05 by wst
异步处理今天部署一个包含异步任务处理的项目,其中有个任务是20秒后给用户发信息。踩过的坑包括:
1. 部署celery worker为守护进程的时候,需要在两个地方设置:/etc/default/celeryd 和 /etc/init.d/celeryd, 其中/etc/default/celeryd中还要包含设置环境变量的行:
export DJANGO_SETTINGS_MODULE='wanhuqianjia.settings'
然后设置/etc/init.d/celeryd为可执行文件:
sudo chmod +x /etc/init.d/celeryd
2. 在部署环境的时候,需要设置包的初始化文件:wanhuqianjia/__init__.py
import pymysql
from .celery import app as celery_app
pymysql.install_as_MySQLdb()
__all__ = ('celery_app',)
3. 安装完django-celery-results之后,需要在配置文件wanhuqianjia/settings.py中添加以下内容:
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'source',
'bills',
'authen',
'tools.apps.ToolsConfig',
'rights.apps.RightsConfig',
'business.apps.BusninessConfig',
'django_celery_results', # 注意这行
]
然后做数据库的迁移:
python manage.py migrate django_celery_results
项目wanhuqianjia目录结构如下:
.
├── authen
│ ├── admin.py
│ ├── apps.py
│ ├── __init__.py
│ ├── migrations
│ ├── models.py
│ ├── __pycache__
│ ├── tasks.py
│ ├── tests.py
│ ├── urls.py
│ ├── views.py
│ └── wxlib
├── business
│ ├── admin.py
│ ├── apps.py
│ ├── __init__.py
│ ├── migrations
│ ├── models.py
│ ├── __pycache__
│ ├── tests.py
│ ├── urls.py
│ └── views.py
├── db.sqlite3
├── manage.py
├── README.md
├── requirements.txt
├── restart.sh
├── start.sh
├── stop.sh
├── templates
│ ├── authen
│ ├── business
│ └── tools
├── tools
│ ├── admin.py
│ ├── apps.py
│ ├── __init__.py
│ ├── migrations
│ ├── models.py
│ ├── __pycache__
│ ├── tests.py
│ ├── urls.py
│ └── views.py
├── utils
│ ├── base_model.py
│ ├── __init__.py
│ ├── migrate_gitlab.py
│ ├── migrate.py
│ └── __pycache__
├── wanhuqianjia
│ ├── asgi.py
│ ├── celery.py
│ ├── __init__.py
│ ├── __pycache__
│ ├── settings.py
│ ├── urls.py
│ ├── views.py
│ └── wsgi.py
└── whqj_uwsgi.ini
列出目录结构之后,下面阐述具体步骤:
1. 配置Django项目的配置文件wanhuqianjia/settings.py,这里只列出redis和celery相关的配置
# celery 设置-------------------------------------
CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'django-cache'
CELERY_BROKER_URL = REDIS # Broker配置,使用Redis作为消息中间件
CELERY_ACCEPT_CONTENT = ['json'] # 可接受的内容格式
CELERY_TASK_SERIALIZER = 'json' # 结果序列化方案
CELERY_RESULT_BACKEND = 'django-db' # 使用django orm 作为结果存储
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
CELERY_WORKER_CONCURRENCY = 4
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
2. 配置celery为守护进程
文件内容:/etc/default/celeryd
export DJANGO_SETTINGS_MODULE='wanhuqianjia.settings'
# Names of nodes to start
# most people will only start one node:
CELERYD_NODES="worker1"
# but you can also start multiple and configure settings
# for each in CELERYD_OPTS
#CELERYD_NODES="worker1 worker2 worker3"
# alternatively, you can specify the number of nodes to start:
#CELERYD_NODES=10
# Absolute or relative path to the 'celery' command:
CELERY_BIN="/home/ubuntu/miniconda3/envs/whqj/bin/celery"
#CELERY_BIN="/virtualenvs/def/bin/celery"
# App instance to use
# comment out this line if you don't use an app
CELERY_APP="wanhuqianjia"
# or fully qualified:
#CELERY_APP="proj.tasks:app"
# Where to chdir at start.
CELERYD_CHDIR="/home/ubuntu/workspace/wanhuqianjia/"
# Extra command-line arguments to the worker
CELERYD_OPTS="--time-limit=300 --concurrency=4"
# Configure node-specific settings by appending node name to arguments:
#CELERYD_OPTS="--time-limit=300 -c 8 -c:worker2 4 -c:worker3 2 -Ofair:worker1"
# Set logging level to DEBUG
CELERYD_LOG_LEVEL="DEBUG"
# %n will be replaced with the first part of the nodename.
CELERYD_LOG_FILE="/var/log/celery/%n%I.log"
CELERYD_PID_FILE="/var/run/celery/%n.pid"
# Workers should run as an unprivileged user.
# You need to create this user manually (or you can choose
# a user/group combination that already exists (e.g., nobody).
CELERYD_USER="ubuntu"
CELERYD_GROUP="ubuntu"
# If enabled pid and log directories will be created if missing,
# and owned by the userid/group configured.
CELERY_CREATE_DIRS=1
文件内容:/etc/init.d/celeryd
#!/bin/sh -e
# ============================================
# celeryd - Starts the Celery worker daemon.
# ============================================
#
# :Usage: /etc/init.d/celeryd {start|stop|force-reload|restart|try-restart|status}
# :Configuration file: /etc/default/celeryd (or /usr/local/etc/celeryd on BSD)
#
# See https://docs.celeryq.dev/en/latest/userguide/daemonizing.html#generic-init-scripts
### BEGIN INIT INFO
# Provides: celeryd
# Required-Start: $network $local_fs $remote_fs
# Required-Stop: $network $local_fs $remote_fs
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: celery task worker daemon
### END INIT INFO
#
#
# To implement separate init-scripts, copy this script and give it a different
# name. That is, if your new application named "little-worker" needs an init,
# you should use:
#
# cp /etc/init.d/celeryd /etc/init.d/little-worker
#
# You can then configure this by manipulating /etc/default/little-worker.
#
VERSION=5.3.6
echo "celery init v${VERSION}."
if [ $(id -u) -ne 0 ]; then
echo "Error: This program can only be used by the root user."
echo " Unprivileged users must use the 'celery multi' utility, "
echo " or 'celery worker --detach'."
exit 1
fi
origin_is_runlevel_dir () {
set +e
dirname $0 | grep -q "/etc/rc.\.d"
echo $?
}
# Can be a runlevel symlink (e.g., S02celeryd)
if [ $(origin_is_runlevel_dir) -eq 0 ]; then
SCRIPT_FILE=$(readlink "$0")
else
SCRIPT_FILE="$0"
fi
SCRIPT_NAME="$(basename "$SCRIPT_FILE")"
DEFAULT_USER="wst"
DEFAULT_PID_FILE="/var/run/celery/%n.pid"
DEFAULT_LOG_FILE="/var/log/celery/%n%I.log"
DEFAULT_LOG_LEVEL="INFO"
DEFAULT_NODES="celery"
DEFAULT_CELERYD="-m celery worker --detach"
if [ -d "/etc/default" ]; then
CELERY_CONFIG_DIR="/etc/default"
else
CELERY_CONFIG_DIR="/usr/local/etc"
fi
CELERY_DEFAULTS=${CELERY_DEFAULTS:-"$CELERY_CONFIG_DIR/${SCRIPT_NAME}"}
# Make sure executable configuration script is owned by root
_config_sanity() {
local path="$1"
local owner=$(ls -ld "$path" | awk '{print $3}')
local iwgrp=$(ls -ld "$path" | cut -b 6)
local iwoth=$(ls -ld "$path" | cut -b 9)
if [ "$(id -u $owner)" != "0" ]; then
echo "Error: Config script '$path' must be owned by root!"
echo
echo "Resolution:"
echo "Review the file carefully, and make sure it hasn't been "
echo "modified with malicious intent. When sure the "
echo "script is safe to execute with superuser privileges "
echo "you can change ownership of the script:"
echo " $ sudo chown root '$path'"
exit 1
fi
if [ "$iwoth" != "-" ]; then # S_IWOTH
echo "Error: Config script '$path' cannot be writable by others!"
echo
echo "Resolution:"
echo "Review the file carefully, and make sure it hasn't been "
echo "modified with malicious intent. When sure the "
echo "script is safe to execute with superuser privileges "
echo "you can change the scripts permissions:"
echo " $ sudo chmod 640 '$path'"
exit 1
fi
if [ "$iwgrp" != "-" ]; then # S_IWGRP
echo "Error: Config script '$path' cannot be writable by group!"
echo
echo "Resolution:"
echo "Review the file carefully, and make sure it hasn't been "
echo "modified with malicious intent. When sure the "
echo "script is safe to execute with superuser privileges "
echo "you can change the scripts permissions:"
echo " $ sudo chmod 640 '$path'"
exit 1
fi
}
if [ -f "$CELERY_DEFAULTS" ]; then
_config_sanity "$CELERY_DEFAULTS"
echo "Using config script: $CELERY_DEFAULTS"
. "$CELERY_DEFAULTS"
fi
# Sets --app argument for CELERY_BIN
CELERY_APP_ARG=""
if [ ! -z "$CELERY_APP" ]; then
CELERY_APP_ARG="--app=$CELERY_APP"
fi
# Options to su
# can be used to enable login shell (CELERYD_SU_ARGS="-l"),
# or even to use start-stop-daemon instead of su.
CELERYD_SU=${CELERY_SU:-"su"}
CELERYD_SU_ARGS=${CELERYD_SU_ARGS:-""}
CELERYD_USER=${CELERYD_USER:-$DEFAULT_USER}
# Set CELERY_CREATE_DIRS to always create log/pid dirs.
CELERY_CREATE_DIRS=${CELERY_CREATE_DIRS:-0}
CELERY_CREATE_RUNDIR=$CELERY_CREATE_DIRS
CELERY_CREATE_LOGDIR=$CELERY_CREATE_DIRS
if [ -z "$CELERYD_PID_FILE" ]; then
CELERYD_PID_FILE="$DEFAULT_PID_FILE"
CELERY_CREATE_RUNDIR=1
fi
if [ -z "$CELERYD_LOG_FILE" ]; then
CELERYD_LOG_FILE="$DEFAULT_LOG_FILE"
CELERY_CREATE_LOGDIR=1
fi
CELERYD_LOG_LEVEL=${CELERYD_LOG_LEVEL:-${CELERYD_LOGLEVEL:-$DEFAULT_LOG_LEVEL}}
CELERY_BIN=${CELERY_BIN:-"celery"}
CELERYD_MULTI=${CELERYD_MULTI:-"$CELERY_BIN multi"}
CELERYD_NODES=${CELERYD_NODES:-$DEFAULT_NODES}
export CELERY_LOADER
if [ -n "$2" ]; then
CELERYD_OPTS="$CELERYD_OPTS $2"
fi
CELERYD_LOG_DIR=`dirname $CELERYD_LOG_FILE`
CELERYD_PID_DIR=`dirname $CELERYD_PID_FILE`
# Extra start-stop-daemon options, like user/group.
if [ -n "$CELERYD_CHDIR" ]; then
DAEMON_OPTS="$DAEMON_OPTS --workdir=$CELERYD_CHDIR"
fi
check_dev_null() {
if [ ! -c /dev/null ]; then
echo "/dev/null is not a character device!"
exit 75 # EX_TEMPFAIL
fi
}
maybe_die() {
if [ $? -ne 0 ]; then
echo "Exiting: $* (errno $?)"
exit 77 # EX_NOPERM
fi
}
create_default_dir() {
if [ ! -d "$1" ]; then
echo "- Creating default directory: '$1'"
mkdir -p "$1"
maybe_die "Couldn't create directory $1"
echo "- Changing permissions of '$1' to 02755"
chmod 02755 "$1"
maybe_die "Couldn't change permissions for $1"
if [ -n "$CELERYD_USER" ]; then
echo "- Changing owner of '$1' to '$CELERYD_USER'"
chown "$CELERYD_USER" "$1"
maybe_die "Couldn't change owner of $1"
fi
if [ -n "$CELERYD_GROUP" ]; then
echo "- Changing group of '$1' to '$CELERYD_GROUP'"
chgrp "$CELERYD_GROUP" "$1"
maybe_die "Couldn't change group of $1"
fi
fi
}
check_paths() {
if [ $CELERY_CREATE_LOGDIR -eq 1 ]; then
create_default_dir "$CELERYD_LOG_DIR"
fi
if [ $CELERY_CREATE_RUNDIR -eq 1 ]; then
create_default_dir "$CELERYD_PID_DIR"
fi
}
create_paths() {
create_default_dir "$CELERYD_LOG_DIR"
create_default_dir "$CELERYD_PID_DIR"
}
export PATH="${PATH:+$PATH:}/usr/sbin:/sbin"
_get_pidfiles () {
# note: multi < 3.1.14 output to stderr, not stdout, hence the redirect.
${CELERYD_MULTI} expand "${CELERYD_PID_FILE}" ${CELERYD_NODES} 2>&1
}
_get_pids() {
found_pids=0
my_exitcode=0
for pidfile in $(_get_pidfiles); do
local pid=`cat "$pidfile"`
local cleaned_pid=`echo "$pid" | sed -e 's/[^0-9]//g'`
if [ -z "$pid" ] || [ "$cleaned_pid" != "$pid" ]; then
echo "bad pid file ($pidfile)"
one_failed=true
my_exitcode=1
else
found_pids=1
echo "$pid"
fi
if [ $found_pids -eq 0 ]; then
echo "${SCRIPT_NAME}: All nodes down"
exit $my_exitcode
fi
done
}
_chuid () {
${CELERYD_SU} ${CELERYD_SU_ARGS} "$CELERYD_USER" -c "$CELERYD_MULTI $*"
}
start_workers () {
if [ ! -z "$CELERYD_ULIMIT" ]; then
ulimit $CELERYD_ULIMIT
fi
_chuid $* start $CELERYD_NODES $DAEMON_OPTS \
--pidfile="$CELERYD_PID_FILE" \
--logfile="$CELERYD_LOG_FILE" \
--loglevel="$CELERYD_LOG_LEVEL" \
$CELERY_APP_ARG \
$CELERYD_OPTS
}
dryrun () {
(C_FAKEFORK=1 start_workers --verbose)
}
stop_workers () {
_chuid stopwait $CELERYD_NODES $DAEMON_OPTS --pidfile="$CELERYD_PID_FILE"
}
restart_workers () {
_chuid restart $CELERYD_NODES $DAEMON_OPTS \
--pidfile="$CELERYD_PID_FILE" \
--logfile="$CELERYD_LOG_FILE" \
--loglevel="$CELERYD_LOG_LEVEL" \
$CELERY_APP_ARG \
$CELERYD_OPTS
}
kill_workers() {
_chuid kill $CELERYD_NODES $DAEMON_OPTS --pidfile="$CELERYD_PID_FILE"
}
restart_workers_graceful () {
echo "WARNING: Use with caution in production"
echo "The workers will attempt to restart, but they may not be able to."
local worker_pids=
worker_pids=`_get_pids`
[ "$one_failed" ] && exit 1
for worker_pid in $worker_pids; do
local failed=
kill -HUP $worker_pid 2> /dev/null || failed=true
if [ "$failed" ]; then
echo "${SCRIPT_NAME} worker (pid $worker_pid) could not be restarted"
one_failed=true
else
echo "${SCRIPT_NAME} worker (pid $worker_pid) received SIGHUP"
fi
done
[ "$one_failed" ] && exit 1 || exit 0
}
check_status () {
my_exitcode=0
found_pids=0
local one_failed=
for pidfile in $(_get_pidfiles); do
if [ ! -r $pidfile ]; then
echo "${SCRIPT_NAME} down: no pidfiles found"
one_failed=true
break
fi
local node=`basename "$pidfile" .pid`
local pid=`cat "$pidfile"`
local cleaned_pid=`echo "$pid" | sed -e 's/[^0-9]//g'`
if [ -z "$pid" ] || [ "$cleaned_pid" != "$pid" ]; then
echo "bad pid file ($pidfile)"
one_failed=true
else
local failed=
kill -0 $pid 2> /dev/null || failed=true
if [ "$failed" ]; then
echo "${SCRIPT_NAME} (node $node) (pid $pid) is down, but pidfile exists!"
one_failed=true
else
echo "${SCRIPT_NAME} (node $node) (pid $pid) is up..."
fi
fi
done
[ "$one_failed" ] && exit 1 || exit 0
}
case "$1" in
start)
check_dev_null
check_paths
start_workers
;;
stop)
check_dev_null
check_paths
stop_workers
;;
reload|force-reload)
echo "Use restart"
;;
status)
check_status
;;
restart)
check_dev_null
check_paths
restart_workers
;;
graceful)
check_dev_null
restart_workers_graceful
;;
kill)
check_dev_null
kill_workers
;;
dryrun)
check_dev_null
dryrun
;;
try-restart)
check_dev_null
check_paths
restart_workers
;;
create-paths)
check_dev_null
create_paths
;;
check-paths)
check_dev_null
check_paths
;;
*)
echo "Usage: /etc/init.d/${SCRIPT_NAME} {start|stop|restart|graceful|kill|dryrun|create-paths}"
exit 64 # EX_USAGE
;;
esac
exit 0
3. 启动守护进程
sudo /etc/init.d/celeryd start
4. 抛出任务给守护进程
wanhuqianjia/authen/tasks.py
from celery import shared_task
@shared_task
def send_message_test(a):
print("这是一个异步测试任务。",a)
return "success"
wanhuqianjia/authen/views.py
from authen.tasks import send_message_test
from authen.wxlib.utils import eta_second
def test_celery(request):
"测试celery"
res = send_message_test.apply_async(args=('nihao',),eta=eta_second(20))
print("res:", res)
return HttpResponse("测试异步任务已发出。")
5. 浏览器中请求服务器
https://www.wanhuqianjia.com/authen/celery_test
后来通过"ps aux|grep celery"观察,实际的启动命令为:
/home/ubuntu/miniconda3/envs/jianke/bin/python -m celery --app=jianke --workdir=/home/ubuntu/workspace/jianke/ worker --pidfile=/var/run/celery/worker2.pid --logfile=/var/log/celery/worker2%I.log --loglevel=DEBUG --time-limit=300 --concurrency=4 -n worker2@VM-8-12-ubuntu --executable=/home/ubuntu/miniconda3/envs/jianke/bin/python
除了用init.d去做启动命令,还是可以使用supervision实现对进程的监控。
监控脚本(配置文件)大致如下:
[program:jianke]
directory=/home/ubuntu/workspace/jianke/
command=/home/ubuntu/miniconda3/envs/jianke/bin/python -m celery --app=jianke --workdir=/home/ubuntu/workspace/jianke/ worker --pidfile=/var/run/celery/worker2.pid --logfile=/var/log/celery/worker2%I.log --loglevel=DEBUG --time-limit=300 --concurrency=4 -n worker2@VM-8-12-ubuntu --executable=/home/ubuntu/miniconda3/envs/jianke/bin/python
autostart=true
autorestart=true
startsecs=5
stderr_logifle=/tmp/jianke_stderr.log
stdout_logfile=/tmp/jinake_stdout.log
redirect_stderr=true
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=10
environment=DEBUG=false