Статьи

Решение проблемы производитель-потребитель в Java

Проблема «производитель-потребитель» — одна из наиболее часто встречающихся проблем при попытке многопоточного программирования. Хотя это не так сложно, как некоторые другие проблемы многопоточного программирования, неправильная реализация этой проблемы может создать беспорядок в вашем приложении. Произведенные предметы останутся неиспользованными, начальные предметы будут пропущены, потребление зависит от того, началось ли производство раньше или позже, чем попытки потребления и т. Д. Чтобы добавить к этому, вы можете заметить аномалии спустя много времени после того, как это действительно произошло, и что наиболее важно, как почти все программы с многопоточностью, эту тоже сложно отлаживать и воспроизводить.
Поэтому в этом посте я подумал, что попытаюсь решить эту проблему в Java с помощью Java-пакета «java.util.concurrent» и его классов.
Прежде всего, давайте посмотрим на характеристики проблемы производитель-потребитель:
  • Производитель (и) производят товары.
  • Потребитель (и) потребляют товары, произведенные производителем (ами).
  • Производитель (и) заканчивают производство и дают потребителям знать, что они сделаны.
Обратите внимание, что в этой проблеме производитель-потребитель производитель работает в потоке, отличном от потока в потребителе. Эта настройка имеет смысл в двух случаях:
  • Шаги по потреблению товара производятся независимо и не зависят от других предметов.
  • Время на обработку предметов больше, чем на их изготовление.
Термин «больше» во втором пункте используется немного свободно. Рассмотрим случай, когда производитель читает строку из файла, а «потребление и обработка» заключается в том, чтобы просто записать строку в специальном формате обратно в файл, тогда использование решения проблемы производителя для потребителя можно считать случаем чрезмерного проектирования. решение. Однако если для каждой из этих строк этап «потребления и обработки» состоит в том, чтобы сделать HTTP-запрос GET / POST к веб-серверу, а затем вывести результат куда-нибудь, то мы должны выбрать решение «производитель-потребитель». В этом случае я предполагаю, что все данные для выполнения GET / POST доступны в самой строке (элементе), и мы не зависим от предыдущих / следующих строк.
Итак, давайте сначала посмотрим на характеристики решения проблемы производитель-потребитель, которые я опубликовал ниже:
  • Там может быть несколько производителей.
  • Там будет несколько потребителей.
  • Как только производство новых товаров будет завершено, производитель (и) сообщит потребителям, что потребитель выйдет после того, как последний товар будет израсходован и обработан.
Интересно отметить, что для решения этой проблемы на общем уровне мы можем обратиться только к стороне потребителя, а не к стороне производителя. Это потому, что производство предметов может быть сделано в любое время, и мы очень мало можем сделать общим способом, чтобы контролировать производство предметов. Однако мы можем контролировать поведение потребителя, принимая товары от производителя (ов). Изложив правила, давайте посмотрим на потребительский договор:
1
2
3
4
5
6
7
8
package com.maximus.producerconsumer;
 
public interface Consumer
{
 public boolean consume(Item j);
  
 public void finishConsumption();
}

Здесь потребитель может быть разделен между несколькими производителями похожих товаров; под подобными предметами я подразумеваю производителя, который производит объекты типа «Предмет». Определение, если Item является следующим:

1
2
3
4
5
6
package com.maximus.consumer;
 
public interface Item
{
 public void process();
}

Теперь рассмотрим реализацию интерфейса Consumer:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.maximus.consumer;
 
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
 
public class ConsumerImpl implements Consumer
{
 private BlockingQueue< Item > itemQueue =
  new LinkedBlockingQueue<Item>();
  
 private ExecutorService executorService =
  Executors.newCachedThreadPool();
  
 private List<ItemProcessor> jobList =
  new LinkedList<ItemProcessor>();
  
 private volatile boolean shutdownCalled = false;
   
 public ConsumerImpl(int poolSize)
 {
  for(int i = 0; i < poolSize; i++)
  {
   ItemProcessor jobThread =
    new ItemProcessor(itemQueue);
    
   jobList.add(jobThread);
   executorService.submit(jobThread);
  }
 }
  
 public boolean consume(Item j)
 {
  if(!shutdownCalled)
  {
   try
   {
    itemQueue.put(j);
   }
   catch(InterruptedException ie)
   {
    Thread.currentThread().interrupt();
    return false;
   }
   return true;
  }
  else
  {
   return false;
  }
 }
  
 public void finishConsumption()
 {
  for(ItemProcessor j : jobList)
  {
   j.cancelExecution();
  }
   
  executorService.shutdown();
 }
}

Теперь единственной достопримечательностью является ItemProcessor, который потребитель использует для обработки входящих элементов. ItemProcessor кодируется следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.maximus.consumer;
 
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
 
public class ItemProcessor implements Runnable
{
 private BlockingQueue<Item> jobQueue;
  
 private volatile boolean keepProcessing;
   
 public ItemProcessor(BlockingQueue<Item> queue)
 {
  jobQueue = queue;
  keepProcessing = true;
 }
  
 public void run()
 {
  while(keepProcessing || !jobQueue.isEmpty())
  {
   try
   {
    Item j = jobQueue.poll(10, TimeUnit.SECONDS);
     
    if(j != null)
    {
     j.process();
    }
   }
   catch(InterruptedException ie)
   {
    Thread.currentThread().interrupt();
    return;
   }
  }
 }
  
 public void cancelExecution()
 {
  this.keepProcessing = false;
 }
}
Единственная проблема выше — это условие в цикле while. Цикл while написан так, чтобы поддерживать продолжение потребления предметов даже после того, как производитель (ы) завершил производство, и уведомил потребителя, что производство закончено. Вышеуказанный цикл while гарантирует, что потребление всех элементов выполняется до выхода из потоков. Это будет иметь место, когда производители работают быстрее, чем потребители.
Вышеупомянутый потребитель является поточно-ориентированным и может совместно использоваться несколькими производителями, так что каждый производитель может одновременно вызывать customer.consume (), не заботясь о синхронизации и других предупреждениях о многопоточности. Производители просто должны представить реализацию интерфейса Item, метод process () которого будет содержать логику того, как будет выполняться потребление.
В качестве бонуса за чтение поста я выдвинул тестовую программу, которая демонстрирует, как использовать вышеупомянутые классы:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.maximus.consumer;
 
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
 
public class Test
{
 public static void main(String[] args) throws Exception
        {
         Consumer consumer = new ConsumerImpl(10);
          
         BufferedReader br =
          new BufferedReader(
          new InputStreamReader(
          new FileInputStream(
          new File(args[0]))));
          
         String line = "";
          
         while((line = br.readLine()) != null)
         {
          System.out.println(
           "Producer producing: " + line);
          consumer.consume(new PrintJob(line));
         }
          
         consumer.finishConsumption();
        }
}
 
class PrintJob implements Item
{
 private String line;
  
 public PrintJob(String s)
 {
  line = s;
 }
  
 public void process()
 {
  System.out.println(
   Thread.currentThread().getName() +
   " consuming :" + line);
 }
}
Вышеуказанный потребитель может быть настроен различными способами, чтобы сделать его более гибким. Мы можем определить, что будет делать потребитель, когда производство будет завершено. Это может быть изменено, чтобы разрешить пакетную обработку, но я оставляю это пользователю. Не стесняйтесь использовать его и крутить так, как вы хотите.
Удачного кодирования!

Ссылка: Решение проблемы производителя-потребителя в Java от нашего партнера по JCG Сармы Сваранга в блоге Java HotSpot .