Golang 入门:等待 goroutine 完成任务
在 Go 语言编程中,goroutine 是其并发编程模型的核心。goroutine 是轻量级线程,由 Go 运行时管理,让我们能够以极低的资源开销创建成千上万的并发任务。然而,并发编程的一个关键挑战是如何有效地协调这些并发的执行流,特别是如何确保主程序能够等待所有 goroutine 完成任务后再继续或退出。本文将深入探讨在 Go 中等待 goroutine 完成的各种方法、最佳实践以及常见陷阱。
目录#
Goroutine 简介#
Goroutine 是 Go 语言并发设计的核心。它们比操作系统线程更轻量,启动更快,内存占用更少。创建一个 goroutine 非常简单,只需在函数调用前加上 go 关键字:
go func() {
// 并发执行的代码
}()然而,这种简单的启动方式背后隐藏着一个重要问题:主函数(main goroutine)不会自动等待其他 goroutine 完成。如果主 goroutine 先于其他 goroutine 结束,整个程序就会退出,可能导致一些 goroutine 没有机会完成它们的任务。
为什么需要等待 Goroutine 完成?#
考虑以下简单示例:
package main
import "fmt"
func main() {
for i := 0; i < 3; i++ {
go func(id int) {
fmt.Printf("Goroutine %d 开始工作\n", id)
// 模拟工作负载
fmt.Printf("Goroutine %d 完成工作\n", id)
}(i)
}
fmt.Println("主函数结束")
}运行这个程序,你可能会发现只有"主函数结束"被打印出来,而 goroutine 中的输出可能完全看不到,或者只能看到部分输出。这是因为主 goroutine 结束后,程序立即退出,没有给其他 goroutine 足够的时间运行。
等待 Goroutine 完成的方法#
1. 使用 sync.WaitGroup(推荐)#
sync.WaitGroup 是 Go 标准库中专门用于等待一组 goroutine 完成的工具,也是最常用和推荐的方法。
基本用法#
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 在函数退出时通知 WaitGroup 当前任务完成
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second) // 模拟工作耗时
fmt.Printf("Worker %d 完成工作\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1) // 每启动一个 goroutine 前增加计数器
go worker(i, &wg)
}
wg.Wait() // 阻塞直到所有 goroutine 完成(计数器为 0)
fmt.Println("所有工作完成,主函数结束")
}关键方法说明#
Add(delta int):增加 WaitGroup 的计数器,delta 通常为 1Done():减少计数器,通常使用defer wg.Done()确保函数退出时调用Wait():阻塞当前 goroutine,直到计数器变为 0
高级用法:批量添加任务#
func main() {
var wg sync.WaitGroup
tasks := []string{"task1", "task2", "task3", "task4"}
wg.Add(len(tasks)) // 一次性添加所有任务
for _, task := range tasks {
go func(t string) {
defer wg.Done()
processTask(t)
}(task)
}
wg.Wait()
fmt.Println("所有任务处理完成")
}
func processTask(task string) {
fmt.Printf("处理任务: %s\n", task)
time.Sleep(time.Second)
}2. 使用通道(Channel)#
通道是 Go 中 goroutine 之间通信的主要方式,也可以用于同步。
使用通道进行同步#
package main
import (
"fmt"
"time"
)
func worker(id int, done chan bool) {
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成工作\n", id)
done <- true // 发送完成信号
}
func main() {
done := make(chan bool, 3) // 缓冲通道,容量为 goroutine 数量
for i := 0; i < 3; i++ {
go worker(i, done)
}
// 等待所有 goroutine 完成
for i := 0; i < 3; i++ {
<-done // 接收完成信号,会阻塞直到有数据
}
fmt.Println("所有工作完成,主函数结束")
}使用 close() 和 range 的优雅方式#
func main() {
done := make(chan bool)
for i := 0; i < 3; i++ {
go func(id int) {
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成工作\n", id)
done <- true
}(i)
}
go func() {
for i := 0; i < 3; i++ {
<-done
}
close(done) // 关闭通道
}()
// 等待通道关闭
for range done {
// 通道关闭后循环会自动退出
}
fmt.Println("所有工作完成")
}3. 使用 sync.Mutex 和条件变量#
对于更复杂的同步场景,可以使用互斥锁和条件变量。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
completed := 0
total := 3
for i := 0; i < total; i++ {
go func(id int) {
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成工作\n", id)
mu.Lock()
completed++
if completed == total {
cond.Broadcast() // 通知所有等待的 goroutine
}
mu.Unlock()
}(i)
}
mu.Lock()
for completed < total {
cond.Wait() // 等待条件满足
}
mu.Unlock()
fmt.Println("所有工作完成")
}最佳实践与常见陷阱#
最佳实践#
-
优先使用
sync.WaitGroup:对于简单的等待场景,WaitGroup 是最清晰和高效的选择。 -
正确传递 WaitGroup:始终传递 WaitGroup 的指针,而不是值:
// 正确 func worker(wg *sync.WaitGroup) // 错误 - 传递值会导致副本,计数器不会正确递减 func worker(wg sync.WaitGroup) -
使用 defer 调用 Done():确保即使在函数发生 panic 时也能正确递减计数器:
func worker(wg *sync.WaitGroup) { defer wg.Done() // 确保一定会执行 // 工作代码 } -
在启动 goroutine 前调用 Add():
// 正确 wg.Add(1) go worker(&wg) // 有风险 - 如果 worker 很快完成,可能在 Add 调用前就调用了 Done go func() { wg.Add(1) defer wg.Done() // ... }()
常见陷阱#
-
忘记调用 Done():会导致 Wait() 永久阻塞。
-
在 goroutine 外部调用 Done():可能导致计数器过早归零。
-
竞争条件:如果多个 goroutine 同时修改共享数据,需要适当的同步机制。
-
goroutine 泄漏:忘记等待 goroutine 完成可能导致资源泄漏。
实际应用示例#
并行处理任务并收集结果#
package main
import (
"fmt"
"sync"
"time"
)
type Result struct {
TaskID int
Value int
Error error
}
func processTask(taskID int) Result {
// 模拟处理过程
time.Sleep(time.Millisecond * time.Duration(100+taskID*50))
if taskID == 4 { // 模拟错误
return Result{TaskID: taskID, Error: fmt.Errorf("task %d failed", taskID)}
}
return Result{TaskID: taskID, Value: taskID * 10}
}
func main() {
tasks := []int{1, 2, 3, 4, 5, 6, 7, 8}
results := make(chan Result, len(tasks))
var wg sync.WaitGroup
// 启动 worker goroutine
for _, taskID := range tasks {
wg.Add(1)
go func(id int) {
defer wg.Done()
result := processTask(id)
results <- result
}(taskID)
}
// 等待所有任务完成并关闭结果通道
go func() {
wg.Wait()
close(results)
}()
// 收集结果
var successful []Result
var failed []Result
for result := range results {
if result.Error != nil {
failed = append(failed, result)
fmt.Printf("任务 %d 失败: %v\n", result.TaskID, result.Error)
} else {
successful = append(successful, result)
fmt.Printf("任务 %d 成功完成,结果: %d\n", result.TaskID, result.Value)
}
}
fmt.Printf("\n总结: 成功 %d 个任务,失败 %d 个任务\n",
len(successful), len(failed))
}限制并发数量的 worker pool#
package main
import (
"fmt"
"sync"
"time"
)
func workerPool(tasks []string, maxWorkers int) {
taskChan := make(chan string, len(tasks))
var wg sync.WaitGroup
// 创建 worker pool
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range taskChan {
fmt.Printf("Worker %d 处理任务: %s\n", workerID, task)
time.Sleep(time.Second) // 模拟处理时间
fmt.Printf("Worker %d 完成任务: %s\n", workerID, task)
}
}(i)
}
// 发送任务到通道
for _, task := range tasks {
taskChan <- task
}
close(taskChan) // 关闭通道,worker 会在处理完所有任务后退出
wg.Wait()
fmt.Println("所有任务处理完成")
}
func main() {
tasks := []string{"task1", "task2", "task3", "task4", "task5", "task6"}
workerPool(tasks, 2) // 最多同时运行 2 个 worker
}总结#
在 Go 语言中等待 goroutine 完成是并发编程的基本技能。sync.WaitGroup 是最常用和推荐的方法,适合大多数场景。通道提供了更灵活的通信和同步机制,而互斥锁和条件变量适用于更复杂的同步需求。
关键要点:
- 理解每种方法的适用场景
- 遵循最佳实践避免常见陷阱
- 在真实项目中考虑错误处理和资源管理
- 根据具体需求选择合适的并发模式
掌握这些技术将帮助你编写出健壮、高效的并发 Go 程序。