Статьи

Неблокируемый (NIO) сервер Push и сервлет 3

В моей предыдущей публикации в блоге я писал о том, что, как я ожидаю, будет делать node.js, чтобы стать зрелым Я ввел идею иметь структуру, которая позволяет вам определять протокол и некоторые обработчики, чтобы позволить разработчику сконцентрироваться на написании полезного делового программного обеспечения, а не технического кода, очень похоже на то, как это делает Java EE. И благодаря этой публикации я узнал о комете. Я заявил, что использование неблокирующего сервера на самом деле не будет более полезным, чем блокирование в типичном веб-приложении (HTTP), и поэтому я создал пример на основе моего собственного протокола и сервера VOIP для потоковой передачи двоичного файла. данные для многих одновременно подключенных клиентов.

Теперь я прочитал о Comet и понял, что действительно есть хороший пример наличия неблокирующего сервера в сети. В этом случае данные передаются обратно клиенту, например, для непрерывной публикации последних цен на акции. Хотя этот пример может быть решен с помощью опроса, истинная Comet использует длинный опрос или, что еще лучше, полное нажатие. Большое введение я прочитал здесь . Идея состоит в том, что клиент делает вызов серверу, и вместо того, чтобы сервер немедленно возвращал данные, он сохраняет соединение открытым и возвращает данные в будущем, возможно, много раз. Это не новая идея — термин «комета», по-видимому, был изобретен примерно в 2006 году, а статья, на которую я ссылаюсь выше, относится к 2009 году. Я думаю, что я пришел на эту вечеринку очень поздно 🙂

Мой новый найденный случай, когда сервер настаивал на неблокирующем HTTP, и сильное любопытство заставило меня опустить руки и начать кодировать. Вскоре у меня была грубая реализация протокола HTTP для моей маленькой платформы, и я смог написать приложение для своего сервера, используя обработчики, которые подклассифицировали HttpHandler, который для всех намерений и целей был сервлетом.

Чтобы Comet push работал правильно, вы заставляете клиента «войти» и зарегистрироваться на сервере. В моей демонстрации я не проверял авторизации для базы данных, как вы могли бы сделать для реального приложения, но у меня была концепция канала, на который мог подписаться любой клиент браузера. Во время этого входа клиент сообщает, на какой канал он хочет подписаться, и сервер добавляет неблокирующее соединение клиентов в свою модель. Сервер отвечает, используякодировка передачи по частям , потому что в этом случае соединение остается открытым, и вам не нужно заранее указывать , сколько данных вы отправите обратно. В какой-то момент в будущем, когда кто-то что-то публикует, сервер может использовать соединение, которое все еще открыто, и содержится в его модели, чтобы отправить эти опубликованные данные обратно подписанному клиенту, отправив другой кусок данных.

Реализация сервера не была слишком сложной, но клиент представлял несколько проблем, пока я не понял, что данные поступают в клиент ajax с состоянием готовности 3, а не с более обычным 4. Функция обратного вызова onreadystatechange клиента ajax была Кроме того, каждый байт данных содержится в его responseText, а не только в новом материале, поэтому мне пришлось немного повозиться, пока я не смог заставить браузер просто добавить новый материал в атрибут innerHTML элемента div на моей странице. В любом случае, через несколько часов у меня появилось приложение, которое работало довольно хорошо. Но это было не совсем удовлетворительно, отчасти потому, что, как я уже говорил в предыдущем сообщении, сервер все еще немного глючит, особенно когда клиент разрывает соединение, потому что, например, страница браузера закрыта. Я также закончил реализацию протокола HTTP для моей платформы,который, казалось, заново изобретал технологию колесо-сервлет, уже все это делает, и гораздо лучше, чем я могу надеяться сделать это. Одна из причин, по которой мне не нравится node.js, заключается в том, что все заново изобретается.

Таким образом, как и в статье, на которую я ссылался выше, в сервлетах версии 3.0 должна быть возможность обрабатывать Comet. Я скачал Tomcat 7.0 с контейнером Servlet 3.0 и перенес код своего приложения на соответствующие сервлеты. Потребовалось некоторое время, чтобы понять, как именно использовать новые асинхронные части сервлетов, потому что там не так много точных руководств. Спецификации сервлета (JSR 315) очень помогли. После того, как я понял, как правильно использовать асинхронные компоненты, у меня появилось по-настоящему удовлетворительное решение для моих требований push

Первым шагом была перенастройка Tomcat, чтобы он использовал неблокирующую (NIO) для своего протокола соединителя. Суть в том, что я хочу держать соединение открытым для клиента, чтобы передавать данные на него. Я не могу полагаться на парадигму «один поток на запрос», потому что переключение контекста и требования к памяти потоков, скорее всего, приведут к снижению производительности. В файле Tomcat server.xml я настроил протокол соединителя:

