原文链接:http://vincenttide.com/blog/1/django-channels-and-celery-example/
In this tutorial, I will go over how to setup a Django Channels project to work with Celery and have instant notification when task starts and completes. Django Channels uses WebSockets to enable two-way communication between the server and browser client. It is assumed that the reader is comfortable with how to setup a normal Django project and we will only cover the parts relating to Channels and Celery.
在这份教程中,我们详细讨论如何设置一个使用Celery的Django Channels项目,并在任务启动和完成时发送即时通知。Django Channels使用WebSockets启动介于服务器端和浏览器端之间的双向通信。这里假设读者
You can find the Github repository here and a similar deployment at http://tasker.vincenttide.com. Note that this deployment contains some extra stuff not covered in this tutorial such as a cancel functionality. The front end of the sample deployment is also running the React library whereas we will only be using JavaScript in this demo.
To get started let’s install some dependencies which we will need. We will need a Channels layer backend which is what Channels uses to pass and store messages. We will also need a Celery broker transport backend. As it turns out, we can use Redis for both of these tasks so that is what we will use.
# Add Chris Lea’s redis ppa - he maintains the ppa for many open source projects
$ sudo add-apt-repository ppa:chris-lea/redis-server
$ sudo apt-get update
$ sudo apt-get install redis-server
# Now check that redis-server is up and running 检查redis服务器是否在运行
$ redis-cli ping
# PONG
Setup a new Django project in a virtualenv and install the following libraries:
在虚拟环境中设置一个新的Django项目,然后安装下面的库:
$ pip install django
$ pip install channels # the channels library
$ pip install asgi_redis # the redis channel layer we are using
$ pip install celery # Celery task queue
Let’s take a look at the settings.py file first.
首先,我们来看一看settings.py文件。
# Add our new app to installed apps
INSTALLED_APPS = [
#…
‘jobs’,
]
# Channels settings
CHANNEL_LAYERS = {
"default": {
"BACKEND": "asgi_redis.RedisChannelLayer", # use redis backend
"CONFIG": {
"hosts": [os.environ.get('REDIS_URL', 'redis://localhost:6379')], # set redis address
},
"ROUTING": "django_channels_celery_tutorial.routing.channel_routing", # load routing from our routing.py file
},
}
# Celery settings
BROKER_URL = 'redis://localhost:6379/0' # our redis address
# use json format for everything
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
First we add our new app to the INSTALLED_APPS list. The Channels setting simply tells Channels what backend we are using, in this case Redis. The ROUTING option tells Channels where to look for our WebSockets routes which will be found in a file called routing.py. The Celery setting tells Celery where to look for our broker and that we want to use json format for everything.
首先,我们把新应用添加到了INSTALLED_APPS列表。Channels设置简明的告诉Channels我们正在使用的后端,这个例子中我们使用的是Redis。ROUTING选项告诉Channels在哪里可以找到WebSockets路由,它可以在称作routing.py的一个文件中找到。Celery设置告诉Celery在哪里查找代理器,而且我们想要使用的是json数据格式。
Next let’s look at the routing.py file:
from channels import route
from jobs import consumers
channel_routing = [
# Wire up websocket channels to our consumers:
route("websocket.connect", consumers.ws_connect),
route("websocket.receive", consumers.ws_receive),
]
Here we simply hook up what functions we want to handle the connect and receive messages. We could also add a function to handle a disconnect message but for our purposes, that is not needed. We tell Channels to look for our functions in our jobs/consumers.py file.
Here’s the main parts of the consumers.py file:
@channel_session
def ws_connect(message):
message.reply_channel.send({
"text": json.dumps({
"action": "reply_channel",
"reply_channel": message.reply_channel.name,
})
})
@channel_session
def ws_receive(message):
try:
data = json.loads(message['text'])
except ValueError:
log.debug("ws message isn't json text=%s", message['text'])
return
if data:
reply_channel = message.reply_channel.name
if data['action'] == "start_sec3":
start_sec3(data, reply_channel)
In our ws_connect function, we will simply echo back to the client what their reply channel address is. The reply channel is the unique address that gets assigned to every browser client that connects to our websockets server. This value which can be retrieved from message.reply_channel.name can be saved or passed on to a different function such as a Celery task so that they can also send a message back. In fact this is what we will be doing. message.reply_channel.send is a convenient shortcut that Channels provides for us to reply back to the same client. If you only have the reply_channel name, you will have to use the below method to send a message:
Channel(reply_channel_name).send({
"text": json.dumps({
"action": "started",
"job_id": job.id,
"job_name": job.name,
"job_status": job.status,
})
})
In our ws_receive function, we look at the action parameter to check what the client wants us to do. If you wanted to do different things, you could have multiple action commands. In our example, we only have the one command which is to run a function called start_sec3. start_sec3 simply sleeps for 3 seconds and then sends a reply back to the client that it has completed. Note that we pass the reply_channel address so it knows where to send the response.
The last important piece is the javascript handling the client side functions.
$(function() {
// When we are using HTTPS, use WSS too.
var ws_scheme = window.location.protocol == "https:" ? "wss" : "ws";
var ws_path = ws_scheme + '://' + window.location.host + '/dashboard/';
console.log("Connecting to " + ws_path)
var socket = new ReconnectingWebSocket(ws_path);
socket.onmessage = function(message) {
console.log("Got message: " + message.data);
var data = JSON.parse(message.data);
// if action is started, add new item to table
if (data.action == "started") {
var task_status = $("#task_status");
var ele = $('<tr></tr>');
ele.attr("id", data.job_id);
var item_id = $("<td></td>").text(data.job_id);
ele.append(item_id);
var item_name = $("<td></td>").text(data.job_name);
ele.append(item_name);
var item_status = $("<td></td>");
item_status.attr("id", "item-status-"+data.job_id);
var span = $('<span class="label label-primary"></span>').text(data.job_status);
item_status.append(span);
ele.append(item_status);
task_status.append(ele);
}
// if action is completed, just update the status
else if (data.action == "completed"){
var item = $('#item-status-' + data.job_id + ' span');
item.attr("class", "label label-success");
item.text(data.job_status);
}
};
$("#taskform").on("submit", function(event) {
var message = {
action: "start_sec3",
job_name: $('#task_name').val()
};
socket.send(JSON.stringify(message));
$("#task_name").val('').focus();
return false;
});
});
Here we first create the websockets object then we assign the socket.onmessage function to handle what we should do for each websockets message. If the action parameter is “started”, we will add a new entry to the table. If it action is completed, we simply change the corresponding column status to completed.
这里,我们首先创建websockets对象,然后赋值socket.onmessage函数以处理每条websockets消息。如果action参数为“started”,我们对表格填海一个新条目。如果action已完成,我们简单改变对应列的状态。
The one form that we have is wired up to send a websockets message to the server that tells it to run the action “start_sec3”.
To see the entire project files, visit the Github repository. To run the Github repository code, first make sure you have Redis installed then run the following commands:
pip install -r requirements.txt
python manage.py makemigrations
python manage.py migrate
python manage.py runserver # Start daphne and workers
celery worker -A example -l info # Start celery workers
That should start the development server on http://localhost:8000. Again, you can find a similar deployment on http://tasker.vincenttide.com.
原文链接:http://www.john2x.com/blog/celery-production-notes.html
I’ve been using Celery a lot lately on several projects at work lately, and I think I’ve finally gotten to a point where I am more or less comfortable working with it. It was painful getting started with Celery. The official docs and guides were useful of course, but when it came time to deploy to production, I found them to be quite lacking.
Celery gets a lot of flak for being too big, or too difficult to work with. Despite the complaints, it’s mindshare, track record, and library support with Django makes going with the alternatives1 a bit less tempting. You’ll need a better reason for choosing an alternative other than just for the sake of using something else. At the end of the day, Celery works and is good enough for most projects.
This post will focus on using Celery (3.1) with Django (1.9).
本文着重于Celery (3.1)和Django (1.9)的使用。
The official guides already cover choosing the right broker, but I’ll quickly mention it here for completeness.
官方指南已经提到了选择正确的代理,但是
Fortunately the choice is not difficult, as there are only two:
There are other supported brokers, of course, but those aren’t considered “stable” yet. Never use the Django ORM as your broker. That would be like using manage.py runserver on production. (On development, using the ORM is a nice and easy way to get started, and avoids the overhead of setting up Redis/RabbitMQ on your machine).
当让,还有其他的被支持的代理,但是它们被认为是还不够”稳定“。绝对不要使用Django ORM做为代理。就这样像是在生成环境中使用manage.py runserver。(在进行开发时,使用ORM入手是一个很好、简单的方式,从而避免在机器上设置Redis/RabbitMQ的花费。)
From what I’ve read, Redis is good enough for most cases, but you should probably only use it if you have a good reason to not use RabbitMQ and understand the caveats that come with using Redis2.
据我所知道的,Redis应对大多数情况足够了,但是
RabbitMQ is the officially recommended broker and is a good default choice. The official guide has a nice walkthrough on how to configure RabbitMQ.
RabbitMQ是官方推荐的代理,而且也是一个很好的缺省选择。官方指南在如何配置RabbitMQ上有很好的说明。
We also enable RabbitMQ’s management web UI. It becomes especially useful when your job count starts to get high and/or when something goes wrong with the queues. Just make sure you’ve set a strong password for the vhost user before turning it on.
我们还启用了RabbitMQ的web管理界面。
It is a good idea to have a dedicated instance for RabbitMQ, as the message database could get quite big, and you’ll soon find yourself running out of disk space if it’s running on the same machine as the webserver or data storage.
By default, Celery routes all your tasks to the default queue named celery. This is good enough for small apps that have less than 10 (or less) different task functions.
默认,Celery把你的全部任务路由到默认的名叫celery的队列。
Personally I like to have each Django “app” and its tasks have a dedicated queue. This makes it easy to adjust the workers for each app without affecting others.
我个人喜欢让每个Django的“app”都拥有一个专用的队列。
For example, let’s say we have an app called customers with the following structure:
例如,假如我们拥有一个称作customers的包含如下结构的应用:
+ project/
|- core/
| |- models.py
| |- tasks.py
| `- views.py
|- customers/
| |- models.py
| |- views.py
| `- tasks.py
|- settings.py
|- celery_config.py
`- manage.py
And we want all customers tasks to be processed by a dedicated worker (could be running on a separate machine), and have all other tasks (e.g. core tasks) to be processed by a lower frequency/priority worker.
我们想让所有的customers任务都可以由专用的worke(可能运行在一台独立的主机上)来处理,让剩余的其他任务(例如,core的任务)由一个低频率/优先级低worker处理。
We’ll need a router for the customers app. Create a new file customers/routers.py:
我们需要为customers应用设置路由。创建一个新文件customers/routers.py:
# customers/routers.py
class CeleryTaskRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task.startswith('customers.'):
return 'customers'
return None
And specify our routers in settings.py:
然后在settings.py指定我们的路由:
CELERY_ROUTES = (
'customers.routers.CeleryTaskRouter',
)
You can have as many routers as you want, or one main router with a bunch of if statements.
随便你设置多少个路由都可以,或者设置一个包含多个if语句主路由。
Now, all tasks defined in the “customers.tasks” module will be routed to a queue named “customers”, this is because Celery task names are derived from their module name and function name (if you don’t specify the name parameter in @app.task).
现在,所有定义在”customers.tasks” 模块中的任务都会被路由到一个称作”customers”的队列,这就是Celery的任务名称衍生自对应的模块名和函数名(如果你不在@app.task中指定name参数的话)
If we start a worker like so:
如果我吗想这样启动一个worker:
$ ./manage.py celery worker --app=celery_config:app --queue=customers -n "customers.%%h"
The worker will have a name “customers.hostname” and will only process tasks defined in the customers.tasks module.
worker会被命名为”customers.hostname”,而且仅处理定义在customers.tasks模块中的任务。
Any other tasks (e.g. those defined in core.tasks) will be ignored, unless we start a separate worker process to process tasks going to the “default” queue (named “celery” by default).
任何其他的任务(例如,那些定义在core.tasks中的)都会被忽略,除非我们启动一个独立的worker进程把任务处理到“默认”队列(默认,称作“celery”)。
$ ./manage.py celery worker --app=celery_config:app --queue=celery
This decouples our apps’ workers and makes monitoring and managing them easier.
解构过的应用worker可以让对它们监控和管理更为容易。
Notice that all we needed to set in our settings.py is the CELERY_ROUTES variable. Celery automatically creates the necessary queues when needed. So only define CELERY_QUEUES3 when you absolutely need to fine tune your routes and queues.
注意,需要我们做的是在settings.py中设置CELERY_ROUTES变量。有需要时,Celery将自动的创建必要的队列。所以,
We use Supervisor at work to manage our app processes (e.g. gunicorn, flower, celery).
我们使用Supervisor管理应用进程(例如,gunicorn, flower, celery)。
Here’s what a typical entry in supervisor.conf for the two workers above would look like:
[program:celery_customers]
command=/path/to/project/venv/bin/celery_supervisor --app=celery_config:app worker --loglevel=INFO --concurrency=8 --queue=customers -n "customers.%%h"
directory = /path/to/project
stdout_logfile=/path/to/project/logs/celery_customers.out.log
stderr_logfile=/path/to/project/logs/celery_customers.err.log
autostart=true
autorestart=true
user=user
startsecs=10
stopwaitsecs=60
stopasgroup=true
killasgroup=true
[program:celery_default]
command=/path/to/project/venv/bin/celery_supervisor --app=celery_config:app worker --loglevel=INFO --concurrency=8 --queue=celery
directory = /path/to/project
stdout_logfile=/path/to/project/logs/celery_default.out.log
stderr_logfile=/path/to/project/logs/celery_default.err.log
autostart=true
autorestart=true
user=user
startsecs=10
stopwaitsecs=60
stopasgroup=true
killasgroup=true
The celery_supervisor script is a custom script that wraps manage.py celery. It activates the virtualenv and the project variables (since doing that with Supervisor never seemed to work). It looks like this:
celery_supervisor是一个包装了manage.py celery的自定义脚步。它激活了虚拟环境和项目变量。
#!/bin/sh
# Wrapper script for running celery with virtualenv and env variables set
DJANGODIR=/path/to/project
echo "Starting celery with args $@"
# Activate the virtual environment. 激活虚拟环境
cd $DJANGODIR
. venv/bin/activate
# Set additional environment variables. 设置额外的环境变量
. venv/bin/postactivate
# Programs meant to be run under supervisor should not daemonize themselves
# (do not use --daemon). 程序打算运行在supervisor下面的话就不应该后台化处理
exec celery "$@"
Now, when a worker goes down for whatever reason, Supervisor will automatically restart them.
现在,不论什么原因让worker停止工作,Supervisor都可以自动地重启它们。
Currently we have our workers’ concurrency level set at 8. By default, Celery will spawn 8 threads for each worker (a total of 16 threads for our example). For low concurrency levels, this is fine, but when you start needing to process a few hundred or more tasks concurrently, you will soon find that your machine will have a hard time keeping up.
Celery has an alternative worker pool implementation that uses Eventlet. This allows for significantly higher concurrency levels than possible with the default prefork/threads, but with the requirement that your task must be non-blocking.
I had a hard time determining what makes a task non-blocking, as the official guide only mentions it in passing. Eventlet docs state that you will need to monkeypatch libraries you are using for it to work, so I was left to wonder on where should I insert this monkeypatching code or if I even needed to? The official examples seem to suggest that no explicit monkeypatching is needed, but I couldn’t be sure.
After asking on StackOverflow4 and the Google Group5, I got my answer. Celery does indeed do the monkeypatching for you, and no changes are needed for your tasks or app.
To use the eventlet pool implementation, you will need to install the eventlet library:
pip install eventlet
And specify the –pool option when starting the worker:
$ ./manage.py celery worker --app=celery_config.app --queue=customers -n "customers.%%h" --pool=eventlet --concurrency=256
Now that we have our workers up and running, we want to monitor them, check the tasks currently on queue and restart them when we push updates.
现在我们的worker已经启动并运行了,我们想监控它们,
The recommended tool for this is Flower, a web UI for Celery. Install it as usual:
对于此种目的推荐的工具是Flower,一个Celery的web界面。和往常一样安装它:
pip install flower
Then start the server (make sure your workers are running already or flower will have trouble picking them up):
然后,启动服务器(请保证你的worker已经运行了,要么flower在选择它们时会遇到问题):
$ flower -A celery_config:app --basic_auth=flower:secretpassword --port=5555
Visit the web UI at http://yourdomain.com:5555 and you can see a list of workers with their status. The UI should be pretty self explanatory.
提供http://yourdomain.com:5555可以访问到web界面,你可以看到包含自身状态的worker列表。界面很简单,一看就能明白。
One way to restart the workers is directly via supervisorctl, but this could get cumbersome if you have lots of worker processes, and if they are distributed across multiple servers.
My preferred method is to do it via the Flower UI. You can select the workers you want (note though that on the latest version of flower, the UX for selecting workers have regressed6), and then click on the “Shut Down” button. This will cause the workers to gracefully perform a warm shutdown, which is what we want. And since the workers are supervised, they will automatically start up again, now using the latest version of the code.
原始链接:http://www.craigderington.me/configure-redis-as-djangos-cache-engine/
Redis is an open-source, persistent, in-memory data structure server that stores it’s data in key-value pairs. Redis can be used as a database, cache or message broker. Redis data structures support a variety of formats including strings, lists, sets, hashes and sorted sets. More advanced data structures include bitmaps, hyper-logs, and geo-spatial index with radius queries. Redis has built in replication, Lua scripting and transactions. It offers different levels on on-disk persistence and provides high availability and automatic partitioning.
Redis是一个开源的,持久化的,内存数据结构服务器,
原文链接: https://blog.heroku.com/in_deep_with_django_channels_the_future_of_real_time_apps_in_django
Today, we’re thrilled to host Jacob Kaplan-Moss. Jacob’s a former Herokai and long-time core contributor to Django, and he’s here to share an in-depth look at something that he believes will define the future of the framework.
原始链接,http://www.craigderington.me/configure-django-task-queue-with-celery-and-redis/
Task Queue: a robust, asynchronous job handler and message broker enabling you to run and consume back-end functions based on system events rather than human intervention. A Django and Celery powered application can quickly respond to user requests while the task automation manager handles longer running requests through the use of a task queuing system.