Статьи

Параллелизм — последовательный и необработанный поток

Некоторое время назад я работал над проектом, в котором поток отчетов был примерно таким:

  1. Пользователь будет запрашивать отчет
  2. Запрос отчета будет переведен на более мелкие части / разделы
  3. Отчет по каждой части, основанный на типе части / секции, будет сгенерирован генератором отчетов.
  4. Составляющие части отчета будут собраны в окончательный отчет и возвращены пользователю.

Моя цель — показать, как я перешел от плохой реализации к довольно хорошей реализации:
Некоторые из основных строительных блоков, которые у меня есть, лучше всего демонстрируются модульным тестом: Это помощник по тестированию, который генерирует пример запроса отчета с составными частями запроса отчета:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class FixtureGenerator {
    public static ReportRequest generateReportRequest(){
        List<ReportRequestPart> requestParts = new ArrayList<ReportRequestPart>();
        Map<String, String> attributes = new HashMap<String, String>();
        attributes.put("user","user");
        Context context = new Context(attributes );
     
        ReportRequestPart part1 = new ReportRequestPart(Section.HEADER, context);
        ReportRequestPart part2 = new ReportRequestPart(Section.SECTION1, context);
        ReportRequestPart part3 = new ReportRequestPart(Section.SECTION2, context);
        ReportRequestPart part4 = new ReportRequestPart(Section.SECTION3, context);
        ReportRequestPart part5 = new ReportRequestPart(Section.FOOTER, context);  
         
        requestParts.add(part1);       
        requestParts.add(part2);
        requestParts.add(part3);
        requestParts.add(part4);
        requestParts.add(part5);
         
        ReportRequest reportRequest  = new ReportRequest(requestParts );
        return reportRequest;
    }
 
}

И тест для генерации отчета:

1
2
3
4
5
6
7
8
9
public class FixtureGenerator {
 @Test
 public void testSequentialReportGeneratorTime(){
  long startTime = System.currentTimeMillis();
  Report report = this.reportGenerator.generateReport(FixtureGenerator.generateReportRequest());
  long timeForReport = System.currentTimeMillis()-startTime;
  assertThat(report.getSectionReports().size(), is (5));
  logger.error(String.format("Sequential Report Generator : %s ms", timeForReport));
 }

Компонент, который генерирует часть отчета, является фиктивной реализацией с задержкой в ​​2 секунды для имитации интенсивного вызова ввода-вывода:

01
02
03
04
05
06
07
08
09
10
11
12
13
public class DummyReportPartGenerator implements ReportPartGenerator{
 
 @Override
 public ReportPart generateReportPart(ReportRequestPart reportRequestPart) {
  try {
   //Deliberately introduce a delay
   Thread.sleep(2000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  return new ReportPart(reportRequestPart.getSection(), "Report for " + reportRequestPart.getSection());
 }
}

Последовательная реализация
 
Учитывая этот базовый набор классов, моя первая наивная последовательная реализация заключается в следующем:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
public class SequentialReportGenerator implements ReportGenerator {
 private ReportPartGenerator reportPartGenerator;
 
 @Override
 public Report generateReport(ReportRequest reportRequest){
  List<ReportRequestPart> reportRequestParts = reportRequest.getRequestParts();
  List<ReportPart> reportSections = new ArrayList<ReportPart>();
  for (ReportRequestPart reportRequestPart: reportRequestParts){
   reportSections.add(reportPartGenerator.generateReportPart(reportRequestPart));
  }
  return new Report(reportSections);
 }
  
  
......
}

Очевидно, что для запроса отчета с 5 частями, каждая часть которого занимает 2 секунды, для выполнения этого отчета требуется около 10 секунд для его возврата пользователю.

Прошу сделать это одновременно.

Реализация на основе необработанных потоков
 
Первая параллельная реализация, не очень хорошая, но лучше, чем последовательная, заключается в следующем, где поток создается для каждой части запроса отчета, ожидая формирования частей отчета (используя метод thread.join ()) и агрегируя части по мере их поступления. в.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class RawThreadBasedReportGenerator implements ReportGenerator {
    private static final Logger logger = LoggerFactory.getLogger(RawThreadBasedReportGenerator.class);
 
    private ReportPartGenerator reportPartGenerator;
 
    @Override
    public Report generateReport(ReportRequest reportRequest) {
        List<ReportRequestPart> reportRequestParts = reportRequest.getRequestParts();
        List<Thread> threads = new ArrayList<Thread>();
        List<ReportPartRequestRunnable> runnablesList = new ArrayList<ReportPartRequestRunnable>();
        for (ReportRequestPart reportRequestPart : reportRequestParts) {
            ReportPartRequestRunnable reportPartRequestRunnable = new ReportPartRequestRunnable(reportRequestPart, reportPartGenerator);
            runnablesList.add(reportPartRequestRunnable);
            Thread thread = new Thread(reportPartRequestRunnable);
            threads.add(thread);
            thread.start();
        }
 
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
 
        List<ReportPart> reportParts = new ArrayList<ReportPart>();
 
        for (ReportPartRequestRunnable reportPartRequestRunnable : runnablesList) {
            reportParts.add(reportPartRequestRunnable.getReportPart());
        }
 
        return new Report(reportParts);
 
    }   
    .....
}

Опасность такого подхода заключается в том, что для каждой части отчета создается новый поток, поэтому в реальном сценарии, если при каждом запросе 100 запросов одновременно появляется 5 потоков, это может в конечном итоге создать 500 дорогостоящих потоков в виртуальной машине. !!

Таким образом, создание потока должно быть каким-то образом ограничено. Я расскажу еще о двух подходах к управлению потоками в следующей записи блога.

Ссылка: параллелизм — последовательная и необработанная тема от нашего партнера по JCG Биджу Кунджуммен в блоге all and sundry.