Статьи

Пойдем: параллелизм Голанга, часть 2

Одной из уникальных особенностей Go является использование каналов для безопасного общения между программами. В этой статье вы узнаете, что такое каналы, как их эффективно использовать, а также некоторые общие шаблоны.

Канал — это синхронизированная очередь в памяти, которую могут использовать программы и обычные функции для отправки и получения введенных значений. Связь сериализуется через канал.

Вы создаете канал с помощью make() и указываете тип значений, которые принимает канал:

ch := make(chan int)

Go предоставляет приятный синтаксис стрелок для отправки и получения в / из каналов:

1
2
3
4
5
// send value to a channel
   ch <- 5
 
   // receive value from a channel
   x := <- ch

Вам не нужно потреблять стоимость. Это нормально, просто вывести значение из канала:

<-ch

Каналы блокируются по умолчанию. Если вы отправите значение в канал, вы заблокируете его, пока его не получат. Точно так же, если вы получаете от канала, вы будете блокировать, пока кто-то не отправит значение в канал.

Следующая программа демонстрирует это. Функция main() создает канал и запускает вызываемую процедуру go, которая печатает «start», считывает значение из канала и печатает тоже. Затем main() запускает другую процедуру, которая печатает тире («-«) каждую секунду. Затем он спит в течение 2,5 секунд, отправляет значение в канал и спит еще 3 секунды, чтобы завершить выполнение всех процедур.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import (
    «fmt»
    «time»
)
 
func main() {
    ch := make(chan int)
 
    // Start a goroutine that reads a value from a channel and prints it
    go func(ch chan int) {
        fmt.Println(«start»)
        fmt.Println(<-ch)
    }(ch)
 
    // Start a goroutine that prints a dash every second
    go func() {
        for i := 0;
            time.Sleep(time.Second)
            fmt.Println(«-«)
        }
    }()
 
    // Sleep for two seconds
    time.Sleep(2500 * time.Millisecond)
 
    // Send a value to the channel
    ch <- 5
 
    // Sleep three more seconds to let all goroutines finish
    time.Sleep(3 * time.Second)
}

Эта программа очень хорошо демонстрирует блокирующий характер канала. Первая программа распечатывает «start» сразу, но затем блокируется при попытке получить от канала функцию main() , которая спит в течение 2,5 секунд и отправляет значение. Другая программа просто обеспечивает визуальное представление о потоке времени, регулярно печатая тире каждую секунду.

Вот вывод:

1
2
3
4
5
6
7
start
5

Такое поведение тесно связывает отправителей с получателями, а иногда это не то, что вы хотите. Go предоставляет несколько механизмов для решения этой проблемы.

Буферизованные каналы — это каналы, которые могут содержать определенное (предопределенное) количество значений, чтобы отправители не блокировали до тех пор, пока буфер не будет заполнен, даже если никто не получает.

Чтобы создать буферизованный канал, просто добавьте емкость в качестве второго аргумента:

ch := make(chan int, 5)

Следующая программа иллюстрирует поведение буферизованных каналов. Программа main() определяет буферизованный канал с пропускной способностью 3. Затем она запускает одну процедуру, которая каждую секунду читает буфер из канала и печатает, и другую программу, которая просто печатает тире каждую секунду, чтобы визуально показать ход выполнения. времени. Затем он отправляет пять значений в канал.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import (
    «fmt»
    «time»
)
 
 
func main() {
    ch := make(chan int, 3)
 
    // Start a goroutine that reads a value from the channel every second and prints it
    go func(ch chan int) {
        for {
            time.Sleep(time.Second)
            fmt.Printf(«Goroutine received: %d\n», <-ch)
        }
 
    }(ch)
 
    // Start a goroutine that prints a dash every second
    go func() {
        for i := 0;
            time.Sleep(time.Second)
            fmt.Println(«-«)
        }
    }()
 
    // Push values to the channel as fast as possible
    for i := 0;
        ch <- i
        fmt.Printf(«main() pushed: %d\n», i)
    }
 
    // Sleep five more seconds to let all goroutines finish
    time.Sleep(5 * time.Second)
}

Что происходит во время выполнения? Первые три значения сразу буферизуются каналом и функциональными блоками main() . Через секунду значение получено программой, и функция main() может выдвинуть другое значение. Проходит еще одна секунда, программа получает другое значение, и функция main() может выдвинуть последнее значение. На этом этапе программа продолжает получать значения из канала каждую секунду.

Вот вывод:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
main() pushed: 0
main() pushed: 1
main() pushed: 2
Goroutine received: 0
main() pushed: 3
Goroutine received: 1
main() pushed: 4
Goroutine received: 2
Goroutine received: 3
Goroutine received: 4

Буферизованные каналы (при условии, что буфер достаточно большой) могут решить проблему временных колебаний, когда не хватает приемников для обработки всех отправленных сообщений. Но есть и противоположная проблема заблокированных получателей, ожидающих обработки сообщений. Иди тебя прикрыл.

Что если вы хотите, чтобы ваша программа выполняла что-то еще, когда в канале нет сообщений для обработки? Хороший пример, если ваш приемник ожидает сообщений от нескольких каналов. Вы не хотите блокировать на канале A, если канал B имеет сообщения прямо сейчас. Следующая программа пытается вычислить сумму 3 и 5, используя полную мощность машины.

