Статьи

Введение в параллельное и параллельное программирование на Python

Python является одним из самых популярных языков для обработки и обработки данных в целом. Экосистема предоставляет множество библиотек и сред, которые способствуют высокопроизводительным вычислениям. Однако параллельное программирование на Python может оказаться довольно сложным.

В этом уроке мы собираемся изучить, почему параллелизм сложен, особенно в контексте Python, и для этого мы рассмотрим следующее:

  • Почему параллелизм сложен в Python (подсказка: это из-за GIL — глобальной блокировки интерпретатора).
  • Потоки против процессов : различные способы достижения параллелизма. Когда использовать один над другим?
  • Параллельный или параллельный : почему в некоторых случаях мы можем согласиться на параллелизм, а не на параллелизм
  • Создание простого, но практического примера с использованием различных обсуждаемых методов .

Global Interpreter Lock (GIL) — одна из самых противоречивых тем в мире Python. В CPython, самой популярной реализации Python, GIL является мьютексом, который делает работу с потоками безопасной. GIL облегчает интеграцию с внешними библиотеками, которые не являются поточно-ориентированными, и ускоряет работу непараллельного кода. Это происходит за плату, хотя. Благодаря GIL мы не можем достичь истинного параллелизма с помощью многопоточности. По сути, два разных собственных потока одного и того же процесса не могут запускать код Python одновременно.

Однако все не так плохо, и вот почему: вещи, происходящие вне сферы GIL, могут быть параллельными. В эту категорию попадают такие длительные задачи, как ввод / вывод и, к счастью, библиотеки вроде numpy .

Так что Python не является действительно многопоточным. Но что это за нить? Давайте сделаем шаг назад и посмотрим на вещи в перспективе.

Процесс — это базовая абстракция операционной системы. Это программа, которая выполняется — другими словами, код, который выполняется. На компьютере всегда выполняется несколько процессов, и они выполняются параллельно.

Процесс может иметь несколько потоков. Они выполняют один и тот же код, принадлежащий родительскому процессу. В идеале они работают параллельно, но не обязательно. Причина, по которой процессов недостаточно, заключается в том, что приложения должны реагировать и прислушиваться к действиям пользователя при обновлении дисплея и сохранении файла.

Если это все еще немного неясно, вот чит-лист:

ПРОЦЕССЫ ПОТОКИ
Процессы не разделяют память Потоки разделяют память
Процессы нереста / переключения дороги Нерест / переключение потоков дешевле
Процессы требуют больше ресурсов Потоки требуют меньше ресурсов (иногда их называют легковесными процессами)
Не требуется синхронизация памяти Вы должны использовать механизмы синхронизации, чтобы убедиться, что вы правильно обрабатываете данные

Нет ни одного рецепта, который бы подходил всем. Выбор одного зависит от контекста и задачи, которую вы пытаетесь достичь.

Теперь мы сделаем еще один шаг и погрузимся в параллелизм. Параллелизм часто неправильно понимают и ошибочно принимают за параллелизм. Это не тот случай. Параллельность подразумевает планирование независимого кода для совместного выполнения. Воспользуйтесь тем фактом, что часть кода ожидает выполнения операций ввода-вывода, и в течение этого времени запускайте другую, но независимую часть кода.

В Python мы можем добиться легкого параллельного поведения через гринлеты. С точки зрения распараллеливания использование потоков или гринлетов эквивалентно, поскольку ни один из них не работает параллельно. Гринлеты создавать даже дешевле, чем нитки. Из-за этого гринлеты активно используются для выполнения огромного количества простых задач ввода-вывода, подобных тем, которые обычно используются в сетевых и веб-серверах.

Теперь, когда мы знаем разницу между потоками и процессами, параллельными и параллельными, мы можем проиллюстрировать, как разные задачи выполняются в двух парадигмах. Вот что мы собираемся сделать: мы будем многократно запускать задачу вне GIL и одну внутри нее. Мы запускаем их последовательно, используя потоки и процессы. Давайте определим задачи:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
import time
import threading
import multiprocessing
 
NUM_WORKERS = 4
 
