Статьи

Использование Java WebSockets, JSR 356 и JSON, сопоставленных с POJO

Поэтому я поигрался с Tyrus , эталонной реализацией спецификации JSR 356 WebSocket для Java. Поскольку я смотрел на инструменты тестирования, мне было интересно запускать как клиентскую, так и серверную часть в Java. Так что нет HTML5 в этом посте, я боюсь.

В этом примере мы хотим отправлять JSON туда и обратно, и, поскольку я старомоден, я хочу иметь возможность связываться с объектом POJO. Я собираюсь использовать Джексона для этого, поэтому мой файл maven выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<dependencies>
    <dependency>
        <groupId>javax.websocket</groupId>
        <artifactId>javax.websocket-api</artifactId>
        <version>1.0-rc3</version>
    </dependency>
 
    <dependency>
        <groupId>org.glassfish.tyrus</groupId>
        <artifactId>tyrus-client</artifactId>
        <version>1.0-rc3</version>
    </dependency>
 
    <dependency>
        <groupId>org.glassfish.tyrus</groupId>
        <artifactId>tyrus-server</artifactId>
        <version>1.0-rc3</version>
    </dependency>
 
    <dependency>
        <groupId>org.glassfish.tyrus</groupId>
        <artifactId>tyrus-container-grizzly</artifactId>
        <version>1.0-rc3</version>
    </dependency>
 
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.2.0</version>
    </dependency>
 
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.2.0</version>
    </dependency>
 
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>2.2.0</version>
    </dependency>
 
  </dependencies>

Итак, первое, что нам нужно сделать, — это определить реализации интерфейсов Encode / Decoder, чтобы сделать эту работу за нас. Это даст некоторое простое размышление о том, что такое класс бобов. Как и в случае с JAX-WS, их проще размещать в одном классе. Обратите внимание, что мы используем потоковую версию интерфейса и обрабатываем только текстовое содержимое. (Игнорируя возможность отправлять двоичные данные на данный момент)

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package websocket;
 
import com.fasterxml.jackson.databind.ObjectMapper;
 
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
 
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
 
import javax.websocket.DecodeException;
import javax.websocket.Decoder;
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;
 
public abstract class JSONCoder<T>
  implements Encoder.TextStream<T>, Decoder.TextStream<T>{
 
    private Class<T> _type;
 
    // When configured my read in that ObjectMapper is not thread safe
    //
    private ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
 
        @Override
        protected ObjectMapper initialValue() {
            return new ObjectMapper();
        }
    };
 
    @Override
    public void init(EndpointConfig endpointConfig) {
 
        ParameterizedType $thisClass = (ParameterizedType) this.getClass().getGenericSuperclass();
        Type $T = $thisClass.getActualTypeArguments()[0];
        if ($T instanceof Class) {
            _type = (Class<T>)$T;
        }
        else if ($T instanceof ParameterizedType) {
            _type = (Class<T>)((ParameterizedType)$T).getRawType();
        }
    }
 
    @Override
    public void encode(T object, Writer writer) throws EncodeException, IOException {
        _mapper.get().writeValue(writer, object);
    }
 
    @Override
    public T decode(Reader reader) throws DecodeException, IOException {
        return _mapper.get().readValue(reader, _type);
    }
 
    @Override
    public void destroy() {
 
    }
 
}

Класс bean действительно довольно прост со статическим подклассом Coder, который мы можем использовать позже.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package websocket;
 
public class EchoBean {
 
    public static class EchoBeanCode extends
       JSONCoder<EchoBean> {
 
    }
 
    private String _message;
    private String _reply;
 
    public EchoBean() {
 
    }
 
    public EchoBean(String _message) {
        super();
        this._message = _message;
    }
 
    public void setMessage(String _message) {
        this._message = _message;
    }
 
    public String getMessage() {
        return _message;
    }
 
    public void setReply(String _reply) {
        this._reply = _reply;
    }
 
    public String getReply() {
        return _reply;
    }
 
}

Таким образом, мы должны реализовать конечную точку нашего сервера, поэтому вы можете пойти одним из двух способов: аннотировать POJO или расширять конечную точку. Я иду с первым для сервера и вторым для клиента. На самом деле все, что делает эта служба — это отправляет сообщение обратно клиенту. Обратите внимание на регистрацию кодирования и декодирования. Тот же класс в этом случае.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package websocket;
 
import java.io.IOException;
 
import javax.websocket.EncodeException;
import javax.websocket.EndpointConfig;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import static java.lang.System.out;
 
@ServerEndpoint(value="/echo",
                encoders = {EchoBean.EchoBeanCode.class},
                decoders = {EchoBean.EchoBeanCode.class})
public class EchoBeanService
{
 
