Golang 入门:等待 goroutine 完成任务

在 Go 语言编程中,goroutine 是其并发编程模型的核心。goroutine 是轻量级线程,由 Go 运行时管理,让我们能够以极低的资源开销创建成千上万的并发任务。然而,并发编程的一个关键挑战是如何有效地协调这些并发的执行流,特别是如何确保主程序能够等待所有 goroutine 完成任务后再继续或退出。本文将深入探讨在 Go 中等待 goroutine 完成的各种方法、最佳实践以及常见陷阱。

目录#

Goroutine 简介#

Goroutine 是 Go 语言并发设计的核心。它们比操作系统线程更轻量,启动更快,内存占用更少。创建一个 goroutine 非常简单,只需在函数调用前加上 go 关键字:

go func() {
    // 并发执行的代码
}()

然而,这种简单的启动方式背后隐藏着一个重要问题:主函数(main goroutine)不会自动等待其他 goroutine 完成。如果主 goroutine 先于其他 goroutine 结束,整个程序就会退出,可能导致一些 goroutine 没有机会完成它们的任务。

为什么需要等待 Goroutine 完成?#

考虑以下简单示例:

package main
 
import "fmt"
 
func main() {
    for i := 0; i < 3; i++ {
        go func(id int) {
            fmt.Printf("Goroutine %d 开始工作\n", id)
            // 模拟工作负载
            fmt.Printf("Goroutine %d 完成工作\n", id)
        }(i)
    }
    
    fmt.Println("主函数结束")
}

运行这个程序,你可能会发现只有"主函数结束"被打印出来,而 goroutine 中的输出可能完全看不到,或者只能看到部分输出。这是因为主 goroutine 结束后,程序立即退出,没有给其他 goroutine 足够的时间运行。

等待 Goroutine 完成的方法#

1. 使用 sync.WaitGroup(推荐)#

sync.WaitGroup 是 Go 标准库中专门用于等待一组 goroutine 完成的工具,也是最常用和推荐的方法

基本用法#

package main
 
import (
    "fmt"
    "sync"
    "time"
)
 
func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 在函数退出时通知 WaitGroup 当前任务完成
    
    fmt.Printf("Worker %d 开始工作\n", id)
    time.Sleep(time.Second) // 模拟工作耗时
    fmt.Printf("Worker %d 完成工作\n", id)
}
 
func main() {
    var wg sync.WaitGroup
    
    for i := 0; i < 3; i++ {
        wg.Add(1) // 每启动一个 goroutine 前增加计数器
        go worker(i, &wg)
    }
    
    wg.Wait() // 阻塞直到所有 goroutine 完成(计数器为 0)
    fmt.Println("所有工作完成,主函数结束")
}

关键方法说明#

  • Add(delta int):增加 WaitGroup 的计数器,delta 通常为 1
  • Done():减少计数器,通常使用 defer wg.Done() 确保函数退出时调用
  • Wait():阻塞当前 goroutine,直到计数器变为 0

高级用法:批量添加任务#

func main() {
    var wg sync.WaitGroup
    tasks := []string{"task1", "task2", "task3", "task4"}
    
    wg.Add(len(tasks)) // 一次性添加所有任务
    
    for _, task := range tasks {
        go func(t string) {
            defer wg.Done()
            processTask(t)
        }(task)
    }
    
    wg.Wait()
    fmt.Println("所有任务处理完成")
}
 
func processTask(task string) {
    fmt.Printf("处理任务: %s\n", task)
    time.Sleep(time.Second)
}

2. 使用通道(Channel)#

通道是 Go 中 goroutine 之间通信的主要方式,也可以用于同步。

使用通道进行同步#

package main
 
import (
    "fmt"
    "time"
)
 
func worker(id int, done chan bool) {
    fmt.Printf("Worker %d 开始工作\n", id)
    time.Sleep(time.Second)
    fmt.Printf("Worker %d 完成工作\n", id)
    done <- true // 发送完成信号
}
 
func main() {
    done := make(chan bool, 3) // 缓冲通道,容量为 goroutine 数量
    
    for i := 0; i < 3; i++ {
        go worker(i, done)
    }
    
    // 等待所有 goroutine 完成
    for i := 0; i < 3; i++ {
        <-done // 接收完成信号,会阻塞直到有数据
    }
    
    fmt.Println("所有工作完成,主函数结束")
}

使用 close()range 的优雅方式#

func main() {
    done := make(chan bool)
    
    for i := 0; i < 3; i++ {
        go func(id int) {
            fmt.Printf("Worker %d 开始工作\n", id)
            time.Sleep(time.Second)
            fmt.Printf("Worker %d 完成工作\n", id)
            done <- true
        }(i)
    }
    
    go func() {
        for i := 0; i < 3; i++ {
            <-done
        }
        close(done) // 关闭通道
    }()
    
    // 等待通道关闭
    for range done {
        // 通道关闭后循环会自动退出
    }
    
    fmt.Println("所有工作完成")
}

3. 使用 sync.Mutex 和条件变量#

对于更复杂的同步场景,可以使用互斥锁和条件变量。

package main
 
import (
    "fmt"
    "sync"
    "time"
)
 
func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    completed := 0
    total := 3
    
    for i := 0; i < total; i++ {
        go func(id int) {
            fmt.Printf("Worker %d 开始工作\n", id)
            time.Sleep(time.Second)
            fmt.Printf("Worker %d 完成工作\n", id)
            
            mu.Lock()
            completed++
            if completed == total {
                cond.Broadcast() // 通知所有等待的 goroutine
            }
            mu.Unlock()
        }(i)
    }
    
    mu.Lock()
    for completed < total {
        cond.Wait() // 等待条件满足
    }
    mu.Unlock()
    
    fmt.Println("所有工作完成")
}

最佳实践与常见陷阱#

最佳实践#

  1. 优先使用 sync.WaitGroup:对于简单的等待场景,WaitGroup 是最清晰和高效的选择。

  2. 正确传递 WaitGroup:始终传递 WaitGroup 的指针,而不是值:

    // 正确
    func worker(wg *sync.WaitGroup)
     
    // 错误 - 传递值会导致副本,计数器不会正确递减
    func worker(wg sync.WaitGroup)
  3. 使用 defer 调用 Done():确保即使在函数发生 panic 时也能正确递减计数器:

    func worker(wg *sync.WaitGroup) {
        defer wg.Done() // 确保一定会执行
        // 工作代码
    }
  4. 在启动 goroutine 前调用 Add()

    // 正确
    wg.Add(1)
    go worker(&wg)
     
    // 有风险 - 如果 worker 很快完成,可能在 Add 调用前就调用了 Done
    go func() {
        wg.Add(1)
        defer wg.Done()
        // ...
    }()

常见陷阱#

  1. 忘记调用 Done():会导致 Wait() 永久阻塞。

  2. 在 goroutine 外部调用 Done():可能导致计数器过早归零。

  3. 竞争条件:如果多个 goroutine 同时修改共享数据,需要适当的同步机制。

  4. goroutine 泄漏:忘记等待 goroutine 完成可能导致资源泄漏。

实际应用示例#

并行处理任务并收集结果#

package main
 
import (
    "fmt"
    "sync"
    "time"
)
 
type Result struct {
    TaskID int
    Value  int
    Error  error
}
 
func processTask(taskID int) Result {
    // 模拟处理过程
    time.Sleep(time.Millisecond * time.Duration(100+taskID*50))
    
    if taskID == 4 { // 模拟错误
        return Result{TaskID: taskID, Error: fmt.Errorf("task %d failed", taskID)}
    }
    
    return Result{TaskID: taskID, Value: taskID * 10}
}
 
func main() {
    tasks := []int{1, 2, 3, 4, 5, 6, 7, 8}
    results := make(chan Result, len(tasks))
    var wg sync.WaitGroup
    
    // 启动 worker goroutine
    for _, taskID := range tasks {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            result := processTask(id)
            results <- result
        }(taskID)
    }
    
    // 等待所有任务完成并关闭结果通道
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    var successful []Result
    var failed []Result
    
    for result := range results {
        if result.Error != nil {
            failed = append(failed, result)
            fmt.Printf("任务 %d 失败: %v\n", result.TaskID, result.Error)
        } else {
            successful = append(successful, result)
            fmt.Printf("任务 %d 成功完成,结果: %d\n", result.TaskID, result.Value)
        }
    }
    
    fmt.Printf("\n总结: 成功 %d 个任务,失败 %d 个任务\n", 
        len(successful), len(failed))
}

限制并发数量的 worker pool#

package main
 
import (
    "fmt"
    "sync"
    "time"
)
 
func workerPool(tasks []string, maxWorkers int) {
    taskChan := make(chan string, len(tasks))
    var wg sync.WaitGroup
    
    // 创建 worker pool
    for i := 0; i < maxWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for task := range taskChan {
                fmt.Printf("Worker %d 处理任务: %s\n", workerID, task)
                time.Sleep(time.Second) // 模拟处理时间
                fmt.Printf("Worker %d 完成任务: %s\n", workerID, task)
            }
        }(i)
    }
    
    // 发送任务到通道
    for _, task := range tasks {
        taskChan <- task
    }
    close(taskChan) // 关闭通道,worker 会在处理完所有任务后退出
    
    wg.Wait()
    fmt.Println("所有任务处理完成")
}
 
func main() {
    tasks := []string{"task1", "task2", "task3", "task4", "task5", "task6"}
    workerPool(tasks, 2) // 最多同时运行 2 个 worker
}

总结#

在 Go 语言中等待 goroutine 完成是并发编程的基本技能。sync.WaitGroup 是最常用和推荐的方法,适合大多数场景。通道提供了更灵活的通信和同步机制,而互斥锁和条件变量适用于更复杂的同步需求。

关键要点:

  • 理解每种方法的适用场景
  • 遵循最佳实践避免常见陷阱
  • 在真实项目中考虑错误处理和资源管理
  • 根据具体需求选择合适的并发模式

掌握这些技术将帮助你编写出健壮、高效的并发 Go 程序。

参考资料#

  1. Go 官方文档 - sync.WaitGroup
  2. Go 官方博客 - 并发不是并行
  3. Go 并发模式
  4. Go 语言圣经 - Goroutines 和 Channels