Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions README.markdown
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
Tornado Thread-Pool
===================

### 说明:

`我在这个版本已经修复,给作者发了pull requests`
* 以前的代码在Blocking处理阶段是有问题,首先不需要判断thread_locals.thread_pool是否存在,另外在异常处理阶段有出BUG.

`正在写`
* ThreadPool不是预先Prefork的,是根据队列的大小来增加减少. 这样带来的问题是不断的new thread,带来不必要的系统开销.

`正在写,在保留任务异步化的基础上,加入同步非堵塞逻辑`
* thread_pool做了相当于celery那种任务推送,由线程组去消费, 而没有做到把当前用户的操作同步化,不能合理的拿到结果.

-----
##### example.py 是我写的一个tornado thread_pool的使用实例,可以方便测试线程池的功能.

-----

Tornado Thread-Pool is a library for [Tornado](http://www.tornadoweb.org/) that lets you make sure that your blocking code and your non-blocking code don't interfere with each other.

You don't have to remember to call `IOLoop.add_callback` at the appropriate time, or worry about whether that database query will block other web requests.
Expand Down Expand Up @@ -32,14 +48,3 @@ Usage
# Guarunteed to run in a tornado IOLoop.

```

Dependencies
------------

* Python (in theory any version >= 2.3, but I only tested it on 2.7)
* Tornado (only tested on version 2.2, but should work on earlier versions)

Running Tests
-------------

python -m tornado.testing tests
66 changes: 66 additions & 0 deletions example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#coding:utf-8
import time
import logging

import tornado.httpserver
import tornado.ioloop
import tornado.web
from thread_pool import in_thread_pool, in_ioloop, blocking
from tornado.options import define, options

define("port", default=8002, help="run on the given port", type=int)

class TestThreadPool(tornado.web.RequestHandler):
def get(self):
self.blocking_method(self.sleep)
logging.info('end')
self.write('go')

@in_thread_pool
def blocking_method(self,callback):
logging.info('pass thread_pool')
callback()

def sleep(self):
time.sleep(5)
logging.info('this func is sleep')

class TestIoloop(tornado.web.RequestHandler):
def get(self):
self.blocking_method(self.sleep)
logging.info('end')
self.write('go')

@in_ioloop
def blocking_method(self,callback):
logging.info('pass in_ioloop')
callback()

def sleep(self):
time.sleep(5)
logging.info('this func is sleep')

class TestBlocking(tornado.web.RequestHandler):
def get(self):
self.blocking_method(self.sleep)
logging.info('end')
self.write('TestBlocking')

@blocking
def blocking_method(self,callback):
logging.info('pass blocking')
callback()

def sleep(self):
time.sleep(5)
logging.info('this func is sleep')

if __name__ == "__main__":
tornado.options.parse_command_line()
app = tornado.web.Application(handlers=[
(r"/threadpool", TestThreadPool),
(r"/ioloop",TestIoloop ),
(r"/blocking", TestBlocking), ])
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
15 changes: 1 addition & 14 deletions thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def flag_ioloop():
get_ioloop().add_callback(flag_ioloop)

def in_ioloop(fn):

@wraps(fn)
def res(*args, **kwargs):
try:
Expand All @@ -75,7 +74,6 @@ def res(*args, **kwargs):
return res

def in_thread_pool(fn):

@wraps(fn)
def res(*args, **kwargs):
try:
Expand All @@ -92,21 +90,10 @@ def res(*args, **kwargs):
def blocking_warning(fn):
warning_string = 'Blocking call to %s not in thread pool' % fn.__name__
warnings.warn(warning_string, RuntimeWarning)
traceback.print_last()
traceback.print_exc()

def blocking(fn):


@wraps(fn)
def res(*args, **kwargs):
while 1:
try:
if thread_locals.thread_pool:
break
except AttributeError:
pass
blocking_warning(fn)
break
return fn(*args, **kwargs)

return res