Статьи

I / O Демистифицированный

Учитывая всю шумиху по поводу масштабируемой конструкции сервера и ярость за nodejs, я собирался сфокусироваться на чтении шаблонов проектирования ввода-вывода, в которые до сих пор не мог найти достаточно времени для инвестирования. Теперь, проведя некоторое исследование, я подумал, что лучше записать материал, с которым я столкнулся, в качестве будущего справочника для меня и любого, кто может встретить этот пост. Хорошо, тогда .. Давайте сядем на автобус ввода / вывода и покататься.

Типы ввода / вывода

Существует четыре различных способа ввода-вывода в соответствии с блокирующим или неблокирующим характером операций и синхронным или асинхронным характером уведомлений о готовности / завершении ввода-вывода.

Синхронный блокирующий ввод / вывод

Именно здесь операция ввода-вывода блокирует приложение до его завершения, что составляет основу типичного потока для каждой модели подключения, найденного на большинстве веб-серверов.

Когда вызывается блокировка read () или write (), происходит переключение контекста в ядро, где происходит операция ввода-вывода и данные копируются в буфер ядра. После этого буфер ядра будет перенесен в буфер уровня приложения пользовательского пространства, а поток приложения будет помечен как работоспособный.
после чего приложение будет разблокировать и считывать данные в буфере пространства пользователя.

Модель потока на соединение пытается ограничить эффект этой блокировки, ограничивая соединение потоком, чтобы обработка других одновременных соединений не была заблокирована операцией ввода-вывода для одного соединения. Это нормально, если соединения недолговечны и задержки передачи данных не так уж и плохи. Однако в
в случае соединений с длительным сроком службы или с высокой задержкой существует вероятность того, что потоки будут задерживаться этими соединениями в течение длительного времени, что приведет к истощению ресурсов для новых соединений, если используется пул потоков фиксированного размера, поскольку заблокированные потоки не могут повторно использоваться для обслуживания новых соединений, пока в состояние блокировки или это
приведет к тому, что в системе будет создано большое количество потоков, если каждое соединение обслуживается с использованием нового потока, что может стать довольно ресурсоемким с высокими затратами на переключение контекста для высокой одновременной нагрузки.

1
2
3
4
5
ServerSocket server = new ServerSocket(port);
while(true) {
  Socket connection = server.accept();
  spawn-Thread-and-process(connection);
}

Синхронный неблокирующий ввод / вывод

В этом режиме устройство или соединение настроено как неблокирующее, поэтому операции read () и write () не будут заблокированы. Обычно это означает, что если операция не может быть выполнена немедленно, она вернется с кодом ошибки, указывающим, что операция заблокируется (EWOULDBLOCK в POSIX) или устройство
временно недоступен (EAGAIN в POSIX). Приложение должно опросить, пока устройство не будет готово и все данные не будут прочитаны. Однако это не очень эффективно, поскольку каждый из этих вызовов вызывает переключение контекста в ядро ​​и обратно независимо от того, были ли прочитаны некоторые данные или нет.

Асинхронный неблокирующий ввод-вывод с событиями готовности

Проблема с более ранним режимом состояла в том, что приложение должно было опросить и занято ждать, чтобы выполнить работу. Разве не было бы лучше, если бы приложение получало уведомление, когда устройство готово для чтения / записи? Это именно то, что обеспечивает этот режим. Используя специальный системный вызов (зависит от платформы — выберите () / poll () / epoll () для Linux, kqueue () для BSD, / dev / poll для Solaris), приложение регистрирует интерес к готовности ввода-вывода информация для определенной операции ввода / вывода (чтение или запись) с определенного устройства (дескриптор файла на языке Linux, поскольку все сокеты абстрагируются с использованием файловых дескрипторов). После этого вызывается этот системный вызов, который блокируется, пока, по крайней мере, один из зарегистрированных файловых дескрипторов не станет готовым. Когда это так, файловые дескрипторы, готовые для ввода / вывода, будут выбраны как
возврат системного вызова и может обслуживаться последовательно в цикле в потоке приложения.