def only_sleep():
    «»» Do nothing, wait for a timer to expire «»»
    print(«PID: %s, Process Name: %s, Thread Name: %s» % (
        os.getpid(),
        multiprocessing.current_process().name,
        threading.current_thread().name)
    )
    time.sleep(1)
 
 
def crunch_numbers():
    «»» Do some computations «»»
    print(«PID: %s, Process Name: %s, Thread Name: %s» % (
        os.getpid(),
        multiprocessing.current_process().name,
        threading.current_thread().name)
    )
    x = 0
    while x < 10000000:
        x += 1

Мы создали две задачи. Оба они работают долго, но только crunch_numbers активно выполняет вычисления. Давайте запустим only_sleep последовательно, многопоточно и с использованием нескольких процессов и сравним результаты:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
## Run tasks serially
start_time = time.time()
for _ in range(NUM_WORKERS):
    only_sleep()
end_time = time.time()
 
print(«Serial time=», end_time — start_time)
 
# Run tasks using threads
start_time = time.time()
threads = [threading.Thread(target=only_sleep) for _ in range(NUM_WORKERS)]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
end_time = time.time()
 
print(«Threads time=», end_time — start_time)
 
# Run tasks using processes
start_time = time.time()
processes = [multiprocessing.Process(target=only_sleep()) for _ in range(NUM_WORKERS)]
[process.start() for process in processes]
[process.join() for process in processes]
end_time = time.time()
 
print(«Parallel time=», end_time — start_time)

Вот вывод, который я получил (ваш должен быть похожим, хотя PID и время будут немного отличаться):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
Serial time= 4.018089056015015
 
PID: 95726, Process Name: MainProcess, Thread Name: Thread-1
PID: 95726, Process Name: MainProcess, Thread Name: Thread-2
PID: 95726, Process Name: MainProcess, Thread Name: Thread-3
PID: 95726, Process Name: MainProcess, Thread Name: Thread-4
Threads time= 1.0047411918640137
 
PID: 95728, Process Name: Process-1, Thread Name: MainThread
PID: 95729, Process Name: Process-2, Thread Name: MainThread
PID: 95730, Process Name: Process-3, Thread Name: MainThread
PID: 95731, Process Name: Process-4, Thread Name: MainThread
Parallel time= 1.014023780822754

Вот некоторые наблюдения:

  • В случае последовательного подхода все довольно очевидно. Мы выполняем задачи одну за другой. Все четыре запуска выполняются одним и тем же потоком одного и того же процесса.

  • Используя процессы, мы сокращаем время выполнения до четверти исходного времени просто потому, что задачи выполняются параллельно. Обратите внимание, как каждая задача выполняется в отдельном процессе и в MainThread этого процесса.

  • Используя потоки, мы пользуемся тем фактом, что задачи могут выполняться одновременно. Время выполнения также сокращается до четверти, хотя параллельно ничего не происходит. Вот как это происходит: мы создаем первый поток, и он начинает ждать истечения таймера. Мы приостанавливаем его выполнение, даем ему подождать, пока истечет таймер, и за это время мы порождаем второй поток. Мы повторяем это для всех тем. В какой-то момент истекает таймер первого потока, поэтому мы переключаем на него выполнение и завершаем его. Алгоритм повторяется для второго и всех остальных потоков. В итоге получается, что все идет параллельно. Вы также заметите, что четыре разных потока ветвятся и живут в одном и том же процессе: MainProcess .

  • Вы можете даже заметить, что многопоточный подход быстрее, чем действительно параллельный. Это связано с накладными расходами на нерестовые процессы. Как мы отмечали ранее, процессы порождения и переключения являются дорогостоящей операцией.

Давайте crunch_numbers ту же процедуру, но на этот раз crunch_numbers задачу crunch_numbers :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
start_time = time.time()
for _ in range(NUM_WORKERS):
    crunch_numbers()
end_time = time.time()
 
print(«Serial time=», end_time — start_time)
 
start_time = time.time()
threads = [threading.Thread(target=crunch_numbers) for _ in range(NUM_WORKERS)]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
end_time = time.time()
 
