Что это за « производитель / потребитель »? Это вокруг нас, везде. Каждый раз, когда вы видите какой-то рабочий процесс с несколькими последовательными шагами, это пример. Производственная линия на автомобильном заводе, кухня быстрого питания, даже почтовая служба.
Так почему мы заботимся об этом? Ну, это легко: почти в каждом программном обеспечении, которое мы пишем, есть конвейер, который нужно выполнить. И, как и каждый конвейер, после завершения шага вывод перенаправляется на следующий в строке, освобождая место для другого выполнения.
По сути, это означает, что каждый шаг в цепочке должен выполняться в полной изоляции, получать данные, обрабатывать их и передавать их следующему блоку.
Как следствие, каждый блок должен выполняться в своем собственном потоке, чтобы обеспечить правильную инкапсуляцию. Конечно, есть целый мир, который нужно учитывать, включая все проблемы параллелизма, которые могут возникнуть при обмене данными между потоками.
Именно здесь на помощь приходит библиотека System.Threading.Channels . Но что такое «канал»? Это средство для достижения цели.
Канал представляет собой способ безопасного обмена данными между двумя сторонами (производитель и потребитель), позволяя в то же время уведомлений и обеспечение потокобезопасности. Это в основном потокобезопасная очередь.
Вам также может понравиться:
Эволюция проблемы производителя-потребителя в Java .
Теперь канал может быть ограничен или неограничен:
- Ограниченные каналы имеют ограниченную емкость для входящих сообщений, что означает, что источник может публиковать только определенное количество раз перед заполнением пространства. Затем он должен будет дождаться, пока потребители выполнят свою работу, и освободит место для новых сообщений.
- У неограниченных каналов вместо этого нет этого ограничения, что означает, что издатели могут публиковать столько раз, сколько захотят, надеясь, что потребители смогут следить.
Выбор правильного типа канала, конечно, чрезвычайно важен и сильно зависит от контекста. Также имейте в виду, что, хотя это правда, что неограниченные каналы действительно «неограничены», память на машине обычно нет.
Таким образом, если ваше приложение заполняет канал данными, а потребители не могут выполнять свою работу достаточно быстро, вы можете столкнуться с проблемами.
С другой стороны, когда заполненный канал заполнен, входящие сообщения не будут добавляться в очередь, что замедляет работу системы. Простым решением может быть просто добавление большего числа потребителей, но, опять же, не думайте, что ресурсы бесконечны.
Как обычно, я создал небольшой репозиторий на GitHub, в котором показаны некоторые варианты использования. Код в основном симулирует обмен кучей сообщений между:
- один производитель и один потребитель.
- один производитель и несколько потребителей.
- несколько производителей и несколько потребителей.
Я структурировал это, чтобы было очень просто добавлять больше случаев.
Теперь несколько вещей, чтобы отметить здесь.
Класс Producer просто вызывает WriteAsync()
публикацию сообщения. Этот метод внутренне использует интересный шаблон, что-то вроде этого:
C #
xxxxxxxxxx
1
while (await _writer.WaitToWriteAsync(cancellationToken))
2
if (_writer.TryWrite(message))
3
return;
Есть несколько веских причин, почему он используется WaitToWriteAsync()
в цикле. Во-первых, потому что разные производители могут совместно использовать канал, поэтому WaitToWriteAsync()
могут сигнализировать о том, что мы можем продолжить запись, но затем происходит TryWrite()
сбой. Это вернет нас в петлю, ожидая следующего шанса.
Что касается чтения , все не так уж и отличается:
C #
xxxxxxxxxx
1
await foreach (var message in _reader.ReadAllAsync(cancellationToken))
2
DoSomething(message);
Здесь мы используем ReadAllAsync()
, что возвращает IAsyncEnumerable<>
, позволяя нам прочитать все доступные данные за один раз.
Этот метод внутренне ожидает данные, которые будут доступны, и использует yield return, чтобы вернуть их вызывающей стороне.
Всегда полезно взглянуть на источники библиотек, которые мы используем. Это помогает нам лучше понять инструменты в нашем поясе, давая нам возможность выбрать наиболее подходящий для работы под рукой.
Кроме того (и, вероятно, это еще более важно), чтение кода других людей является одним из лучших способов усовершенствования в качестве разработчиков программного обеспечения.
В целом, эта библиотека каналов очень полезна при разработке приложений с интенсивным использованием данных в многопоточных средах, особенно когда необходимо обмениваться сообщениями между работниками.
В веб-контексте это может быть удобно. например, при подписке на систему очередей, такую как RabbitMQ, при этом источник извлекает сообщения и отправляет их одному или нескольким получателям.
Здесь вы можете получить более подробное объяснение с примером реализации.