    @OnMessage
    public void echo (EchoBean bean, Session peer) throws IOException, EncodeException {
        //
        bean.setReply("Server says " + bean.getMessage());
        out.println("Sending message to client");
        peer.getBasicRemote().sendObject(bean);
    }
 
    @OnOpen
    public void onOpen(final Session session, EndpointConfig endpointConfig) {
        out.println("Server connected "  + session + " " + endpointConfig);
    }
}

Давайте посмотрим на клиентский бин, на этот раз расширив стандартный класс Endpoint и добавив определенный прослушиватель для сообщения. В этом случае, когда сообщение получено, соединение просто закрывается, чтобы упростить наш тестовый пример. В реальном мире управление этой связью, очевидно, будет более сложным.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package websocket;
 
import java.io.IOException;
 
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.EncodeException;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
 
import static java.lang.System.out;
 
@ClientEndpoint(encoders = {EchoBean.EchoBeanCode.class},
                decoders = {EchoBean.EchoBeanCode.class})
public class EchoBeanClient
  extends Endpoint
{
    public void onOpen(final Session session, EndpointConfig endpointConfig) {
 
        out.println("Client Connection open "  + session + " " + endpointConfig);
 
        // Add a listener to capture the returning event
        //
 
        session.addMessageHandler(new MessageHandler.Whole() {
 
            @Override
            public void onMessage(EchoBean bean) {
                out.println("Message from server : " + bean.getReply());
 
                out.println("Closing connection");
                try {
                    session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, "All fine"));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
 
        // Once we are connected we can now safely send out initial message to the server
        //
 
        out.println("Sending message to server");
        try {
            EchoBean bean = new EchoBean("Hello");
            session.getBasicRemote().sendObject(bean);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (EncodeException e) {
            e.printStackTrace();
        }
 
    }
}

Теперь запустить автономный WebSocket на Tyrus довольно просто: вы просто создаете экземпляр сервера и запускаете его. Имейте в виду, что это запускает потоки демона, поэтому вам нужно убедиться, что это основной метод, что вы делаете что-то для поддержки JVM.

1
2
3
4
import org.glassfish.tyrus.server.Server;
 
Server server = new Server("localhost", 8025, "/", EchoBeanService.class);
server.start();

Так что клиент относительно прост; но поскольку мы делаем декларативный метод, нам нужно явно регистрировать кодеры и декодеры при регистрации клиентского класса.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import javax.websocket.ClientEndpointConfig;
import javax.websocket.Decoder;
import javax.websocket.Encoder;
import javax.websocket.Session;
 
import org.glassfish.tyrus.client.ClientManager;
 
// Right now we have to create a client, which will send a message then close
// when it has received a reply
//
 
ClientManager client = ClientManager.createClient();
EchoBeanClient beanClient = new EchoBeanClient();
 
Session session = client.connectToServer(
        beanClient,
        ClientEndpointConfig.Builder.create()
         .encoders(Arrays.<Class<? extends Encoder>>asList(EchoBean.EchoBeanCode.class))
         .decoders(Arrays.<Class<? extends Decoder>>asList(EchoBean.EchoBeanCode.class))
         .build(),
        URI.create("ws://localhost:8025/echo"));
 
// Wait until things are closed down
 
while (session.isOpen()) {
    out.println("Waiting");
    TimeUnit.MILLISECONDS.sleep(10);
}

Теперь результат этого выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
Server connected SessionImpl{uri=/echo, id='e7739cc8-1ce5-4c26-ad5f-88a24c688799', endpoint=EndpointWrapper{endpointClass=null, endpoint=org.glassfish.tyrus.core.AnnotatedEndpoint@1ce5bc9, uri='/echo', contextPath='/'}} javax.websocket.server.DefaultServerEndpointConfig@ec120d
Waiting
Client Connection open SessionImpl{uri=ws://localhost:8025/echo, id='7428be2b-6f8a-4c40-a0c4-b1c8b22e1338', endpoint=EndpointWrapper{endpointClass=null, endpoint=websocket.EchoBeanClient@404c85, uri='ws://localhost:8025/echo', contextPath='ws://localhost:8025/echo'}} javax.websocket.DefaultClientEndpointConfig@15fdf14
Sending message to server
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Waiting
Sending message to client
Message from server : Server says Hello
Closing connection
Waiting

Интересно, что в первый раз, когда это происходит, возникает пауза, я подозреваю, что это из-за того, что Джексон настраивается, но у меня не было времени для профилирования. Я обнаружил, что эта долгая задержка произошла в первом посте — хотя, очевидно, это будет медленнее, чем обычная передача текстовых сообщений. Важна ли для вас разница, зависит от вашего приложения.

Было бы интересно сравнить производительность простого текста с потоковым API-интерфейсом JSON, таким как предоставляемый новым JSR, и, конечно, версией, которая связывает эти значения с JSON POJO. Что-то на другой день, возможно.