print(«Threads time=», end_time — start_time)
 
 
start_time = time.time()
processes = [multiprocessing.Process(target=crunch_numbers) for _ in range(NUM_WORKERS)]
[process.start() for process in processes]
[process.join() for process in processes]
end_time = time.time()
 
print(«Parallel time=», end_time — start_time)

Вот результат, который я получил:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
Serial time= 2.705625057220459
PID: 96285, Process Name: MainProcess, Thread Name: Thread-1
PID: 96285, Process Name: MainProcess, Thread Name: Thread-2
PID: 96285, Process Name: MainProcess, Thread Name: Thread-3
PID: 96285, Process Name: MainProcess, Thread Name: Thread-4
Threads time= 2.6961309909820557
PID: 96289, Process Name: Process-1, Thread Name: MainThread
PID: 96290, Process Name: Process-2, Thread Name: MainThread
PID: 96291, Process Name: Process-3, Thread Name: MainThread
PID: 96292, Process Name: Process-4, Thread Name: MainThread
Parallel time= 0.8014059066772461

Основное отличие здесь заключается в многопоточном подходе. На этот раз он работает очень похоже на последовательный подход, и вот почему: поскольку он выполняет вычисления, а Python не выполняет настоящий параллелизм, потоки в основном работают один за другим, передавая выполнение друг другу, пока все не завершится.

Python имеет богатые API для параллельного / параллельного программирования. В этом уроке мы рассмотрим самые популярные из них, но вы должны знать, что для любых ваших потребностей в этой области, возможно, уже есть что-то, что может помочь вам в достижении вашей цели.

В следующем разделе мы создадим практическое приложение во многих формах, используя все представленные библиотеки. Без лишних слов, вот модули / библиотеки, которые мы собираемся охватить:

  • threading : стандартный способ работы с потоками в Python. Это высокоуровневая оболочка API для функциональности, _thread модулем _thread , который является низкоуровневым интерфейсом для реализации потоков операционной системы.

  • concurrent.futures : модульная часть стандартной библиотеки, которая обеспечивает еще более высокий уровень абстракции над потоками. Потоки моделируются как асинхронные задачи.

  • multiprocessing : аналогично модулю threading , с очень похожим интерфейсом, но использующим процессы вместо потоков.

  • gevent and greenlets : Greenlets, также называемые микропотоками, являются единицами выполнения, которые могут планироваться совместно и могут выполнять задачи одновременно без больших накладных расходов.

  • celery : распределенная очередь задач высокого уровня. Задачи ставятся в очередь и выполняются одновременно с использованием различных парадигм, таких как multiprocessing или gevent .

Знание теории — это хорошо и хорошо, но лучший способ научиться — это создать что-то практичное, верно? В этом разделе мы собираемся создать приложение классического типа, проходящее через все различные парадигмы.

Давайте создадим приложение, которое проверяет время работы веб-сайтов. Существует множество таких решений, наиболее известными из которых являются, вероятно, Jetpack Monitor и Uptime Robot . Цель этих приложений — уведомить вас, когда ваш сайт не работает, чтобы вы могли быстро принять меры. Вот как они работают:

  • Приложение очень часто просматривает список URL-адресов веб-сайтов и проверяет их работоспособность.
  • Каждый веб-сайт должен проверяться каждые 5-10 минут, чтобы время простоя не было значительным.
  • Вместо выполнения классического HTTP-запроса GET он выполняет запрос HEAD, чтобы он не оказал существенного влияния на ваш трафик.
  • Если статус HTTP находится в опасном диапазоне (400+, 500+), владелец уведомляется.
  • Владелец получает уведомление по электронной почте, текстовое сообщение или push-уведомление.

Вот почему важно использовать параллельный / параллельный подход к проблеме. Поскольку список веб-сайтов растет, последовательный просмотр списка не гарантирует нам, что каждый веб-сайт проверяется каждые пять минут или около того. Веб-сайты могут быть недоступны в течение нескольких часов, и владелец не будет уведомлен.

Давайте начнем с написания некоторых утилит:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# utils.py
 
import time
import logging
import requests
 
 
class WebsiteDownException(Exception):
    pass
 
 
