goim解析-数据结构

goim中的数据结构主要存在于需要保存状态的节点,除了logic是无状态外,其他的节点comet、router、job都会有数据结构来保持信息。

comet

comet服务和客户端保持长连接,存在最多最复杂的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// DefaultServer是全局变量,每启动一个comet服务就会有一个Server对象
// Server主要保存了多个bucket和
var DefaultServer *Server
type Server struct {
Buckets []*Bucket // 一个server会有多个bucket
bucketIdx uint32 // 其实是保存server对象中bucket的数量,方便进行hash计算索引
round *Round // 一个server会有一个round
operator Operator
Options ServerOptions // options保存一些数组初始化时的数量,暂时忽略
}

// --- Round ---

// round用来保存tcp相关的读写缓冲池,定时器对象等
// readers,writers,timers数组的个数是在服务器启动时指定的,之后无法更改。
// 每次获取时会计算索引,轮询获取Pool或Timer对象,而Pool或Timer里会维护空闲链表
//
type Round struct {
readers []bytes.Pool // 读写缓冲区都使用Pool结构体,初始化时创建固定数量的Pool
writers []bytes.Pool
timers []time.Timer // 定时器使用Timer作为结构体,初始化时创建固定数量的Timer
options RoundOptions
readerIdx int // 下面3个都是代表数组的长度,也是轮询方式计算索引获取读写缓冲池或定时器对象
writerIdx int
timerIdx int
}

// tcp内存缓冲池,tcp读写操作都会用到这个自定义缓冲区
// 具体tcp连接分配到哪个Pool,是每次递增索引,轮询方式获取
type Pool struct {
lock sync.Mutex
free *Buffer // 这里放的都是空闲的Buffer,tcp连接使用时会从free链表取出Buffer,绑定成该tcp连接的用户态缓冲。tcp连接关闭,则会把Buffer放回到free链表。如果free链表没有可使用的缓冲区,就会grow,创建num个新的Buffer。
max int // 每次Pool grow时,会先申请max大小字节数组,然后分配给这个num个Buffer
num int // 每次Pool grow时,创建num个Buffer
size int // 每个Buffer的大小(字节个数)
}

// 定时器对象,TimerData是具体定时器信息,signal是系统定时器
// 看实现是一个时间轮,心跳时间是 infiniteDuration = itime.Duration(1<<63 - 1)
type Timer struct {
lock sync.Mutex
free *TimerData // 空闲的TimerData对象链表,操作和Pool的free链表类似
timers []*TimerData // 在时间轮中等待的TimerData,每次心跳都会检查,然后调用回调
signal *itime.Timer // 系统Timer对象,会对signal设置固定的定时时间,每次超时就会操作一次timers数组
num int
}

// 定时数据
type TimerData struct {
Key string
expire itime.Time // 超时时间
fn func() // 超时回调函数
index int
next *TimerData
}

// --- Bucket ---

// 一个Bucket对象保存多个Channel,每个tcp连接对应一个Channel,然后根据hash计算分配到某个bucket上,然后这个tcp连接的Channelbucket绑定。
// 这里涉及到sub key概念,一个tcp连接对应一个sub key,一个用户可以有多个sub key
type Bucket struct {
cLock sync.RWMutex // protect the channels for chs
chs map[string]*Channel // map sub key to a channel
boptions BucketOptions
rooms map[int32]*Room // map roomid to room
routines []chan *proto.BoardcastRoomArg // room广播,包含多个chan的数组,在服务启动时就会开启同等数量的goroutine,监听chan,每次有广播请求时,会轮询计算出一个goroutine对应的chan,触发信号。
routinesNum uint64 // 用来轮询计算索引
}

// 房间,一个房间可以有多个Channel
type Room struct {
id int32
rLock sync.RWMutex
next *Channel // 通过链表形式保存Channel地址
drop bool // 表示该room是否所有Channel都退出了
Online int // dirty read is ok
}