<!-- NIO HTTP/1.1 connector -->        <Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol"                connectionTimeout="20000"                redirectPort="8443" />

Скорее нормальный

<Connector port="8080" protocol="HTTP/1.1"                connectionTimeout="20000"                redirectPort="8443" /> 

Все, что вам нужно сделать, чтобы Tomcat превратился в сервер NIO, — это изменить атрибут протокола на более длинное имя класса.

Вторым шагом было создание двух сервлетов. Первый LoginServlet обрабатывает клиент, «вход в систему» и подписку на канал. Этот сервлет выглядит так:

/*   * Copyright (c) 2011 Ant Kutschera *  * This file is part of Ant Kutschera's blog,  * http://blog.maxant.co.uk *  * This is free software: you can redistribute * it and/or modify it under the terms of the * Lesser GNU General Public License as published by * the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. *  * It is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * PURPOSE.  See the Lesser GNU General Public License for * more details.  *  * You should have received a copy of the * Lesser GNU General Public License along with this software. * If not, see http://www.gnu.org/licenses/. */package ch.maxant.blog.nio.servlet3;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.Map;import javax.servlet.AsyncContext;import javax.servlet.ServletContext;import javax.servlet.annotation.WebServlet;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import ch.maxant.blog.nio.servlet3.model.Subscriber;@WebServlet(name = "loginServlet", urlPatterns = { "/login" }, asyncSupported = true)public class LoginServlet extends HttpServlet {public static final String CLIENTS = "ch.maxant.blog.nio.servlet3.clients";private static final long serialVersionUID = 1L;public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {// dont set the content length in the response, and we will end up with chunked // encoding so that a) we can keep the connection open to the client, and b) send// updates to the client as chunks.// *********************// we use asyncSupported=true on the annotation for two reasons. first of all, // it means the connection to the client isn't closed by the container.  second // it means that we can pass the asyncContext to another thread (eg the publisher) // which can then send data back to that open connection.// so that we dont require a thread per client, we also use NIO, configured in the // connector of our app server (eg tomcat)// *********************// what channel does the user want to subscribe to?  // for production we would need to check authorisations here!String channel = request.getParameter("channel");// ok, get an async context which we can pass to another threadfinal AsyncContext aCtx = request.startAsync(request, response);// a little longer than default, to give us time to test.// TODO if we use a heartbeat, then time that to pulse at a similar rateaCtx.setTimeout(20000L); // create a data object for this new subscriptionSubscriber subscriber = new Subscriber(aCtx, channel);// get the application scope so that we can add our data to the modelServletContext appScope = request.getServletContext();// fetch the model from the app scope@SuppressWarnings("unchecked")Map

Встроенные комментарии описывают большинство вариантов, которые я сделал. Я добавил слушателя в контекст, чтобы мы получили событие в случае отключенного клиента, чтобы мы могли привести в порядок нашу модель — полный код приведен в ZIP-файле в конце этой статьи. Важно отметить, что этот сервлет не выполняет асинхронную обработку. Он просто готовит объекты запроса и ответа для доступа в будущем. Мы вставляем их (через асинхронный контекст) в модель, которая находится в области применения. Эта модель может затем использоваться сервлетом, который получает запрос на публикацию чего-либо в данный канал.

/*   * Copyright (c) 2011 Ant Kutschera *  * This file is part of Ant Kutschera's blog,  * http://blog.maxant.co.uk *  * This is free software: you can redistribute * it and/or modify it under the terms of the * Lesser GNU General Public License as published by * the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. *  * It is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * PURPOSE.  See the Lesser GNU General Public License for * more details.  *  * You should have received a copy of the * Lesser GNU General Public License along with this software. * If not, see http://www.gnu.org/licenses/. */package ch.maxant.blog.nio.servlet3;import java.io.IOException;import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.Map;import javax.servlet.AsyncContext;import javax.servlet.ServletContext;import javax.servlet.annotation.WebServlet;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import ch.maxant.blog.nio.servlet3.model.Subscriber;@WebServlet(name = "publishServlet", urlPatterns = { "/publish" }, asyncSupported = true)public class PublishServlet extends HttpServlet {private static final long serialVersionUID = 1L;public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {// *************************// this servlet simply spawns a thread to send its message to all subscribers.// this servlet keeps the connection to its client open long enough to tell it // that it has published to all subscribers.// *************************// add a pipe character, so that the client knows from where the newest model has started.// if messages are published really quick, its possible that the client gets two at// once, and we dont want it to be confused!  these messages also arrive at the // ajax client in readyState 3, where the responseText contains everything since login,// rather than just the latest chunk.  so, the client needs a way to work out the // latest part of the message, containing the newest version of the model it should // work with.  might be better to return XML or JSON here!final String msg = "|" + request.getParameter("message") + " " + new Date();// to which channel should it publish?  in prod, we would check authorisations here too!final String channel = request.getParameter("channel");// get the application scoped model, and copy the list of subscribers, so that the // long running task of publishing doesnt interfere with new loginsServletContext appScope = request.getServletContext();@SuppressWarnings("unchecked")final Map

 

Опять же, в коде много комментариев. В этом сервлете мы фактически выполняем некоторую (потенциально) длительную задачу. Во многих онлайн-примерах поддержки асинхронной поддержки Servlet 3.0 они показывают передачу работы исполнителю. Асинхронный контекст обеспечивает идеальный способ сделать это через контейнер, используя его метод start (Runnable). Реализация контейнера тогда решает, как справиться с задачей, вместо того, чтобы разработчику приходилось беспокоиться о порождающих потоках, что на серверах приложений, таких как WebSphere, недопустимо и приводит к ошибкам. В истинной манере Java EE разработчик может сосредоточиться на бизнес-коде, а не на технических аспектах.

Что-то еще, что может быть важным в приведенном выше коде, заключается в том, что публикация выполняется в другом потоке. Представьте себе публикацию данных десяти тысячам клиентов. Чтобы закончить в течение секунды, каждое нажатие должно завершиться менее чем за одну десятую миллисекунды. Это означает, что сделать что-то полезное, например, поиск в базе данных, невозможно. На неблокирующем сервере мы не можем позволить себе потратить секунду, чтобы что-то сделать, и многие утверждают, что секунда — это вечность в такой среде. Возможность передать такую ​​работу другому потоку неоценима, и, к сожалению, что-то, что не может сделать node.js в настоящее время, хотя вы можете передать задачу другому процессу, хотя и потенциально более сложному, чем показано здесь.

Теперь нам просто нужно создать клиент для подписки на сервер. Этот клиент является объектом запроса ajax, который создается при загрузке HTML-кода, который выполняет некоторый JavaScript в библиотеке, которую я написал. HTML выглядит так:

<div id="myDiv"></div></body><script language="Javascript" type="text/javascript">function callback(model){//simply append the model to a div, for demo purposesvar myDiv = document.getElementById("myDiv");myDiv.innerHTML = myDiv.innerHTML + "<br>" + model;}new PushClient("myChannel", callback).login();</script></html>

 

Как видите, ему просто нужно определить функцию обратного вызова, которая будет обрабатывать каждое сообщение, опубликованное с сервера. Опубликованное сообщение может быть текстовым, XML или JSON — по выбору издателя. JavaScript в этой библиотеке немного сложнее, но в основном создает XHR-запросчик, который отправляет запрос сервлету входа в систему. Любые данные, которые он получает от сервера, он анализирует и возвращает самую последнюю часть данных обратно в обратный вызов.

/*   * Copyright (c) 2011 Ant Kutschera *  * This file is part of Ant Kutschera's blog,  * http://blog.maxant.co.uk *  * This is free software: you can redistribute * it and/or modify it under the terms of the * Lesser GNU General Public License as published by * the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. *  * It is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * PURPOSE.  See the Lesser GNU General Public License for * more details.  *  * You should have received a copy of the * Lesser GNU General Public License along with this software. * If not, see http://www.gnu.org/licenses/. */function PushClient(ch, m){this.channel = ch;this.ajax = getAjaxClient();this.onMessage = m;// stick a reference to "this" into the ajax client, so that the handleMessage // function can access the push client - its "this" is an XMLHttpRequest object// rather than the push client, coz thats how javascript works!this.ajax.pushClient = this;function getAjaxClient(){/* * Gets the ajax client * http://en.wikipedia.org/wiki/XMLHttpRequest * http://www.w3.org/TR/XMLHttpRequest/#responsetext */    var client = null;    try{// Firefox, Opera 8.0+, Safariclient = new XMLHttpRequest();}catch (e){// Internet Explorertry{client = new ActiveXObject("Msxml2.XMLHTTP");}catch (e){client = new ActiveXObject("Microsoft.XMLHTTP");}}return client;};/**  * pass in a callback and a channel.   * the callback should take a string,  * which is the latest version of the model  */PushClient.prototype.login = function(){try{var params = escape("channel") + "=" + escape(this.channel);var url = "login?" + params;this.ajax.onreadystatechange = handleMessage;this.ajax.open("GET",url,true); //true means async, which is the safest way to do it// hint to the browser and server, that we are doing something long running// initial tests only seemed to work with this - dont know, perhaps now it // works without it?this.ajax.setRequestHeader("Connection", "Keep-Alive");this.ajax.setRequestHeader("Keep-Alive", "timeout=999, max=99");this.ajax.setRequestHeader("Transfer-Encoding", "chunked");//send the GET request to the serverthis.ajax.send(null);}catch(e){alert(e);}};function handleMessage() {//states are://0 (Uninitialized)The object has been created, but not initialized (the open method has not been called).//1 (Open)The object has been created, but the send method has not been called.//2 (Sent)The send method has been called. responseText is not available. responseBody is not available.//3 (Receiving)Some data has been received. responseText is not available. responseBody is not available.//4 (Loaded)try{if(this.readyState == 0){//this.pushClient.onMessage("0/-/-");}else if (this.readyState == 1){//this.pushClient.onMessage("1/-/-");}else if (this.readyState == 2){//this.pushClient.onMessage("2/-/-");}else if (this.readyState == 3){//for chunked encoding, we get the newest version of the entire response here, //rather than in readyState 4, which is more usual.if (this.status == 200){this.pushClient.onMessage("3/200/" + this.responseText.substring(this.responseText.lastIndexOf("|")));}else{this.pushClient.onMessage("3/" + this.status + "/-");}}else if (this.readyState == 4){if (this.status == 200){//the connection is now closed.this.pushClient.onMessage("4/200/" + this.responseText.substring(this.responseText.lastIndexOf("|")));//start again - we were just disconnected!this.pushClient.login();}else{this.pushClient.onMessage("4/" + this.status + "/-");}}}catch(e){alert(e);}};}

Важными моментами здесь являются то, что нам нужно прослушивать события как в состоянии готовности 3, так и в 4, а не в обычном 4. Пока соединение остается открытым, клиент получает данные только в состоянии готовности 3 и каждый раз, когда получает чанк, Атрибут ajax.responseText содержит все чанки с момента входа в систему, а не только самый новый чанк. Это может быть плохо, если соединение получает тонны данных — в конце концов браузеру не хватит памяти! Вы можете измерить количество байтов, отправленных любому клиенту на сервере, и, когда оно превысит заданное пороговое значение, принудительно отключить клиент, завершив поток (вызвать метод complete () в асинхронном контексте соответствующего клиента сразу после публикации). сообщение к нему). Приведенный выше клиент автоматически входит обратно на сервер, когда сервер отключается.

Вместо решения по повторному соединению для соединений с прерыванием / тайм-аутом (что очень похоже на длительный опрос) мы могли бы добавить тактовый сигнал, который сервер отправляет каждому подписанному клиенту. Период сердцебиения должен быть немного меньше времени ожидания. Точные детали могут запутаться — вы рассматриваете посылку сердцебиения каждую секунду, но делаете это только тем клиентам, которые в этом нуждаются? Или вы отправляете его каждому клиенту, скажем, каждые 25 секунд, если время ожидания составляет, скажем, 30 секунд? Вы можете использовать настройку производительности, чтобы определить, является ли это лучше, чем переподключение, которое я показал выше. С другой стороны, сердцебиение подходит для отбраковки закрытых соединений, потому что оно очень часто проверяет соединение и получает исключение в случае сбоя отправки. И затем снова контейнер сообщает слушателю, что мы добавили в асинхронный контекст входа в систему, если также происходит отключение,так что, возможно, нам не нужно сердцебиение — вы решаете 🙂

Теперь нам нужен способ публикации данных — это просто — я просто набираю следующий URL в браузере, и он отправляет запрос GET сервлету публикации:

http://localhost:8080/nio-servlet3/publish?channel=myChannel&message=javaIsAwesome

Продолжайте обновлять окно браузера этим протоколом, а другое окно почти мгновенно получает обновления, показывая последнее сообщение внизу. Я протестировал его, используя Firefox в качестве подписчика и Chrome в качестве издателя.

Я не проверил масштабируемость, потому что предположил, что разъем NIO Tomcat был хорошо протестирован и хорошо работает. Я позволю кому-то еще поиграть с масштабируемостью Servlet 3.0 для решения push. Эта статья рассказывает о том, как легко реализовать Comet push, используя сервлеты Java EE. Обратите внимание, что раньше это тоже было легко, потому что серверы, такие как Jetty и Tomcat и другие, предоставляли специальные интерфейсы Comet, но теперь, с появлением Servlet 3.0, появился стандартизированный способ сделать это.

Есть много профессиональных и открытых решений, которые делают то, что я сделал в этой статье. См эту статью , в которой перечислены многие, whackiest всего, APE — еще один проект , который ставит JavaScript на сервере !? Ну, лучше, чем PHP, я думаю 🙂

Полный код для этой демонстрации можно скачать здесь .

 

От http://blog.maxant.co.uk/pebble/2011/06/05/1307299200000.html