обзор
Одной из уникальных особенностей Go является использование каналов для безопасного общения между программами. В этой статье вы узнаете, что такое каналы, как их эффективно использовать, а также некоторые общие шаблоны.
Что такое канал?
Канал — это синхронизированная очередь в памяти, которую могут использовать программы и обычные функции для отправки и получения введенных значений. Связь сериализуется через канал.
Вы создаете канал с помощью make()
и указываете тип значений, которые принимает канал:
ch := make(chan int)
Go предоставляет приятный синтаксис стрелок для отправки и получения в / из каналов:
1
2
3
4
5
|
// send value to a channel
ch <- 5
// receive value from a channel
x := <- ch
|
Вам не нужно потреблять стоимость. Это нормально, просто вывести значение из канала:
<-ch
Каналы блокируются по умолчанию. Если вы отправите значение в канал, вы заблокируете его, пока его не получат. Точно так же, если вы получаете от канала, вы будете блокировать, пока кто-то не отправит значение в канал.
Следующая программа демонстрирует это. Функция main()
создает канал и запускает вызываемую процедуру go, которая печатает «start», считывает значение из канала и печатает тоже. Затем main()
запускает другую процедуру, которая печатает тире («-«) каждую секунду. Затем он спит в течение 2,5 секунд, отправляет значение в канал и спит еще 3 секунды, чтобы завершить выполнение всех процедур.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import (
«fmt»
«time»
)
func main() {
ch := make(chan int)
// Start a goroutine that reads a value from a channel and prints it
go func(ch chan int) {
fmt.Println(«start»)
fmt.Println(<-ch)
}(ch)
// Start a goroutine that prints a dash every second
go func() {
for i := 0;
time.Sleep(time.Second)
fmt.Println(«-«)
}
}()
// Sleep for two seconds
time.Sleep(2500 * time.Millisecond)
// Send a value to the channel
ch <- 5
// Sleep three more seconds to let all goroutines finish
time.Sleep(3 * time.Second)
}
|
Эта программа очень хорошо демонстрирует блокирующий характер канала. Первая программа распечатывает «start» сразу, но затем блокируется при попытке получить от канала функцию main()
, которая спит в течение 2,5 секунд и отправляет значение. Другая программа просто обеспечивает визуальное представление о потоке времени, регулярно печатая тире каждую секунду.
Вот вывод:
1
2
3
4
5
6
7
|
start
—
—
5
—
—
—
|
Буферизованные каналы
Такое поведение тесно связывает отправителей с получателями, а иногда это не то, что вы хотите. Go предоставляет несколько механизмов для решения этой проблемы.
Буферизованные каналы — это каналы, которые могут содержать определенное (предопределенное) количество значений, чтобы отправители не блокировали до тех пор, пока буфер не будет заполнен, даже если никто не получает.
Чтобы создать буферизованный канал, просто добавьте емкость в качестве второго аргумента:
ch := make(chan int, 5)
Следующая программа иллюстрирует поведение буферизованных каналов. Программа main()
определяет буферизованный канал с пропускной способностью 3. Затем она запускает одну процедуру, которая каждую секунду читает буфер из канала и печатает, и другую программу, которая просто печатает тире каждую секунду, чтобы визуально показать ход выполнения. времени. Затем он отправляет пять значений в канал.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
import (
«fmt»
«time»
)
func main() {
ch := make(chan int, 3)
// Start a goroutine that reads a value from the channel every second and prints it
go func(ch chan int) {
for {
time.Sleep(time.Second)
fmt.Printf(«Goroutine received: %d\n», <-ch)
}
}(ch)
// Start a goroutine that prints a dash every second
go func() {
for i := 0;
time.Sleep(time.Second)
fmt.Println(«-«)
}
}()
// Push values to the channel as fast as possible
for i := 0;
ch <- i
fmt.Printf(«main() pushed: %d\n», i)
}
// Sleep five more seconds to let all goroutines finish
time.Sleep(5 * time.Second)
}
|
Что происходит во время выполнения? Первые три значения сразу буферизуются каналом и функциональными блоками main()
. Через секунду значение получено программой, и функция main()
может выдвинуть другое значение. Проходит еще одна секунда, программа получает другое значение, и функция main()
может выдвинуть последнее значение. На этом этапе программа продолжает получать значения из канала каждую секунду.
Вот вывод:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
main() pushed: 0
main() pushed: 1
main() pushed: 2
—
Goroutine received: 0
main() pushed: 3
—
Goroutine received: 1
main() pushed: 4
—
Goroutine received: 2
—
Goroutine received: 3
—
Goroutine received: 4
|
Выбрать
Буферизованные каналы (при условии, что буфер достаточно большой) могут решить проблему временных колебаний, когда не хватает приемников для обработки всех отправленных сообщений. Но есть и противоположная проблема заблокированных получателей, ожидающих обработки сообщений. Иди тебя прикрыл.
Что если вы хотите, чтобы ваша программа выполняла что-то еще, когда в канале нет сообщений для обработки? Хороший пример, если ваш приемник ожидает сообщений от нескольких каналов. Вы не хотите блокировать на канале A, если канал B имеет сообщения прямо сейчас. Следующая программа пытается вычислить сумму 3 и 5, используя полную мощность машины.
Идея состоит в том, чтобы моделировать сложную операцию (например, удаленный запрос к распределенной БД) с избыточностью. Функция sum()
(обратите внимание, как она определена как вложенная функция внутри main()
) принимает два параметра int и возвращает канал int. Внутренняя анонимная программа неактивна некоторое время до одной секунды, а затем записывает сумму в канал, закрывает ее и возвращает.
Теперь основные вызовы sum(3, 5)
четыре раза и сохраняют полученные каналы в переменных от ch1 до ch4. Четыре вызова sum()
возвращаются немедленно, потому что случайный спящий процесс происходит внутри программы, которую вызывает каждая функция sum()
.
Здесь начинается классная часть. Оператор select
позволяет функции main()
ждать на всех каналах и отвечать на первый, который возвращается. Оператор select
работает немного как оператор switch
.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
func main() {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
sum := func(a int, b int) <-chan int {
ch := make(chan int)
go func() {
// Random time up to one second
delay := time.Duration(r.Int()%1000) * time.Millisecond
time.Sleep(delay)
ch <- a + b
close(ch)
}()
return ch
}
// Call sum 4 times with the same parameters
ch1 := sum(3, 5)
ch2 := sum(3, 5)
ch3 := sum(3, 5)
ch4 := sum(3, 5)
// wait for the first goroutine to write to its channel
select {
case result := <-ch1:
fmt.Printf(«ch1: 3 + 5 = %d», result)
case result := <-ch2:
fmt.Printf(«ch2: 3 + 5 = %d», result)
case result := <-ch3:
fmt.Printf(«ch3: 3 + 5 = %d», result)
case result := <-ch4:
fmt.Printf(«ch4: 3 + 5 = %d», result)
}
}
|
Иногда вы не хотите, чтобы функция main()
блокировала ожидание даже до завершения первой процедуры. В этом случае вы можете добавить вариант по умолчанию, который будет выполняться, если все каналы заблокированы.
Пример веб-сканера
В моей предыдущей статье я продемонстрировал решение упражнения для веб-сканера Tour of Go . Я использовал goroutines и синхронизированную карту. Я также решил упражнение, используя каналы. Полный исходный код для обоих решений доступен на GitHub .
Давайте посмотрим на соответствующие части. Во-первых, вот структура, которая будет отправляться на канал всякий раз, когда программа обрабатывает страницу. Он содержит текущую глубину и все URL-адреса, найденные на странице.
1
2
3
4
|
type links struct {
urls []string
depth int
}
|
Функция fetchURL()
принимает URL, глубину и канал вывода. Он использует сборщик (предоставленный упражнением) для получения URL-адресов всех ссылок на странице. Он отправляет список URL-адресов в виде одного сообщения на канал кандидата в виде структуры links
с уменьшенной глубиной. Глубина показывает, насколько дальше мы должны ползти. Когда глубина достигает 0, дальнейшая обработка не производится.
01
02
03
04
05
06
07
08
09
10
|
func fetchURL(url string, depth int, candidates chan links) {
body, urls, err := fetcher.Fetch(url)
fmt.Printf(«found: %s %q\n», url, body)
if err != nil {
fmt.Println(err)
}
candidates <- links{urls, depth — 1}
}
|
Функция ChannelCrawl()
координирует все. Он отслеживает все URL, которые уже были извлечены на карте. Нет необходимости синхронизировать доступ, потому что никакая другая функция или процедура не касаются. Он также определяет канал-кандидат, в который все программы будут записывать свои результаты.
Затем он запускает parseUrl
качестве parseUrl
для каждого нового URL. Логика отслеживает, сколько гоу-рутин было запущено с помощью счетчика. Всякий раз, когда значение считывается из канала, счетчик уменьшается (так как отправляющая процедура завершается после отправки), и всякий раз, когда запускается новая процедура, счетчик увеличивается. Если глубина становится равной нулю, никакие новые подпрограммы не будут запущены, и основная функция будет продолжать чтение из канала, пока все подпрограммы не будут выполнены.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// ChannelCrawl crawls links from a seed url
func ChannelCrawl(url string, depth int, fetcher Fetcher) {
candidates := make(chan links, 0)
fetched := make(map[string]bool)
counter := 1
// Fetch initial url to seed the candidates channel
go fetchURL(url, depth, candidates)
for counter > 0 {
candidateLinks := <-candidates
counter—
depth = candidateLinks.depth
for _, candidate := range candidateLinks.urls {
// Already fetched.
if fetched[candidate] {
continue
}
// Add to fetched mapped
fetched[candidate] = true
if depth > 0 {
counter++
go fetchURL(candidate, depth, candidates)
}
}
}
|
Вывод
Каналы Go предоставляют множество возможностей для безопасной связи между программами. Поддержка синтаксиса является как краткой, так и иллюстративной. Это настоящее благо для выражения параллельных алгоритмов. Существует гораздо больше каналов, чем я представил здесь. Я рекомендую вам погрузиться и ознакомиться с различными шаблонами параллелизма, которые они допускают.