Статьи

Gevent, ZeroMQ, WebSockets и Flot FTW!

В рамках работы я делал на Зарки я имел возможность поиграть с большим количеством холодных технологий, среди которых GEvent , ZeroMQ , WebSockets и ФЛОТ . Потребовалось некоторое время, чтобы добраться до того момента, когда я действительно смог добиться цели, но как только я оказался там, все стало невероятно простым. В этой статье я покажу вам, как использовать эти три технологии вместе для создания простого веб-приложения с данными push-запросов сервера в реальном времени.

Предпосылки

Прежде чем вы сможете запустить пример программы, вам необходимо установить следующие библиотеки Python:

  • pyzmq
  • GEvent
  • GEvent-ZeroMQ
  • GEvent-WebSocket
  • вставить (для простого статического файлового сервера WSGI http)

Самый простой способ сделать это:

pip install pyzmq gevent gevent_zeromq paste 

Вы можете увидеть сообщения об отсутствующих системных библиотеках; для сборки пакетов Python вам нужно установить libzmq-dev 2.1+ и libevent-dev.

Вам также нужно скачать плагин Flot JQuery и распаковать его в каталоге вашего проекта.

Greenlets

Прежде чем запускать код, вам нужно понять, что такое концепция зеленых потоков или гринлетов. Они ведут себя как потоки, за исключением двух предостережений:

  • они никогда не бегают одновременно
  • они уступают только друг другу в указанных точках

В конечном итоге это означает, что они очень легки для создания по сравнению с потоками, и вы можете быть немного неаккуратными с общими данными между гринлетами, поскольку вы можете точно определить, когда гринлет может уступить другому гринлету. Библиотека gevent обеспечивает нашу поддержку гринлетов и делает гринлеты еще более полезными, превращая блокирующие системные вызовы в точки доходности, так что другой гринлет может работать, пока один ожидает данные.

ZeroMQ

ZeroMQ звучит как очередь сообщений, как RabbitMQ, но на самом деле это не так. ZeroMQ — это библиотека, которая предоставляет множество функций, подобных очереди сообщений, но не требует посредника и очень небольшой настройки. Мы будем использовать ZeroMQ pub / sub сокеты для управления связью в этой демонстрации. Пуб / сокеты имеют приятную особенность, заключающуюся в том, что опубликованные сообщения распространяются среди всех подключенных подписчиков, или, если подписчики не подключены, сообщение просто отбрасывается.

ZeroMQ был разработан для работы с потоками, а не с гринлетами, поэтому нам также необходимо использовать отличную библиотеку gevent_zeromq для «зеленого» ZeroMQ. В этом примере мы напишем цикл, который выталкивает (x, y) значения точек на сокет zmq.PUB:

def zmq_producer(context): 
    '''Produce a nice time series sine wave''' 
    socket = context.socket(zmq.PUB) 
    socket.connect('tcp://127.0.0.1:5000') 
 
    while True: 
        x = time.time() 
        y = 2.5 * (1 + math.sin(x / 500)) 
        socket.send(json.dumps(dict(x=x, y=y))) 
        gevent.sleep(0.1) 

Передаваемый контекст создается в нашей функции main () с помощью вызова zmq.Context (). Контекст — это просто место для ZeroMQ для хранения некоторого глобального состояния; вы обычно создаете только одно приложение.

Хорошо, теперь, когда у нас есть наш производитель, давайте посмотрим на сервер ZeroMQ. Здесь мы будем просто ретранслировать сообщения, полученные на входящем сокете tcp zmq.SUB, и публиковать их на внешнем сокете inproc zmq.PUB:

def zmq_server(context): 
    '''Funnel messages coming from the external tcp socket to an inproc socket''' 
    sock_incoming = context.socket(zmq.SUB) 
    sock_outgoing = context.socket(zmq.PUB) 
    sock_incoming.bind('tcp://*:5000') 
    sock_outgoing.bind('inproc://queue') 
    sock_incoming.setsockopt(zmq.SUBSCRIBE, "") 
    while True: 
        msg = sock_incoming.recv() 
        sock_outgoing.send(msg)

Единственное, что следует здесь отметить, это то, что мы должны сообщить ZeroMQ, что мы хотим подписаться на все сообщения в сокете zmq.SUB.

Серверы Gevent WSGI

Gevent обеспечивает быструю реализацию стандарта WSGI в модуле gevent.pywsgi. В этой демонстрации мы будем использовать два экземпляра сервера (зеленые). Один будет просто обслуживать статические файлы, а другой обеспечит наше соединение с WebSocket. Вот код для настройки этих двух серверов:

# websocket server: copies inproc zmq messages to websocket 
ws_server = gevent.pywsgi.WSGIServer( 
    ('', 9999), WebSocketApp(context), 
    handler_class=WebSocketHandler) 
# http server: serves up static files 
http_server = gevent.pywsgi.WSGIServer( 
    ('', 8000), 
    paste.urlparser.StaticURLParser(os.path.dirname(__file__))) 

