Fan-out, fan-in — это мощный шаблон параллелизма, который обеспечивает эффективную параллельную обработку задач и эффективное использование системных ресурсов. Этот шаблон использует мощь горутин и каналов в Go для распределения рабочей нагрузки между несколькими работниками, тем самым повышая общую производительность приложения.
Разветвленная часть шаблона включает в себя распределение работы между несколькими рабочими горутинами. Эти горутины работают одновременно, каждая обрабатывает часть задач. Такой подход помогает увеличить пропускную способность и более эффективно обрабатывать большие наборы данных.
Вступительный аспект шаблона включает в себя сбор результатов рабочих горутин и объединение их в один вывод. Этот процесс обычно выполняется с помощью специальной горутины, которая прослушивает отдельные выходные каналы рабочих процессов, объединяет результаты и отправляет их в один выходной канал.
Шаблон разветвления-входа особенно полезен в ситуациях, когда задачи можно разделить на более мелкие независимые блоки и обрабатывать их одновременно. Этот шаблон не только повышает производительность приложения, но также повышает удобство сопровождения и удобочитаемость кода за счет разделения задач распределения задач и агрегирования результатов.
Пример на ходу
В этом примере у нас есть параллельная программа, которая загружает список файлов по URL-адресам, обрабатывает содержимое для подсчета количества слов в каждом файле, а затем объединяет результаты для получения общего количества слов.
Вот разбивка кода:
- Импортируйте необходимые пакеты.
2. Определите функцию simulateDownload(url string)
, которая имитирует загрузку файла с предоставленного URL-адреса и возвращает его содержимое в виде строки.
3. Определите функцию downloader(urls []string)
, которая берет фрагмент URL-адресов и возвращает канал, который отправляет содержимое каждого URL-адреса. Он запускает горутину, которая перебирает URL-адреса, имитирует загрузку и отправляет содержимое по каналу. Канал закрывается после обработки всех URL.
Это будет разветвленная часть: функция downloader
создает один канал downloadStream
, который отправляет содержимое каждого загруженного файла. Позже мы создадим несколько рабочих горутин, которые прослушивают этот общий канал, эффективно распределяя работу, которую нужно выполнять одновременно.
4. Определите функцию worker(in <-chan string)
, которая принимает на вход канал строк и возвращает канал целых чисел. Он запускает горутину, которая считывает содержимое из входного канала, печатает метку времени обработки и содержимое, подсчитывает количество слов в содержании и отправляет результат через выходной канал. Выходной канал закрывается после обработки всего содержимого.
5. Определите функцию merger(ins ...<-chan int)
, которая принимает вариативный параметр каналов с целыми значениями и возвращает канал с целыми значениями. Он объединяет входные каналы в один выходной канал. sync.WaitGroup
используется для ожидания обработки всех входных каналов, после чего выходной канал закрывается.
Это будет входящая часть: функция merger
объединяет результаты нескольких рабочих горутин, прослушивая их отдельные выходные каналы. Он использует sync.WaitGroup
, чтобы гарантировать, что он ожидает завершения всех рабочих горутин перед закрытием своего выходного канала.
6. В функции main
запустите генератор случайных чисел, определите часть URL-адресов и создайте поток загрузки, вызвав функцию downloader()
.
7. Определите количество рабочих процессов, создайте срез рабочих каналов и запустите рабочие горутины с потоком загрузки в качестве входных данных.
8. Объединить рабочие каналы с помощью функции merger()
.
9. Повторите объединенный канал, чтобы вычислить общее количество слов.
10. Распечатайте общее количество слов.
В этом примере показано, как использовать каналы и горутины для одновременной загрузки, обработки и объединения данных эффективным и организованным образом.
package main import ( "fmt" "math/rand" "strings" "sync" "time" ) // simulateDownload simulates downloading a file and returns its content. func simulateDownload(url string) string { time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return fmt.Sprintf("Content of %s", url) } // downloader downloads a list of URLs and returns the content. func downloader(urls []string) <-chan string { out := make(chan string) go func() { defer close(out) for _, url := range urls { out <- simulateDownload(url) } }() return out } // worker processes the content and returns the number of words. func worker(in <-chan string) <-chan int { out := make(chan int) go func() { defer close(out) for content := range in { fmt.Println(time.Now().Format("2006-01-02 15:04:05.000000000 -0700 MST")) fmt.Printf("Processing content: %s\n\n", content) words := strings.Fields(content) out <- len(words) } }() return out } // merger merges the results from multiple workers. func merger(ins ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup wg.Add(len(ins)) for _, in := range ins { go func(in <-chan int) { defer wg.Done() for n := range in { fmt.Printf("Merging result: %d\n", n) out <- n } }(in) } go func() { wg.Wait() close(out) }() return out } func main() { rand.Seed(time.Now().UnixNano()) urls := []string{ "https://example.com/file1.txt", "https://example.com/file2.txt", "https://example.com/file3.txt", "https://example.com/file4.txt", "https://example.com/file5.txt", } downloadStream := downloader(urls) numWorkers := 3 workerChannels := make([]<-chan int, numWorkers) for i := 0; i < numWorkers; i++ { workerChannels[i] = worker(downloadStream) } merged := merger(workerChannels...) totalWordCount := 0 for count := range merged { totalWordCount += count } fmt.Printf("Total word count: %d\n", totalWordCount) }
Заключение
Мы продемонстрировали, как мощную модель параллелизма Go, включая горутины и каналы, можно использовать для создания эффективных и параллельных программ. В нашем примере мы загрузили несколько файлов из списка URL-адресов, обработали их содержимое, чтобы подсчитать количество слов, и объединили результаты, используя методы параллельного программирования. В этом примере показано, как можно использовать Go для эффективного управления несколькими задачами одновременно, что приводит к повышению производительности и использованию ресурсов в реальных приложениях.
Если вам нравится читать статьи на Medium и вы заинтересованы в том, чтобы стать участником, я буду рад поделиться с вами своей реферальной ссылкой!