Golang Concurrency:sync.errgroup


背景

在开发中,经常会遇到一个接口需要组合多种数据返回的情况,并且所有数据必须全部成功,否则就需要返回错误。以下图的微博举例:

golang-errgroup

一条微博由两部分数据组成:博主 + 微博。如果需要完整显示这条微博,两者缺一不可。微博的URL为 https://weibo.com/1227368500/H3GIgngon,构成为:协议://域名/用户ID/微博mid。由于提前知道 博主ID微博ID,因此在读取的时候可以并发两个请求分别获取 博主微博 数据。

并发控制

在并发获取数据的时候,会遇到两种异常情况:

  1. 请求出错
  2. 请求超时

由于两部分数据缺一不可,无论哪个请求出现异常,此时再继续处理该请求都毫无意义,此时最好的做法是:

  1. 停止仍在处理中的请求
  2. 将第一个异常返回给用户

快速失败(Fail-fast) 带来的好处显而易见:

  1. 停止无效的请求,减少资源资源
  2. 快速返回,方便用户再次重试,降低偶然出错的影响

sync.errgroup

那么,Go 语言中该怎么实现呢? golang 提供了一个很好用的小组件:errgroup。仍然以微博为例,来演示如何使用该函数:

package main

import (
	"context"
	"fmt"
	"golang.org/x/sync/errgroup"
	"os"
)

var (
	Blogger   = find("1227368500")
	Weibo = find("H3GIgngon")
)

type Result string
type Find func(ctx context.Context, query string) (Result, error)

func find(kind string) Find {
	return func(_ context.Context, query string) (Result, error) {
		return Result(fmt.Sprintf("%s result for %q", kind, query)), nil
	}
}

func main() {
	SinaWeibo := func(ctx context.Context, query string) ([]Result, error) {
		g, ctx := errgroup.WithContext(ctx)

		finds := []Find{Blogger, Weibo}
		results := make([]Result, len(finds))
		for i, find := range finds {
			i, find := i, find // https://golang.org/doc/faq#closures_and_goroutines
			g.Go(func() error {
				result, err := find(ctx, query)
				if err == nil {
					results[i] = result
				}
				return err
			})
		}
		if err := g.Wait(); err != nil {
			return nil, err
		}
		return results, nil
	}

	results, err := SinaWeibo(context.Background(), "https://weibo.com/1227368500/H3GIgngon")
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}
	for _, result := range results {
		fmt.Println(result)
	}

}

errgroup 使用 Context 来控制超时,使用 Go() 函数返回的第一个错误来停止所有协程。使用 errgroup 尤其需要小心踩坑 闭包 问题

源码解析

那么 errgroup 究竟是如何实现的呢?errgroup 简单的令人惊讶,却功能强大。代码+注释不过六十余行,值得仔细品味。源码如下:

// Package errgroup provides synchronization, error propagation, and Context
// cancelation for groups of goroutines working on subtasks of a common task.
package errgroup

import (
	"context"
	"sync"
)

// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid and does not cancel on error.
type Group struct {
	cancel func()

	wg sync.WaitGroup

	errOnce sync.Once
	err     error
}

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	return &Group{cancel: cancel}, ctx
}

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
	g.wg.Wait()
	if g.cancel != nil {
		g.cancel()
	}
	return g.err
}

// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
	g.wg.Add(1)

	go func() {
		defer g.wg.Done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
}

问题

  1. 除了官方实现,其实 bilbil 也有类似实现 bilbil/kratos。为什么官方库却没有选择实现以下两个功能?:
    • 最大并发限制
    • panic 恢复
  2. 同样以微博为例,用户访问微博的时候,同样需要获取“评论数”、“点赞数”、“是否已关注”。不同的是,这些数据并非关键数据,获取失败却不影响用户查看微博。因此,在官宣恋情婚讯 的高峰期,其实可以尽力提供服务。那么非关键数据,如何实现 尽力 并发获取?

本文作者:cyningsun
本文地址https://www.cyningsun.com/12-30-2020/golang-errgroup.html
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-ND 3.0 CN 许可协议。转载请注明出处!

# Golang