Логика обработки готовых соединений обычно содержится в предоставляемом пользователем обработчике событий, который все равно должен был бы выполнять неблокирующие вызовы read () / write () для извлечения данных с устройства в ядро ​​и, в конечном счете, в
буфер пространства пользователя, выполняющий переключение контекста к ядру. Более того, как правило, нет абсолютной гарантии того, что с устройством можно выполнить запланированные операции ввода-вывода, поскольку то, что обеспечивает операционная система, является лишь признаком того, что устройство может быть готово выполнить интересующую операцию ввода-вывода, но не блокировка read () или write () может помочь вам в таких ситуациях. Однако это должно быть исключением, чем норма.

Таким образом, общая идея состоит в том, чтобы получать события готовности асинхронным способом и регистрировать некоторые обработчики событий для обработки, когда такие уведомления о событиях запускаются. Таким образом, как вы можете видеть, все это может быть выполнено в одном потоке при одновременном мультиплексировании между различными соединениями, в первую очередь из-за природы select () (здесь я выбираю типичный системный вызов), который может возвращать готовность нескольких сокетов одновременно. Это является частью привлекательности этого режима работы, когда один поток может одновременно обслуживать большое количество соединений. Этот
Режим — это то, что обычно называют моделью «неблокирующий ввод / вывод».

Java абстрагировала различия между реализациями системных вызовов для конкретной платформы с помощью своего NIO API. Дескрипторы сокета / файла абстрагируются с использованием Channels, а Selector инкапсулирует системный вызов selection. Приложения, заинтересованные в получении событий готовности, регистрируют канал (обычно SocketChannel, полученный методом accept () на ServerSocketChannel) с помощью селектора и получают SelectionKey, который действует как дескриптор для хранения канала и регистрационной информации. Затем вызывается блокирующий вызов select () для Selector, который возвращает набор SelectionKeys, которые затем могут быть обработаны
один за другим, используя указанные приложением обработчики событий.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Selector selector = Selector.open();
 
channel.configureBlocking(false);
 
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
 
while(true) {
 
  int readyChannels = selector.select();
 
  if(readyChannels == 0) continue;
 
  Set<SelectionKey> selectedKeys = selector.selectedKeys();
 
  Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
 
  while(keyIterator.hasNext()) {
 
    SelectionKey key = keyIterator.next();
 
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
 
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
 
    } else if (key.isReadable()) {
        // a channel is ready for reading
 
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
 
    keyIterator.remove();
  }
}

Асинхронный и неблокирующий ввод-вывод с событиями завершения

События готовности только зашли так далеко, чтобы уведомить вас, что устройство / сокет готовы что-то сделать. Приложению по-прежнему приходится выполнять грязную работу по чтению данных с устройства / сокета (точнее, указав операционной системе сделать это через системный вызов) в буфер пространства пользователя на всем пути от устройства. Разве не было бы неплохо делегировать это задание операционной системе для выполнения в фоновом режиме и позволить ему проинформировать вас о завершении задания, передав все данные с устройства в буфер ядра и, наконец, в буфер уровня приложения? Это основная идея этого режима, обычно известного как режим «асинхронного ввода-вывода». Для этого требуется, чтобы операционная система поддерживала операции AIO. В Linux эта поддержка присутствует в aio POSIX API начиная с версии 2.6, а для Windows — в виде «портов завершения ввода / вывода».

С NIO2 Java усилила поддержку этого режима с помощью API AsynchronousChannel.

Поддержка операционной системы

Для поддержки уведомлений о готовности и завершении событий разные операционные системы предоставляют различные системные вызовы. Для событий готовности select () и poll () могут использоваться в системах на базе Linux. Однако более новый вариант epoll () предпочтителен из-за его эффективности по сравнению с select () или poll (). select () страдает от того факта, что время выбора увеличивается линейно с количеством отслеживаемых дескрипторов. Это очевидно печально известно перезаписью ссылок массива дескриптора файла. Таким образом, каждый раз, когда он вызывается, массив дескрипторов должен быть заново заполнен из отдельной копии. Во всяком случае, не элегантное решение.

