概述
并发编程是利用多核心能力,提升程序性能,而多线程之间需要相互协作、共享资源、线程安全等。任何并发模型都要解决线程间通讯问题,毫不夸张的说线程通讯是并发编程的主要问题。go
使用著名的CSP(Communicating Sequential Process,通讯顺序进程)并发模型,从设计之初 Go 语言就注重如何在编程语言层级上设计一个简洁安全高效的抽象模型,让程序员专注于分解问题和组合方案,而且不用被线程管理和信号互斥这些繁琐的操作分散精力。channel是线程简通讯的具体实现之一,本质就是一个线程安全的 FIFO
阻塞队列(先进先出),向队列中写入数据,在另一个线程从队列读取数据。很多语言都有类似实现,比如 Java 的线程池任务队列。
基本使用
通道是引用类型,需要使用 make 创建,格式如下
通道实例 := make(chan 数据类型, 通道长度)
- 数据类型:通道内传输的元素类型,可以基本数据类型,也可以使自定义数据类型。
- 通道实例:通过make创建的通道句柄,与函数名称一样,指向通道的内存首地址。
- 通道长度:通道本质是队列,创建时候可指定长度,默认为0
创建通道
ch1 := make(chan int) // 创建一个整型类型的通道
ch2 := make(chan interface{}) // 创建一个空接口类型的通道, 可以存放任意格式
ch3 := make(chan *Equip) // 创建Equip指针类型的通道, 可以存放*Equip
ch4 := make(chan *Equip, 10) // 创建Equip指针类型的通道, 并指定队列长度
通道本质就是线程安全的队列,创建时候可以指定队列长度,默认为0。
向通道写入数据,使用语法非常形象,写入channel ,读取
ch2 := make(chan interface{}, 10)
ch2
箭头语法虽然很形象,但是有些奇怪,也不利于扩展。使用函数方式感觉更好,也更主流,如func (p *chan) get() any
,func (p *chan) put(any) err
,扩展性也更强,通过参数可增加超时、同步、异步等技能。
箭头符号并没有规定位置,与C
指针一样,如下两个语句等效
ch1 := make(chan int)
i :=
箭头语法的读写有相对性,可读性一般,有时候无法分辨是读或写,看起来很奇怪,如下伪代码
func main() {
input := make(chan int, 2)
output := make(chan int, 2)
go func() {
input
管道是用于协程之间通讯,主流使用方式如下
ch2 := make(chan interface{}, 10)
go func() {
data :=
管道也支持遍历,与箭头符号一样,无数据时候循环将被阻塞,循环永远不会结束,除非关闭管道
chanInt := make(chan int, 10)
for chanInt, ok := range chanInts {
fmt.Println(chanInt)
}
管道也支持关闭,关闭后的管道不允许写入,panic 异常
chanInts := make(chan int, 10)
close(chanInts)
chanInts
读取则不同,已有数据可继续读取,无数据时返回false
,不阻塞
if value, ok :=
单向管道
管道也支持单向模式,仅允许读取、或者写入
var queue
函数形参也可以定义定向管道
func customer(channel
管道阻塞
Go
管道的读写都是同步模式,当管道容量还有空间,则写入成功,否则将阻塞直到写入成功。从管道读取也一样,有数据直接读取,否则将阻塞直到读取成功。
var done = make(chan bool)
func aGoroutine() {
fmt.Println("hello")
done
主协程从管道读取数据时将被阻塞,直到用户协程写入数据。管道非常适合用于生产者消费者模式,需要平滑两者的性能差异,可通过管道容量实现缓冲,所以除非特定场景,都建议管道容量大于零。
有些场景可以使用管道控制线程并发数
// 待补充
阻塞特性也带来了些问题,程序无法控制超时(箭头函数语法的后遗症),go 也提供了解决方案, 使用select
关键,与网络编程的select
函数类似,监测多个通道是否可读状态,都可读随机选择一个,都不可读进入Default
分支,否则阻塞
select {
case n :=
当然也可以使用select
向管道写入数据,只要不关闭管道总是可写入,此时加入default
分支永远不会被执行到,如下随机石头剪刀布
ch := make(chan string)
go func() {
for {
select {
case ch
模拟线程池
由于go的管道非常轻量且简服务器托管网洁,大部分直接使用,封装线程池模式并不常见。案例仅作为功能演示,非常简单几十行代码即可实现线程池的基本功能,体现了go并发模型的简洁、高效。
type Runnable interface {
Start()
}
// 线程池对象
type ThreadPool struct {
queueSize int
workSize int
channel chan Runnable
wait sync.WaitGroup
}
// 工作线程, 执行异步任务
func (pool *ThreadPool) doWorker(name string) {
log.Printf("%s 启动工作协程", name)
for true {
if runnable, ok :=
使用线程池
type person struct {
name string
}
func (p *person) Start() {
fmt.Println(p.name)
}
func main() {
threadPool := executor.NewThreadPool(10, 2) // 创建线程池, 队列长度10, 工作线程2
for i := 0; i
模拟管道
任何线程之间的通讯都依赖底层锁机制,channel是对锁机制封装后的实现对象,与Java中线程池任务队列机制几乎一样,但要简洁很多。使用切片简单模拟
接口声明
type Queue interface {
// Put 向队列添加任务, 添加成功返回true, 添加失败返回false, 队列满了则阻塞直到添加成功
Put(task interface{}) bool
// Get 从队列获取任务, 一直阻塞直到获取任务, 队列关闭且没有任务则返回false
Get() (interface{}, bool)
// Size 查看队列中的任务数
Size() int
// Close 关闭队列, 关闭后将无法添加任务, 已有的任务可以继续获取
Close()
}
基于切片实现
// SliceQueue 使用切片实现, 自动扩容属性队列永远都不会满, 扩容时候会触发数据复制, 性能一般
type SliceQueue struct {
sync.Mutex
cond *sync.Cond
queue []interface{}
close atomic.Bool
}
func (q *SliceQueue) Get() (data interface{}, ok bool) {
q.Lock()
defer q.Unlock()
for true {
if len(q.queue) == 0 {
if q.close.Load() == true {
return nil, false
}
q.cond.Wait()
}
if data := q.doGet(); data != nil {
return data, true
}
}
return
}
func (q *SliceQueue) doGet() interface{} {
if len(q.queue) >= 1 {
data := q.queue[0]
q.queue = q.queue[1:]
return data
}
return nil
}
func (q *SliceQueue) Put(task interface{}) bool {
q.Lock()
defer func() {
q.cond.Signal()
q.Unlock()
}()
if q.close.Load() == true {
return false
}
q.queue = append(q.queue, task)
return true
}
func (q *SliceQueue) Size() int {
return len(q.queue)
}
func (q *SliceQueue) Close() {
if q.close.Load() == true {
return
}
q.Lock()
defer q.Unlock()
q.close.Store(true)
q.cond.Broadcast()
}
func NewSliceQueue() Queue {
sliceQueue := &SliceQueue{queue: make([]interface{}, 0, 2)}
sliceQueue.cond = sync.NewCond(sliceQueue)
return sliceQueue
}
基于环行数组实现
type ArrayQueue struct {
sync.Mutex
readCond *sync.Cond
writeCond *sync.Cond
readIndex int
writeIndex int
queueMaxSize int
close atomic.Bool
queue []interface{}
}
func (q *ArrayQueue) Put(task interface{}) bool {
q.Lock()
defer q.Unlock()
for true {
if q.close.Load() == true {
return false
}
if q.Is服务器托管网Full() {
q.writeCond.Wait()
if q.IsFull() {
continue
}
}
q.queue[q.writeIndex] = task
q.writeIndex = (q.writeIndex + 1) % q.queueMaxSize
q.readCond.Signal()
return true
}
return true
}
func (q *ArrayQueue) Get() (interface{}, bool) {
q.Lock()
defer q.Unlock()
for true {
if q.IsEmpty() {
if q.close.Load() == true {
return nil, false
}
q.readCond.Wait()
if q.IsEmpty() {
continue
}
}
task := q.queue[q.readIndex]
q.readIndex = (q.readIndex + 1) % q.queueMaxSize
q.writeCond.Signal()
return task, true
}
return nil, true
}
func (q *ArrayQueue) Size() int {
return q.queueMaxSize
}
func (q *ArrayQueue) Close() {
if q.close.Load() == true {
return
}
q.Lock()
q.Unlock()
q.close.Store(true)
q.readCond.Broadcast()
}
func (q *ArrayQueue) IsFull() bool {
return (q.writeIndex+1)%q.queueMaxSize == q.readIndex
}
func (q *ArrayQueue) IsEmpty() bool {
return q.readIndex == q.writeIndex
}
func NewArrayQueue(size int) Queue {
queue := &ArrayQueue{queue: make([]interface{}, size), readIndex: 0, writeIndex: 0, queueMaxSize: size}
queue.readCond = sync.NewCond(queue)
queue.writeCond = sync.NewCond(queue)
return queue
}
测试用例
func TestWith(t *testing.T) {
q := NewSliceQueue()
go func() {
time.Sleep(time.Second * 2)
q.Put(true) // 向队列写入数据, 与 chan
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
ES6 或 ECMAScript 2015 是 ECMAScript 语言规范标准的第六版,也是主要版本。它定义了 JavaScript 的实现标准,并且比之前的 ES5 版本更加流行。 ES6 对 JavaScript 语言进行了重大更改。它带来了一些新功能…