Go有两种并发编程的风格,简单归纳为共享内存多线程的传统模型和支持通信顺序进程(CSP,在不同的执行体之间传递值)的并发模式。
共享内存的几种并发实现:
- 互斥锁/读写互斥锁:sync.Mutex/sync.RWMutex
- 原子操作库:sync.atomic
- 延迟初始化:sync.Once
var icons map[string]image.Image
func loadIcons() {
icons = map[string]image.Image{
"spades.png": loadIcon("spades.png"),
"hearts.png": loadIcon("hearts.png"),
...
}
}
// 并发不安全,loadIcons可能会被调用多次,且一个goroutine发现icons不是nil并不意味着变量的初始化肯定已经完成。
func Icon(name string) image.Image {
if icons == nil {
loadIcons()
}
return icons[name]
}
使用Mutex改造:
// goroutine 无法并发访问这个变量
func Icon(name string) image.Image {
mu.Lock()
defer mu.Unlock()
if icons == nil {
loadIcons()
}
return icons[name]
}
使用RWMutex进行改造:
func Icon(name string) image.Image {
mu.RLock()
if icon != nil {
icon := icons[name]
mu.RUnlock()
return icon
}
mu.RUnlock()
mu.Lock()
if icons == nil { //避免在锁升级的过程中其他goroutine已经初始化了icons
loadIcons()
}
icon := icons[name]
mu.UnLock()
return icon
}
使用sync.Once改造:
// Once 包含一个布尔变量和一个互斥量,布尔变量记录初始化是否已完成
var loadIconsOnce sync.Once
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
线程间通信:
sync.WaitGroup 信号量
wg := sync.WaitGroup{}
missionNum := 10
wg.Add(10)
for i := 1; i <= missionNum; i++ {
go func(count int) {
wg.Done()
}(i)
}
wg.Wait()
println("所有子任务执行完毕,接下来做主任务处理......")
无缓冲通道/同步通道
// main goroutine
done := make(chan struct{})
go func() {
// sub goroutine
done <- struct{}
}
<- done
select 多路复用
for {
select {
case <- ch1:
return // break仅跳出select语句
case x := <- ch2:
//...
case ch3 <- y:
//...
default:
//...
}
}
context.Context是Go 语言中用来设置截止日期、同步信号,传递请求相关值的结构体。 该接口定义了四个需要实现的方法,其中包括:
- Deadline — 返回 context.Context 被取消的时间,也就是完成工作的截止日期;
- Done — 返回一个 Channel,这个 Channel 会在当前工作完成或者上下文被取消后关闭,多次调用 Done 方法会返回同一个 Channel;
- Err — 返回 context.Context 结束的原因,它只会在 Done 方法对应的 Channel 关闭时返回非空的值; 如果 context.Context 被取消,会返回 Canceled 错误; 如果 context.Context 超时,会返回 DeadlineExceeded 错误;
- Value — 从 context.Context 中获取键对应的值,对于同一个上下文来说,多次调用 Value 并传入相同的 Key 会返回相同的结果,该方法可以用来传递请求特定的数据;
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
go handle(ctx, 500*time.Millisecond)
select {
case <-ctx.Done():
fmt.Println("main", ctx.Err())
}
}
func handle(ctx context.Context, duration time.Duration) {
select {
case <-ctx.Done():
fmt.Println("handle", ctx.Err())
case <-time.After(duration):
fmt.Println("process request with", duration)
}
}
运行结果,没有达到1s的超时时间:
process request with 500ms
main context deadline exceeded
从一些实践的例子来看context的使用:
stopChan := ctx.Done()
func (w *asyncWorker) Run(workerNumber int, stopChan <-chan struct{}) {
for i := 0; i < workerNumber; i++ {
go wait.Until(w.worker, 0, stopChan)
}
// Ensure all goroutines are cleaned up when the stop channel closes
go func() {
<-stopChan
w.queue.ShutDown()
}()
}
// Get the minimum value of MaxAvailableReplicas in terms of all estimators.
estimators := estimatorclient.GetReplicaEstimators()
ctx := context.WithValue(context.TODO(), util.ContextKeyObject,
fmt.Sprintf("kind=%s, name=%s/%s", spec.Resource.Kind, spec.Resource.Namespace, spec.Resource.Name))
for _, estimator := range estimators {
// context 带上资源的metadata信息
res, err := estimator.MaxAvailableReplicas(ctx, clusters, spec.ReplicaRequirements)
if err != nil {
klog.Errorf("Max cluster available replicas error: %v", err)
continue
}
for i := range res {
if res[i].Replicas == estimatorclient.UnauthenticReplica {
continue
}
if availableTargetClusters[i].Name == res[i].Name && availableTargetClusters[i].Replicas > res[i].Replicas {
availableTargetClusters[i].Replicas = res[i].Replicas
}
}
}
// add object information into gRPC metadata
if u, ok := parentCtx.Value(util.ContextKeyObject).(string); ok {
// AppendToOutgoingContext在context中加入一组新的kv键值对
parentCtx = metadata.AppendToOutgoingContext(parentCtx, string(util.ContextKeyObject), u)
}
ctx, cancel := context.WithTimeout(parentCtx, timeout)
defer cancel()
ctx, cancel := context.WithTimeout(context.TODO(), duration)
defer cancel() //在超时之前方法如果成功完成了需要手动cancel,ctx.Done会在超时后被关闭
wait.JitterUntil(func() {
cm, err = client.CoreV1().ConfigMaps(metav1.NamespacePublic).Get(context.TODO(), bootstrapapi.ConfigMapClusterInfo, metav1.GetOptions{})
if err != nil {
klog.V(1).Infof("[discovery] Failed to request cluster-info, will try again: %v", err)
return
}
// Even if the ConfigMap is available the JWS signature is patched-in a bit later.
// Make sure we retry util then.
if _, ok := cm.Data[bootstrapapi.JWSSignatureKeyPrefix+token.ID]; !ok {
klog.V(1).Infof("[discovery] The cluster-info ConfigMap does not yet contain a JWS signature for token ID %q, will try again", token.ID)
err = fmt.Errorf("could not find a JWS signature in the cluster-info ConfigMap for token ID %q", token.ID)
return
}
// Cancel the context on success
cancel()
}, interval, 0.3, true, ctx.Done())
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-stopChan
cancel()
}()
Note: 从源代码来看,context.Background 和 context.TODO 也只是互为别名,没有太大的差别,只是在使用和语义上稍有不同。context.Background 是上下文的默认值,所有其他的上下文都应该从它衍生出来。context.TODO 应该仅在不确定应该使用哪种上下文时使用。 在多数情况下,如果当前函数没有上下文作为入参,我们都会使用 context.Background 作为起始的上下文向下传递。
Refer to https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-context/.