package main
type Node struct {
value int
next *Node
}
type Queue struct {
head *Node
tail *Node
}
func CAS(...interface{}) bool {
return true
}
func (q *Queue) InitQueue() {
node := &Node{next: nil}
q.head = node
q.tail = node
}
// 不考虑并发情况下加入队列的方式
func (q *Queue) Enqueuev0(data int) {
node := &Node{data, nil}
p := q.tail // 读取这个操作不需要考虑并发的情况
p.next = node // 写入的操作需要考虑并发的情况,隐藏了 p.next 是 nil 的情况下,才将 p.next 指定为 nil
q.tail = node // 写入的操作需要考虑并发的情况,隐藏了 p == q.tail 的情况,才将 q.tail 指定为 nil
}
// 根据 v0 的方式,我们应该对后面的两步操作进行加锁或者原子操作
func (q *Queue) Enqueuev1(data int) {
node := &Node{data, nil}
var p *Node
for {
p = q.tail
if CAS(p.next, nil, node) == true {
break
}
}
CAS(q.tail, p, node)
}
// 在上述 v1 的基础之上,考虑一个线程执行完第一个 CAS 之后就挂掉了的情况
func (q *Queue) Enqueuev2(data int) {
node := &Node{data, nil}
p := q.tail
oldP := p
for {
for p.next != nil {
p = p.next
}
if CAS(p.next, nil, node) == true {
break
}
}
CAS(q.tail, oldP, node)
// var (
// p *Node
// oldP *Node
// )
// for {
// p = q.tail
// oldP = p
// for p.next != nil {
// p = p.next
// }
// if CAS(p.next, nil, node) == true {
// break
// }
// }
// CAS(q.tail, oldP, node)
}
// v2 的方法其实相当于每个线程单独找到尾节点,而不同线程中找尾节点的过程其实可以同步:
// 比如线程 T1 往下移动了一个,那么线程 T2 可以不用从头开始找,而是基于线程 T1 往下
// 移动的基础之上继续往下移动一个,这样就可以将找尾节点的步骤同步起来了。
// 基于上述的想法,我们可以可以在移动了一个节点之后,就通过 q.tail 进行同步起来,从而
// 实现过程的优化。
func (q *Queue) Enqueuev3(data int) {
node := &Node{data, nil}
var tail *Node
for {
// 找尾节点的过程
tail = q.tail
next := tail.next
// 感觉这段话,其实不影响最终的结果.
// 因为 next != nil 这个判断其实包含了这两种情况:
// next != nil && q.tail != tail ==> continue
// next != nil && q.tail == tail ==> fetch && continue
// 那么 这个判断主要处理的情况是:
// next == nil && q.tail != tail
// 但是在 next == null 的情况下,q.tail 肯定 == tail,
// 可以这么理解 q.tail 和 tail 的主要作用是找到尾节点,但是 next == null
// 那么 q.tail 和 tail 其实已经无所谓了
// 但是放在这感觉到的一个好处是可以提前判断
if q.tail != tail {
continue
}
if next != nil {
CAS(q.tail, tail, next)
continue
}
// 个人觉得下面这个 CAS 等同于 CAS(tail.next, nil, node)
if CAS(tail.next, next, node) == true {
break
}
}
CAS(q.tail, tail, node)
}
// 不考虑并发的情况
func (q *Queue) Dequeuev0() *Node {
p := q.head
if p.next == nil {
return nil
}
q.head = p.next // 隐藏的条件是 q.head == p,即 q.head.next = p.next
// q.head.next = p.next.next,这种方式稍微好理解点
return p.next
}
// 考虑并发的情况
func (q *Queue) Dequeuev1() *Node {
var p *Node
for {
p = q.head
if p.next == nil {
return nil
}
// CAS(q.head.next, p.next, p.next.next)
if CAS(q.head, p, p.next) == true {
break
}
}
return p.next
}
// 优雅的处理并发的情况,考虑 Enqueue 的时候,只进行了一个CAS。
// 那么此时需要加一些判断条件,在都经过这些判断之后,也就是不是只
// 处理了一个 CAS 的情况时,那么才进行 Dequeue 操作。
// 而在判断条件的处理过程中,如果发现了只进行了一个 CAS,那么同样
// 可以使用原子操作的方式对 tail 进行处理。
// 综上来说,其实在判断特殊的情况的时候,遵守一个原则,那就是当只
// 涉及到写共享数据的时候才进行原子操作,其他情况就可以不用原子操作
func (q *Queue) Dequeuev2() *Node {
var res *Node
for {
head := q.head
tail := q.tail
headNext := head.next
if head != q.head {
continue
}
if head == tail && headNext == nil {
return nil
}
if head == tail && headNext != nil {
CAS(q.tail, tail, headNext)
continue
}
if CAS(q.head, head, headNext) == true {
res = headNext
break
}
}
return res
}
func main() {
}
// 小结:
// 对于如何使用锁和原子操作,个人总结的一个方法如下:
// 1. 先把不用锁和原子操作的方式写出来;
// 2. 然后对这种方式中的操作进行分析,比如读取操作往往是不需要锁的,而写入的方式往往需要锁的,
// 所以在写入的前后加锁,并且往往只需要在这些地方使用锁或者原子操作即可,其他地方就按照正
// 常逻辑判断就好。
// 除此之外,假如我们需要使用 CAS 这种原子操作的话,我们需要明白一个点,锁的实现其实
// 是基于 CAS+循环,所以当我们直接使用 CAS 进行原子操作的时候,往往需要额外加循环。
// 基于 CAS 来说,个人觉得其实在不用锁和原子方式操作时,这些操作其实是有一种隐含关系
// 的,也就是在这种关系成立时,才进行诸如赋值等操作。
参考链接: