Статьи

Сокеты TCP / IP для мониторинга процессов демона.

Когда я закончил свою предыдущую статью несколько недель назад, я подумал, что было бы неплохо поговорить о сокетах. У процессов-демонов есть проблемы, связанные с отсутствием видимости их результатов, и во многих случаях трудно получить от них информацию, поэтому было бы идеально связать обе статьи.

Я помню еще в конце 1990-х годов, когда я обнаружил Socket-связь и то, как это может помочь внутренним процессам предоставлять информацию внешнему миру. Я начал использовать это очень часто, и это был полный успех, так как компания, в которой я работал, не знала, что это возможно. Мониторинг этих процессов был очень утомительным и занимал много времени, поэтому это имело огромное значение для нас. Через полгода мы контролировали все наши процессы не только через сокет-клиенты, но и смогли увеличить количество потоков, беззвучно остановить демон, сократить время ожидания, в течение которого производитель просыпается для выполнения задач, и даже изменять поведенческие настройки того, как процесс автоматически замедляется или ускоряется на основе статистики по требованию задачи.

Сокеты использовались слишком долго, и они всегда являются замечательным и мощным инструментом во многих отношениях. Мы обязательно используем их сотни раз в день, даже не подозревая об этом.

Давайте начнем писать код сейчас. Я буду использовать код в  моей предыдущей статье,  поэтому нам не придется тратить время на написание и понимание другого внутреннего процесса. 

Первое, что мы собираемся сделать, — это позаботиться о сокете сервера, который будет находиться внутри внутреннего процесса. У демона должен быть другой отдельный поток, чтобы получать петиции о соединении путем прослушивания одного конкретного сокета TCP / IP, а затем создает новый поток для входящего соединения, который будет обслуживать его запрос. Таким образом, мы можем сказать, что у каждого клиента, подключенного к нашему процессу, будет свой собственный поток, живущий в процессе демона, посвященном ему (клиенту).

Класс ServerSocketConnection.java:

package com.test.multithread;

import java.net.*;
import java.io.*;

public class ServerSocketConnection extends Thread {
	boolean listening = true;
	ServerSocket serverSocket = null;
	TaskProducer taskProducer;

	@Override
	public void run() {
		try {
			serverSocket = new ServerSocket(3232);

			while (listening) {
				new SocketConnectionThread(serverSocket.accept(), taskProducer).start();
			}

		} catch (final IOException e) {
			System.err.println("Could not listen on port: 3232.");
			System.exit(-1);
		}
	}

	public void setProcessingThread(final TaskProducer ep) {
		this.taskProducer = ep;
	}

	public boolean isListening() {
		return listening;
	}

	public void setListening(final boolean listening) {
		this.listening = listening;
	}

	public void close() {
		try {
			serverSocket.close();
		} catch (final IOException e) {
			e.printStackTrace();
		}
	}
}

В методе run происходит важное действие, и поток будет заблокирован в 17 строке в ожидании нового клиента, запрашивающего соединение. Как только соединение установится, поток разблокируется и будет создан новый SocketConnectionThread. Этот новый объект теперь предназначен для прослушивания запросов этого конкретного клиента и будет использовать ссылку на taskProducer для обработки своих команд.

Теперь давайте удостоверимся, что этот новый класс используется главным классом демона:

Класс ProducerConsumerPattern.java:

package com.test.multithread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumerPattern {

	private final int queueCapacity = 200;
	private int numberOfThreads = 10;

	public static void main(final String args[]) {
		new ProducerConsumerPattern(20);
	

	public ProducerConsumerPattern(final int numberOfThreads) {
		if (numberOfThreads <= 0 || numberOfThreads > 100) 
			throw new IllegalArgumentException("The number of threads should be a number between 1 and 100");

		this.numberOfThreads = numberOfThreads;

		// Creating shared object
		final BlockingQueue<Long> sharedQueue = new LinkedBlockingQueue<Long>(queueCapacity);

		// Creating and starting the Consumer Threads
		final List<TaskConsumer> consumers = new ArrayList<TaskConsumer>();
		for (int i = 0; i <= this.numberOfThreads; i++) {
			final TaskConsumer consThread = new TaskConsumer(i, sharedQueue);
			consThread.start();
			consumers.add(consThread);
		}

		final ServerSocketConnection socketServer = new ServerSocketConnection();

		// Creating and starting the Producer Thread
		final TaskProducer prodThread = new TaskProducer(sharedQueue, socketServer, consumers);
		prodThread.start();

		socketServer.setProcessingThread(prodThread);
		socketServer.start();
	}
}

Как видите, строки 34, 40 и 41 были добавлены в исходный код. Поскольку TaskProducer также потребуется ссылка на код ServerSocketConnection, его необходимо создать непосредственно перед созданием TaskProducer, чтобы передать ссылку ServerSocketConnection в конструктор. Затем, поскольку эти два класса имеют перекрестные ссылки, нам нужно установить TaskProducer в класс ServerSocketConnection с помощью метода setter.

Теперь, когда у нас есть класс сокета сервера, мы должны написать класс SocketConnectionThread, который будет прослушивать информационные запросы (команды) от клиентов и отвечать данными, поступающими из статистики, накопленной TaskProducer.

Класс SocketConnectionThread.java:

package com.test.multithread;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class SocketConnectionThread extends Thread {
	private Socket socket = null;
	private final TaskProducer taskProducer;

	public SocketConnectionThread(final Socket socket, final TaskProducer ep) {
		super("SocketConnectionThread");
		this.socket = socket;
		this.taskProducer = ep;
	}

	@Override
	public void run() {
		try {
			final PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
			final BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

			String inputLine, outputLine;
			final CommunicationProtocol kkp = new CommunicationProtocol();
			while ((inputLine = in.readLine()) != null) {
				outputLine = kkp.processInput(inputLine, taskProducer);
				out.println(outputLine);

				if (outputLine.equals("Bye"))
					break;
			}
			out.close();
			in.close();
			socket.close();
		} catch (final IOException e) {
			e.printStackTrace();
		}
	}
}

Как и в ServerSocketConnection, логика сосредоточена на методе run, и здесь блокировка происходит в другом месте. Теперь OutputStream, исходящий из сокета, блокирует поток, ожидающий запросы. Каждый раз, когда от клиента приходит новая строка (команда), поток просыпается и обрабатывает запрос.

Здесь мы должны принять решение. Как мы собираемся обрабатывать текст, поступающий от клиента? Мы собираемся написать этот код в этот класс? Что я обычно делаю в этих сценариях, так это инкапсулирую логику интерпретации команд, поступающих от клиента, в класс, который действует как синтаксический анализатор протокола связи. Этот класс может использоваться как на сервере, так и в объектах обслуживания нескольких клиентов. Вот почему у нас здесь третий класс:

Класс CommunicationProtocol.java:

package com.test.multithread;

public class CommunicationProtocol {

	public String processInput(final String theInput, final TaskProducer taskProducer) {
		final String output = "Unknown command";

		if (theInput.equalsIgnoreCase("exit")) {
			return "Bye";
		} else if (theInput.equalsIgnoreCase("stop")) {
			taskProducer.stopProcessing();
			return "Stopping";
		} else if (theInput.equalsIgnoreCase("totalQueuedItems")) {
			return Long.toString(taskProducer.getTotalQueuedItems());
		} else if (theInput.equalsIgnoreCase("totalProcessedItems")) {
			return Long.toString(taskProducer.getTotalProcessedItems());
		} else if (theInput.equalsIgnoreCase("averageProcessingTime")) {
			return Long.toString(taskProducer.getAverageProcessingTime());
		}

		return output;
	}
}

Код в этом классе действительно прост, но он также может быть очень сложным в зависимости от уровня сложности, который потребуется вашим командам. Идея здесь заключается в добавлении еще одного уровня абстракции, чтобы класс SocketConnectionThread не сталкивался с проблемами, связанными с протоколом.

Как вы можете видеть, я предоставлю процессу 5 простых команд, так как цель этой статьи не в том, чтобы исследовать идеи о том, как манипулировать захватом данных из процессов демона. Эти простые три статистических фрагмента данных и две поведенческие команды будут служить примерами того, что можно сделать с помощью этого подхода.

Чтобы предоставить эту статистическую информацию нашим клиентам, нам необходимо отслеживать, сколько задач было поставлено в очередь, сколько было обработано и сколько времени занимает выполнение каждой задачи. Но где эта информация принадлежит?

Естественным местом для общего количества элементов в очереди является TaskProducer, и общее или среднее время обработки будет зависеть от TaskConsumer, поэтому нам потребуется добавить некоторые изменения в оба класса.

Класс TaskProducer.java:

package com.test.multithread;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class TaskProducer extends Thread {
	private boolean blnExit = false;
	private final List<TaskConsumer> consumers;
	private final BlockingQueue<Long> sharedQueue;
	private final ServerSocketConnection socketServer;
	public long totalQueuedItems = 0;

	public TaskProducer(final BlockingQueue<Long> sharedQueue, final ServerSocketConnection socketServer, final List<TaskConsumer> consumers) {
		this.sharedQueue = sharedQueue;
		this.consumers = consumers;
		this.socketServer = socketServer;
	}

	@Override
	public void run() {
		long i = 0;

		////////////////////////////////////////////
		// PRODUCING THE OBJECTS TO BE CONSUMED
		////////////////////////////////////////////
		while (!blnExit) {
			try {
            			i++;
				totalQueuedItems++;
				sharedQueue.put(Long.valueOf(i)); 

			} catch (final InterruptedException ex) {
				Logger.getLogger(TaskProducer.class.getName()).log(Level.SEVERE, null, ex);
			}
		}

		/////////////////////////////////
		// WAIT UNTIL THE QUEUE IS EMPTY
		/////////////////////////////////
		while (sharedQueue.size() > 0) {
			try {
				Thread.sleep(200);
				System.out.println("Producer waiting to end.");

			} catch (final InterruptedException e) {
				break;
			}
		}

		////////////////////////////////////////////
		// SEND TO ALL CONSUMERS THE EXIT CONDITION
		////////////////////////////////////////////
		for (final TaskConsumer consumer : consumers) {
			consumer.setExitCondition(true);
		}
		socketServer.close();

	}

	public void setExitCondition(final boolean blnDoExit) {
		blnExit = blnDoExit;
	}

	public long getTotalQueuedItems() {
		return totalQueuedItems;
	}

	public long getTotalProcessedItems() {
		long total = 0;
		for (final TaskConsumer consumer : consumers) {
			total += consumer.getTotalConsumedItems();
		}
		return total;
	}

	public void stopProcessing() {
		blnExit = true;
		socketServer.setListening(false);
	}

	public long getAverageProcessingTime() {
		long total = 0;
		for (final TaskConsumer consumer : consumers) {
			total += consumer.getAverageProcessingTime();
		}
		return total / consumers.size();
	}
}

Здесь мы добавили метод для каждой из данных, которые будут интересны клиентам, и две из них, которые будут выполнять действие. GetAverageProcessingTime и getTotalProcessingTime рассчитываются путем сбора информации, хранящейся в каждом из объектов TaskConsumer, а getTotalQueuedItems как раз возвращает значение атрибута класса.

Класс TaskConsumer будет хранить статистическую информацию о процессе, чтобы иметь возможность передавать его в TaskProducer, когда это необходимо.

Класс TaskConsumer.java:

package com.test.multithread;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class TaskConsumer extends Thread {
	boolean blnExit = false;
	private final int id;
	private long totalConsumedItems = 0;
	private long averageProcessingTime = 0;
	private final BlockingQueue<Long> sharedQueue;

	public TaskConsumer(final int id, final BlockingQueue<Long> sharedQueue) {
		this.id = id;
		this.sharedQueue = sharedQueue;
	}

	public void setExitCondition(final boolean blnDoExit) {
		blnExit = blnDoExit;
	}

	@Override
	public void run() {
		final Random generator = new Random();

		while (!blnExit) {
			try {
				if (sharedQueue.size() > 0) {
					System.out.println("Consumer id " + id + " has worked task "+ sharedQueue.take());

					// TO BE REMOVED (ONLY SIMULATES RANDOM WORKING TIME)
					final long start = System.currentTimeMillis();
					Thread.sleep(generator.nextInt(1000) + 1000);
					final long end = System.currentTimeMillis();

					totalConsumedItems++;
					averageProcessingTime = (averageProcessingTime + (end - start)) / 2;

				} else
					Thread.sleep(500);

			} catch (final InterruptedException ex) {
				Logger.getLogger(TaskConsumer.class.getName()).log(Level.SEVERE, null, ex);
			}
		}

		System.out.println("Consumer " + id + " exiting");
	}

	public long getTotalConsumedItems() {
		return totalConsumedItems;
	}

	public long getAverageProcessingTime() {
		return averageProcessingTime;
	}
}

Поскольку мы не хотим хранить исторические данные для каждой потребляемой задачи, я решил предоставить только среднее время выполнения. Эта информация может быть гораздо более точной, но это выходит за рамки данной статьи.

Теперь все на своем месте. Нам просто нужно приложение GUI клиентского сокета, но мы не собираемся его здесь писать. Это было написано тысячи раз, поэтому давайте не будем изобретать велосипед. А не ___ ли нам? ?

Я использовал SocketTest в этой статье, и он прекрасно работает, но вы можете свободно использовать любой другой стандартный сокет-клиент, какой захотите.

Загрузите SocketTest отсюда http://sockettest.sourceforge.net/ и вы готовы к работе!

Поиграйте с этим инструментом, чтобы чувствовать себя комфортно с ним, и как только вы будете готовы протестировать наше приложение, запустите демон и затем подключите клиент сокетов, используя хост по умолчанию (127.0.0.1, localhost, также известный как loopback) и порт 3232 (или тот, твой выбор).

Теперь вы можете запросить демона следующим образом:

SocketTestProducerConsumer.png

Помните, что процесс-демон постоянно обрабатывает приложения для кредитных карт, и это дополнение будет просто предоставлять информацию о выполнении процесса удаленным клиентам.

Получайте удовольствие и не стесняйтесь задавать вопросы, если вам нужно.