Моя последняя пара блогов рассказывала о длинном опросе и методике Spring DeferredResult
, и чтобы продемонстрировать эти концепции, я включил код из моего проекта Producer Consumer в веб-приложение. Хотя код демонстрирует замечания блогов, он содержит большое количество пробелов в своей логике. Помимо того факта, что в реальном приложении вы не будете использовать простое LinkedBlockingQueue
, но выберете JMS или какой-либо другой промышленный сервис обмена сообщениями, а также тот факт, что только один пользователь может получить доступ к обновлениям соответствия, есть и проблема порождает потоки с плохим поведением, которые не закрываются при завершении JVM.
Вы можете задаться вопросом, почему это должно быть проблемой… для вас, как для разработчика, это на самом деле не проблема, это всего лишь немного небрежного программирования, но для одного из ваших сотрудников это может сделать жизнь излишне сложной. Причина этого в том, что если у вас слишком много потоков с плохим поведением, то ввод команды Tomcat shutdown.sh
будет иметь очень небольшой эффект, и вам придется жестоко убивать ваш веб-сервер, набрав что-то вроде:
1
|
ps -ef | grep java |
чтобы получить пид, а затем
1
|
kill - 9 <<pid>> |
… и когда у вас есть поле для веб-серверов Tomcat, чтобы перезапустить весь этот дополнительный беспорядок, который становится серьезной болью. Когда вы набираете shutdown.sh
вы хотите, чтобы Tomcat остановился.
В моей прошлой паре блогов созданные мной потоки с плохим поведением имели следующие методы run()
причем первый из них, показанный ниже, действительно плохо себя вел:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
@Override public void run() { while ( true ) { try { DeferredResult<Message> result = resultQueue.take(); Message message = queue.take(); result.setResult(message); } catch (InterruptedException e) { throw new UpdateException( "Cannot get latest update. " + e.getMessage(), e); } } } |
В этом коде я использовал бесконечное while(true)
, что означает, что поток просто продолжит работать и никогда не завершится.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
@Override public void run() { sleep( 5 ); // Sleep to allow the reset of the app to load logger.info( "The match has now started..." ); long now = System.currentTimeMillis(); List<Message> matchUpdates = match.getUpdates(); for (Message message : matchUpdates) { delayUntilNextUpdate(now, message.getTime()); logger.info( "Add message to queue: {}" , message.getMessageText()); queue.add(message); } start = true ; // Game over, can restart logger.warn( "GAME OVER" ); } |
Второй пример, приведенный выше, также довольно плохо себя ведет. Он будет продолжать принимать сообщения из списка MatchUpdates
и добавлять их в очередь сообщений в соответствующий момент. Их единственное спасительное преимущество заключается в том, что они могут генерировать InterruptedException
, которое при правильной обработке вызовет завершение потока; однако это не может быть гарантировано.
Есть быстрое решение для этого, действительно … все, что вам нужно сделать, это убедиться, что все созданные вами потоки являются потоками демонов. Определение потока демона — это поток, который не препятствует выходу JVM при завершении программы, но поток все еще работает. Обычным примером потока демона является поток сборки мусора JVM. Чтобы превратить ваши потоки в потоки демонов, вы просто вызываете:
1
|
thread.setDaemon( true ); |
… И когда вы напечатаете shutdown.sh
, WHAM , все ваши потоки исчезнут. Однако есть проблема с этим. Что, если один из ваших потоков демонов делал что-то важное, и прерывание его в самом начале приводило к потере некоторых довольно важных данных?
Что вам нужно сделать, так это убедиться, что все ваши потоки корректно завершены, завершив любую работу, которую они могут выполнять в данный момент. Остальная часть этого блога демонстрирует исправление для этих ошибочных потоков, изящно координируя их отключение с помощью ShutdownHook
. Согласно документации , «ловушка отключения» — это просто инициализированный, но незапущенный поток. Когда виртуальная машина начинает свою последовательность выключения, она запускает все зарегистрированные обработчики завершения работы в неустановленном порядке и позволяет им запускаться одновременно ». Итак, после прочтения последнего предложения вы, возможно, догадались, что вам нужно создать поток, который отвечает за отключение всех остальных потоков и передается в JVM в качестве ловушки завершения работы. Все это может быть реализовано в общем виде в нескольких небольших классах и с помощью некоторых jiggery-pokery в существующих методах run()
потока.
Два класса для создания — это ShutdownService
и Hook
. Класс Hook
, который я продемонстрирую первым, используется для связи ShutdownService
с вашими потоками. Код для Hook
выглядит следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public class Hook { private static final Logger logger = LoggerFactory.getLogger(Hook. class ); private boolean keepRunning = true ; private final Thread thread; Hook(Thread thread) { this .thread = thread; } /** * @return True if the daemon thread is to keep running */ public boolean keepRunning() { return keepRunning; } /** * Tell the client daemon thread to shutdown and wait for it to close gracefully. */ public void shutdown() { keepRunning = false ; thread.interrupt(); try { thread.join(); } catch (InterruptedException e) { logger.error( "Error shutting down thread with hook" , e); } } } |
Hook
содержит две переменные экземпляра: keepRunning
и thread
. thread
— это ссылка на поток, за который этот экземпляр Hook
отвечает за завершение работы, в то время как keepRunning
сообщает потоку … продолжать работу.
У keepRunning()
есть два открытых метода: keepRunning()
и shutdown()
. keepRunning()
вызывается потоком, чтобы выяснить, должен ли он продолжать работать, а shutdown()
вызывается потоком ловушки shutdownService, чтобы завершить работу вашего потока. Это самый интересный из двух методов. Во-первых, он устанавливает для переменной keepRunning
значение false. Затем он вызывает thread.interrupt()
для прерывания потока, заставляя его генерировать thread.interrupt()
InterruptedException
. Наконец, он вызывает thread.join()
и ожидает завершения работы экземпляра thread
.
Обратите внимание, что эта техника опирается на взаимодействие всех ваших потоков. Если в миксе есть одна нить с плохим поведением, то все может зависнуть. Чтобы обойти эту проблему, добавьте таймаут в thread.join(…)
.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
@Service public class ShutdownService { private static final Logger logger = LoggerFactory.getLogger(ShutdownService. class ); private final List<Hook> hooks; public ShutdownService() { logger.debug( "Creating shutdown service" ); hooks = new ArrayList<Hook>(); createShutdownHook(); } /** * Protected for testing */ @VisibleForTesting protected void createShutdownHook() { ShutdownDaemonHook shutdownHook = new ShutdownDaemonHook(); Runtime.getRuntime().addShutdownHook(shutdownHook); } protected class ShutdownDaemonHook extends Thread { /** * Loop and shutdown all the daemon threads using the hooks * * @see java.lang.Thread#run() */ @Override public void run() { logger.info( "Running shutdown sync" ); for (Hook hook : hooks) { hook.shutdown(); } } } /** * Create a new instance of the hook class */ public Hook createHook(Thread thread) { thread.setDaemon( true ); Hook retVal = new Hook(thread); hooks.add(retVal); return retVal; } @VisibleForTesting List<Hook> getHooks() { return hooks; } } |
ShutdownService
— это служба Spring, которая содержит список классов Hook
и, следовательно, посредством потоков вывода, которые она отвечает за завершение работы. Он также содержит внутренний класс ShutdownDaemonHook
, который расширяет Thread
. Экземпляр ShutdownDaemonHook
создается во время создания ShutdownService
, который затем передается в JVM в качестве ловушки завершения работы путем вызова
1
|
Runtime.getRuntime().addShutdownHook(shutdownHook); |
У ShutdownService
есть один открытый метод: createHook()
. Первое, что делает этот класс, — это гарантирует, что любой переданный ему поток преобразуется в поток демона. Затем он создает новый экземпляр Hook
, передавая поток в качестве аргумента, прежде чем, наконец, сохранить результат в списке и вернуть его вызывающей стороне.
Теперь осталось только интегрировать ShutdownService
в DeferredResultService
и MatchReporter
, два класса, которые содержат потоки с плохим поведением.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
@Service ( "DeferredService" ) public class DeferredResultService implements Runnable { private static final Logger logger = LoggerFactory.getLogger(DeferredResultService. class ); private final BlockingQueue<DeferredResult<Message>> resultQueue = new LinkedBlockingQueue<>(); private Thread thread; private volatile boolean start = true ; @Autowired private ShutdownService shutdownService; private Hook hook; @Autowired @Qualifier ( "theQueue" ) private LinkedBlockingQueue<Message> queue; @Autowired @Qualifier ( "BillSkyes" ) private MatchReporter matchReporter; public void subscribe() { logger.info( "Starting server" ); matchReporter.start(); startThread(); } private void startThread() { if (start) { synchronized ( this ) { if (start) { start = false ; thread = new Thread( this , "Studio Teletype" ); hook = shutdownService.createHook(thread); thread.start(); } } } } @Override public void run() { logger.info( "DeferredResultService - Thread running" ); while (hook.keepRunning()) { try { DeferredResult<Message> result = resultQueue.take(); Message message = queue.take(); result.setResult(message); } catch (InterruptedException e) { System.out.println( "Interrupted when waiting for latest update. " + e.getMessage()); } } System.out.println( "DeferredResultService - Thread ending" ); } public void getUpdate(DeferredResult<Message> result) { resultQueue.add(result); } } |
Первым изменением этого класса было автоматическое подключение в экземпляре службы Shutdown
. Следующее, что нужно сделать, это использовать ShutdownService
для создания экземпляра Hook
после создания потока, но до thread.start()
:
1
2
3
|
thread = new Thread( this , "Studio Teletype" ); hook = shutdownService.createHook(thread); thread.start(); |
Последнее изменение — заменить while(true)
на:
1
|
while (hook.keepRunning()) { |
… Сообщая нити, когда выходить из цикла while и выключаться.
Возможно, вы также заметили, что в приведенный выше код добавлено несколько вызовов System.out.println()
. Есть причина для этого, и это из-за неопределенного порядка, в котором выполняются потоки обработчика завершения работы. Помните, что не только ваши классы пытаются корректно завершить работу, но и другие подсистемы, а также завершают работу. Это означает, что мой оригинальный код, который назывался logger.info(…)
не logger.info(…)
следующее исключение:
01
02
03
04
05
06
07
08
09
10
11
|
Exception in thread "Studio Teletype" java.lang.NoClassDefFoundError: org/apache/log4j/spi/ThrowableInformation at org.apache.log4j.spi.LoggingEvent.(LoggingEvent.java: 159 ) at org.apache.log4j.Category.forcedLog(Category.java: 391 ) at org.apache.log4j.Category.log(Category.java: 856 ) at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java: 382 ) at com.captaindebug.longpoll.service.DeferredResultService.run(DeferredResultService.java: 75 ) at java.lang.Thread.run(Thread.java: 722 ) Caused by: java.lang.ClassNotFoundException: org.apache.log4j.spi.ThrowableInformation at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java: 1714 ) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java: 1559 ) ... 6 more |
Это потому, что регистратор уже был выгружен, когда я пытаюсь вызвать его; отсюда и ошибка. Повторите, как указано в документации : «Завершающие перехваты выполняются в непростое время в жизненном цикле виртуальной машины и поэтому должны кодироваться с защитой. В частности, они должны быть написаны так, чтобы они были поточно-ориентированными и, по возможности, избегали взаимоблокировок. Они также не должны слепо полагаться на сервисы, которые могли зарегистрировать свои собственные крюки отключения и, следовательно, могут сами в процессе выключения. Попытки использовать другие основанные на потоке сервисы, такие как поток диспетчеризации событий AWT, например, могут привести к взаимоблокировкам ».
Класс MatchReport
имеет несколько очень похожих модификаций. Основное отличие состоит в том, что hook.keepRunning()
находится внутри run()
метода run()
.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
public class MatchReporter implements Runnable { private static final Logger logger = LoggerFactory.getLogger(MatchReporter. class ); private final Match match; private final Queue<Message> queue; private volatile boolean start = true ; @Autowired private ShutdownService shutdownService; private Hook hook; public MatchReporter(Match theBigMatch, Queue<Message> queue) { this .match = theBigMatch; this .queue = queue; } /** * Called by Spring after loading the context. Will "kick off" the match... */ public void start() { if (start) { synchronized ( this ) { if (start) { start = false ; logger.info( "Starting the Match Reporter..." ); String name = match.getName(); Thread thread = new Thread( this , name); hook = shutdownService.createHook(thread); thread.start(); } } } else { logger.warn( "Game already in progress" ); } } /** * The main run loop */ @Override public void run() { sleep( 5 ); // Sleep to allow the reset of the app to load logger.info( "The match has now started..." ); long now = System.currentTimeMillis(); List<Message> matchUpdates = match.getUpdates(); for (Message message : matchUpdates) { delayUntilNextUpdate(now, message.getTime()); if (!hook.keepRunning()) { break ; } logger.info( "Add message to queue: {}" , message.getMessageText()); queue.add(message); } start = true ; // Game over, can restart logger.warn( "GAME OVER" ); } private void sleep( int deplay) { try { TimeUnit.SECONDS.sleep( 10 ); } catch (InterruptedException e) { logger.info( "Sleep interrupted..." ); } } private void delayUntilNextUpdate( long now, long messageTime) { while (System.currentTimeMillis() < now + messageTime) { try { Thread.sleep( 100 ); } catch (InterruptedException e) { logger.info( "MatchReporter Thread interrupted..." ); } } } } |
Окончательная проверка этого кода — ввод команды Tomcat shutdown.sh
полпути через последовательность обновления соответствия. Когда JVM завершает работу, он вызывает хук отключения из класса ShutdownDaemonHook
. По мере run()
метода run()
этого класса он зацикливается по списку экземпляров Hook
сообщая им о необходимости закрыть их соответствующие потоки. Если вы tail -f
файл журнала вашего сервера (в моем случае catalina.out, но ваш Tomcat может быть настроен иначе, чем мой), вы увидите, как запись записей корректно завершает работу вашего сервера.
Код, сопровождающий этот блог, доступен на Github по адресу: https://github.com/roghughe/captaindebug/tree/master/long-poll .