Go 语言的map在高并发场景下会Panic,本文分析了三种在高并发场景下使用map的解决方案。
Map是各个语言中都有一种数据结构,在原生支持并发的Go这里也例外。但是在高并发场景下使用原生的Map恰恰是存在一些坑的。
原生map高并发问题
Go官方博客中对有一些说明:
Maps are not safe for concurrent use: it’s not defined what happens when you read and write to them simultaneously. If you need to read from and write to a map from concurrently executing goroutines, the accesses must be mediated by some kind of synchronization mechanism. One common way to protect maps is with sync.RWMutex.
意思是说,并发访问map是不安全的,并发读写时会出现未定义行为。如果希望多协程访问读写map,必须提供某种同步机制,最常用的是sync.RWMutex
。
直接加锁
使用sync.RWMutex
加锁是最直接的一种方式,但是存在两个问题:
- 使用起来并不方便,需要我们自己封装一层读写逻辑
- 性能比较差,写一个key会锁住整个map
示例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60package main
import (
"fmt"
"sync"
"time"
)
const CycleNum = 100000
type Map interface {
Set(key, value string)
Get(key string) (string, bool)
}
type MutexMap struct {
m sync.RWMutex
c map[string]string
}
func NewMutexMap() *MutexMap {
return &MutexMap{
c: make(map[string]string),
}
}
func (m *MutexMap) Set(key, value string) {
m.m.Lock()
defer m.m.Unlock()
m.c[key] = value
}
func (m *MutexMap) Get(key string) (string, bool) {
m.m.RLock()
defer m.m.RUnlock()
res, ok := m.c[key]
return res, ok
}
func main() {
beginTime := time.Now()
mutexMap := NewMutexMap()
wg := sync.WaitGroup{}
wg.Add(CycleNum)
for i := 0; i < CycleNum; i++ {
s := fmt.Sprintf("%d", i)
go func() {
mutexMap.Set(s, s)
}()
go func() {
mutexMap.Get(s)
}()
wg.Done()
}
wg.Wait()
spanTime := time.Now().Sub(beginTime)
fmt.Printf("spend time: %v", spanTime)
}
sync.Map
sync.Map
是Go 1.9 以后标准库中提供的并发Map。其内部实现是引入两个map将读写进行分离。
其中read map只提供读,而dirty map则负责写。这样read map就可以在不加锁的情况下进行并发读取,当read map中没有读取到值时,再加锁进行后续读取,并累加未命中数,当未命中数到达dirty map的长度时,用dirty map替换read map。虽然引入了两个map,但是底层数据存储的是指针,指向的是同一份值
源码:
数据结构1
2
3
4
5
6
7
8
9type Map struct {
// 互斥锁
mu Mutex
// 读map
read atomic.Value // readOnly
// 写map
dirty map[interface{}]*entry
misses int
}
Load方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// 避免在我们加锁的时候,key写入了读map中
// amended 标记dirty map中含有read map中不存在的数据
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// 记录miss数,到达dirty长度的时候进行替换
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
Store方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
// 没有在read map中读取到值,或者读取到值但是更新失败
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// 读到的值为删除状态,写入dirty map中
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// dirty map中有这个值,进行更新
e.storeLocked(&value)
} else {
//
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}