def ping_website(address, timeout=20):
    «»»
    Check if a website is down.
    if either the status_code >= 400 or if the timeout expires
     
    Throw a WebsiteDownException if any of the website down conditions are met
    «»»
    try:
        response = requests.head(address, timeout=timeout)
        if response.status_code >= 400:
            logging.warning(«Website %s returned status_code=%s» % (address, response.status_code))
            raise WebsiteDownException()
    except requests.exceptions.RequestException:
        logging.warning(«Timeout expired for website %s» % address)
        raise WebsiteDownException()
         
 
def notify_owner(address):
    «»»
    Send the owner of the address a notification that their website is down
     
    For now, we’re just going to sleep for 0.5 seconds but this is where
    you would send an email, push notification or text-message
    «»»
    logging.info(«Notifying the owner of %s website» % address)
    time.sleep(0.5)
     
 
def check_website(address):
    «»»
    Utility function: check if a website is down, if so, notify the user
    «»»
    try:
        ping_website(address)
    except WebsiteDownException:
        notify_owner(address)

Нам действительно нужен список веб-сайтов, чтобы опробовать нашу систему. Создайте свой собственный список или используйте мой:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# websites.py
 
WEBSITE_LIST = [
    ‘http://envato.com’,
    ‘http://amazon.co.uk’,
    ‘http://amazon.com’,
    ‘http://facebook.com’,
    ‘http://google.com’,
    ‘http://google.fr’,
    ‘http://google.es’,
    ‘http://google.co.uk’,
    ‘http://internet.org’,
    ‘http://gmail.com’,
    ‘http://stackoverflow.com’,
    ‘http://github.com’,
    ‘http://heroku.com’,
    ‘http://really-cool-available-domain.com’,
    ‘http://djangoproject.com’,
    ‘http://rubyonrails.org’,
    ‘http://basecamp.com’,
    ‘http://trello.com’,
    ‘http://yiiframework.com’,
    ‘http://shopify.com’,
    ‘http://another-really-interesting-domain.co’,
    ‘http://airbnb.com’,
    ‘http://instagram.com’,
    ‘http://snapchat.com’,
    ‘http://youtube.com’,
    ‘http://baidu.com’,
    ‘http://yahoo.com’,
    ‘http://live.com’,
    ‘http://linkedin.com’,
    ‘http://yandex.ru’,
    ‘http://netflix.com’,
    ‘http://wordpress.com’,
    ‘http://bing.com’,
]

Обычно этот список хранится в базе данных вместе с контактной информацией владельца, чтобы вы могли с ним связаться. Поскольку это не основная тема этого урока, и для простоты мы просто собираемся использовать этот список Python.

Если бы вы обратили действительно хорошее внимание, вы могли бы заметить в списке два действительно длинных домена, которые не являются действительными веб-сайтами (я надеюсь, что никто не купил их к тому времени, когда вы читаете это, чтобы доказать, что я неправ!) Я добавил эти два домена, чтобы быть уверенным, что у нас есть некоторые сайты при каждом запуске. Также назовем наше приложение UptimeSquirrel .

Во-первых, давайте попробуем последовательный подход и посмотрим, насколько плохо он работает. Мы рассмотрим это как базовый уровень.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
# serial_squirrel.py
 
import time
 
 
start_time = time.time()
 
for address in WEBSITE_LIST:
    check_website(address)
         
end_time = time.time()
 
print(«Time for SerialSquirrel: %ssecs» % (end_time — start_time))
 
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405
# Time for SerialSquirrel: 15.881232261657715secs

Мы собираемся стать немного более креативными с реализацией многопоточного подхода. Мы используем очередь, чтобы поместить адреса и создать рабочие потоки, чтобы вывести их из очереди и обработать их. Мы будем ждать, пока очередь будет пустой, а это означает, что все адреса были обработаны нашими рабочими потоками.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# threaded_squirrel.py
 
import time
from queue import Queue
from threading import Thread
 
NUM_WORKERS = 4
task_queue = Queue()
 