Вариант epoll () можно настроить двумя способами. А именно, срабатывают по краям и по уровням. В случае запуска по фронту он будет отправлять уведомление только тогда, когда в соответствующем дескрипторе обнаружено событие. Скажем, во время событийного уведомления ваш обработчик приложения считывает только половину входного буфера ядра. Теперь он не получит уведомление об этом дескрипторе в следующий раз, даже когда есть какие-то данные для чтения, если устройство не готово отправить больше данных, вызывающих событие дескриптора файла. С другой стороны, конфигурация, управляемая уровнем, будет вызывать уведомление каждый раз, когда есть данные для чтения.

Сопоставимые системные вызовы присутствуют в виде kqueue в вариантах BSD и / dev / poll или «Завершение события» в Solaris в зависимости от версии. Эквивалент Windows — «Порты завершения ввода / вывода».

Однако ситуация для режима AIO немного отличается, по крайней мере, в случае Linux. Поддержка aio для сокетов в Linux в лучшем случае кажется неясной, поскольку некоторые предполагают, что она фактически использует события готовности на уровне ядра, обеспечивая асинхронную абстракцию для событий завершения на уровне приложения. Однако Windows, похоже, снова поддерживает этот первый класс через «порты завершения ввода / вывода».

Design I / O Patterns 101

Там, где дело доходит до разработки программного обеспечения, есть шаблоны. Ввод / вывод не отличается. Есть несколько моделей ввода / вывода, связанных с моделями NIO и AIO, которые описаны ниже.

Образец реактора

В этом шаблоне участвуют несколько компонентов. Сначала я пройдусь по ним, чтобы было легче понять схему.

Reactor Initiator: это компонент, который запускает неблокирующий сервер путем настройки и запуска диспетчера. Сначала он связывает сокет сервера и регистрирует его в демультиплексоре для событий готовности клиента к соединению. Затем реализации обработчика событий для каждого типа событий готовности (чтение / запись / принятие и т. Д.) Будут зарегистрированы диспетчером. Затем будет вызван цикл событий диспетчера для обработки уведомлений о событиях.

Диспетчер: Определяет интерфейс для регистрации, удаления и отправки обработчиков событий, отвечающих за реакцию на события соединения, которые включают в себя принятие соединения, ввод / вывод данных и события тайм-аута для набора соединений. Для обслуживания клиентского соединения соответствующий обработчик события (например, обработчик события accept) регистрирует принятый клиентский канал (оболочку для основного клиентского сокета) в демультиплексоре вместе с типом событий готовности для прослушивания этого конкретного канала. После этого поток диспетчера вызовет операцию выбора готовности блокировки на демультиплексоре для набора зарегистрированных каналов. Как только один или несколько зарегистрированных каналов готовы к вводу / выводу, диспетчер будет обслуживать каждый возвращенный «дескриптор», связанный с каждым готовым каналом, один за другим, используя зарегистрированные обработчики событий. Важно, чтобы эти обработчики событий не задерживали поток диспетчера, поскольку это приведет к задержке диспетчера в обслуживании других готовых соединений. Поскольку обычная логика в обработчике событий включает в себя передачу данных в / из готового соединения, которое будет блокироваться до тех пор, пока все данные не будут переданы между пользовательским пространством и буферами данных пространства ядра, как правило, это тот случай, когда эти обработчики запускаются в разных потоках из потока. бассейн.

Дескриптор: дескриптор возвращается, когда канал зарегистрирован демультиплексором, который инкапсулирует канал соединения и информацию о готовности. Набор готовых дескрипторов будет возвращен операцией выбора готовности демультиплексора. Эквивалентом Java NIO является SelectionKey.

Демультиплексор: ожидает событий готовности в одном или нескольких зарегистрированных каналах соединения. Эквивалентом Java NIO является Селектор.

