上一篇文章中我们已经完成了 channel 的设计,这里我们继续完成 topic 的设计工作。
topic
字段设计
topic 的作用是接收客户端的消息,然后同时发送给所有绑定的 channel 上,所以它的设计和 channel 很类似,包含的字段有:
- name:名称
- newChannelChan:新增 channel 的管道
- channelMap:维护的 channel 集合
- incomingMessageChan:接收消息的管道
- msgChan:有缓冲管道,相当于消息的内存队列
- readSyncChan:和 routerSyncChan 配合使用保证 channelMap 的并发安全
- routerSyncChan:见上
- exitChan:接收退出信号的管道
- channelWriteStarted:是否已向 channel 发送消息
topic 工厂
我们需要维护一个全局的 topic map,在消费者订阅时生成新的 topic,类似于一个工厂,逻辑与第一篇中生成 uuid 类似:
注:Router 是 topic 的事件处理方法,详情见后文。
var (
TopicMap = make(map[string]*Topic)
newTopicChan = make(chan util.ChanReq)
)
func NewTopic(name string, inMemSize int) *Topic {
topic := &Topic{
name: name,
newChannelChan: make(chan util.ChanReq),
channelMap: make(map[string]*Channel),
incomingMessageChan: make(chan *Message),
msgChan: make(chan *Message, inMemSize),
readSyncChan: make(chan struct{}),
routerSyncChan: make(chan struct{}),
exitChan: make(chan util.ChanReq),
}
go topic.Router(inMemSize)
return topic
}
func GetTopic(name string) *Topic {
topicChan := make(chan interface{})
newTopicChan
维护 channel
topic 维护 channel 的逻辑和 channel 维护消费者相似,也是“老熟人” chan + slice 的组合:
func (t *Topic) GetChannel(channelName string) *Channel {
channelRet := make(chan interface{})
t.newChannelChan
推送消息给 channel
此处的逻辑依然与 channel 中的设计类似,直接贴代码:
func (t *Topic) PutMessage(msg *Message) {
t.incomingMessageChan
我们还是从 incomingMessageChan 中读取消息,然后写入 msgChan,msgChan 缓冲区满了就丢弃(后续会加上持久化磁盘功能)。推送消息到 channel 的协程是在添加 channel 时开启的,因为没有 channel 的话 topic 并不会推送消息。
在向所有 channel 推送消息的前后,我们发现多了两个读管道的操作,这样做的目的是避免 map 的并发读写错误。在 Go 语言中 map 是不支持并发读写的,因此我们在遍历 channel 之前先读取 readSyncChan,确保我们在遍历的时候调度协程是阻塞在 这个 case 上,避免了对 map 的并发写操作。
关闭
关闭操作相信大家已经了如指掌了,无非就是监听推出信号的管道然后关闭 channel 和推送消息的协程,代码如下:
func (t *Topic) MessagePump(closeChan
topic 完整代码:topic.go
到这里我们的两个核心组件 topic 和 channel 就全部设计完成了,下一篇文章我们继续完成协议和后台队列的功能。
当前目录结构为:
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net