def worker():
    # Constantly check the queue for addresses
    while True:
        address = task_queue.get()
        check_website(address)
         
        # Mark the processed task as done
        task_queue.task_done()
 
start_time = time.time()
         
# Create the worker threads
threads = [Thread(target=worker) for _ in range(NUM_WORKERS)]
 
# Add the websites to the task queue
[task_queue.put(item) for item in WEBSITE_LIST]
 
# Start all the workers
[thread.start() for thread in threads]
 
# Wait for all the tasks in the queue to be processed
task_queue.join()
 
         
end_time = time.time()
 
print(«Time for ThreadedSquirrel: %ssecs» % (end_time — start_time))
 
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405
# Time for ThreadedSquirrel: 3.110753059387207secs

Как указывалось ранее, concurrent.futures — это высокоуровневый API для использования потоков. Подход, который мы здесь используем, подразумевает использование ThreadPoolExecutor . Мы собираемся отправить задачи в пул и вернуть фьючерсы, результаты которых будут нам доступны в будущем. Конечно, мы можем подождать, пока все фьючерсы станут фактическими результатами.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
# future_squirrel.py
 
import time
import concurrent.futures
 
NUM_WORKERS = 4
 
start_time = time.time()
 
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
    futures = {executor.submit(check_website, address) for address in WEBSITE_LIST}
    concurrent.futures.wait(futures)
 
end_time = time.time()
 
print(«Time for FutureSquirrel: %ssecs» % (end_time — start_time))
 
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405
# Time for FutureSquirrel: 1.812899112701416secs

multiprocessing библиотека предоставляет API-интерфейс для замены threading . В этом случае мы собираемся использовать подход, более похожий на подход concurrent.futures . Мы настраиваем multiprocessing.Pool и отправляем ему задачи, отображая функцию в список адресов (вспомним классическую функцию map Python).

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
# multiprocessing_squirrel.py
 
import time
import socket
import multiprocessing
 
NUM_WORKERS = 4
 
start_time = time.time()
 
with multiprocessing.Pool(processes=NUM_WORKERS) as pool:
    results = pool.map_async(check_website, WEBSITE_LIST)
    results.wait()
 
end_time = time.time()
 
print(«Time for MultiProcessingSquirrel: %ssecs» % (end_time — start_time))
 
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405
# Time for MultiProcessingSquirrel: 2.8224599361419678secs

Gevent является популярной альтернативой для достижения массового параллелизма. Есть несколько вещей, которые вы должны знать, прежде чем использовать его:

  • Код, выполняемый одновременно гринлетами, является детерминированным. В отличие от других представленных альтернатив, эта парадигма гарантирует, что для любых двух идентичных прогонов вы всегда получите одинаковые результаты в одном и том же порядке.

  • Вам нужно соединить стандартные функции, чтобы они взаимодействовали с Gevent. Вот что я имею в виду под этим. Обычно операция сокета блокируется. Мы ждем окончания операции. Если бы мы находились в многопоточной среде, планировщик просто переключился бы на другой поток, в то время как другой ожидает ввода / вывода. Поскольку мы не в многопоточной среде, gevent исправляет стандартные функции, чтобы они стали неблокирующими, и возвращал управление планировщику gevent.

Чтобы установить gevent, запустите: pip install gevent

Вот как использовать gevent для выполнения нашей задачи с помощью gevent.pool.Pool :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# green_squirrel.py
 
import time
from gevent.pool import Pool
from gevent import monkey
 
# Note that you can spawn many workers with gevent since the cost of creating and switching is very low
NUM_WORKERS = 4
 
# Monkey-Patch socket module for HTTP requests
monkey.patch_socket()
 
start_time = time.time()
 
pool = Pool(NUM_WORKERS)
for address in WEBSITE_LIST:
    pool.spawn(check_website, address)
 
# Wait for stuff to finish
pool.join()
         
end_time = time.time()
 
print(«Time for GreenSquirrel: %ssecs» % (end_time — start_time))
# Time for GreenSquirrel: 3.8395519256591797secs

Сельдерей — это подход, который в основном отличается от того, что мы видели до сих пор. Это боевые испытания в условиях очень сложных и высокопроизводительных сред. Настройка Celery потребует немного больше усилий, чем все вышеперечисленные решения.