Обработчик событий: Определяет интерфейс, имеющий методы ловушки для отправки событий соединения. Эти методы должны быть реализованы реализациями обработчика событий для конкретного приложения.

Конкретный обработчик событий: содержит логику для чтения / записи данных из базового соединения и для выполнения необходимой обработки или запуска протокола приема клиентского соединения из переданного дескриптора.

Обработчики событий обычно запускаются в отдельных потоках из пула потоков, как показано на диаграмме ниже.

Простая реализация эхо-сервера для этого шаблона выглядит следующим образом (без пула потоков обработчика событий).

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
public class ReactorInitiator {
 
private static final int NIO_SERVER_PORT = 9993;
 
  public void initiateReactiveServer(int port) throws Exception {
 
    ServerSocketChannel server = ServerSocketChannel.open();
    server.socket().bind(new InetSocketAddress(port));
    server.configureBlocking(false);
 
    Dispatcher dispatcher = new Dispatcher();
    dispatcher.registerChannel(SelectionKey.OP_ACCEPT, server);
 
    dispatcher.registerEventHandler(
      SelectionKey.OP_ACCEPT, new AcceptEventHandler(
      dispatcher.getDemultiplexer()));
 
    dispatcher.registerEventHandler(
      SelectionKey.OP_READ, new ReadEventHandler(
      dispatcher.getDemultiplexer()));
 
    dispatcher.registerEventHandler(
    SelectionKey.OP_WRITE, new WriteEventHandler());
 
    dispatcher.run(); // Run the dispatcher loop
 
 }
 
  public static void main(String[] args) throws Exception {
    System.out.println('Starting NIO server at port : ' +
      NIO_SERVER_PORT);
    new ReactorInitiator().
      initiateReactiveServer(NIO_SERVER_PORT);
  }
 
}
 
public class Dispatcher {
 
  private Map<Integer, EventHandler> registeredHandlers =
    new ConcurrentHashMap<Integer, EventHandler>();
  private Selector demultiplexer;
 
  public Dispatcher() throws Exception {
    demultiplexer = Selector.open();
  }
 
  public Selector getDemultiplexer() {
    return demultiplexer;
  }
 
  public void registerEventHandler(
    int eventType, EventHandler eventHandler) {
    registeredHandlers.put(eventType, eventHandler);
  }
 
  // Used to register ServerSocketChannel with the
  // selector to accept incoming client connections
  public void registerChannel(
    int eventType, SelectableChannel channel) throws Exception {
    channel.register(demultiplexer, eventType);
  }
 
