From 1181f6d79645250eb285a8712746b1bc339b1bab Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Sat, 15 Mar 2025 10:53:56 +0800 Subject: [PATCH] =?UTF-8?q?raft=E6=97=A5=E5=BF=97=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/nodes/init.go | 26 ++++-- internal/nodes/log.go | 19 ++++- internal/nodes/node.go | 184 ++++++++++++++++++++++++++++++++++++------ internal/nodes/server_node.go | 13 ++- test/server_client_test.go | 2 +- 5 files changed, 204 insertions(+), 40 deletions(-) diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 0a38451..1a68bdd 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -30,14 +30,26 @@ func Init(id string, nodeAddr map[string]string, pipe string, db *leveldb.DB) *N } // 创建节点 - return &Node{ + node := &Node{ selfId: id, nodes: ns, pipeAddr: pipe, - maxLogId: 0, - log: make(map[int]LogEntry), + maxLogId: -1, + currTerm: 1, + log: make([]RaftLogEntry, 0), + commitIndex: -1, + lastApplied: -1, + nextIndex: make(map[string]int), + matchIndex: make(map[string]int), db: db, } + for nodeId := range nodeAddr { + if nodeId != id { // 不初始化自身 + node.nextIndex[nodeId] = node.maxLogId + 1 + node.matchIndex[nodeId] = 0 + } + } + return node } func Start(node *Node, isLeader bool) { @@ -53,6 +65,9 @@ func Start(node *Node, isLeader bool) { case Follower: case Candidate: + // todo 成为leader的初始化 + // node.currTerm = 1 + // candidate发布一个监听输入线程后,变成leader node.state = Leader go func() { @@ -78,12 +93,11 @@ func Start(node *Node, isLeader bool) { kv := LogEntry{input, ""} // 目前键盘输入key,value 0 logId := node.maxLogId node.maxLogId++ - node.log[logId] = kv + node.log[logId] = RaftLogEntry{kv, logId, node.currTerm} log.Info("send : logId = " + strconv.Itoa(logId) + ", key = " + input) // 广播给其它节点 - kvCall := LogEntryCall{kv, Normal} - node.BroadCastKV(logId, kvCall) + node.BroadCastKV(Normal) // 持久化 node.db.Put([]byte(kv.Key), []byte(kv.Value), nil) } diff --git a/internal/nodes/log.go b/internal/nodes/log.go index 3d13831..b2f1537 100644 --- a/internal/nodes/log.go +++ b/internal/nodes/log.go @@ -1,7 +1,10 @@ package nodes +import "strconv" + +type CallMode = uint8 const ( - Normal State = iota + 1 + Normal CallMode = iota + 1 Delay Fail ) @@ -10,10 +13,22 @@ type LogEntry struct { Key string Value string } +func (LogE *LogEntry) print() string { + return "key: " + LogE.Key + ", value: " + LogE.Value +} + +type RaftLogEntry struct { + LogE LogEntry + LogId int + Term int +} +func (RLogE *RaftLogEntry) print() string { + return "logid: " + strconv.Itoa(RLogE.LogId) + ", term: " + strconv.Itoa(RLogE.Term) + ", " + RLogE.LogE.print() +} type LogEntryCall struct { LogE LogEntry - CallState State + CallState CallMode } type KVReply struct { diff --git a/internal/nodes/node.go b/internal/nodes/node.go index f1a17c8..9379e10 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -3,6 +3,7 @@ package nodes import ( "math/rand" "net/rpc" + "sort" "strconv" "time" @@ -30,33 +31,48 @@ type Node struct { // 除当前节点外其他节点信息 nodes map[string]*Public_node_info - //管道名 + // 管道名 pipeAddr string // 当前节点状态 state State + // 任期 + currTerm int + // 简单的kv存储 - log map[int]LogEntry + log []RaftLogEntry - // leader用来标记新log + // leader用来标记新log, = log.len maxLogId int + // 已提交的index + commitIndex int + + // 最后应用(写到db)的index + lastApplied int + + // 需要发送给每个节点的下一个索引 + nextIndex map[string]int + + // 已经发送给每个节点的最大索引 + matchIndex map[string]int + db *leveldb.DB + } -func (node *Node) BroadCastKV(logId int, kvCall LogEntryCall) { +func (node *Node) BroadCastKV(callMode CallMode) { // 遍历所有节点 - for id, _ := range node.nodes { - go func(id string, kv LogEntryCall) { - var reply KVReply - node.sendKV(id, logId, kvCall, &reply) - }(id, kvCall) + for id := range node.nodes { + go func(id string, kv CallMode) { + node.sendKV(id, callMode) + }(id, callMode) } } -func (node *Node) sendKV(id string, logId int, kvCall LogEntryCall, reply *KVReply) { - switch kvCall.CallState { +func (node *Node) sendKV(id string, callMode CallMode) { + switch callMode { case Fail: log.Info("模拟发送失败") // 这么写向所有的node发送都失败,也可以随机数确定是否失败 @@ -80,22 +96,142 @@ func (node *Node) sendKV(id string, logId int, kvCall LogEntryCall, reply *KVRep } }(client) - arg := LogIdAndEntry{logId, kvCall.LogE} - callErr := client.Call("Node.ReceiveKV", arg, reply) // RPC - if callErr != nil { - log.Error("dialing node_"+id+"fail: ", zap.Error(callErr)) + var appendReply AppendEntriesReply + appendReply.Success = false + nextIndex := node.nextIndex[id] + // log.Info("nextindex " + strconv.Itoa(nextIndex)) + for (!appendReply.Success) { + if nextIndex < 0 { + log.Error("assert >= 0 here") + } + sendEntries := node.log[nextIndex:] + arg := AppendEntriesArg{ + Term: node.currTerm, + PrevLogIndex: nextIndex - 1, + Entries: sendEntries, + LeaderCommit: node.commitIndex, + } + if arg.PrevLogIndex >= 0 { + arg.PrevLogTerm = node.log[arg.PrevLogIndex].Term + } + callErr := client.Call("Node.AppendEntries", arg, &appendReply) // RPC + if callErr != nil { + log.Error("dialing node_"+id+"fail: ", zap.Error(callErr)) + } + + if appendReply.Term != node.currTerm { + // 转变成follower? + break + } + nextIndex-- // 失败往前传一格 } + + // 不变成follower情况下 + node.nextIndex[id] = node.maxLogId + 1 + node.matchIndex[id] = node.maxLogId + node.updateCommitIndex() +} + +func (node *Node) updateCommitIndex() { + // node.mu.Lock() + // defer node.mu.Unlock() + + totalNodes := len(node.nodes) + + // 收集所有 matchIndex 并排序 + matchIndexes := make([]int, 0, totalNodes) + for _, index := range node.matchIndex { + matchIndexes = append(matchIndexes, index) + } + sort.Ints(matchIndexes) // 排序 + + // 计算多数派 commitIndex + majorityIndex := matchIndexes[totalNodes/2] // 取 N/2 位置上的索引(多数派) + + // 确保这个索引的日志条目属于当前 term,防止提交旧 term 的日志 + if majorityIndex > node.commitIndex && node.log[majorityIndex].Term == node.currTerm { + node.commitIndex = majorityIndex + log.Info("Leader 更新 commitIndex: " + strconv.Itoa(majorityIndex)) + + // 应用日志到状态机 + node.applyCommittedLogs() + } +} + +// 应用日志到状态机 +func (node *Node) applyCommittedLogs() { + for node.lastApplied < node.commitIndex { + node.lastApplied++ + logEntry := node.log[node.lastApplied] + log.Info("node" + node.selfId + "应用日志到状态机: " + logEntry.print()) + err := node.db.Put([]byte(logEntry.LogE.Key), []byte(logEntry.LogE.Value), nil) + if err != nil { + log.Error("应用状态机失败: ", zap.Error(err)) + } + } } // RPC call -func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error { - log.Info("node_" + node.selfId + " receive: logId = " + strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key) - entry, ok := node.log[arg.LogId] - if !ok { - node.log[arg.LogId] = entry +func (node *Node) AppendEntries(arg AppendEntriesArg, reply *AppendEntriesReply) error { + // node.mu.Lock() + // defer node.mu.Unlock() + + // 1. 如果 term 过期,拒绝接受日志 + if node.currTerm > arg.Term { + *reply = AppendEntriesReply{node.currTerm, false} + return nil + } + + // 2. 检查 prevLogIndex 是否有效 + if arg.PrevLogIndex >= len(node.log) || (arg.PrevLogIndex >= 0 && node.log[arg.PrevLogIndex].Term != arg.PrevLogTerm) { + *reply = AppendEntriesReply{node.currTerm, false} + return nil + } + + // 3. 处理日志冲突(如果存在不同 term,则截断日志) + idx := arg.PrevLogIndex + 1 + if idx < len(node.log) && node.log[idx].Term != arg.Entries[0].Term { + node.log = node.log[:idx] // 截断冲突日志 + } + // log.Info(strconv.Itoa(idx) + strconv.Itoa(len(node.log))) + + // 4. 追加新的日志条目 + for _, raftLogEntry := range arg.Entries { + if idx < len(node.log) { + node.log[idx] = raftLogEntry + } else { + node.log = append(node.log, raftLogEntry) + } + idx++ + } + + // 5. 更新 maxLogId + node.maxLogId = len(node.log) - 1 + + // 6. 更新 commitIndex + if arg.LeaderCommit < node.maxLogId { + node.commitIndex = arg.LeaderCommit + } else { + node.commitIndex = node.maxLogId } - // 持久化 - node.db.Put([]byte(arg.Entry.Key), []byte(arg.Entry.Value), nil) - reply.Reply = true // rpc call需要有reply,但实际上调用是否成功是error返回值决定 - return nil + + // 7. 提交已提交的日志 + node.applyCommittedLogs() + + *reply = AppendEntriesReply{node.currTerm, true} + return nil +} + +type AppendEntriesArg struct { + Term int + // leaderId string + PrevLogIndex int + PrevLogTerm int + Entries []RaftLogEntry + LeaderCommit int +} + +type AppendEntriesReply struct { + Term int + Success bool } diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go index 58eed40..7c30121 100644 --- a/internal/nodes/server_node.go +++ b/internal/nodes/server_node.go @@ -13,15 +13,14 @@ type ServerReply struct{ Value string } // RPC call -func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error { - - logId := node.maxLogId +func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error { node.maxLogId++ - node.log[logId] = kvCall.LogE - node.db.Put([]byte(kvCall.LogE.Key), []byte(kvCall.LogE.Value), nil) - log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kvCall.LogE.Key) + logId := node.maxLogId + node.log = append(node.log, RaftLogEntry{kvCall.LogE, logId, node.currTerm}) + // node.db.Put([]byte(kvCall.LogE.Key), []byte(kvCall.LogE.Value), nil) + log.Info("server write request : " + kvCall.LogE.print() + ", 模拟方式 : " + strconv.Itoa(int(kvCall.CallState))) // 广播给其它节点 - node.BroadCastKV(logId, kvCall) + node.BroadCastKV(kvCall.CallState) reply.Isconnect = true return nil } diff --git a/test/server_client_test.go b/test/server_client_test.go index ab106c3..4a6e489 100644 --- a/test/server_client_test.go +++ b/test/server_client_test.go @@ -58,7 +58,7 @@ func TestServerClient(t *testing.T) { key := strconv.Itoa(i) var value string s = c.Read(key, &value) - if s != clientPkg.Ok && value != "hello" + key { + if s != clientPkg.Ok || value != "hello" { t.Errorf("Read test1 fail") } }