Статьи

Асинхронный ввод-вывод с Python 3

В этом руководстве вы познакомитесь с возможностями асинхронного ввода-вывода, представленными в Python 3.4 и улучшенными в Python 3.5 и 3.6.

Ранее в Python было мало отличных вариантов асинхронного программирования. Новая поддержка Async I / O, наконец, обеспечивает первоклассную поддержку, которая включает в себя как высокоуровневые API, так и стандартную поддержку, которая нацелена на объединение нескольких сторонних решений (Twisted, Gevent, Tornado, asyncore и т. Д.).

Важно понимать, что изучение асинхронного ввода-вывода Python не является тривиальным из-за быстрой итерации, области действия и необходимости предоставить путь миграции к существующим асинхронным средам. Я сосредоточусь на последних и самых простых, чтобы немного упростить.

Существует множество движущихся частей, которые интересным образом взаимодействуют через границы потоков, границы процессов и удаленные машины. Существуют отличия и ограничения для конкретной платформы. Давайте прыгнем прямо в.

Основной концепцией асинхронного ввода-вывода является цикл обработки событий. В программе может быть несколько циклов событий. Каждый поток будет иметь не более одного активного цикла обработки событий. Цикл обработки событий предоставляет следующие возможности:

  • Регистрация, выполнение и отмена отложенных вызовов (с таймаутами).
  • Создание клиентских и серверных транспортов для различных видов связи.
  • Запуск подпроцессов и связанных транспортов для связи с внешней программой.
  • Делегирование дорогостоящих вызовов функций в пул потоков.

Вот небольшой пример, который запускает две сопрограммы и вызывает функцию с задержкой. Он показывает, как использовать цикл обработки событий для питания вашей программы:

1
import asyncio async def foo(delay): for i in range(10): print(i) await asyncio.sleep(delay) def stopper(loop): loop.stop() loop = asyncio.get_event_loop() # Schedule a call to foo() loop.create_task(foo(0.5)) loop.create_task(foo(1)) loop.call_later(12, stopper, loop) # Block until loop.stop() is called() loop.run_forever() loop.close()

Класс AbstractEventLoop предоставляет базовый контракт для циклов событий. Есть много вещей, которые должен поддерживать цикл обработки событий:

  • Планирование функций и сопрограмм для выполнения
  • Создание фьючерсов и задач
  • Управление TCP-серверами
  • Обработка сигналов (в Unix)
  • Работа с трубами и подпроцессами

Вот методы, связанные с выполнением и остановкой события, а также планирование функций и сопрограмм:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class AbstractEventLoop:
   «»»Abstract event loop.»»»
 
   # Running and stopping the event loop.
 
   def run_forever(self):
       «»»Run the event loop until stop() is called.»»»
       raise NotImplementedError
 
   def run_until_complete(self, future):
       «»»Run the event loop until a Future is done.
 
       Return the Future’s result, or raise its exception.
       «»»
       raise NotImplementedError
 
   def stop(self):
       «»»Stop the event loop as soon as reasonable.
 
       Exactly how soon that is may depend on the implementation, but
       no more I/O callbacks should be scheduled.
       «»»
       raise NotImplementedError
 
   def is_running(self):
       «»»Return whether the event loop is currently running.»»»
       raise NotImplementedError
 
   def is_closed(self):
       «»»Returns True if the event loop was closed.»»»
       raise NotImplementedError
 
   def close(self):
       «»»Close the loop.
 
       The loop should not be running.
 
       This is idempotent and irreversible.
 
       No other methods should be called after this one.
       «»»
       raise NotImplementedError
 
   def shutdown_asyncgens(self):
       «»»Shutdown all active asynchronous generators.»»»
       raise NotImplementedError
 
   # Methods scheduling callbacks.
 
   def _timer_handle_cancelled(self, handle):
       «»»Notification that a TimerHandle has been cancelled.»»»
       raise NotImplementedError
 
   def call_soon(self, callback, *args):
       return self.call_later(0, callback, *args)
 
   def call_later(self, delay, callback, *args):
       raise NotImplementedError
 
   def call_at(self, when, callback, *args):
       raise NotImplementedError
 
   def time(self):
       raise NotImplementedError
 
   def create_future(self):
       raise NotImplementedError
 
   # Method scheduling a coroutine object: create a task.
 
   def create_task(self, coro):
       raise NotImplementedError
 
   # Methods for interacting with threads.
 
   def call_soon_threadsafe(self, callback, *args):
       raise NotImplementedError
 
   def run_in_executor(self, executor, func, *args):
       raise NotImplementedError
 
   def set_default_executor(self, executor):
       raise NotImplementedError

Asyncio разработан для поддержки нескольких реализаций циклов событий, которые придерживаются его API. Ключом является класс EventLoopPolicy который настраивает asyncio и позволяет управлять каждым аспектом цикла событий. Вот пример пользовательского цикла uvloop событий, называемого uvloop основанного на libuv, который должен быть намного быстрее, чем альтернативы (я сам не тестировал его):

1
2
3
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

Вот и все. Теперь, когда вы используете какую-либо функцию asyncio, она uvloop под одеялом.

Сопрограмма — это загруженный термин. Это и функция, которая выполняется асинхронно, и объект, который необходимо запланировать. Вы определяете их, добавляя ключевое слово async перед определением:

1
2
3
4
5
import asyncio
 
 
async def cool_coroutine():
    return «So cool…»

Если вы вызываете такую ​​функцию, она не запускается. Вместо этого он возвращает объект сопрограммы, и если вы не запланируете его выполнение, вы также получите предупреждение:

1
2
3
4
5
6
7
8
9
c = cool_coroutine()
print(c)
 
Output:
 
<coroutine object cool_coroutine at 0x108a862b0>
sys:1: RuntimeWarning: coroutine ‘cool_coroutine’ was never awaited
 
Process finished with exit code 0

Чтобы фактически выполнить сопрограмму, нам нужен цикл обработки событий:

1
2
3
4
5
6
7
8
r = loop.run_until_complete(c)
loop.close()
 
print(r)
 
Output:
 
So cool…

Это прямое планирование. Вы также можете цепочку сопрограмм. Обратите внимание, что вы должны вызывать await при вызове сопрограмм:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
import asyncio
 
async def compute(x, y):
    print(«Compute %s + %s …» % (x, y))
    await asyncio.sleep(1.0)
    return x + y
 
async def print_sum(x, y):
    result = await compute(x, y)
    print(«%s + %s = %s» % (x, y, result))
 
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

Класс asyncio Future аналогичен классу concurrent.future.Future . Он не защищен от потоков и поддерживает следующие функции:

  • добавление и удаление готовых обратных вызовов
  • отмена
  • настройка результатов и исключений

Вот как использовать будущее с циклом событий. take_your_time() принимает будущее и устанавливает его результат после сна на секунду.

Функция ensure_future() планирует сопрограмму, а wait_until_complete() ожидает выполнения будущего. За занавесом он добавляет готовый обратный вызов в будущее.

01
02
03
04
05
06
07
08
09
10
11
12
import asyncio
 
async def take_your_time(future):
    await asyncio.sleep(1)
    future.set_result(42)
 
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(take_your_time(future))
loop.run_until_complete(future)
print(future.result())
loop.close()

Это довольно громоздко. Asyncio предлагает задачи, чтобы сделать работу с фьючерсами и сопрограммами более приятной. Задача — это подкласс Future, который оборачивает сопрограмму и которую можно отменить.

Сопрограмма не должна принимать явное будущее и устанавливать его результат или исключение. Вот как выполнить те же операции с задачей:

01
02
03
04
05
06
07
08
09
10
11
import asyncio
 
async def take_your_time():
    await asyncio.sleep(1)
    return 42
 
loop = asyncio.get_event_loop()
task = loop.create_task(take_your_time())
loop.run_until_complete(task)
print(task.result())
loop.close()

Транспорт — это абстракция канала связи. Транспорт всегда поддерживает определенный протокол. Asyncio предоставляет встроенные реализации для TCP, UDP, SSL и каналов подпроцесса.

Если вы знакомы с сетевым программированием на основе сокетов, вы будете чувствовать себя как дома с транспортными средствами и протоколами. С Asyncio вы получаете стандартное асинхронное сетевое программирование. Давайте посмотрим на печально известный эхо-сервер и клиент («привет мир» сетей).

Во-первых, клиент echo реализует класс EchoClient , производный от asyncio.Protocol . Он сохраняет свой цикл событий и сообщение, которое он отправит на сервер при подключении.

В обратном вызове connection_made() записывает свое сообщение в транспорт. В data_received() он просто печатает ответ сервера, а в методе connection_lost() останавливает цикл обработки событий. При передаче экземпляра класса EchoClient в метод цикла create_connection() результатом является сопрограмма, которую цикл выполняет до завершения.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
 