Для начала нам нужно установить Celery:

pip install celery

Задачи являются центральными понятиями в рамках проекта Celery. Все, что вы захотите запустить в Celery, должно быть задачей. Celery предлагает большую гибкость для запуска задач: вы можете запускать их синхронно или асинхронно, в режиме реального времени или по расписанию, на одной и той же машине или на нескольких машинах, используя потоки, процессы, Eventlet или Gevent.

Расположение будет немного сложнее. Сельдерей использует другие сервисы для отправки и получения сообщений. Эти сообщения обычно являются задачами или результатами задач. Мы собираемся использовать Redis в этом руководстве для этой цели. Redis — отличный выбор, потому что его действительно легко установить и настроить, и вполне возможно, что вы уже используете его в своем приложении для других целей, таких как кэширование и публикация / публикация.

Вы можете установить Redis, следуя инструкциям на странице быстрого запуска Redis . Не забудьте установить библиотеку redis Python, pip install redis и пакет, необходимый для использования Redis и Celery: pip install celery[redis] .

Запустите сервер Redis следующим образом: $ redis-server

Чтобы начать создавать вещи с помощью Celery, сначала нам нужно создать приложение Celery. После этого Celery должен знать, какие задачи он может выполнять. Для этого нам нужно зарегистрировать задачи в приложении Celery. Мы сделаем это с @app.task декоратора @app.task :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# celery_squirrel.py
 
import time
from utils import check_website
from data import WEBSITE_LIST
from celery import Celery
from celery.result import ResultSet
 
app = Celery(‘celery_squirrel’,
             broker=’redis://localhost:6379/0′,
             backend=’redis://localhost:6379/0′)
 
@app.task
def check_website_task(address):
    return check_website(address)
 
if __name__ == «__main__»:
    start_time = time.time()
 
    # Using `delay` runs the task async
    rs = ResultSet([check_website_task.delay(address) for address in WEBSITE_LIST])
     
    # Wait for the tasks to finish
    rs.get()
 
    end_time = time.time()
 
    print(«CelerySquirrel:», end_time — start_time)
    # CelerySquirrel: 2.4979639053344727

Не паникуйте, если ничего не происходит. Помните, что Celery — это сервис, и нам нужно его запустить. До сих пор мы только помещали задачи в Redis, но не запускали Celery для их выполнения. Для этого нам нужно запустить эту команду в папке, где находится наш код:

celery worker -A do_celery --loglevel=debug --concurrency=4

Теперь перезапустите скрипт Python и посмотрите, что произойдет. Стоит обратить внимание на одну вещь: обратите внимание, как мы дважды передали адрес Redis нашему приложению Redis. Параметр broker указывает, где задачи передаются в Celery, а в backend Celery помещает результаты, чтобы мы могли использовать их в нашем приложении. Если мы не укажем backend результата, мы не сможем узнать, когда задача была обработана и каков был результат.

Также имейте в виду, что журналы теперь находятся в стандартном выводе процесса Celery, поэтому обязательно проверьте их в соответствующем терминале.

Я надеюсь, что это было интересное путешествие для вас и хорошее введение в мир параллельного / параллельного программирования на Python. Это конец пути, и мы можем сделать некоторые выводы:

  • Есть несколько парадигм, которые помогают нам достигать высокопроизводительных вычислений в Python.
  • Для многопоточной парадигмы у нас есть библиотеки threading и concurrent.futures .
  • multiprocessing обеспечивает очень похожий интерфейс на threading но для процессов, а не для потоков.
  • Помните, что процессы достигают истинного параллелизма, но создавать их дороже.
  • Помните, что в процессе может быть запущено больше потоков.
  • Не путайте параллель с параллельной. Помните, что только параллельный подход использует преимущества многоядерных процессоров, в то время как параллельное программирование интеллектуально планирует задачи так, что ожидание длительных операций выполняется при параллельном выполнении реальных вычислений.

Изучите Python с нашим полным руководством по питону, независимо от того, начинаете ли вы или начинающий программист, ищущий новые навыки.