Go

Go 并发编程

Posted by Poor12 Blog on August 6, 2023

Go有两种并发编程的风格,简单归纳为共享内存多线程的传统模型和支持通信顺序进程(CSP,在不同的执行体之间传递值)的并发模式。

共享内存的几种并发实现

  • 互斥锁/读写互斥锁:sync.Mutex/sync.RWMutex
  • 原子操作库:sync.atomic
  • 延迟初始化:sync.Once
var icons map[string]image.Image
func loadIcons() {
    icons = map[string]image.Image{
        "spades.png": loadIcon("spades.png"),
        "hearts.png": loadIcon("hearts.png"),
        ...
    }
}

// 并发不安全,loadIcons可能会被调用多次,且一个goroutine发现icons不是nil并不意味着变量的初始化肯定已经完成。
func Icon(name string) image.Image {
    if icons == nil {
        loadIcons()	
    }
    return icons[name]
}

使用Mutex改造:

// goroutine 无法并发访问这个变量
func Icon(name string) image.Image {
    mu.Lock()
    defer mu.Unlock()
    if icons == nil {
        loadIcons()	
    }
    return icons[name]
}

使用RWMutex进行改造:

func Icon(name string) image.Image {
    mu.RLock()
    if icon != nil {
        icon := icons[name]
        mu.RUnlock()
        return icon
    }
    mu.RUnlock()
	
    mu.Lock()
    if icons == nil {   //避免在锁升级的过程中其他goroutine已经初始化了icons
        loadIcons()
    }
    icon := icons[name]
    mu.UnLock()
    return icon
}

使用sync.Once改造:

// Once 包含一个布尔变量和一个互斥量,布尔变量记录初始化是否已完成
var loadIconsOnce sync.Once
func Icon(name string) image.Image {
	loadIconsOnce.Do(loadIcons)
	return icons[name]
}

线程间通信

sync.WaitGroup 信号量

wg := sync.WaitGroup{}
missionNum := 10
wg.Add(10)
for i := 1; i <= missionNum; i++ {
    go func(count int) {
        wg.Done()
    }(i)
}
wg.Wait()
println("所有子任务执行完毕,接下来做主任务处理......")

无缓冲通道/同步通道

// main goroutine
done := make(chan struct{})
go func() {
    // sub goroutine
    done <- struct{}
}
<- done

select 多路复用

for {
    select {
        case <- ch1:
            return  // break仅跳出select语句
        case x := <- ch2:
        //...
        case ch3 <- y:
        //...
        default:
        //...
    }
}	

context.Context是Go 语言中用来设置截止日期、同步信号,传递请求相关值的结构体。 该接口定义了四个需要实现的方法,其中包括:

  • Deadline — 返回 context.Context 被取消的时间,也就是完成工作的截止日期;
  • Done — 返回一个 Channel,这个 Channel 会在当前工作完成或者上下文被取消后关闭,多次调用 Done 方法会返回同一个 Channel;
  • Err — 返回 context.Context 结束的原因,它只会在 Done 方法对应的 Channel 关闭时返回非空的值; 如果 context.Context 被取消,会返回 Canceled 错误; 如果 context.Context 超时,会返回 DeadlineExceeded 错误;
  • Value — 从 context.Context 中获取键对应的值,对于同一个上下文来说,多次调用 Value 并传入相同的 Key 会返回相同的结果,该方法可以用来传递请求特定的数据;
type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}
func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	go handle(ctx, 500*time.Millisecond)
	select {
	case <-ctx.Done():
		fmt.Println("main", ctx.Err())
	}
}

func handle(ctx context.Context, duration time.Duration) {
	select {
	case <-ctx.Done():
		fmt.Println("handle", ctx.Err())
	case <-time.After(duration):
		fmt.Println("process request with", duration)
	}
}

运行结果,没有达到1s的超时时间:

process request with 500ms
main context deadline exceeded

从一些实践的例子来看context的使用:

stopChan := ctx.Done()
	
func (w *asyncWorker) Run(workerNumber int, stopChan <-chan struct{}) {
  for i := 0; i < workerNumber; i++ {
      go wait.Until(w.worker, 0, stopChan)
  }
  // Ensure all goroutines are cleaned up when the stop channel closes
  go func() {
      <-stopChan
      w.queue.ShutDown()
  }()
}
// Get the minimum value of MaxAvailableReplicas in terms of all estimators.
estimators := estimatorclient.GetReplicaEstimators()
ctx := context.WithValue(context.TODO(), util.ContextKeyObject,
    fmt.Sprintf("kind=%s, name=%s/%s", spec.Resource.Kind, spec.Resource.Namespace, spec.Resource.Name))
for _, estimator := range estimators {
	// context 带上资源的metadata信息
    res, err := estimator.MaxAvailableReplicas(ctx, clusters, spec.ReplicaRequirements)
    if err != nil {
        klog.Errorf("Max cluster available replicas error: %v", err)
        continue
    }
    for i := range res {
        if res[i].Replicas == estimatorclient.UnauthenticReplica {
            continue
        }
        if availableTargetClusters[i].Name == res[i].Name && availableTargetClusters[i].Replicas > res[i].Replicas {
            availableTargetClusters[i].Replicas = res[i].Replicas
        }
    }
}

// add object information into gRPC metadata
if u, ok := parentCtx.Value(util.ContextKeyObject).(string); ok {
	// AppendToOutgoingContext在context中加入一组新的kv键值对
    parentCtx = metadata.AppendToOutgoingContext(parentCtx, string(util.ContextKeyObject), u)
}
ctx, cancel := context.WithTimeout(parentCtx, timeout)
defer cancel()
ctx, cancel := context.WithTimeout(context.TODO(), duration)
defer cancel()  //在超时之前方法如果成功完成了需要手动cancel,ctx.Done会在超时后被关闭

wait.JitterUntil(func() {
    cm, err = client.CoreV1().ConfigMaps(metav1.NamespacePublic).Get(context.TODO(), bootstrapapi.ConfigMapClusterInfo, metav1.GetOptions{})
    if err != nil {
        klog.V(1).Infof("[discovery] Failed to request cluster-info, will try again: %v", err)
        return
    }
    // Even if the ConfigMap is available the JWS signature is patched-in a bit later.
    // Make sure we retry util then.
    if _, ok := cm.Data[bootstrapapi.JWSSignatureKeyPrefix+token.ID]; !ok {
        klog.V(1).Infof("[discovery] The cluster-info ConfigMap does not yet contain a JWS signature for token ID %q, will try again", token.ID)
        err = fmt.Errorf("could not find a JWS signature in the cluster-info ConfigMap for token ID %q", token.ID)
        return
    }
    // Cancel the context on success
    cancel()
}, interval, 0.3, true, ctx.Done())
ctx, cancel := context.WithCancel(context.Background())
go func() {
    <-stopChan
    cancel()
}()

Note: 从源代码来看,context.Background 和 context.TODO 也只是互为别名,没有太大的差别,只是在使用和语义上稍有不同。context.Background 是上下文的默认值,所有其他的上下文都应该从它衍生出来。context.TODO 应该仅在不确定应该使用哪种上下文时使用。 在多数情况下,如果当前函数没有上下文作为入参,我们都会使用 context.Background 作为起始的上下文向下传递。

Refer to https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-context/.