// Channel对应一个TCP连接,关联着读写缓冲区,以及一个tcp连接的数据队列Ring
// 用户态缓冲区主要用来处理封包粘包问题
// Channel used by message pusher send msg to write goroutine.
type Channel struct {
RoomId int32
CliProto Ring // 和Ring名称一样,是一个环形数据结构,保存着一组客户端tcp发送来的数据。
signal chan *proto.Proto // 通过signal这个chan来向tcp连接写数据
Writer bufio.Writer // 缓冲区,当TCP连接到来,Channel创建时,从Round中获取用户态缓冲区
Reader bufio.Reader // 缓冲区,同上
Next *Channel // 双向链表
Prev *Channel
}

// 环形数据结构,保存着一组proto.Proto,也就是按照二进制协议解析完成后的对象,这个数组的数量是固定的。
// Ring.data的数组元素个数固定,如果某次写入不进去,就会阻塞,不会增加Ring.data的大小
type Ring struct {
// read
rp uint64
num uint64
mask uint64
// TODO split cacheline, many cpu cache line size is 64
// pad [40]byte
// write
wp uint64
data []proto.Proto
}

router

router是goim里专门负责保存数据的服务,对其他节点提供了rpc调用,主要被logic和job调用。这个服务也可以被redis等替代,因为基本就是在读写一些内存数据,没有额外逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// router也有一个bucket结构体,不过和comet的bucket保存的信息不同,这里主要保存用户、tcp连接、房间等对象的关联信息。
type Bucket struct {
bLock sync.RWMutex
server int // session server map init num
session int // bucket session init num
sessions map[int64]*Session // userid->sessions,一个用户一个session
roomCounter map[int32]int32 // roomid->count,某个room里的channel总数量
serverCounter map[int32]int32 // server->count,某个server里的channel总数量,这个server指的是某个具体的comet节点的server id
userServerCounter map[int32]map[int64]int32 // serverid->userid->count,某个server里的某个用户的channel总数
cleaner *Cleaner // bucket map cleaner
}

// 一个用户对应一个session
type Session struct {
seq int32 // seq代表一个用户的新的连接的序列号,从1开始,不断递增,userid+seq就是subkey
servers map[int32]int32 // seq:server,保存某个连接在哪个comet节点上这样的关联关系,因为需要知道这个关联关系,才可以发送通知时,找到具体的comet节点,然后进行tcp写操作
rooms map[int32]map[int32]int32 // roomid:seq:server with specified room id,一个用户所在的房间信息
}

job

job服务主要工作就是连接上kafka,接收kafka的消息,然后job调用comet的rpc服务,comet则把请求消息写入到tcp连接,客户端则会接收到通知消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// job服务主要工作就是从router上同步channel和commet节点的关联关系,然后当从kafka收到发送通知的请求时,就要调用指定comet节点的rpc服务,进行对应tcp写操作。
// 下面的多个routines都是数组,数组个数相同,job服务启动时会启动这个个数的goroutine,用来接收到kafka消息时,通过chan触发job调用comet的rpc服务的操作
type Comet struct {
serverId int32
rpcClient *xrpc.Clients
pushRoutines []chan *proto.MPushMsgArg // 数组,每个元素又是一个chan链表
broadcastRoutines []chan *proto.BoardcastArg // 同上
roomRoutines []chan *proto.BoardcastRoomArg // 同上
pushRoutinesNum int64
roomRoutinesNum int64
broadcastRoutinesNum int64
options CometOptions
}

// 这个结构体用来定时从router获取信息,更新当前goim系统里的客户端连接服务端的状态
type RoomBucket struct {
roomNum int
rooms map[int32]*Room
rwLock sync.RWMutex
options RoomOptions
round *Round
}

// 定时器,不再赘述
// Ronnd userd for connection round-robin get a timer for split big lock.
type Round struct {
timers []time.Timer
options RoundOptions
timerIdx int
}