分布式Raft算法原理及实现

秘密课程 2024-03-04 04:48:19
Raft算法的实现

Leader election (领导选举):原来的leader挂掉后,必须选出一个新的leader

Log replication (日志复制):leader从客户端接收日志,并复制到整个集群中

Safety (安全性):如果有任意的server将日志项回放到状态机中了,那么其他的server只会回放相同的日志项

Leader election (领导选举)

Raft 使用一种心跳机制来触发领导人选举。当服务器程序启动时,他们都是 follower(跟随者) 身份。如果一个跟随者在一段时间里没有接收到任何消息,也就是选举超时,然后他就会认为系统中没有可用的领导者然后开始进行选举以选出新的领导者。要开始一次选举过程,follower 会给当前term加1并且转换成candidate状态。

然后他会并行的向集群中的其他服务器节点发送请求投票的 RPCs 来给自己投票。候选人的状态维持直到发生以下任何一个条件发生的时候,

他自己赢得了这次的选举如果这个节点赢得了半数以上的vote就会成为leader,每个节点会按照first-come-first-served的原则进行投票,并且一个term中只能投给一个节点, 这样就保证了一个term最多有一个节点赢得半数以上的vote。当一个节点赢得选举, 他会成为leader, 并且给所有节点发送这个信息, 这样所有节点都会回退成follower。其他的服务器成为领导者

如果在等待选举期间,candidate接收到其他server要成为leader的RPC,分两种情况处理:

如果leader的term大于或等于自身的term,那么改candidate 会转成follower 状态如果leader的term小于自身的term,那么会拒绝该 leader,并继续保持candidate 状态一段时间之后没有任何一个获胜的人有可能,很多follower同时变成candidate,导致没有candidate能获得大多数的选举,从而导致无法选出主。当这个情况发生时,每个candidate会超时,然后重新发增加term,发起新一轮选举RPC。需要注意的是,如果没有特别处理,可能出导致无限地重复选主的情况。Raft采用随机定时器的方法来避免上述情况,每个candidate选择一个时间间隔内的随机值,例如150-300ms,采用这种机制,一般只有一个server会进入candidate状态,然后获得大多数server的选举,最后成为主。每个candidate在收到leader的心跳信息后会重启定时器,从而避免在leader正常工作时,会发生选举的情况。Log replication (日志复制)

当选出 leader 后,它会开始接受客户端请求,每个请求会带有一个指令,可以被回放到状态机中。leader 把指令追加成一个log entry,然后通过AppendEntries RPC并行的发送给其他的server,当改entry被多数派server复制后,leader 会把该entry回放到状态机中,然后把结果返回给客户端。

当 follower 宕机或者运行较慢时,leader 会无限地重发AppendEntries给这些follower,直到所有的follower都复制了该log entry。

raft的log replication保证以下性质(Log Matching Property):

如果两个log entry有相同的index和term,那么它们存储相同的指令如果两个log entry在两份不同的日志中,并且有相同的index和term,那么它们之前的log entry是完全相同的

其中特性一通过以下保证:

leader在一个特定的term和index下,只会创建一个log entrylog entry不会改变它们在日志中的位置

特性二通过以下保证:

AppendEntries会做log entry的一致性检查,当发送一个AppendEntriesRPC时,leader会带上需要复制的log entry前一个log entry的(index, iterm)

如果follower没有发现与它一样的log entry,那么它会拒绝接受新的log entry 这样就能保证特性二得以满足。

安全性选举限制

在一些一致性算法中,即使一台server没有包含所有之前已提交的log entry,也能被选为主,这些算法需要把leader上缺失的日志从其他的server拷贝到leader上,这种方法会导致额外的复杂度。相对而言,raft使用一种更简单的方法,即它保证所有已提交的log entry都会在当前选举的leader上,因此,在raft算法中,日志只会从leader流向follower。

为了实现上述目标,raft在选举中会保证,一个candidate只有得到大多数的server的选票之后,才能被选为主。得到大多数的选票表明,选举它的server中至少有一个server是拥有所有已经提交的log entry的,而leader的日志至少和follower的一样新,这样就保证了leader肯定有所有已提交的log entry。

提交之前任期内的日志条目

领导人知道一条当前任期内的日志记录是可以被提交的,只要它被存储到了大多数的服务器上。如果一个领导人在提交日志条目之前崩溃了,未来后续的领导人会继续尝试复制这条日志记录。然而,一个领导人不能断定一个之前任期里的日志条目被保存到大多数服务器上的时候就一定已经提交了。下图展示了一种情况,一条已经被存储到大多数节点上的老日志条目,也依然有可能会被未来的领导人覆盖掉。

