Go语言并发map

Go 语言的map在高并发场景下会Panic,本文分析了三种在高并发场景下使用map的解决方案。

Map是各个语言中都有一种数据结构,在原生支持并发的Go这里也例外。但是在高并发场景下使用原生的Map恰恰是存在一些坑的。

原生map高并发问题

Go官方博客中对有一些说明:

Maps are not safe for concurrent use: it’s not defined what happens when you read and write to them simultaneously. If you need to read from and write to a map from concurrently executing goroutines, the accesses must be mediated by some kind of synchronization mechanism. One common way to protect maps is with sync.RWMutex.

意思是说,并发访问map是不安全的,并发读写时会出现未定义行为。如果希望多协程访问读写map,必须提供某种同步机制,最常用的是sync.RWMutex

直接加锁

使用sync.RWMutex加锁是最直接的一种方式,但是存在两个问题:

  1. 使用起来并不方便,需要我们自己封装一层读写逻辑
  2. 性能比较差,写一个key会锁住整个map

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
"fmt"
"sync"
"time"
)

const CycleNum = 100000

type Map interface {
Set(key, value string)
Get(key string) (string, bool)
}

type MutexMap struct {
m sync.RWMutex
c map[string]string
}

func NewMutexMap() *MutexMap {
return &MutexMap{
c: make(map[string]string),
}
}

func (m *MutexMap) Set(key, value string) {
m.m.Lock()
defer m.m.Unlock()
m.c[key] = value
}

func (m *MutexMap) Get(key string) (string, bool) {
m.m.RLock()
defer m.m.RUnlock()
res, ok := m.c[key]
return res, ok
}

func main() {
beginTime := time.Now()

mutexMap := NewMutexMap()
wg := sync.WaitGroup{}
wg.Add(CycleNum)
for i := 0; i < CycleNum; i++ {
s := fmt.Sprintf("%d", i)
go func() {
mutexMap.Set(s, s)
}()
go func() {
mutexMap.Get(s)
}()
wg.Done()
}
wg.Wait()

spanTime := time.Now().Sub(beginTime)
fmt.Printf("spend time: %v", spanTime)
}

sync.Map

sync.Map 是Go 1.9 以后标准库中提供的并发Map。其内部实现是引入两个map将读写进行分离。

其中read map只提供读,而dirty map则负责写。这样read map就可以在不加锁的情况下进行并发读取,当read map中没有读取到值时,再加锁进行后续读取,并累加未命中数,当未命中数到达dirty map的长度时,用dirty map替换read map。虽然引入了两个map,但是底层数据存储的是指针,指向的是同一份值

源码:

数据结构

1
2
3
4
5
6
7
8
9
type Map struct {
// 互斥锁
mu Mutex
// 读map
read atomic.Value // readOnly
// 写map
dirty map[interface{}]*entry
misses int
}

Load方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// 避免在我们加锁的时候,key写入了读map中
// amended 标记dirty map中含有read map中不存在的数据
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// 记录miss数,到达dirty长度的时候进行替换
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}

Store方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
// 没有在read map中读取到值,或者读取到值但是更新失败
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// 读到的值为删除状态,写入dirty map中
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// dirty map中有这个值,进行更新
e.storeLocked(&value)
} else {
//
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}

分段锁

各场景下Benchmark

总结