Go语言并发编程的几种主要操作介绍。
条件变量
在java中条件变量是与具体的锁想关联的,在go中也是这样的。
package main
import (
"fmt"
"sync"
"time"
)
var (
counter int //计数器
wg sync.WaitGroup //信号量
lock sync.Mutex //互斥锁
cond = sync.NewCond(&lock) //条件变量
)
func main() {
//1.获取锁
cond.L.Lock()
fmt.Println("main thread got lock ")
//2.开启线程执行一些事情
go doSomething()
//3.用与锁关联的条件变量的wait方法
fmt.Println("main thread begin wait ")
cond.Wait()
fmt.Println("main thread end wait ")
//4.释放锁
cond.L.Unlock()
fmt.Println("main thread release lock ")
}
func doSomething() {
//2.1激活阻塞到条件变量的wait方法的一个线程
time.Sleep(time.Second * 2)
//2.2获取锁
fmt.Println("sub thread begin get lock ")
//cond.L.Lock()
fmt.Println("sub thread got lock ")
time.Sleep(5 * time.Second)
cond.Signal()
//2.3释放锁
//cond.L.Unlock()
fmt.Println("sub thread release lock ")
}
- go中使用sync.NewCond(&lock)创建一个条件变量,其中lock可以是互斥锁或者读写锁
- 主线程线程先获取了lock锁(cond.L就是lock变量),然后开启了子线程,然后调用了条件变量的wait方法,main线程然后被阻塞,并且main线程会释放获取的lock锁。
- 子线程则首先尝试获取lock锁,如果是在main线程执行条件变量的wait前尝试获取锁,则子线程会被阻塞,等main线程执行到wait后,main函数释放锁后,子线程会获取锁成功。
- 子线程获取锁成功后,会先休眠5s然后释放锁,然后调用条件变量的signal方法,这时候main线程则会重新获取到锁,然后从wait返回。
需要注意的是调用条件变量的signal方法的线程在调用该方法前,获取关联的lock锁这个并不是必须的,读者可以注释获取和释放锁代码,也是OK的。
与Java中类似调用条件变量的signal会激活一个线程,调用Broadcast会激活所有阻塞到条件变量wait方法的线程。
另外需要注意,一般调用线程应该使用循环检查方式调用条件变量的wait方法,以避免虚假唤醒等问题。
go中条件变量与Java中条件变量类似,但是也有不同,相同在于条件变量都是与锁关联的,并且只有当线程获取到锁后才可以调用其关联的条件变量的wait方法,否则会抛出异常,另外当线程阻塞到wait方法后,当前线程会释放已经获取的锁。不同在于Java中只有当线程获取到锁后才可以调用其关联的条件变量的signal方法,否则会抛出异常,但是在go中调用线程调用signal前获取锁不是必须的。
CAS操作
go中的Cas操作与java中类似,都是借用了CPU提供的原子性指令来实现。CAS操作修改共享变量时候不需要对共享变量加锁,而是通过类似乐观锁的方式进行检查,本质还是不断的占用CPU 资源换取加锁带来的开销(比如上下文切换开销)。下面一个例子使用CAS来实现计数器
package main
import (
"fmt"
"sync"
"sync/atomic"
)
var (
counter int32 //计数器
wg sync.WaitGroup //信号量
)
func main() {
threadNum := 5
//1. 五个信号量
wg.Add(threadNum)
//2.开启5个线程
for i := 0; i < threadNum; i++ {
go incCounter(i)
}
//3.等待子线程结束
wg.Wait()
fmt.Println(counter)
}
func incCounter(index int) {
defer wg.Done()
spinNum := 0
for {
//2.1原子操作
old := counter
ok := atomic.CompareAndSwapInt32(&counter, old, old+1)
if ok {
break
} else {
spinNum++
}
}
fmt.Printf("thread,%d,spinnum,%d\n",index,spinNum)
}
- 如上代码main线程首先创建了5个信号量,然后开启五个线程执行incCounter方法
- incCounter内部执行代码2.1 使用cas操作递增counter的值, atomic.CompareAndSwapInt32具有三个参数,第一个是变量的地址,第二个是变量当前值,第三个是要修改变量为多少,该函数如果发现传递的old值等于当前变量的值,则使用第三个变量替换变量的值并返回true,否则返回false。
- 这里之所以使用无限循环是因为在高并发下每个线程执行CAS并不是每次都成功,失败了的线程需要重写获取变量当前的值,然后重新执行CAS操作。读者可以把线程数改为10000或者更多会发现输出thread,5329,spinnum,1其中1说明该线程尝试了两个CAS操作,第二次才成功。
go中CAS操作具有原子性,在解决多线程操作共享变量安全上可以有效的减少使用锁所带来的开销,但是这是使用cpu资源做交换的。
Load/Store操作
go中的Load和Store提供了原子性的读取和修改变量的功能,那么什么是原子性那?比如int64 类型的变量M,在字节长为32的计算机中,读取该变量M需要两次读取(比如先读取高32位,然后在读取底32位),写入的时候也是一样。正常情况下读取变量M的时候并没有加什么额外的措施,多个线程可以同时读写该变量的高32和底32位。
比如线程A读取变量M的高32时候,线程B在修改变量M的低32位,那么就会造成线程A读取的变量的M的高32位是原来的值,低32位是线程B修改后的值,从而导致脏数据,程序出错。
go中的atomic.LoadInt64(&M)
操作可以保证在读取变量M的时候,其他线程不能修改变量M, atomic.StoreInt64(&M, val)
操作则可以保证在修改变量的高低位时候其他线程不能读取该变量。
其实前面<>一文中使用CAS实现的计数器本身还是有问题的:
其中2.1 old := counter
处可能获取脏数据,此处应该修改为
old := atomic.LoadInt32(&counter)
go中的Load和Store操作提供了对变量原子性的操作,可以避免当类型长度大于计算机字节长时候多线程读写变量造成数据混乱,在java中变量声明为volatile可以避免该问题。