安全性论证

以反证法来证明,假设任期 T 的领导人(领导人 T)在任期内提交了一条日志条目,但是这条日志条目没有被存储到未来某个任期的领导人的日志中。设大于 T 的最小任期 U 的领导人 U 没有这条日志条目。

如果 S1 (任期 T 的领导者)提交了一条新的日志在它的任期里,然后 S5 在之后的任期 U 里被选举为领导人,然后至少会有一个机器,如 S3,既拥有来自 S1 的日志,也给 S5 投票了。

在领导人 U 选举的时候一定没有那条被提交的日志条目(领导人从不会删除或者覆盖任何条目)。领导人 T 复制这条日志条目给集群中的大多数节点,同时,领导人U 从集群中的大多数节点赢得了选票。因此,至少有一个节点(投票者、选民)同时接受了来自领导人T 的日志条目,并且给领导人U 投票了,这个投票者是产生这个矛盾的关键。这个投票者必须在给领导人 U 投票之前先接受了从领导人 T 发来的已经被提交的日志条目;否则他就会拒绝来自领导人 T 的附加日志请求(因为此时他的任期号会比 T 大)。投票者在给领导人 U 投票时依然保有这条日志条目,因为任何中间的领导人都包含该日志条目(根据上述的假设),领导人从不会删除条目,并且跟随者只有和领导人冲突的时候才会删除条目。投票者把自己选票投给领导人 U 时,领导人 U 的日志必须和投票者自己一样新。这就导致了两者矛盾之一。首先,如果投票者和领导人 U 的最后一条日志的任期号相同,那么领导人 U 的日志至少和投票者一样长,所以领导人 U 的日志一定包含所有投票者的日志。这是另一处矛盾,因为投票者包含了那条已经被提交的日志条目,但是在上述的假设里,领导人 U 是不包含的。除此之外,领导人 U 的最后一条日志的任期号就必须比投票人大了。此外,他也比 T 大,因为投票人的最后一条日志的任期号至少和 T 一样大(他包含了来自任期 T 的已提交的日志)。创建了领导人 U 最后一条日志的之前领导人一定已经包含了那条被提交的日志(根据上述假设,领导人 U 是第一个不包含该日志条目的领导人)。所以,根据日志匹配特性,领导人 U 一定也包含那条被提交当然日志,这里产生矛盾。因此,假设不成立,所有比 T 大的领导人一定包含了所有来自 T 的已经被提交的日志。日志匹配原则保证了未来的领导人也同时会包含被间接提交的条目跟随者和候选人崩溃

跟随者或者候选人崩溃,会按如下处理:

领导者会不断给它发送选举和追加日志的RPC,直到成功跟随者会忽略它已经处理过的追加日志的RPC时间和可用性

领导人选举是 Raft 中对时间要求最为关键的方面。Raft 可以选举并维持一个稳定的领导人,只要系统满足下面的时间要求:

广播时间(broadcastTime) << 选举超时时间(electionTimeout) << 平均故障间隔时间(MTBF)广播时间指的是从一个服务器并行的发送 RPCs 给集群中的其他服务器并接收响应的平均时间;选举超时时间就是选举的超时时间限制平均故障间隔时间就是对于一台服务器而言,两次故障之间的平均时间。

选举超时时间要大于广播时间的原因是,防止跟随者因为还没收到领导者的心跳,而重新选主。

选举超时时间要小于MTBF的原因是,防止选举时,能正常工作的server没有达到大多数。

对于广播时间,一般在[0.5ms,20ms]之间,而平均故障间隔时间一般非常大,至少是按照月为单位。因此,一般选举超时时间一般选择范围为[10ms,500ms]。因此,当领导者挂掉后,能在较短时间内重新选主。