class EchoClient(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop
 
    def connection_made(self, transport):
        transport.write(self.message.encode())
        print(‘Data sent: {!r}’.format(self.message))
 
    def data_received(self, data):
        print(‘Data received: {!r}’.format(data.decode()))
 
    def connection_lost(self, exc):
        print(‘The server closed the connection’)
        print(‘Stop the event loop’)
        self.loop.stop()
 
loop = asyncio.get_event_loop()
message = ‘Hello World!’
coro = loop.create_connection(lambda: EchoClient(message, loop),
                              ‘127.0.0.1’, 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

Сервер аналогичен, за исключением того, что он работает вечно, ожидая подключения клиентов. После отправки эхо-ответа он также закрывает соединение с клиентом и готов к подключению следующего клиента.

Новый экземпляр EchoServer создается для каждого соединения, поэтому даже при одновременном подключении нескольких клиентов проблем с атрибутом transport не возникнет.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
 
class EchoServer(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info(‘peername’)
        print(‘Connection from {}’.format(peername))
        self.transport = transport
 
    def data_received(self, data):
        message = data.decode()
        print(‘Data received: {!r}’.format(message))
 
        print(‘Send: {!r}’.format(message))
        self.transport.write(data)
 
        print(‘Close the client socket’)
        self.transport.close()
 
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServer, ‘127.0.0.1’, 8888)
server = loop.run_until_complete(coro)
print(‘Serving on {}’.format(server.sockets[0].getsockname()))
loop.run_forever()

Вот результат после подключения двух клиентов:

1
2
3
4
5
6
7
8
9
Serving on (‘127.0.0.1’, 8888)
Connection from (‘127.0.0.1’, 53248)
Data received: ‘Hello World!’
Send: ‘Hello World!’
Close the client socket
Connection from (‘127.0.0.1’, 53351)
Data received: ‘Hello World!’
Send: ‘Hello World!’
Close the client socket

Потоки предоставляют высокоуровневый API, который основан на сопрограммах и предоставляет абстракции Reader и Writer. Протоколы и транспорты скрыты, нет необходимости определять свои собственные классы и нет обратных вызовов. Вы просто ждете такие события, как соединение и полученные данные.

Клиент вызывает open_connection() которая возвращает объекты чтения и записи, использованные естественным образом. Чтобы закрыть соединение, он закрывает писателя.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
 
 
async def tcp_echo_client(message, loop):
    reader, writer = await asyncio.open_connection(
        ‘127.0.0.1’,
        8888,
        loop=loop)
 
    print(‘Send: %r’ % message)
    writer.write(message.encode())
 
    data = await reader.read(100)
    print(‘Received: %r’ % data.decode())
 
    print(‘Close the socket’)
    writer.close()
 
 
message = ‘Hello World!’
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))
loop.close()

Сервер также значительно упрощен.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
 
async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info(‘peername’)
    print(«Received %r from %r» % (message, addr))
 
    print(«Send: %r» % message)
    writer.write(data)
    await writer.drain()
 
    print(«Close the client socket»)
    writer.close()
 
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo,
                            ‘127.0.0.1’,
                            8888,
                            loop=loop)
server = loop.run_until_complete(coro)
print(‘Serving on {}’.format(server.sockets[0].getsockname()))
loop.run_forever()

Asyncio также охватывает взаимодействия с подпроцессами. Следующая программа запускает другой процесс Python и выполняет код «import this». Это одно из знаменитых пасхальных яиц Python, на котором напечатано «Zen of Python». Проверьте вывод ниже.

Процесс Python запускается в сопрограмме zen() с помощью функции create_subprocess_exec() и связывает стандартный вывод с каналом. Затем он перебирает стандартные выходные строки построчно, используя await чтобы дать возможность другим процессам или сопрограммам выполнить, если вывод еще не готов.

Обратите внимание, что в Windows вы должны установить цикл обработки событий в ProactorEventLoop потому что стандартный SelectorEventLoop не поддерживает каналы.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import asyncio.subprocess
import sys
 
 
async def zen():
    code = ‘import this’
    create = asyncio.create_subprocess_exec(
        sys.executable,
        ‘-c’,
        code,
        stdout=asyncio.subprocess.PIPE)
    proc = await create
 
    data = await proc.stdout.readline()
    while data:
        line = data.decode(‘ascii’).rstrip()
        print(line)
        data = await proc.stdout.readline()
 
    await proc.wait()
 
if sys.platform == «win32»:
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
 
loop.run_until_complete(zen())
 
Output:
 
The Zen of Python, by Tim Peters
 
Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren’t special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one— and preferably only one —obvious way to
do it.
Although that way may not be obvious at first unless you’re
Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it’s a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea — let’s do more of those!

Не стесняйтесь, чтобы увидеть, что у нас есть в наличии для продажи и для изучения на рынке , и не стесняйтесь задавать любые вопросы и оставить свой ценный отзыв, используя канал ниже.

Python asyncio — это комплексная среда для асинхронного программирования. Он имеет огромный охват и поддерживает как низкоуровневые, так и высокоуровневые API. Это все еще относительно молодой и не очень хорошо понятый сообществом.

Я уверен, что со временем появятся лучшие практики, и появится больше примеров, которые упростят использование этой мощной библиотеки.