Pretty simple; just specify the bind address for the server, the WSGI app to call, and in the case of the WebSocket server, the WebSocket handler class. The http_server above is mostly uninteresting, as it just serves up static files via paste. The ws_server has a (slightly) more interesting implementation:

class WebSocketApp(object): 
    '''Funnel messages coming from an inproc zmq socket to the websocket''' 
 
    def __init__(self, context): 
        self.context = context 
 
    def __call__(self, environ, start_response): 
        ws = environ['wsgi.websocket'] 
        sock = self.context.socket(zmq.SUB)/span> 
        sock.setsockopt(zmq.SUBSCRIBE, "") 
        sock.connect('inproc://queue') 
        while True: 
            msg = sock.recv() 
            ws.send(msg) 

What’s happening here is that when we get a connection to the websocket address (port 9999 in our example), we do the following:

  • Subscribe to the inproc socket that our zmq_server is publishing messages to
  • Grab the websocket from the environ
  • Relay messages from the zmq socket to the websocket

Client-side

So the server-side stuff is straightforward; what about the client-side? Well, it’s pretty easy as well. Here’s the (static) HTML page we’ll use:

<html> 
    <head> 
        <title>ZWS Example</title> 
        <script type="text/javascript" src="/flot/jquery.min.js"></script> 
        <script type="text/javascript" src="/flot/jquery.flot.min.js"></script> 
        <script type="text/javascript" src="graph.js"></script> 
    </head> 
    <body> 
        <h1>ZMQ - WebSocket Example</h1> 
        <div id="conn_status">Not Connected</div> 
        <div id="placeholder" style="width:600px;height:300px;"></div> 
    </body> 
</html>

Here, all we’re doing is pulling in the JQuery and Flot libraries as well as our custom graph.js and setting up a couple of placeholders. The Javascript is also pretty straightforward. I’ve tried to provide inline commentary:

$(function() { 
    // Open up a connection to our server 
    var ws = new WebSocket("ws://localhost:9999/"); 
    // Save our plot placeholder 
    var $placeholder = $('#placeholder'); 
    // Maximum # of data points to plot 
    var datalen = 100; 
    // This will be the plot object 
    var plot = null; 
    // Set up some options on our data series 
    var series = { 
        label: "Value", 
        lines: { 
            show: true, 
            fill: true 
        }, 
        points: { 
            show:true 
        }, 
        data: [] 
    }; 
    // What do we do when we get a message? 
    ws.onmessage = function(evt) { 
        var d = $.parseJSON(evt.data); 
        series.data.push([d.x, d.y]); 
        // Keep the data series a manageable length 
        while (series.data.length > datalen) { 
            series.data.shift(); 
        } 
        if(plot) { 
            // Create the plot if it's not there already 
            plot.setData([series]); 
            plot.setupGrid(); 
            plot.draw(); 
        } else if(series.data.length > 10) { 
            // Update the plot 
            plot = $.plot($placeholder, [series], { 
                xaxis:{ 
                    mode: "time", 
                    timeformat: "%H:%M:%S", 
                    minTickSize: [2, "second"], 
                }, 
                yaxis: { 
                    min: 0, 
                    max: 5 
                } 
            }); 
            plot.draw(); 
        } 
    } 
    // Just update our conn_status field with the connection status 
    ws.onopen = function(evt) { 
        $('#conn_status').html('<b>Connected</b>'); 
    } 
    ws.onerror = function(evt) { 
        $('#conn_status').html('<b>Error</b>'); 
    } 
    ws.onclose = function(evt) { 
        $('#conn_status').html('<b>Closed</b>'); 
    } 
}); 

Putting it all together

To put everything together, here’s the main function I used:

def main(): 
    '''Set up zmq context and greenlets for all the servers, then launch the web 
    browser and run the data producer''' 
    context = zmq.Context() 
 
    # zeromq: tcp to inproc gateway 
    gevent.spawn(zmq_server, context) 
    # websocket server: copies inproc zmq messages to websocket 
    ws_server = gevent.pywsgi.WSGIServer( 
        ('', 9999), WebSocketApp(context), 
        handler_class=WebSocketHandler) 
    # http server: serves up static files 
    http_server = gevent.pywsgi.WSGIServer( 
        ('', 8000), 
        paste.urlparser.StaticURLParser(os.path.dirname(__file__))) 
    # Start the server greenlets 
    http_server.start() 
    ws_server.start() 
    # Open a couple of webbrowsers 
    webbrowser.open('http://localhost:8000/graph.html') 
    webbrowser.open('http://localhost:8000/graph.html') 
    # Kick off the producer 
    zmq_producer(context) 

For fun, I threw a couple of webbrowser calls at the end so you can see the data getting distributed to all the clients that connect to our server. If you’d like to see the full program including flot et. al., here’s a download. Hope you enjoy playing around with gevent, ZeroMQ, and WebSockets as much as I did!

Source: http://blog.pythonisito.com/2011/07/gevent-zeromq-websockets-and-flot-ftw.html