Статьи

Как реализовать производителя / потребителя с помощью System.Threading.Channels

Что это за « производитель / потребитель »? Это вокруг нас, везде. Каждый раз, когда вы видите какой-то рабочий процесс с несколькими последовательными шагами, это пример. Производственная линия на автомобильном заводе, кухня быстрого питания, даже почтовая служба.

Так почему мы заботимся об этом? Ну, это легко: почти в каждом программном обеспечении, которое мы пишем, есть конвейер, который нужно выполнить. И, как и каждый конвейер, после завершения шага вывод перенаправляется на следующий в строке, освобождая место для другого выполнения.

По сути, это означает, что каждый шаг в цепочке должен выполняться в полной изоляции, получать данные, обрабатывать их и передавать их следующему блоку.

Как следствие, каждый блок должен выполняться в своем собственном потоке, чтобы обеспечить правильную инкапсуляцию. Конечно, есть целый мир, который нужно учитывать, включая все проблемы параллелизма, которые могут возникнуть при обмене данными между потоками.

Именно здесь на помощь приходит библиотека System.Threading.Channels . Но что такое «канал»? Это средство для достижения цели.

Канал представляет собой способ безопасного обмена данными между двумя сторонами (производитель и потребитель), позволяя в то же время уведомлений и обеспечение потокобезопасности. Это в основном потокобезопасная очередь.


Вам также может понравиться:
Эволюция проблемы производителя-потребителя в Java .

Теперь канал может быть ограничен или неограничен:

  • Ограниченные каналы имеют ограниченную емкость для входящих сообщений, что означает, что источник может публиковать только определенное количество раз перед заполнением пространства. Затем он должен будет дождаться, пока потребители выполнят свою работу, и освободит место для новых сообщений.
  • У неограниченных каналов  вместо этого нет этого ограничения, что означает, что издатели могут публиковать столько раз, сколько захотят, надеясь, что потребители смогут следить.

Выбор правильного типа канала, конечно, чрезвычайно важен и сильно зависит от контекста. Также имейте в виду, что, хотя это правда, что неограниченные каналы действительно «неограничены», память на машине обычно нет.

Таким образом, если ваше приложение заполняет канал данными, а потребители не могут выполнять свою работу достаточно быстро, вы можете столкнуться с проблемами.

С другой стороны, когда заполненный канал заполнен, входящие сообщения не будут добавляться в очередь, что замедляет работу системы. Простым решением может быть просто добавление большего числа потребителей, но, опять же, не думайте, что ресурсы бесконечны.

Как обычно, я создал небольшой репозиторий на GitHub, в котором показаны некоторые варианты использования. Код в основном симулирует обмен кучей сообщений между:

  • один производитель и один потребитель.
  • один производитель и несколько потребителей.
  • несколько производителей и несколько потребителей.

Я структурировал это, чтобы было очень просто добавлять больше случаев.

Теперь несколько вещей, чтобы отметить здесь.

Класс Producer просто вызывает WriteAsync()публикацию сообщения. Этот метод внутренне использует интересный шаблон, что-то вроде этого:


C #