基于tornado的publishsubscribe模块

    技术2022-05-19  21

    tornado是facebook的一个开源项目,使用python开发,由一个non-block的web服务器和一个类似web.py的精简开发框架构成。tornado的非阻塞机制使得其可以方便的用于server push(Comet)的实现。以下是我编写的一个简易publish/subscribe(发布/订阅)模块。

    该模块由服务器端代码和客户端的javascript文件subscribe.js构成

    首先是服务器端,定义Publisher类

     

     

    class Publisher(object):

        def __init__(self):

            self.listeners = []

        def subscribe(self, callback):

            self.listeners.append(callback)    

        def publish(self, messages):

             for callback in self.listeners:

                try:

                    callback(messages)

                except:

                    logging.error("Error in waiter callback", exc_info=True)

            self.listeners = []

     

     

    在application里定义publishers:

     

     

    class Application(tornado.web.Application):

        def __init__(self):

            handlers = [(r"/", MainHandler),

                        (r'/subscribe/(/w+)', SubscribeHandler)

                        ]

            self.publishers = {}

            self.publishers['hall'] = Publisher()

            tornado.web.Application.__init__(self, handlers, **settings)

     

    这里定义了一个名为hall的publisher, 同时在url配置时设置了suscribe请求的Handler

     

     

    class BaseHandler(tornado.web.RequestHandler):

        def get_current_user(self):

            #code to return current_user

        def publish(self, channel, message):

            self.application.publishers[channel].publish(message)

     

     

     

    class SubscribeHandler(BaseHandler):

        @tornado.web.authenticated

        @tornado.web.asynchronous

        def post(self, channel):

            publisher = self.application.publishers[channel]

            if publisher:

                publisher.subscribe(self.on_new_message)

            else:

                self.write('this channel does not exist!')

     

        def on_new_message(self, message):

            # Closed client connection

            if self.request.connection.stream.closed():

                return

            self.finish(dict(message=message))

     

     

     

    BaseHandler定义了current_user的获取方法和基本的publish方法,SubscribeHandler用于处理订阅请求

     

    这样,在服务器段发布消息通过继承BaseHandler,调用其publish方法实现,订阅在javascript端通过ajax请求实现,代码如下:

     

    function subscribe(channel, handleMessage){

      var _this = this;

      this.errorSleepTime = 500; 

      channel = "/subscribe/" + channel;

      handleMessage = handleMessage;

      poll = function() {

      var args = {"_xsrf": getCookie("_xsrf")};        

      $.ajax({url: channel, type: "POST", dataType: "text",

              data: $.param(args), success: onSuccess,

              error: onError});

      };

      onSuccess = function(response) {

        try {

            newMessages(eval("(" + response + ")"));

        } catch (e) {

            onError();

            return;

        }

        _this.errorSleepTime = 500;

        window.setTimeout(poll, 0);

      }

     

      onError = function(response) {

        _this.errorSleepTime *= 2;

        console.log("Poll error; sleeping for", _this.errorSleepTime, "ms");

        window.setTimeout(poll, _this.errorSleepTime);

      }

     

      newMessages = function(response) {

        if (!response.message) return;

        var message = response.message;

        console.log(message);

        handleMessage(message);

      }

      poll();

     

    };  

     

     

    使用时只需要调用subscribe函数,其中channel对应你在服务器端定义的channel,如例子中的self.publishers['hall'] = Publisher(),handleMessage是处理收到信息的函数,返回的message是dictionary的形式。

     

    这只是一个很简易的版本,我之后可能还会加入服务器端的timing和cache机制,也欢迎大家使用和讨论

     

     

     


    最新回复(0)