Идея состоит в том, чтобы моделировать сложную операцию (например, удаленный запрос к распределенной БД) с избыточностью. Функция sum() (обратите внимание, как она определена как вложенная функция внутри main() ) принимает два параметра int и возвращает канал int. Внутренняя анонимная программа неактивна некоторое время до одной секунды, а затем записывает сумму в канал, закрывает ее и возвращает.

Теперь основные вызовы sum(3, 5) четыре раза и сохраняют полученные каналы в переменных от ch1 до ch4. Четыре вызова sum() возвращаются немедленно, потому что случайный спящий процесс происходит внутри программы, которую вызывает каждая функция sum() .

Здесь начинается классная часть. Оператор select позволяет функции main() ждать на всех каналах и отвечать на первый, который возвращается. Оператор select работает немного как оператор switch .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func main() {
    r := rand.New(rand.NewSource(time.Now().UnixNano()))
 
    sum := func(a int, b int) <-chan int {
        ch := make(chan int)
        go func() {
            // Random time up to one second
            delay := time.Duration(r.Int()%1000) * time.Millisecond
            time.Sleep(delay)
            ch <- a + b
            close(ch)
        }()
        return ch
    }
 
    // Call sum 4 times with the same parameters
    ch1 := sum(3, 5)
    ch2 := sum(3, 5)
    ch3 := sum(3, 5)
    ch4 := sum(3, 5)
 
    // wait for the first goroutine to write to its channel
    select {
    case result := <-ch1:
        fmt.Printf(«ch1: 3 + 5 = %d», result)
    case result := <-ch2:
        fmt.Printf(«ch2: 3 + 5 = %d», result)
    case result := <-ch3:
        fmt.Printf(«ch3: 3 + 5 = %d», result)
    case result := <-ch4:
        fmt.Printf(«ch4: 3 + 5 = %d», result)
    }
}

Иногда вы не хотите, чтобы функция main() блокировала ожидание даже до завершения первой процедуры. В этом случае вы можете добавить вариант по умолчанию, который будет выполняться, если все каналы заблокированы.

В моей предыдущей статье я продемонстрировал решение упражнения для веб-сканера Tour of Go . Я использовал goroutines и синхронизированную карту. Я также решил упражнение, используя каналы. Полный исходный код для обоих решений доступен на GitHub .

Давайте посмотрим на соответствующие части. Во-первых, вот структура, которая будет отправляться на канал всякий раз, когда программа обрабатывает страницу. Он содержит текущую глубину и все URL-адреса, найденные на странице.

1
2
3
4
type links struct {
    urls []string
    depth int
}

Функция fetchURL() принимает URL, глубину и канал вывода. Он использует сборщик (предоставленный упражнением) для получения URL-адресов всех ссылок на странице. Он отправляет список URL-адресов в виде одного сообщения на канал кандидата в виде структуры links с уменьшенной глубиной. Глубина показывает, насколько дальше мы должны ползти. Когда глубина достигает 0, дальнейшая обработка не производится.

01
02
03
04
05
06
07
08
09
10
func fetchURL(url string, depth int, candidates chan links) {
    body, urls, err := fetcher.Fetch(url)
    fmt.Printf(«found: %s %q\n», url, body)
 
    if err != nil {
        fmt.Println(err)
    }
 
    candidates <- links{urls, depth — 1}
}

Функция ChannelCrawl() координирует все. Он отслеживает все URL, которые уже были извлечены на карте. Нет необходимости синхронизировать доступ, потому что никакая другая функция или процедура не касаются. Он также определяет канал-кандидат, в который все программы будут записывать свои результаты.

Затем он запускает parseUrl качестве parseUrl для каждого нового URL. Логика отслеживает, сколько гоу-рутин было запущено с помощью счетчика. Всякий раз, когда значение считывается из канала, счетчик уменьшается (так как отправляющая процедура завершается после отправки), и всякий раз, когда запускается новая процедура, счетчик увеличивается. Если глубина становится равной нулю, никакие новые подпрограммы не будут запущены, и основная функция будет продолжать чтение из канала, пока все подпрограммы не будут выполнены.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// ChannelCrawl crawls links from a seed url
func ChannelCrawl(url string, depth int, fetcher Fetcher) {
    candidates := make(chan links, 0)
    fetched := make(map[string]bool)
    counter := 1
 
    // Fetch initial url to seed the candidates channel
    go fetchURL(url, depth, candidates)
 
    for counter > 0 {
        candidateLinks := <-candidates
        counter—
        depth = candidateLinks.depth
        for _, candidate := range candidateLinks.urls {
            // Already fetched.
            if fetched[candidate] {
                continue
            }
 
            // Add to fetched mapped
            fetched[candidate] = true
 
            if depth > 0 {
                counter++
                go fetchURL(candidate, depth, candidates)
            }
        }
    }

Каналы Go предоставляют множество возможностей для безопасной связи между программами. Поддержка синтаксиса является как краткой, так и иллюстративной. Это настоящее благо для выражения параллельных алгоритмов. Существует гораздо больше каналов, чем я представил здесь. Я рекомендую вам погрузиться и ознакомиться с различными шаблонами параллелизма, которые они допускают.