  public void run() {
    try {
      while (true) { // Loop indefinitely
        demultiplexer.select();
 
        Set<SelectionKey> readyHandles =
          demultiplexer.selectedKeys();
        Iterator<SelectionKey> handleIterator =
          readyHandles.iterator();
 
        while (handleIterator.hasNext()) {
          SelectionKey handle = handleIterator.next();
 
          if (handle.isAcceptable()) {
            EventHandler handler =
              registeredHandlers.get(SelectionKey.OP_ACCEPT);
              handler.handleEvent(handle);
           // Note : Here we don't remove this handle from
           // selector since we want to keep listening to
           // new client connections
          }
 
          if (handle.isReadable()) {
            EventHandler handler =
              registeredHandlers.get(SelectionKey.OP_READ);
            handler.handleEvent(handle);
            handleIterator.remove();
          }
 
          if (handle.isWritable()) {
            EventHandler handler =
              registeredHandlers.get(SelectionKey.OP_WRITE);
            handler.handleEvent(handle);
            handleIterator.remove();
          }
        }
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
 
}
 
public interface EventHandler {
 
   public void handleEvent(SelectionKey handle) throws Exception;
 
}
 
public class AcceptEventHandler implements EventHandler {
  private Selector demultiplexer;
  public AcceptEventHandler(Selector demultiplexer) {
    this.demultiplexer = demultiplexer;
  }
 
  @Override
  public void handleEvent(SelectionKey handle) throws Exception {
    ServerSocketChannel serverSocketChannel =
     (ServerSocketChannel) handle.channel();
    SocketChannel socketChannel = serverSocketChannel.accept();
    if (socketChannel != null) {
      socketChannel.configureBlocking(false);
      socketChannel.register(
        demultiplexer, SelectionKey.OP_READ);
    }
  }
 
}
 
public class ReadEventHandler implements EventHandler {
 
  private Selector demultiplexer;
  private ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
 
  public ReadEventHandler(Selector demultiplexer) {
    this.demultiplexer = demultiplexer;
  }
 
  @Override
  public void handleEvent(SelectionKey handle) throws Exception {
    SocketChannel socketChannel =
     (SocketChannel) handle.channel();
 
    socketChannel.read(inputBuffer); // Read data from client
 
    inputBuffer.flip();
    // Rewind the buffer to start reading from the beginning
 
    byte[] buffer = new byte[inputBuffer.limit()];
    inputBuffer.get(buffer);
 
    System.out.println('Received message from client : ' +
      new String(buffer));
    inputBuffer.flip();
    // Rewind the buffer to start reading from the beginning
    // Register the interest for writable readiness event for
    // this channel in order to echo back the message
 
    socketChannel.register(
      demultiplexer, SelectionKey.OP_WRITE, inputBuffer);
  }
 
}
 
public class WriteEventHandler implements EventHandler {
 
  @Override
  public void handleEvent(SelectionKey handle) throws Exception {
    SocketChannel socketChannel =
      (SocketChannel) handle.channel();
    ByteBuffer inputBuffer = (ByteBuffer) handle.attachment();
    socketChannel.write(inputBuffer);
    socketChannel.close(); // Close connection
  }
 
}

Proactor Pattern

Этот шаблон основан на модели асинхронного ввода-вывода. Основные компоненты заключаются в следующем.

Проактивный инициатор: это объект, который инициирует асинхронную операцию, принимающую клиентские соединения. Обычно это основной поток серверного приложения. Регистрирует обработчик завершения вместе с диспетчером завершения для обработки асинхронного уведомления о событии принятия соединения.

Процессор асинхронной работы: он отвечает за асинхронное выполнение операций ввода-вывода и предоставление уведомлений о событиях завершения обработчику завершения уровня приложения. Обычно это интерфейс асинхронного ввода-вывода, предоставляемый операционной системой.

Асинхронная операция. Асинхронные операции выполняются процессором асинхронной операции в отдельных потоках ядра.

Диспетчер завершения: отвечает за обратный вызов обработчиков завершения приложения после завершения асинхронных операций. Когда процессор асинхронной операции завершает асинхронно инициированную операцию, диспетчер завершения выполняет обратный вызов приложения от своего имени. Обычно делегирует обработку уведомления о событии подходящему обработчику завершения в соответствии с типом события.

Обработчик завершения: это интерфейс, реализованный приложением для обработки асинхронных событий завершения события.

Давайте посмотрим, как этот шаблон может быть реализован (как простой эхо-сервер) с использованием нового API Java NIO.2, добавленного в Java 7.

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
public class ProactorInitiator {
  static int ASYNC_SERVER_PORT = 4333;
 
  public void initiateProactiveServer(int port)
    throws IOException {
 
    final AsynchronousServerSocketChannel listener =
      AsynchronousServerSocketChannel.open().bind(
        new InetSocketAddress(port));
     AcceptCompletionHandler acceptCompletionHandler =
       new AcceptCompletionHandler(listener);
 
     SessionState state = new SessionState();
     listener.accept(state, acceptCompletionHandler);
  }
 
  public static void main(String[] args) {
    try {
       System.out.println('Async server listening on port : ' +
         ASYNC_SERVER_PORT);
       new ProactorInitiator().initiateProactiveServer(
         ASYNC_SERVER_PORT);
    } catch (IOException e) {
     e.printStackTrace();
    }
 
    // Sleep indefinitely since otherwise the JVM would terminate
    while (true) {
      try {
        Thread.sleep(Long.MAX_VALUE);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}
 
public class AcceptCompletionHandler
  implements
    CompletionHandler<AsynchronousSocketChannel, SessionState> {
 
  private AsynchronousServerSocketChannel listener;
 
  public AcceptCompletionHandler(
    AsynchronousServerSocketChannel listener) {
    this.listener = listener;
  }
 
  @Override
  public void completed(AsynchronousSocketChannel socketChannel,
    SessionState sessionState) {
   // accept the next connection
   SessionState newSessionState = new SessionState();
   listener.accept(newSessionState, this);
 
   // handle this connection
   ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
   ReadCompletionHandler readCompletionHandler =
     new ReadCompletionHandler(socketChannel, inputBuffer);
   socketChannel.read(
     inputBuffer, sessionState, readCompletionHandler);
  }
 
  @Override
  public void failed(Throwable exc, SessionState sessionState) {
   // Handle connection failure...
  }
 
}
 
public class ReadCompletionHandler implements
  CompletionHandler<Integer, SessionState> {
 
   private AsynchronousSocketChannel socketChannel;
   private ByteBuffer inputBuffer;
 
   public ReadCompletionHandler(
     AsynchronousSocketChannel socketChannel,
     ByteBuffer inputBuffer) {
     this.socketChannel = socketChannel;
     this.inputBuffer = inputBuffer;
   }
 
   @Override
   public void completed(
     Integer bytesRead, SessionState sessionState) {
 
     byte[] buffer = new byte[bytesRead];
     inputBuffer.rewind();
     // Rewind the input buffer to read from the beginning
 
     inputBuffer.get(buffer);
     String message = new String(buffer);
 
     System.out.println('Received message from client : ' +
       message);
 
     // Echo the message back to client
     WriteCompletionHandler writeCompletionHandler =
       new WriteCompletionHandler(socketChannel);
 
     ByteBuffer outputBuffer = ByteBuffer.wrap(buffer);
 
     socketChannel.write(
       outputBuffer, sessionState, writeCompletionHandler);
  }
 
  @Override
  public void failed(Throwable exc, SessionState attachment) {
    //Handle read failure.....
   }
 
}
 
public class WriteCompletionHandler implements
  CompletionHandler<Integer, SessionState> {
 
  private AsynchronousSocketChannel socketChannel;
 
  public WriteCompletionHandler(
    AsynchronousSocketChannel socketChannel) {
    this.socketChannel = socketChannel;
  }
 
  @Override
  public void completed(
    Integer bytesWritten, SessionState attachment) {
    try {
      socketChannel.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
 
  @Override
  public void failed(Throwable exc, SessionState attachment) {
   // Handle write failure.....
  }
 
}
 
public class SessionState {
 
  private Map<String, String> sessionProps =
    new ConcurrentHashMap<String, String>();
 
   public String getProperty(String key) {
     return sessionProps.get(key);
   }
 
   public void setProperty(String key, String value) {
     sessionProps.put(key, value);
   }
 
}

Каждый тип завершения события (принять / прочитать / записать) обрабатывается отдельным обработчиком завершения, реализующим интерфейс CompletionHandler (Accept / Read / WriteCompletionHandler и т. Д.). Переходы между состояниями управляются внутри этих обработчиков соединений. Дополнительный аргумент SessionState может быть использован для
удерживать конкретное состояние сеанса клиента через серию событий завершения.

NIO Frameworks (HTTPCore)

Если вы думаете о реализации HTTP-сервера на основе NIO, вам повезло. Apache HTTPCore библиотека обеспечивает отличную поддержку для обработки HTTP-трафика с NIO. API обеспечивает абстракции более высокого уровня поверх уровня NIO со встроенной обработкой HTTP-запросов. Ниже приведена минимальная неблокирующая реализация HTTP-сервера, которая возвращает фиктивный вывод для любого запроса GET.

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
public class NHttpServer {
 
  public void start() throws IOReactorException {
    HttpParams params = new BasicHttpParams();
    // Connection parameters
    params.
      setIntParameter(
        HttpConnectionParams.SO_TIMEOUT, 60000)
     .setIntParameter(
       HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024)
     .setBooleanParameter(
       HttpConnectionParams.STALE_CONNECTION_CHECK, true)
     .setBooleanParameter(
       HttpConnectionParams.TCP_NODELAY, true);
 
    final DefaultListeningIOReactor ioReactor =
      new DefaultListeningIOReactor(2, params);
    // Spawns an IOReactor having two reactor threads
    // running selectors. Number of threads here is
    // usually matched to the number of processor cores
    // in the system
 
    // Application specific readiness event handler
    ServerHandler handler = new ServerHandler();
 
    final IOEventDispatch ioEventDispatch =
      new DefaultServerIOEventDispatch(handler, params);
    // Default IO event dispatcher encapsulating the
    // event handler
 
    ListenerEndpoint endpoint = ioReactor.listen(
      new InetSocketAddress(4444));
 
    // start the IO reactor in a new separate thread
    Thread t = new Thread(new Runnable() {
      public void run() {
        try {
          System.out.println('Listening in port 4444');
          ioReactor.execute(ioEventDispatch);
        } catch (InterruptedIOException ex) {
          ex.printStackTrace();
        } catch (IOException e) {
          e.printStackTrace();
        } catch (Exception e) {
          e.printStackTrace();
        }
    }
    });
    t.start();
 
    // Wait for the endpoint to become ready,
    // i.e. for the listener to start accepting requests.
    try {
      endpoint.waitFor();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
 
  public static void main(String[] args)
    throws IOReactorException {
    new NHttpServer().start();
  }
 
}
 
public class ServerHandler implements NHttpServiceHandler {
 
 private static final int BUFFER_SIZE = 2048;
 
 private static final String RESPONSE_SOURCE_BUFFER =
 'response-source-buffer';
 
 // the factory to create HTTP responses
 private final HttpResponseFactory responseFactory;
 
 // the HTTP response processor
 private final HttpProcessor httpProcessor;
 
 // the strategy to re-use connections
 private final ConnectionReuseStrategy connStrategy;
 
 // the buffer allocator
 private final ByteBufferAllocator allocator;
 
 public ServerHandler() {
   super();
   this.responseFactory = new DefaultHttpResponseFactory();
   this.httpProcessor = new BasicHttpProcessor();
   this.connStrategy = new DefaultConnectionReuseStrategy();
   this.allocator = new HeapByteBufferAllocator();
 }
 
 @Override
 public void connected(
   NHttpServerConnection nHttpServerConnection) {
   System.out.println('New incoming connection');
 }
 
 @Override
 public void requestReceived(
   NHttpServerConnection nHttpServerConnection) {
 
   HttpRequest request =
     nHttpServerConnection.getHttpRequest();
   if (request instanceof HttpEntityEnclosingRequest) {
     // Handle POST and PUT requests
   } else {
 
     ContentOutputBuffer outputBuffer =
       new SharedOutputBuffer(
         BUFFER_SIZE, nHttpServerConnection, allocator);
 
     HttpContext context =
       nHttpServerConnection.getContext();
     context.setAttribute(
       RESPONSE_SOURCE_BUFFER, outputBuffer);
     OutputStream os =
       new ContentOutputStream(outputBuffer);
 
     // create the default response to this request
     ProtocolVersion httpVersion =
     request.getRequestLine().getProtocolVersion();
     HttpResponse response =
       responseFactory.newHttpResponse(
         httpVersion, HttpStatus.SC_OK,
         nHttpServerConnection.getContext());
 
     // create a basic HttpEntity using the source
     // channel of the response pipe
     BasicHttpEntity entity = new BasicHttpEntity();
     if (httpVersion.greaterEquals(HttpVersion.HTTP_1_1)) {
       entity.setChunked(true);
     }
     response.setEntity(entity);
 
     String method = request.getRequestLine().
       getMethod().toUpperCase();
 
     if (method.equals('GET')) {
       try {
         nHttpServerConnection.suspendInput();
         nHttpServerConnection.submitResponse(response);
         os.write(new String('Hello client..').
           getBytes('UTF-8'));
 
         os.flush();
         os.close();
     } catch (Exception e) {
       e.printStackTrace();
     }
    } // Handle other http methods
   }
 }
 
 @Override
 public void inputReady(
    NHttpServerConnection nHttpServerConnection,
    ContentDecoder contentDecoder) {
    // Handle request enclosed entities here by reading
    // them from the channel
 }
 
 @Override
 public void responseReady(
    NHttpServerConnection nHttpServerConnection) {
 
   try {
     nHttpServerConnection.close();
   } catch (IOException e) {
     e.printStackTrace();
   }
 }
 
 @Override
 public void outputReady(
   NHttpServerConnection nHttpServerConnection,
   ContentEncoder encoder) {
   HttpContext context = nHttpServerConnection.getContext();
   ContentOutputBuffer outBuf =
    (ContentOutputBuffer) context.getAttribute(
      RESPONSE_SOURCE_BUFFER);
 
   try {
     outBuf.produceContent(encoder);
   } catch (IOException e) {
     e.printStackTrace();
   }
 }
 
 @Override
 public void exception(
   NHttpServerConnection nHttpServerConnection,
   IOException e) {
   e.printStackTrace();
 }
 
 @Override
 public void exception(
   NHttpServerConnection nHttpServerConnection,
   HttpException e) {
   e.printStackTrace();
 }
 
 @Override
 public void timeout(
   NHttpServerConnection nHttpServerConnection) {
   try {
     nHttpServerConnection.close();
   } catch (IOException e) {
     e.printStackTrace();
   }
 }
 
 @Override
 public void closed(
   NHttpServerConnection nHttpServerConnection) {
   try {
     nHttpServerConnection.close();
   } catch (IOException e) {
     e.printStackTrace();
   }
 }
 
}

Класс IOReactor в основном обернет функциональность демультиплексора реализацией ServerHandler, обрабатывающей события готовности.

Apache Synapse (ESB с открытым исходным кодом) содержит хорошую реализацию HTTP-сервера на основе NIO, в котором NIO используется для масштабирования большого количества клиентов на экземпляр с довольно постоянным использованием памяти с течением времени. Реализация также содержит хорошие механизмы отладки и сбора статистики сервера, встроенные в интеграцию с транспортной средой Axis2. Его можно найти в [1].

Вывод

Существует несколько вариантов ввода / вывода, которые могут повлиять на масштабируемость и производительность серверов. Каждый из вышеперечисленных механизмов ввода / вывода имеет свои плюсы и минусы, поэтому необходимо принимать решения об ожидаемых характеристиках масштабируемости и производительности и простоте поддержки этих подходов. На этом я заканчиваю мою несколько длинную статью о вводе / выводе. Не стесняйтесь предоставлять предложения, исправления или комментарии, которые вы можете иметь. Полные исходные коды для серверов, описанных в посте, вместе с клиентами можно скачать здесь .

Ссылки по теме

Было много ссылок, через которые я прошел в процессе. Ниже приведены некоторые из интересных.

[1] http://www.ibm.com/developerworks/java/library/j-nio2-1/index.html

[2] http://www.ibm.com/developerworks/linux/library/l-async/

[3] http://lse.sourceforge.net/io/aionotes.txt

[4] http://wknight8111.blogspot.com/?tag=aio

[5] http://nick-black.com/dankwiki/index.php/Fast_UNIX_Servers

[6] http://today.java.net/pub/a/today/2007/02/13/architecture-of-highly-scalable-nio-server.html

[7] Java NIO Рона Хитченса

[8] http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf

[9] http://www.cs.wustl.edu/~schmidt/PDF/proactor.pdf

[10] http://www.kegel.com/c10k.html

Ссылка: I / O Демистифицировано от нашего партнера JCG Буддики Чамит в блоге Source Open .