Учебники

Параллелизм в Python – пул процессов

Пул процесса может быть создан и использован так же, как мы создали и использовали пул потоков. Пул процессов может быть определен как группа предварительно созданных и незанятых процессов, которые готовы к работе. Создание пула процессов предпочтительнее, чем создание новых процессов для каждой задачи, когда нам нужно выполнить большое количество задач.

Модуль Python – Concurrent.futures

Стандартная библиотека Python имеет модуль, называемый concurrent.futures . Этот модуль был добавлен в Python 3.2 для предоставления разработчикам высокоуровневого интерфейса для запуска асинхронных задач. Это уровень абстракции в верхней части потокового и многопроцессорного модулей Python, обеспечивающий интерфейс для выполнения задач с использованием пула потоков или процессов.

В наших последующих разделах мы рассмотрим различные подклассы модуля concurrent.futures.

Executor Class

Executor – это абстрактный класс Python-модуля concurrent.futures . Его нельзя использовать напрямую, и нам нужно использовать один из следующих конкретных подклассов:

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ProcessPoolExecutor – конкретный подкласс

Это один из конкретных подклассов класса Executor. Он использует мульти-обработку, и мы получаем пул процессов для отправки задач. Этот пул назначает задачи доступным процессам и планирует их запуск.

Как создать ProcessPoolExecutor?

С помощью модуля concurrent.futures и его конкретного подкласса Executor мы можем легко создать пул процессов. Для этого нам нужно создать ProcessPoolExecutor с количеством процессов, которые мы хотим в пуле. По умолчанию это число 5. Затем следует отправить задачу в пул процессов.

пример

Теперь мы рассмотрим тот же пример, который мы использовали при создании пула потоков, с той лишь разницей, что теперь мы будем использовать ProcessPoolExecutor вместо ThreadPoolExecutor .

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Выход

False
False
Completed

В приведенном выше примере Process PoolExecutor был создан с 5 потоками. Затем задача, которая будет ожидать сообщения в течение 2 секунд, передается исполнителю пула процессов. Как видно из выходных данных, задача не завершается до 2 секунд, поэтому первый вызов done () вернет False. Через 2 секунды задача будет выполнена, и мы получим результат будущего, вызвав для него метод result () .

Создание ProcessPoolExecutor – Менеджер контекста

Другой способ создания экземпляра ProcessPoolExecutor – с помощью диспетчера контекста. Он работает аналогично методу, использованному в приведенном выше примере. Основным преимуществом использования контекстного менеджера является то, что он выглядит синтаксически хорошо. Создание экземпляра может быть сделано с помощью следующего кода –

with ProcessPoolExecutor(max_workers = 5) as executor

пример

Для лучшего понимания мы берем тот же пример, который использовался при создании пула потоков. В этом примере нам нужно начать с импорта модуля concurrent.futures . Затем создается функция с именем load_url (), которая загружает запрошенный URL. Затем создается ProcessPoolExecutor с 5 числами потоков в пуле. Process PoolExecutor был использован как менеджер контекста. Мы можем получить результат будущего, вызвав для него метод result () .

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

Выход

Приведенный выше скрипт Python сгенерирует следующий вывод:

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

Использование функции Executor.map ()

Функция Python map () широко используется для выполнения ряда задач. Одна из таких задач – применить определенную функцию к каждому элементу итерируемых элементов. Точно так же мы можем отобразить все элементы итератора в функцию и отправить их как независимые задания в ProcessPoolExecutor . Рассмотрим следующий пример скрипта Python, чтобы понять это.

пример

Мы рассмотрим тот же пример, который мы использовали при создании пула потоков с помощью функции Executor.map () . В приведенном ниже примере функция map используется для применения функции square () к каждому значению в массиве значений.

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

Выход

Приведенный выше скрипт Python сгенерирует следующий вывод

4
9
16
25

Когда использовать ProcessPoolExecutor и ThreadPoolExecutor?

Теперь, когда мы изучили оба класса Executor – ThreadPoolExecutor и ProcessPoolExecutor, нам нужно знать, когда использовать какого исполнителя. Нам нужно выбрать ProcessPoolExecutor в случае связанных с CPU рабочих нагрузок и ThreadPoolExecutor в случае связанных с I / O рабочих нагрузок.

Если мы используем ProcessPoolExecutor , то нам не нужно беспокоиться о GIL, поскольку он использует многопроцессорность. Более того, время выполнения будет меньше по сравнению с ThreadPoolExecution . Рассмотрим следующий пример скрипта Python, чтобы понять это.

пример

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Выход

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207

Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Выход

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

Из выводов обеих программ, приведенных выше, мы видим разницу во времени выполнения при использовании ProcessPoolExecutor и ThreadPoolExecutor .