Fan-out, fan-in — это мощный шаблон параллелизма, который обеспечивает эффективную параллельную обработку задач и эффективное использование системных ресурсов. Этот шаблон использует мощь горутин и каналов в Go для распределения рабочей нагрузки между несколькими работниками, тем самым повышая общую производительность приложения.

Разветвленная часть шаблона включает в себя распределение работы между несколькими рабочими горутинами. Эти горутины работают одновременно, каждая обрабатывает часть задач. Такой подход помогает увеличить пропускную способность и более эффективно обрабатывать большие наборы данных.

Вступительный аспект шаблона включает в себя сбор результатов рабочих горутин и объединение их в один вывод. Этот процесс обычно выполняется с помощью специальной горутины, которая прослушивает отдельные выходные каналы рабочих процессов, объединяет результаты и отправляет их в один выходной канал.

Шаблон разветвления-входа особенно полезен в ситуациях, когда задачи можно разделить на более мелкие независимые блоки и обрабатывать их одновременно. Этот шаблон не только повышает производительность приложения, но также повышает удобство сопровождения и удобочитаемость кода за счет разделения задач распределения задач и агрегирования результатов.

Пример на ходу

В этом примере у нас есть параллельная программа, которая загружает список файлов по URL-адресам, обрабатывает содержимое для подсчета количества слов в каждом файле, а затем объединяет результаты для получения общего количества слов.

Вот разбивка кода:

  1. Импортируйте необходимые пакеты.

2. Определите функцию simulateDownload(url string), которая имитирует загрузку файла с предоставленного URL-адреса и возвращает его содержимое в виде строки.

3. Определите функцию downloader(urls []string), которая берет фрагмент URL-адресов и возвращает канал, который отправляет содержимое каждого URL-адреса. Он запускает горутину, которая перебирает URL-адреса, имитирует загрузку и отправляет содержимое по каналу. Канал закрывается после обработки всех URL.

Это будет разветвленная часть: функция downloader создает один канал downloadStream, который отправляет содержимое каждого загруженного файла. Позже мы создадим несколько рабочих горутин, которые прослушивают этот общий канал, эффективно распределяя работу, которую нужно выполнять одновременно.

4. Определите функцию worker(in <-chan string), которая принимает на вход канал строк и возвращает канал целых чисел. Он запускает горутину, которая считывает содержимое из входного канала, печатает метку времени обработки и содержимое, подсчитывает количество слов в содержании и отправляет результат через выходной канал. Выходной канал закрывается после обработки всего содержимого.

5. Определите функцию merger(ins ...<-chan int), которая принимает вариативный параметр каналов с целыми значениями и возвращает канал с целыми значениями. Он объединяет входные каналы в один выходной канал. sync.WaitGroup используется для ожидания обработки всех входных каналов, после чего выходной канал закрывается.

Это будет входящая часть: функция merger объединяет результаты нескольких рабочих горутин, прослушивая их отдельные выходные каналы. Он использует sync.WaitGroup, чтобы гарантировать, что он ожидает завершения всех рабочих горутин перед закрытием своего выходного канала.

6. В функции main запустите генератор случайных чисел, определите часть URL-адресов и создайте поток загрузки, вызвав функцию downloader().

7. Определите количество рабочих процессов, создайте срез рабочих каналов и запустите рабочие горутины с потоком загрузки в качестве входных данных.

8. Объединить рабочие каналы с помощью функции merger().

9. Повторите объединенный канал, чтобы вычислить общее количество слов.

10. Распечатайте общее количество слов.

В этом примере показано, как использовать каналы и горутины для одновременной загрузки, обработки и объединения данных эффективным и организованным образом.

package main

import (
 "fmt"
 "math/rand"
 "strings"
 "sync"
 "time"
)

// simulateDownload simulates downloading a file and returns its content.
func simulateDownload(url string) string {
 time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
 return fmt.Sprintf("Content of %s", url)
}

// downloader downloads a list of URLs and returns the content.
func downloader(urls []string) <-chan string {
 out := make(chan string)
 go func() {
  defer close(out)
  for _, url := range urls {
   out <- simulateDownload(url)
  }
 }()
 return out
}

// worker processes the content and returns the number of words.
func worker(in <-chan string) <-chan int {
 out := make(chan int)
 go func() {
  defer close(out)
  for content := range in {
   fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000000 -0700 MST"))
   fmt.Printf("Processing content: %s\n\n", content)
   words := strings.Fields(content)
   out <- len(words)
  }
 }()
 return out
}

// merger merges the results from multiple workers.
func merger(ins ...<-chan int) <-chan int {
 out := make(chan int)
 var wg sync.WaitGroup
 wg.Add(len(ins))

 for _, in := range ins {
  go func(in <-chan int) {
   defer wg.Done()
   for n := range in {
    fmt.Printf("Merging result: %d\n", n)
    out <- n
   }
  }(in)
 }

 go func() {
  wg.Wait()
  close(out)
 }()

 return out
}

func main() {
 rand.Seed(time.Now().UnixNano())

 urls := []string{
  "https://example.com/file1.txt",
  "https://example.com/file2.txt",
  "https://example.com/file3.txt",
  "https://example.com/file4.txt",
  "https://example.com/file5.txt",
 }

 downloadStream := downloader(urls)
 numWorkers := 3

 workerChannels := make([]<-chan int, numWorkers)
 for i := 0; i < numWorkers; i++ {
  workerChannels[i] = worker(downloadStream)
 }

 merged := merger(workerChannels...)

 totalWordCount := 0
 for count := range merged {
  totalWordCount += count
 }

 fmt.Printf("Total word count: %d\n", totalWordCount)
}

Заключение

Мы продемонстрировали, как мощную модель параллелизма Go, включая горутины и каналы, можно использовать для создания эффективных и параллельных программ. В нашем примере мы загрузили несколько файлов из списка URL-адресов, обработали их содержимое, чтобы подсчитать количество слов, и объединили результаты, используя методы параллельного программирования. В этом примере показано, как можно использовать Go для эффективного управления несколькими задачами одновременно, что приводит к повышению производительности и использованию ресурсов в реальных приложениях.

Если вам нравится читать статьи на Medium и вы заинтересованы в том, чтобы стать участником, я буду рад поделиться с вами своей реферальной ссылкой!

https://medium.com/@adamszpilewicz/membership