代码实现package mainimport ( "flag" "fmt" "math/rand" "net" "net/http" "strconv" "strings" "time")const ( LEADER = iota CANDIDATE FOLLOWER LEDER)// 地址信息type Addr struct { Host string //ip Port int Addr string}type RaftServer struct { Votes int Role int Nodes []Addr isElecting bool Timeout int ElecChan chan bool HeartBeatChan chan bool Port int CusMsg chan string}func (rs *RaftServer) changeRole(role int) { switch role { case LEDER: fmt.Println("leader") case CANDIDATE: fmt.Println("candidate") case FOLLOWER: fmt.Println("follower") } rs.Role = role}func (rs *RaftServer) resetTimeout() { rs.Timeout = 2000}func (rs *RaftServer) Run() { listen, _ := net.Listen("tcp", ":"+strconv.Itoa(rs.Port)) defer listen.Close() go rs.elect() go rs.electTimeDuration() go rs.sendDataToOtherNodes() go rs.setHttpServer() for { conn, _ := listen.Accept() go func() { for { by := make([]byte, 1024) n,_:= conn.Read(by) fmt.Println("收到消息", string(by[:n])) value := string(by[:n]) v,_ := strconv.Atoi(value) if v == rs.Port { rs.Votes++ fmt.Println("当前票数:", rs.Votes) // leader 选举成功 if VoteSuccess(rs.Votes, 5) == true { fmt.Printf("我是 %v, 我被选举成leader", rs.Port) //通知其他节点。停止选举 //重置其他节点状态和票数 rs.VoteToOther("stopVote") rs.isElecting = false //改变当前节点状态 rs.changeRole(LEADER) break } } //收到leader发来的消息 if strings.HasPrefix(string(by[:n]), "stopVote") { //停止给别人投票 rs.isElecting = false //回退自己的状态 rs.changeRole(FOLLOWER) break } } }() }}func VoteSuccess(vote int,target int) bool { if vote >= target { return true } return false}//发送数据)func (rs *RaftServer)VoteToOther(data string) { for _,k := range rs.Nodes { if k.Port != rs.Port { if data == "1234" { fmt.Println("-------------", k.Port) } label: conn, err := net.Dial("tcp", ":"+strconv.Itoa(k.Port)) for { if err != nil { time.Sleep(1 * time.Second) goto label } break } conn.Write([]byte(data)) } }}//给别人投票func (rs *RaftServer)elect() { for { //通过通道确定现在可以给别人投票 <- rs.ElecChan //给其他节点投票,不能投给自己 vote := getVoteNum() rs.VoteToOther(strconv.Itoa(vote)) // 设置选举状态 if rs.Role != LEADER { rs.changeRole(CANDIDATE) } else { //是leader的情况 return } }}func getVoteNum() int { rand.Seed(time.Now().UnixNano()) return rand.Intn(4) + 5000}func (rs *RaftServer)electTimeDuration() { // fmt.Println("+++", rs.isElecting) for { if rs.isElecting { rs.ElecChan <- true time.Sleep(time.Duration(rs.Timeout) * time.Millisecond) } }}//打印当前对象的角色func (rs *RaftServer)printRole() { for { time.Sleep(1 * time.Second) fmt.Println(rs.Port, "状态为", rs.Role, rs.isElecting) }}func main() { //获取参数 //运行 go run main.go -p 5000 (p 后面就是要启动的端口) port := flag.Int("p",1234,"port") flag.Parse() fmt.Println(*port) rs := RaftServer{} rs.isElecting = true rs.Votes = 0 rs.Role = FOLLOWER //控制是否开始投票 rs.ElecChan = make(chan bool) rs.HeartBeatChan = make(chan bool) rs.CusMsg = make(chan string) rs.resetTimeout() rs.Nodes = []Addr{ {"127.0.0.1",5000,"5000"}, {"127.0.0.1",5001,"5001"}, {"127.0.0.1",5002,"5002"}, {"127.0.0.1",5003,"5003"}, } rs.Port = *port rs.Run()}//主节点发送心跳信号给其他节点func (rs *RaftServer)sendHeartBeat() { // 每隔1s 发送一次心跳 for { time.Sleep(1 * time.Second) if rs.Role == LEADER { //发送消息 rs.VoteToOther("heat beating") } }}//通过leader 给其他所有子节点发送数据func (rs *RaftServer)sendDataToOtherNodes() { for { msg :=<-rs.CusMsg if rs.Role == LEADER { //发送消息 rs.VoteToOther(msg) } }}//开启http服务器func (rs *RaftServer)setHttpServer() { http.HandleFunc("/req", rs.request) httpPort := rs.Port + 10 if err:=http.ListenAndServe(":"+strconv.Itoa(httpPort), nil); err == nil { fmt.Println(err) }}//leader向其他子节点发送数据func (rs *RaftServer)request(writer http.ResponseWriter, request *http.Request){ request.ParseForm() if len(request.Form["data"][0]) > 0 { writer.Write([]byte("ok")) fmt.Println(request.Form["data"][0]) rs.CusMsg <- request.Form["data"][0] }}

0 阅读:0