Python является одним из самых популярных языков для обработки и обработки данных в целом. Экосистема предоставляет множество библиотек и сред, которые способствуют высокопроизводительным вычислениям. Однако параллельное программирование на Python может оказаться довольно сложным.
В этом уроке мы собираемся изучить, почему параллелизм сложен, особенно в контексте Python, и для этого мы рассмотрим следующее:
- Почему параллелизм сложен в Python (подсказка: это из-за GIL — глобальной блокировки интерпретатора).
- Потоки против процессов : различные способы достижения параллелизма. Когда использовать один над другим?
- Параллельный или параллельный : почему в некоторых случаях мы можем согласиться на параллелизм, а не на параллелизм
- Создание простого, но практического примера с использованием различных обсуждаемых методов .
Глобальная блокировка интерпретатора
Global Interpreter Lock (GIL) — одна из самых противоречивых тем в мире Python. В CPython, самой популярной реализации Python, GIL является мьютексом, который делает работу с потоками безопасной. GIL облегчает интеграцию с внешними библиотеками, которые не являются поточно-ориентированными, и ускоряет работу непараллельного кода. Это происходит за плату, хотя. Благодаря GIL мы не можем достичь истинного параллелизма с помощью многопоточности. По сути, два разных собственных потока одного и того же процесса не могут запускать код Python одновременно.
Однако все не так плохо, и вот почему: вещи, происходящие вне сферы GIL, могут быть параллельными. В эту категорию попадают такие длительные задачи, как ввод / вывод и, к счастью, библиотеки вроде numpy .
Потоки против процессов
Так что Python не является действительно многопоточным. Но что это за нить? Давайте сделаем шаг назад и посмотрим на вещи в перспективе.
Процесс — это базовая абстракция операционной системы. Это программа, которая выполняется — другими словами, код, который выполняется. На компьютере всегда выполняется несколько процессов, и они выполняются параллельно.
Процесс может иметь несколько потоков. Они выполняют один и тот же код, принадлежащий родительскому процессу. В идеале они работают параллельно, но не обязательно. Причина, по которой процессов недостаточно, заключается в том, что приложения должны реагировать и прислушиваться к действиям пользователя при обновлении дисплея и сохранении файла.
Если это все еще немного неясно, вот чит-лист:
| ПРОЦЕССЫ | ПОТОКИ |
|---|---|
| Процессы не разделяют память | Потоки разделяют память |
| Процессы нереста / переключения дороги | Нерест / переключение потоков дешевле |
| Процессы требуют больше ресурсов | Потоки требуют меньше ресурсов (иногда их называют легковесными процессами) |
| Не требуется синхронизация памяти | Вы должны использовать механизмы синхронизации, чтобы убедиться, что вы правильно обрабатываете данные |
Нет ни одного рецепта, который бы подходил всем. Выбор одного зависит от контекста и задачи, которую вы пытаетесь достичь.
Параллельный или Параллельный
Теперь мы сделаем еще один шаг и погрузимся в параллелизм. Параллелизм часто неправильно понимают и ошибочно принимают за параллелизм. Это не тот случай. Параллельность подразумевает планирование независимого кода для совместного выполнения. Воспользуйтесь тем фактом, что часть кода ожидает выполнения операций ввода-вывода, и в течение этого времени запускайте другую, но независимую часть кода.
В Python мы можем добиться легкого параллельного поведения через гринлеты. С точки зрения распараллеливания использование потоков или гринлетов эквивалентно, поскольку ни один из них не работает параллельно. Гринлеты создавать даже дешевле, чем нитки. Из-за этого гринлеты активно используются для выполнения огромного количества простых задач ввода-вывода, подобных тем, которые обычно используются в сетевых и веб-серверах.
Теперь, когда мы знаем разницу между потоками и процессами, параллельными и параллельными, мы можем проиллюстрировать, как разные задачи выполняются в двух парадигмах. Вот что мы собираемся сделать: мы будем многократно запускать задачу вне GIL и одну внутри нее. Мы запускаем их последовательно, используя потоки и процессы. Давайте определим задачи:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import os
import time
import threading
import multiprocessing
NUM_WORKERS = 4
def only_sleep():
«»» Do nothing, wait for a timer to expire «»»
print(«PID: %s, Process Name: %s, Thread Name: %s» % (
os.getpid(),
multiprocessing.current_process().name,
threading.current_thread().name)
)
time.sleep(1)
def crunch_numbers():
«»» Do some computations «»»
print(«PID: %s, Process Name: %s, Thread Name: %s» % (
os.getpid(),
multiprocessing.current_process().name,
threading.current_thread().name)
)
x = 0
while x < 10000000:
x += 1
|
Мы создали две задачи. Оба они работают долго, но только crunch_numbers активно выполняет вычисления. Давайте запустим only_sleep последовательно, многопоточно и с использованием нескольких процессов и сравним результаты:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
## Run tasks serially
start_time = time.time()
for _ in range(NUM_WORKERS):
only_sleep()
end_time = time.time()
print(«Serial time=», end_time — start_time)
# Run tasks using threads
start_time = time.time()
threads = [threading.Thread(target=only_sleep) for _ in range(NUM_WORKERS)]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
end_time = time.time()
print(«Threads time=», end_time — start_time)
# Run tasks using processes
start_time = time.time()
processes = [multiprocessing.Process(target=only_sleep()) for _ in range(NUM_WORKERS)]
[process.start() for process in processes]
[process.join() for process in processes]
end_time = time.time()
print(«Parallel time=», end_time — start_time)
|
Вот вывод, который я получил (ваш должен быть похожим, хотя PID и время будут немного отличаться):
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
Serial time= 4.018089056015015
PID: 95726, Process Name: MainProcess, Thread Name: Thread-1
PID: 95726, Process Name: MainProcess, Thread Name: Thread-2
PID: 95726, Process Name: MainProcess, Thread Name: Thread-3
PID: 95726, Process Name: MainProcess, Thread Name: Thread-4
Threads time= 1.0047411918640137
PID: 95728, Process Name: Process-1, Thread Name: MainThread
PID: 95729, Process Name: Process-2, Thread Name: MainThread
PID: 95730, Process Name: Process-3, Thread Name: MainThread
PID: 95731, Process Name: Process-4, Thread Name: MainThread
Parallel time= 1.014023780822754
|
Вот некоторые наблюдения:
-
В случае последовательного подхода все довольно очевидно. Мы выполняем задачи одну за другой. Все четыре запуска выполняются одним и тем же потоком одного и того же процесса.
-
Используя процессы, мы сокращаем время выполнения до четверти исходного времени просто потому, что задачи выполняются параллельно. Обратите внимание, как каждая задача выполняется в отдельном процессе и в
MainThreadэтого процесса. -
Используя потоки, мы пользуемся тем фактом, что задачи могут выполняться одновременно. Время выполнения также сокращается до четверти, хотя параллельно ничего не происходит. Вот как это происходит: мы создаем первый поток, и он начинает ждать истечения таймера. Мы приостанавливаем его выполнение, даем ему подождать, пока истечет таймер, и за это время мы порождаем второй поток. Мы повторяем это для всех тем. В какой-то момент истекает таймер первого потока, поэтому мы переключаем на него выполнение и завершаем его. Алгоритм повторяется для второго и всех остальных потоков. В итоге получается, что все идет параллельно. Вы также заметите, что четыре разных потока ветвятся и живут в одном и том же процессе:
MainProcess. -
Вы можете даже заметить, что многопоточный подход быстрее, чем действительно параллельный. Это связано с накладными расходами на нерестовые процессы. Как мы отмечали ранее, процессы порождения и переключения являются дорогостоящей операцией.
Давайте crunch_numbers ту же процедуру, но на этот раз crunch_numbers задачу crunch_numbers :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
start_time = time.time()
for _ in range(NUM_WORKERS):
crunch_numbers()
end_time = time.time()
print(«Serial time=», end_time — start_time)
start_time = time.time()
threads = [threading.Thread(target=crunch_numbers) for _ in range(NUM_WORKERS)]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
end_time = time.time()
print(«Threads time=», end_time — start_time)
start_time = time.time()
processes = [multiprocessing.Process(target=crunch_numbers) for _ in range(NUM_WORKERS)]
[process.start() for process in processes]
[process.join() for process in processes]
end_time = time.time()
print(«Parallel time=», end_time — start_time)
|
Вот результат, который я получил:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
PID: 96285, Process Name: MainProcess, Thread Name: MainThread
Serial time= 2.705625057220459
PID: 96285, Process Name: MainProcess, Thread Name: Thread-1
PID: 96285, Process Name: MainProcess, Thread Name: Thread-2
PID: 96285, Process Name: MainProcess, Thread Name: Thread-3
PID: 96285, Process Name: MainProcess, Thread Name: Thread-4
Threads time= 2.6961309909820557
PID: 96289, Process Name: Process-1, Thread Name: MainThread
PID: 96290, Process Name: Process-2, Thread Name: MainThread
PID: 96291, Process Name: Process-3, Thread Name: MainThread
PID: 96292, Process Name: Process-4, Thread Name: MainThread
Parallel time= 0.8014059066772461
|
Основное отличие здесь заключается в многопоточном подходе. На этот раз он работает очень похоже на последовательный подход, и вот почему: поскольку он выполняет вычисления, а Python не выполняет настоящий параллелизм, потоки в основном работают один за другим, передавая выполнение друг другу, пока все не завершится.
Экосистема параллельного / параллельного программирования Python
Python имеет богатые API для параллельного / параллельного программирования. В этом уроке мы рассмотрим самые популярные из них, но вы должны знать, что для любых ваших потребностей в этой области, возможно, уже есть что-то, что может помочь вам в достижении вашей цели.
В следующем разделе мы создадим практическое приложение во многих формах, используя все представленные библиотеки. Без лишних слов, вот модули / библиотеки, которые мы собираемся охватить:
-
threading: стандартный способ работы с потоками в Python. Это высокоуровневая оболочка API для функциональности,_threadмодулем_thread, который является низкоуровневым интерфейсом для реализации потоков операционной системы. -
concurrent.futures: модульная часть стандартной библиотеки, которая обеспечивает еще более высокий уровень абстракции над потоками. Потоки моделируются как асинхронные задачи. -
multiprocessing: аналогично модулюthreading, с очень похожим интерфейсом, но использующим процессы вместо потоков. -
gevent and greenlets: Greenlets, также называемые микропотоками, являются единицами выполнения, которые могут планироваться совместно и могут выполнять задачи одновременно без больших накладных расходов. -
celery: распределенная очередь задач высокого уровня. Задачи ставятся в очередь и выполняются одновременно с использованием различных парадигм, таких какmultiprocessingилиgevent.
Создание практического приложения
Знание теории — это хорошо и хорошо, но лучший способ научиться — это создать что-то практичное, верно? В этом разделе мы собираемся создать приложение классического типа, проходящее через все различные парадигмы.
Давайте создадим приложение, которое проверяет время работы веб-сайтов. Существует множество таких решений, наиболее известными из которых являются, вероятно, Jetpack Monitor и Uptime Robot . Цель этих приложений — уведомить вас, когда ваш сайт не работает, чтобы вы могли быстро принять меры. Вот как они работают:
- Приложение очень часто просматривает список URL-адресов веб-сайтов и проверяет их работоспособность.
- Каждый веб-сайт должен проверяться каждые 5-10 минут, чтобы время простоя не было значительным.
- Вместо выполнения классического HTTP-запроса GET он выполняет запрос HEAD, чтобы он не оказал существенного влияния на ваш трафик.
- Если статус HTTP находится в опасном диапазоне (400+, 500+), владелец уведомляется.
- Владелец получает уведомление по электронной почте, текстовое сообщение или push-уведомление.
Вот почему важно использовать параллельный / параллельный подход к проблеме. Поскольку список веб-сайтов растет, последовательный просмотр списка не гарантирует нам, что каждый веб-сайт проверяется каждые пять минут или около того. Веб-сайты могут быть недоступны в течение нескольких часов, и владелец не будет уведомлен.
Давайте начнем с написания некоторых утилит:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# utils.py
import time
import logging
import requests
class WebsiteDownException(Exception):
pass
def ping_website(address, timeout=20):
«»»
Check if a website is down.
if either the status_code >= 400 or if the timeout expires
Throw a WebsiteDownException if any of the website down conditions are met
«»»
try:
response = requests.head(address, timeout=timeout)
if response.status_code >= 400:
logging.warning(«Website %s returned status_code=%s» % (address, response.status_code))
raise WebsiteDownException()
except requests.exceptions.RequestException:
logging.warning(«Timeout expired for website %s» % address)
raise WebsiteDownException()
def notify_owner(address):
«»»
Send the owner of the address a notification that their website is down
For now, we’re just going to sleep for 0.5 seconds but this is where
you would send an email, push notification or text-message
«»»
logging.info(«Notifying the owner of %s website» % address)
time.sleep(0.5)
def check_website(address):
«»»
Utility function: check if a website is down, if so, notify the user
«»»
try:
ping_website(address)
except WebsiteDownException:
notify_owner(address)
|
Нам действительно нужен список веб-сайтов, чтобы опробовать нашу систему. Создайте свой собственный список или используйте мой:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# websites.py
WEBSITE_LIST = [
‘http://envato.com’,
‘http://amazon.co.uk’,
‘http://amazon.com’,
‘http://facebook.com’,
‘http://google.com’,
‘http://google.fr’,
‘http://google.es’,
‘http://google.co.uk’,
‘http://internet.org’,
‘http://gmail.com’,
‘http://stackoverflow.com’,
‘http://github.com’,
‘http://heroku.com’,
‘http://really-cool-available-domain.com’,
‘http://djangoproject.com’,
‘http://rubyonrails.org’,
‘http://basecamp.com’,
‘http://trello.com’,
‘http://yiiframework.com’,
‘http://shopify.com’,
‘http://another-really-interesting-domain.co’,
‘http://airbnb.com’,
‘http://instagram.com’,
‘http://snapchat.com’,
‘http://youtube.com’,
‘http://baidu.com’,
‘http://yahoo.com’,
‘http://live.com’,
‘http://linkedin.com’,
‘http://yandex.ru’,
‘http://netflix.com’,
‘http://wordpress.com’,
‘http://bing.com’,
]
|
Обычно этот список хранится в базе данных вместе с контактной информацией владельца, чтобы вы могли с ним связаться. Поскольку это не основная тема этого урока, и для простоты мы просто собираемся использовать этот список Python.
Если бы вы обратили действительно хорошее внимание, вы могли бы заметить в списке два действительно длинных домена, которые не являются действительными веб-сайтами (я надеюсь, что никто не купил их к тому времени, когда вы читаете это, чтобы доказать, что я неправ!) Я добавил эти два домена, чтобы быть уверенным, что у нас есть некоторые сайты при каждом запуске. Также назовем наше приложение UptimeSquirrel .
Последовательный подход
Во-первых, давайте попробуем последовательный подход и посмотрим, насколько плохо он работает. Мы рассмотрим это как базовый уровень.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
# serial_squirrel.py
import time
start_time = time.time()
for address in WEBSITE_LIST:
check_website(address)
end_time = time.time()
print(«Time for SerialSquirrel: %ssecs» % (end_time — start_time))
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405
# Time for SerialSquirrel: 15.881232261657715secs
|
Подход потоков
Мы собираемся стать немного более креативными с реализацией многопоточного подхода. Мы используем очередь, чтобы поместить адреса и создать рабочие потоки, чтобы вывести их из очереди и обработать их. Мы будем ждать, пока очередь будет пустой, а это означает, что все адреса были обработаны нашими рабочими потоками.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# threaded_squirrel.py
import time
from queue import Queue
from threading import Thread
NUM_WORKERS = 4
task_queue = Queue()
def worker():
# Constantly check the queue for addresses
while True:
address = task_queue.get()
check_website(address)
# Mark the processed task as done
task_queue.task_done()
start_time = time.time()
# Create the worker threads
threads = [Thread(target=worker) for _ in range(NUM_WORKERS)]
# Add the websites to the task queue
[task_queue.put(item) for item in WEBSITE_LIST]
# Start all the workers
[thread.start() for thread in threads]
# Wait for all the tasks in the queue to be processed
task_queue.join()
end_time = time.time()
print(«Time for ThreadedSquirrel: %ssecs» % (end_time — start_time))
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405
# Time for ThreadedSquirrel: 3.110753059387207secs
|
concurrent.futures
Как указывалось ранее, concurrent.futures — это высокоуровневый API для использования потоков. Подход, который мы здесь используем, подразумевает использование ThreadPoolExecutor . Мы собираемся отправить задачи в пул и вернуть фьючерсы, результаты которых будут нам доступны в будущем. Конечно, мы можем подождать, пока все фьючерсы станут фактическими результатами.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
# future_squirrel.py
import time
import concurrent.futures
NUM_WORKERS = 4
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = {executor.submit(check_website, address) for address in WEBSITE_LIST}
concurrent.futures.wait(futures)
end_time = time.time()
print(«Time for FutureSquirrel: %ssecs» % (end_time — start_time))
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405
# Time for FutureSquirrel: 1.812899112701416secs
|
Многопроцессорный подход
multiprocessing библиотека предоставляет API-интерфейс для замены threading . В этом случае мы собираемся использовать подход, более похожий на подход concurrent.futures . Мы настраиваем multiprocessing.Pool и отправляем ему задачи, отображая функцию в список адресов (вспомним классическую функцию map Python).
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# multiprocessing_squirrel.py
import time
import socket
import multiprocessing
NUM_WORKERS = 4
start_time = time.time()
with multiprocessing.Pool(processes=NUM_WORKERS) as pool:
results = pool.map_async(check_website, WEBSITE_LIST)
results.wait()
end_time = time.time()
print(«Time for MultiProcessingSquirrel: %ssecs» % (end_time — start_time))
# WARNING:root:Timeout expired for website http://really-cool-available-domain.com
# WARNING:root:Timeout expired for website http://another-really-interesting-domain.co
# WARNING:root:Website http://bing.com returned status_code=405
# Time for MultiProcessingSquirrel: 2.8224599361419678secs
|
GEvent
Gevent является популярной альтернативой для достижения массового параллелизма. Есть несколько вещей, которые вы должны знать, прежде чем использовать его:
-
Код, выполняемый одновременно гринлетами, является детерминированным. В отличие от других представленных альтернатив, эта парадигма гарантирует, что для любых двух идентичных прогонов вы всегда получите одинаковые результаты в одном и том же порядке.
-
Вам нужно соединить стандартные функции, чтобы они взаимодействовали с Gevent. Вот что я имею в виду под этим. Обычно операция сокета блокируется. Мы ждем окончания операции. Если бы мы находились в многопоточной среде, планировщик просто переключился бы на другой поток, в то время как другой ожидает ввода / вывода. Поскольку мы не в многопоточной среде, gevent исправляет стандартные функции, чтобы они стали неблокирующими, и возвращал управление планировщику gevent.
Чтобы установить gevent, запустите: pip install gevent
Вот как использовать gevent для выполнения нашей задачи с помощью gevent.pool.Pool :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# green_squirrel.py
import time
from gevent.pool import Pool
from gevent import monkey
# Note that you can spawn many workers with gevent since the cost of creating and switching is very low
NUM_WORKERS = 4
# Monkey-Patch socket module for HTTP requests
monkey.patch_socket()
start_time = time.time()
pool = Pool(NUM_WORKERS)
for address in WEBSITE_LIST:
pool.spawn(check_website, address)
# Wait for stuff to finish
pool.join()
end_time = time.time()
print(«Time for GreenSquirrel: %ssecs» % (end_time — start_time))
# Time for GreenSquirrel: 3.8395519256591797secs
|
Сельдерей
Сельдерей — это подход, который в основном отличается от того, что мы видели до сих пор. Это боевые испытания в условиях очень сложных и высокопроизводительных сред. Настройка Celery потребует немного больше усилий, чем все вышеперечисленные решения.
Для начала нам нужно установить Celery:
pip install celery
Задачи являются центральными понятиями в рамках проекта Celery. Все, что вы захотите запустить в Celery, должно быть задачей. Celery предлагает большую гибкость для запуска задач: вы можете запускать их синхронно или асинхронно, в режиме реального времени или по расписанию, на одной и той же машине или на нескольких машинах, используя потоки, процессы, Eventlet или Gevent.
Расположение будет немного сложнее. Сельдерей использует другие сервисы для отправки и получения сообщений. Эти сообщения обычно являются задачами или результатами задач. Мы собираемся использовать Redis в этом руководстве для этой цели. Redis — отличный выбор, потому что его действительно легко установить и настроить, и вполне возможно, что вы уже используете его в своем приложении для других целей, таких как кэширование и публикация / публикация.
Вы можете установить Redis, следуя инструкциям на странице быстрого запуска Redis . Не забудьте установить библиотеку redis Python, pip install redis и пакет, необходимый для использования Redis и Celery: pip install celery[redis] .
Запустите сервер Redis следующим образом: $ redis-server
Чтобы начать создавать вещи с помощью Celery, сначала нам нужно создать приложение Celery. После этого Celery должен знать, какие задачи он может выполнять. Для этого нам нужно зарегистрировать задачи в приложении Celery. Мы сделаем это с @app.task декоратора @app.task :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
# celery_squirrel.py
import time
from utils import check_website
from data import WEBSITE_LIST
from celery import Celery
from celery.result import ResultSet
app = Celery(‘celery_squirrel’,
broker=’redis://localhost:6379/0′,
backend=’redis://localhost:6379/0′)
@app.task
def check_website_task(address):
return check_website(address)
if __name__ == «__main__»:
start_time = time.time()
# Using `delay` runs the task async
rs = ResultSet([check_website_task.delay(address) for address in WEBSITE_LIST])
# Wait for the tasks to finish
rs.get()
end_time = time.time()
print(«CelerySquirrel:», end_time — start_time)
# CelerySquirrel: 2.4979639053344727
|
Не паникуйте, если ничего не происходит. Помните, что Celery — это сервис, и нам нужно его запустить. До сих пор мы только помещали задачи в Redis, но не запускали Celery для их выполнения. Для этого нам нужно запустить эту команду в папке, где находится наш код:
celery worker -A do_celery --loglevel=debug --concurrency=4
Теперь перезапустите скрипт Python и посмотрите, что произойдет. Стоит обратить внимание на одну вещь: обратите внимание, как мы дважды передали адрес Redis нашему приложению Redis. Параметр broker указывает, где задачи передаются в Celery, а в backend Celery помещает результаты, чтобы мы могли использовать их в нашем приложении. Если мы не укажем backend результата, мы не сможем узнать, когда задача была обработана и каков был результат.
Также имейте в виду, что журналы теперь находятся в стандартном выводе процесса Celery, поэтому обязательно проверьте их в соответствующем терминале.
Выводы
Я надеюсь, что это было интересное путешествие для вас и хорошее введение в мир параллельного / параллельного программирования на Python. Это конец пути, и мы можем сделать некоторые выводы:
- Есть несколько парадигм, которые помогают нам достигать высокопроизводительных вычислений в Python.
- Для многопоточной парадигмы у нас есть библиотеки
threadingиconcurrent.futures. -
multiprocessingобеспечивает очень похожий интерфейс наthreadingно для процессов, а не для потоков. - Помните, что процессы достигают истинного параллелизма, но создавать их дороже.
- Помните, что в процессе может быть запущено больше потоков.
- Не путайте параллель с параллельной. Помните, что только параллельный подход использует преимущества многоядерных процессоров, в то время как параллельное программирование интеллектуально планирует задачи так, что ожидание длительных операций выполняется при параллельном выполнении реальных вычислений.
Выучить питон
Изучите Python с нашим полным руководством по питону, независимо от того, начинаете ли вы или начинающий программист, ищущий новые навыки.