简单的订阅发布机制实现(Golang)

Redis和NSQ都有完善的订阅和发布实现,但参考它们的源码实现,,做个实际例子,对两边的异同和这种机制会更有印象。

练习省掉复杂的,就实现简单的 订阅/取消订阅/发布信息 功能,足够了。

Server.go

Server结构中的Dict用map保存了Channel的相关信息,而Channel结构中则用一个map保存了订阅这个Channel的Client.

这个例子与Redis中不太一样,Redis中的Dict用Channel的名字作为map的key,value则是其对应的Client列表。而Client中则保

存了其所有订阅的Channel信息。

package pubsubimport ("errors""sync")type Client struct {Id intIp string}type Server struct {Dict map[string]*Channel //map[Channel.Name]*Channelsync.RWMutex}func NewServer() *Server {s := &Server{}s.Dict = make(map[string]*Channel) //所有channelreturn s}//订阅func (srv *Server) Subscribe(client *Client, channelName string) {// 客户是否在Channel的客户列表中srv.RLock()ch, found := srv.Dict[channelName]srv.RUnlock()if !found {ch = NewChannel(channelName)ch.AddClient(client)srv.Lock()srv.Dict[channelName] = chsrv.Unlock()} else {ch.AddClient(client)}}//取消订阅func (srv *Server) Unsubscribe(client *Client, channelName string) {srv.RLock()ch, found := srv.Dict[channelName]srv.RUnlock()if found {if ch.DeleteClient(client) == 0 {ch.Exit()srv.Lock()delete(srv.Dict, channelName)srv.Unlock()}}}//发布消息func (srv *Server) PublishMessage(channelName, message string) (bool, error) {srv.RLock()ch, found := srv.Dict[channelName]if !found {srv.RUnlock()return false, errors.New("channelName不存在!")}srv.RUnlock()ch.Notify(message)ch.Wait()return true, nil}

Channel.go

每个Channel 负责将信息放入WaitGroup,发送到Client或队列,例子中是打印一条信息。 当clients为空时,则exit().

import ("fmt""sync""sync/atomic")type Channel struct {Name stringclients map[int]*Client// exitChan chan intsync.RWMutexwaitGroup WaitGroupWrappermessageCount uint64exitFlagint32}func NewChannel(channelName string) *Channel {return &Channel{Name: channelName,// exitChan:make(chan int),clients: make(map[int]*Client),}}func (ch *Channel) AddClient(client *Client) bool {ch.RLock()_, found := ch.clients[client.Id]ch.RUnlock()ch.Lock()if !found {ch.clients[client.Id] = client}ch.Unlock()return found}func (ch *Channel) DeleteClient(client *Client) int {var ret intch.ReplyMsg(fmt.Sprintf("从channel:%s 中删除client:%d ", ch.Name, client.Id))ch.Lock()delete(ch.clients, client.Id)ch.Unlock()ch.RLock()ret = len(ch.clients)ch.RUnlock()return ret}func (ch *Channel) Notify(message string) bool {ch.RLock()defer ch.RUnlock()for cid, _ := range ch.clients {ch.ReplyMsg(fmt.Sprintf("channel:%s client:%d message:%s", ch.Name, cid, message))}return true}func (ch *Channel) ReplyMsg(message string) {ch.waitGroup.Wrap(func() { fmt.Println(message) })}func (ch *Channel) Wait() {ch.waitGroup.Wait()}func (ch *Channel) Exiting() bool {return atomic.LoadInt32(&ch.exitFlag) == 1}func (ch *Channel) Exit() {if !atomic.CompareAndSwapInt32(&ch.exitFlag, 0, 1) {return}//close(ch.exitChan)ch.Wait()}func (ch *Channel) PutMessage(clientID int, message string) {ch.RLock()defer ch.RUnlock()if ch.Exiting() {return}//select {// case <-t.exitChan:// return//}fmt.Println(ch.Name, ":", message)atomic.AddUint64(&ch.messageCount, 1)return}

主程序:

//订阅/发布 练习//author: Xiong Chuan Liang //date: 2015-3-17package mainimport ( . "pubsub")func main(){ c1 := &Client{Id:100,Ip:"172.18.1.1"} c3:= &Client{Id:300,Ip:"172.18.1.3"}srv := NewServer() srv.Subscribe(c1,"Topic") srv.Subscribe(c3,"Topic")srv.PublishMessage("Topic","测试信息1")srv.Unsubscribe(c3,"Topic") srv.PublishMessage("Topic","测试信息2222")srv.Subscribe(c1,"Topic2")srv.Subscribe(c3,"Topic2")srv.PublishMessage("Topic2"," Topic2的测试信息") }/*运行结果:channel:Topic client:100 message:测试信息1channel:Topic client:300 message:测试信息1从channel:Topic 中删除client:300channel:Topic client:100 message:测试信息2222channel:Topic2 client:100 message: Topic2的测试信息channel:Topic2 client:300 message: Topic2的测试信息*/

没做太复杂的测试,粗略看好像没有问题。

MAIL: xcl_168@aliyun.com

BLOG:

往往教导我们大家要好好学习天天向上,要永不言弃坚持到底百折不挠宁死不屈,

简单的订阅发布机制实现(Golang)

相关文章:

你感兴趣的文章:

标签云: