diff --git a/README.md b/README.md index 9a0791c..1cad832 100644 --- a/README.md +++ b/README.md @@ -4,21 +4,34 @@ 基于go语言实现分布式kv数据库 +# 框架 +每个运行main.go进程作为一个节点 +``` +---internal + ---client 客户端使用节点提供的读写功能 + ---logprovider 封装了简单的日志打印,方便调试 + ---nodes 分布式核心代码 + init.go 节点在main中的调用初始化,和大循环启动 + log.go 节点存储的entry相关数据结构 + node_storage.go 抽象了节点数据持久化方法,存到json文件里 + node.go 节点的相关数据结构 + replica.go 日志复制相关逻辑 + server_node.go 节点作为server为 client提供的功能(读写) + vote.go 选主相关逻辑 +``` + # 环境与运行 使用环境是wsl+ubuntu go mod download安装依赖 ./scripts/build.sh 会在根目录下编译出main -./scripts/run.sh 运行三个节点,目前能在终端进行读入,leader(n1)节点输出send log,其余节点输出receive log。终端输入后如果超时就退出(脚本运行时间可以在其中调整)。 # 注意 脚本第一次运行需要权限获取 chmod +x <脚本> 如果出现tcp listen error可能是因为之前的进程没用正常退出,占用了端口 lsof -i :9091查看pid kill -9 杀死进程 + ## 关于测试 -通过新开进程的方式创建节点,如果通过线程创建,会出现重复注册rpc问题 +通过新开进程的方式创建节点(参考test/common.go中executeNodeI函数 +如果通过线程创建,会出现重复注册rpc问题 -# todo list -消息通讯异常的处理 -kv本地持久化 -崩溃与恢复(以及对应的测试) \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go index 4170119..db9d16b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -31,7 +31,6 @@ func main() { port := flag.String("port", ":9091", "rpc listen port") cluster := flag.String("cluster", "127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093", "comma sep") id := flag.String("id", "1", "node ID") - pipe := flag.String("pipe", "", "input from scripts") isNewDb := flag.Bool("isNewDb", true, "new test or restart") // 参数解析 @@ -41,7 +40,7 @@ func main() { idCnt := 1 selfi, err := strconv.Atoi(*id) if err != nil { - log.Error("figure id only") + log.Fatal("figure id only") } for _, addr := range clusters { if idCnt == selfi { @@ -73,16 +72,16 @@ func main() { for iter.Next() { count++ } - fmt.Printf(*id+"结点目前有数据:%d\n", count) + log.Sugar().Infof("[%s]目前有数据:%d", *id, count) - node := nodes.Init(*id, idClusterPairs, *pipe, db, storage) - log.Info("id: " + *id + "节点开始监听: " + *port + "端口") + node := nodes.Init(*id, idClusterPairs, db, storage) + log.Sugar().Infof("[%s]开始监听" + *port + "端口", *id) // 监听rpc node.Rpc(*port) // 开启 raft nodes.Start(node) sig := <-sigs - fmt.Println("node_"+*id+"接收到信号:", sig) + fmt.Println("node_"+ *id +"接收到信号:", sig) } diff --git a/internal/client/client_node.go b/internal/client/client_node.go index a144e10..7203518 100644 --- a/internal/client/client_node.go +++ b/internal/client/client_node.go @@ -11,9 +11,8 @@ import ( var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) type Client struct { - // 连接的server端节点(node1) - ServerId string - Address string + // 连接的server端节点群 + Address [] string } type Status = uint8 @@ -24,35 +23,62 @@ const ( Fail ) +func (client *Client) FindActiveNode() *rpc.Client { + var err error + var c *rpc.Client + i := 0 + for { // 直到找到一个可连接的节点(保证至少一个节点活着) + c, err = rpc.DialHTTP("tcp", client.Address[i]) + if err != nil { + log.Error("dialing: ", zap.Error(err)) + i++ + } else { + break + } + if i == len(client.Address) { + log.Fatal("没找到存活节点") + } + } + return c +} + +func (client *Client) CloseRpcClient(c *rpc.Client) { + err := c.Close() + if err != nil { + log.Error("client close err: ", zap.Error(err)) + } +} +// func (client *Client) Write(kvCall nodes.LogEntryCall) Status { log.Info("client write request key :" + kvCall.LogE.Key) var reply nodes.ServerReply reply.Isleader = false - addr := client.Address - for !reply.Isleader { - c, err := rpc.DialHTTP("tcp", addr) - if err != nil { - log.Error("dialing: ", zap.Error(err)) - return Fail - } + c := client.FindActiveNode() + var err error + for !reply.Isleader { // 根据存活节点的反馈,直到找到leader callErr := c.Call("Node.WriteKV", kvCall, &reply) // RPC - if callErr != nil { + if callErr != nil { // dial和call之间可能崩溃,重新找存活节点 log.Error("dialing: ", zap.Error(callErr)) - return Fail - } - err = c.Close() - if err != nil { - log.Error("client close err: ", zap.Error(err)) + client.CloseRpcClient(c) + c = client.FindActiveNode() + continue } - if !reply.Isleader { // 发过去的不是leader - addr = reply.LeaderAddress + if !reply.Isleader { // 对方不是leader,根据反馈找leader + addr := reply.LeaderAddress + client.CloseRpcClient(c) + c, err = rpc.DialHTTP("tcp", addr) + for err != nil { // 重新找下一个存活节点 + c = client.FindActiveNode() + } } else { // 成功 + client.CloseRpcClient(c) return Ok } } + log.Fatal("客户端会一直找存活节点,不会运行到这里") return Fail } @@ -61,34 +87,29 @@ func (client *Client) Read(key string, value *string) Status { // 查不到value if value == nil { return Fail } + var c *rpc.Client + for { + c = client.FindActiveNode() - c, err := rpc.DialHTTP("tcp", client.Address) - if err != nil { - log.Error("dialing: ", zap.Error(err)) - return Fail - } - - defer func(server *rpc.Client) { - err := c.Close() - if err != nil { - log.Error("client close err: ", zap.Error(err)) + var reply nodes.ServerReply + callErr := c.Call("Node.ReadKey", key, &reply) // RPC + if callErr != nil { + log.Error("dialing: ", zap.Error(callErr)) + client.CloseRpcClient(c) + continue } - }(c) - - var reply nodes.ServerReply - callErr := c.Call("Node.ReadKey", key, &reply) // RPC - if callErr != nil { - log.Error("dialing: ", zap.Error(callErr)) - return Fail - } - // 目前一定发送成功 - if reply.HaveValue { - *value = reply.Value - return Ok - } else { - return NotFound + // 目前一定发送成功 + if reply.HaveValue { + *value = reply.Value + client.CloseRpcClient(c) + return Ok + } else { + client.CloseRpcClient(c) + return NotFound + } } + } diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 2cdfeff..4142038 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -2,7 +2,6 @@ package nodes import ( "fmt" - "math/rand" "net" "net/http" "net/rpc" @@ -22,7 +21,7 @@ func newNode(address string) *Public_node_info { } } -func Init(selfId string, nodeAddr map[string]string, pipe string, db *leveldb.DB, rstorage *RaftStorage) *Node { +func Init(selfId string, nodeAddr map[string]string, db *leveldb.DB, rstorage *RaftStorage) *Node { ns := make(map[string]*Public_node_info) for id, addr := range nodeAddr { ns[id] = newNode(addr) @@ -33,7 +32,6 @@ func Init(selfId string, nodeAddr map[string]string, pipe string, db *leveldb.DB selfId: selfId, leaderId: "", nodes: ns, - pipeAddr: pipe, maxLogId: -1, // 后来发现论文中是从1开始的(初始0),但不想改了 currTerm: 1, log: make([]RaftLogEntry, 0), @@ -48,64 +46,6 @@ func Init(selfId string, nodeAddr map[string]string, pipe string, db *leveldb.DB return node } -// func Start(node *Node, isLeader bool) { -// if isLeader { -// node.state = Candidate // 需要身份转变 -// } else { -// node.state = Follower -// } - -// go func() { -// for { -// switch node.state { -// case Follower: - -// case Candidate: -// // todo 成为leader的初始化 -// // node.currTerm = 1 - -// // candidate发布一个监听输入线程后,变成leader -// node.state = Leader -// go func() { -// if node.pipeAddr == "" { // 客户端远程调用server_node方法 -// log.Info("请运行客户端进程进行读写") -// } else { // 命令行提供了管道,支持管道(键盘)输入 -// pipe, err := os.Open(node.pipeAddr) -// if err != nil { -// log.Error("Failed to open pipe") -// } -// defer pipe.Close() - -// // 不断读取管道中的输入 -// buffer := make([]byte, 256) -// for { -// n, err := pipe.Read(buffer) -// if err != nil && err != io.EOF { -// log.Error("Error reading from pipe") -// } -// if n > 0 { -// input := string(buffer[:n]) -// // 将用户输入封装成一个 LogEntry -// kv := LogEntry{input, ""} // 目前键盘输入key,value 0 -// logId := node.maxLogId -// node.maxLogId++ -// node.log[logId] = RaftLogEntry{kv, logId, node.currTerm} - -// log.Info("send : logId = " + strconv.Itoa(logId) + ", key = " + input) -// // 广播给其它节点 -// node.BroadCastKV(Normal) -// // 持久化 -// node.db.Put([]byte(kv.Key), []byte(kv.Value), nil) -// } -// } -// } -// }() -// case Leader: -// time.Sleep(50 * time.Millisecond) -// } -// } -// }() -// } func (n *Node) initLeaderState() { for peerId := range n.nodes { n.nextIndex[peerId] = len(n.log) // 发送日志的下一个索引 @@ -123,11 +63,11 @@ func Start(node *Node) { switch node.state { case Follower: // 监听心跳超时 - fmt.Printf("Node %s is a follower, 监听中...\n", node.selfId) + fmt.Printf("[%s] is a follower, 监听中...\n", node.selfId) case Leader: // 发送心跳 - fmt.Printf("Node %s is the leader, 发送心跳...\n", node.selfId) + fmt.Printf("[%s] is the leader, 发送心跳...\n", node.selfId) node.BroadCastKV(Normal) } time.Sleep(50 * time.Millisecond) @@ -135,22 +75,6 @@ func Start(node *Node) { }() } -// follower 500-1000ms内没收到appendentries心跳,就变成candidate发起选举 -func (node *Node) resetElectionTimer() { - if node.electionTimer == nil { - node.electionTimer = time.NewTimer(time.Duration(500+rand.Intn(500)) * time.Millisecond) - go func() { - for { - <-node.electionTimer.C - node.startElection() - } - }() - } else { - node.electionTimer.Stop() - node.electionTimer.Reset(time.Duration(500+rand.Intn(500)) * time.Millisecond) - } -} - func (node *Node) Rpc(port string) { err := rpc.Register(node) if err != nil { diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 74da648..568dc4d 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -1,15 +1,10 @@ package nodes import ( - "math/rand" - "net/rpc" - "sort" - "strconv" "sync" "time" "github.com/syndtr/goleveldb/leveldb" - "go.uber.org/zap" ) type State = uint8 @@ -35,9 +30,6 @@ type Node struct { // 除当前节点外其他节点信息 nodes map[string]*Public_node_info - // 管道名 - pipeAddr string - // 当前节点状态 state State @@ -71,203 +63,3 @@ type Node struct { electionTimer *time.Timer } -func (node *Node) BroadCastKV(callMode CallMode) { - // 遍历所有节点 - for id := range node.nodes { - go func(id string, kv CallMode) { - node.sendKV(id, callMode) - }(id, callMode) - } -} - -func (node *Node) sendKV(id string, callMode CallMode) { - - switch callMode { - case Fail: - log.Info("模拟发送失败") - // 这么写向所有的node发送都失败,也可以随机数确定是否失败 - case Delay: - log.Info("模拟发送延迟") - // 随机延迟0-5ms - time.Sleep(time.Millisecond * time.Duration(rand.Intn(5))) - default: - } - - client, err := rpc.DialHTTP("tcp", node.nodes[id].address) - if err != nil { - log.Error("dialing: ", zap.Error(err)) - return - } - - defer func(client *rpc.Client) { - err := client.Close() - if err != nil { - log.Error("client close err: ", zap.Error(err)) - } - }(client) - - node.mu.Lock() - defer node.mu.Unlock() - - var appendReply AppendEntriesReply - appendReply.Success = false - nextIndex := node.nextIndex[id] - // log.Info("nextindex " + strconv.Itoa(nextIndex)) - for (!appendReply.Success) { - if nextIndex < 0 { - log.Error("assert >= 0 here") - } - sendEntries := node.log[nextIndex:] - arg := AppendEntriesArg{ - Term: node.currTerm, - PrevLogIndex: nextIndex - 1, - Entries: sendEntries, - LeaderCommit: node.commitIndex, - LeaderId: node.selfId, - } - if arg.PrevLogIndex >= 0 { - arg.PrevLogTerm = node.log[arg.PrevLogIndex].Term - } - callErr := client.Call("Node.AppendEntries", arg, &appendReply) // RPC - if callErr != nil { - log.Error("dialing node_"+id+"fail: ", zap.Error(callErr)) - } - - if appendReply.Term != node.currTerm { - log.Info("Leader " + node.selfId + " 收到更高的 term=" + strconv.Itoa(appendReply.Term) + ",转换为 Follower") - node.currTerm = appendReply.Term - node.state = Follower - node.votedFor = "" - node.storage.SetTermAndVote(node.currTerm, node.votedFor) - node.resetElectionTimer() - return - } - nextIndex-- // 失败往前传一格 - } - - // 不变成follower情况下 - node.nextIndex[id] = node.maxLogId + 1 - node.matchIndex[id] = node.maxLogId - node.updateCommitIndex() -} - -func (node *Node) updateCommitIndex() { - totalNodes := len(node.nodes) - - // 收集所有 matchIndex 并排序 - matchIndexes := make([]int, 0, totalNodes) - for _, index := range node.matchIndex { - matchIndexes = append(matchIndexes, index) - } - sort.Ints(matchIndexes) // 排序 - - // 计算多数派 commitIndex - majorityIndex := matchIndexes[totalNodes/2] // 取 N/2 位置上的索引(多数派) - - // 确保这个索引的日志条目属于当前 term,防止提交旧 term 的日志 - if majorityIndex > node.commitIndex && majorityIndex < len(node.log) && node.log[majorityIndex].Term == node.currTerm { - node.commitIndex = majorityIndex - log.Info("Leader" + node.selfId + "更新 commitIndex: " + strconv.Itoa(majorityIndex)) - - // 应用日志到状态机 - node.applyCommittedLogs() - } -} - -// 应用日志到状态机 -func (node *Node) applyCommittedLogs() { - for node.lastApplied < node.commitIndex { - node.lastApplied++ - logEntry := node.log[node.lastApplied] - log.Info("node" + node.selfId + "应用日志到状态机: " + logEntry.print()) - err := node.db.Put([]byte(logEntry.LogE.Key), []byte(logEntry.LogE.Value), nil) - if err != nil { - log.Error("应用状态机失败: ", zap.Error(err)) - } - } -} - -// RPC call -func (node *Node) AppendEntries(arg AppendEntriesArg, reply *AppendEntriesReply) error { - node.mu.Lock() - defer node.mu.Unlock() - - // 1. 如果 term 过期,拒绝接受日志 - if node.currTerm > arg.Term { - *reply = AppendEntriesReply{node.currTerm, false} - return nil - } - - node.leaderId = arg.LeaderId // 记录Leader - - if node.currTerm < arg.Term { - log.Info("Node " + node.selfId + " 发现更高 term=" + strconv.Itoa(arg.Term)) - node.currTerm = arg.Term - node.state = Follower - node.votedFor = "" - node.storage.SetTermAndVote(node.currTerm, node.votedFor) - } - node.storage.SetTermAndVote(node.currTerm, node.votedFor) - - // 2. 检查 prevLogIndex 是否有效 - if arg.PrevLogIndex >= len(node.log) || (arg.PrevLogIndex >= 0 && node.log[arg.PrevLogIndex].Term != arg.PrevLogTerm) { - *reply = AppendEntriesReply{node.currTerm, false} - return nil - } - - // 3. 处理日志冲突(如果存在不同 term,则截断日志) - idx := arg.PrevLogIndex + 1 - for i := idx; i < len(node.log) && i-idx < len(arg.Entries); i++ { - if node.log[i].Term != arg.Entries[i-idx].Term { - node.log = node.log[:idx] - break - } - } - // log.Info(strconv.Itoa(idx) + strconv.Itoa(len(node.log))) - - // 4. 追加新的日志条目 - for _, raftLogEntry := range arg.Entries { - log.Info(node.selfId + "结点写入" + raftLogEntry.print()) - if idx < len(node.log) { - node.log[idx] = raftLogEntry - } else { - node.log = append(node.log, raftLogEntry) - } - idx++ - } - - // 暴力持久化 - node.storage.WriteLog(node.log) - - // 更新 maxLogId - node.maxLogId = len(node.log) - 1 - - // 更新 commitIndex - if arg.LeaderCommit < node.maxLogId { - node.commitIndex = arg.LeaderCommit - } else { - node.commitIndex = node.maxLogId - } - - // 提交已提交的日志 - node.applyCommittedLogs() - - // 在成功接受日志或心跳后,重置选举超时 - node.resetElectionTimer() - *reply = AppendEntriesReply{node.currTerm, true} - return nil -} - -type AppendEntriesArg struct { - Term int - LeaderId string - PrevLogIndex int - PrevLogTerm int - Entries []RaftLogEntry - LeaderCommit int -} - -type AppendEntriesReply struct { - Term int - Success bool -} diff --git a/internal/nodes/node_storage.go b/internal/nodes/node_storage.go index 192c222..92b5add 100644 --- a/internal/nodes/node_storage.go +++ b/internal/nodes/node_storage.go @@ -34,7 +34,7 @@ func (rs *RaftStorage) loadData() { file, err := os.Open(rs.filePath) if err != nil { - log.Info("文件不存在:" + rs.filePath) + log.Info("文件未创建:" + rs.filePath) rs.saveData() // 文件不存在时创建默认数据 return } @@ -103,6 +103,7 @@ func (rs *RaftStorage) GetVotedFor() string { return rs.VotedFor } +// 同时设置 func (rs *RaftStorage) SetTermAndVote(term int, candidate string) { rs.mu.Lock() defer rs.mu.Unlock() diff --git a/internal/nodes/replica.go b/internal/nodes/replica.go new file mode 100644 index 0000000..93477f4 --- /dev/null +++ b/internal/nodes/replica.go @@ -0,0 +1,214 @@ +package nodes + +import ( + "math/rand" + "net/rpc" + "sort" + "strconv" + "time" + + "go.uber.org/zap" +) + +type AppendEntriesArg struct { + Term int + LeaderId string + PrevLogIndex int + PrevLogTerm int + Entries []RaftLogEntry + LeaderCommit int +} + +type AppendEntriesReply struct { + Term int + Success bool +} + +// leader收到新内容要广播,以及心跳广播(同步自己的log) +func (node *Node) BroadCastKV(callMode CallMode) { + // 遍历所有节点 + for id := range node.nodes { + go func(id string, kv CallMode) { + node.sendKV(id, callMode) + }(id, callMode) + } +} + +func (node *Node) sendKV(id string, callMode CallMode) { + + switch callMode { + case Fail: + log.Info("模拟发送失败") + // 这么写向所有的node发送都失败,也可以随机数确定是否失败 + case Delay: + log.Info("模拟发送延迟") + // 随机延迟0-5ms + time.Sleep(time.Millisecond * time.Duration(rand.Intn(5))) + default: + } + + client, err := rpc.DialHTTP("tcp", node.nodes[id].address) + if err != nil { + log.Error("dialing: ", zap.Error(err)) + return + } + + defer func(client *rpc.Client) { + err := client.Close() + if err != nil { + log.Error("client close err: ", zap.Error(err)) + } + }(client) + + node.mu.Lock() + defer node.mu.Unlock() + + var appendReply AppendEntriesReply + appendReply.Success = false + nextIndex := node.nextIndex[id] + // log.Info("nextindex " + strconv.Itoa(nextIndex)) + for (!appendReply.Success) { + if nextIndex < 0 { + log.Fatal("assert >= 0 here") + } + sendEntries := node.log[nextIndex:] + arg := AppendEntriesArg{ + Term: node.currTerm, + PrevLogIndex: nextIndex - 1, + Entries: sendEntries, + LeaderCommit: node.commitIndex, + LeaderId: node.selfId, + } + if arg.PrevLogIndex >= 0 { + arg.PrevLogTerm = node.log[arg.PrevLogIndex].Term + } + callErr := client.Call("Node.AppendEntries", arg, &appendReply) // RPC + if callErr != nil { + log.Error("dialing node_"+ id +"fail: ", zap.Error(callErr)) + } + + if appendReply.Term != node.currTerm { + log.Info("Leader[" + node.selfId + "]收到更高的 term=" + strconv.Itoa(appendReply.Term) + ",转换为 Follower") + node.currTerm = appendReply.Term + node.state = Follower + node.votedFor = "" + node.storage.SetTermAndVote(node.currTerm, node.votedFor) + node.resetElectionTimer() + return + } + nextIndex-- // 失败往前传一格 + } + + // 不变成follower情况下 + node.nextIndex[id] = node.maxLogId + 1 + node.matchIndex[id] = node.maxLogId + node.updateCommitIndex() +} + +func (node *Node) updateCommitIndex() { + totalNodes := len(node.nodes) + + // 收集所有 matchIndex 并排序 + matchIndexes := make([]int, 0, totalNodes) + for _, index := range node.matchIndex { + matchIndexes = append(matchIndexes, index) + } + sort.Ints(matchIndexes) // 排序 + + // 计算多数派 commitIndex + majorityIndex := matchIndexes[totalNodes/2] // 取 N/2 位置上的索引(多数派) + + // 确保这个索引的日志条目属于当前 term,防止提交旧 term 的日志 + if majorityIndex > node.commitIndex && majorityIndex < len(node.log) && node.log[majorityIndex].Term == node.currTerm { + node.commitIndex = majorityIndex + log.Info("Leader[" + node.selfId + "]更新 commitIndex: " + strconv.Itoa(majorityIndex)) + + // 应用日志到状态机 + node.applyCommittedLogs() + } +} + +// 应用日志到状态机 +func (node *Node) applyCommittedLogs() { + for node.lastApplied < node.commitIndex { + node.lastApplied++ + logEntry := node.log[node.lastApplied] + log.Sugar().Infof("[%s]应用日志到状态机: " + logEntry.print(), node.selfId) + err := node.db.Put([]byte(logEntry.LogE.Key), []byte(logEntry.LogE.Value), nil) + if err != nil { + log.Error(node.selfId + "应用状态机失败: ", zap.Error(err)) + } + } +} + +// RPC call +func (node *Node) AppendEntries(arg AppendEntriesArg, reply *AppendEntriesReply) error { + node.mu.Lock() + defer node.mu.Unlock() + + // 如果 term 过期,拒绝接受日志 + if node.currTerm > arg.Term { + *reply = AppendEntriesReply{node.currTerm, false} + return nil + } + + node.leaderId = arg.LeaderId // 记录Leader + + // 如果term比自己高,或自己不是follower但收到相同term的心跳 + if node.currTerm < arg.Term || node.state != Follower { + log.Sugar().Infof("[%s]发现更高 term(%s)", node.selfId, strconv.Itoa(arg.Term)) + node.currTerm = arg.Term + node.state = Follower + node.votedFor = "" + node.storage.SetTermAndVote(node.currTerm, node.votedFor) + } + node.storage.SetTermAndVote(node.currTerm, node.votedFor) + + // 检查 prevLogIndex 是否有效 + if arg.PrevLogIndex >= len(node.log) || (arg.PrevLogIndex >= 0 && node.log[arg.PrevLogIndex].Term != arg.PrevLogTerm) { + *reply = AppendEntriesReply{node.currTerm, false} + return nil + } + + // 处理日志冲突(如果存在不同 term,则截断日志) + idx := arg.PrevLogIndex + 1 + for i := idx; i < len(node.log) && i-idx < len(arg.Entries); i++ { + if node.log[i].Term != arg.Entries[i-idx].Term { + node.log = node.log[:idx] + break + } + } + // log.Info(strconv.Itoa(idx) + strconv.Itoa(len(node.log))) + + // 追加新的日志条目 + for _, raftLogEntry := range arg.Entries { + log.Sugar().Infof("[%s]写入:" + raftLogEntry.print(), node.selfId) + if idx < len(node.log) { + node.log[idx] = raftLogEntry + } else { + node.log = append(node.log, raftLogEntry) + } + idx++ + } + + // 暴力持久化 + node.storage.WriteLog(node.log) + + // 更新 maxLogId + node.maxLogId = len(node.log) - 1 + + // 更新 commitIndex + if arg.LeaderCommit < node.maxLogId { + node.commitIndex = arg.LeaderCommit + } else { + node.commitIndex = node.maxLogId + } + + // 提交已提交的日志 + node.applyCommittedLogs() + + // 在成功接受日志或心跳后,重置选举超时 + node.resetElectionTimer() + *reply = AppendEntriesReply{node.currTerm, true} + return nil +} \ No newline at end of file diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go index da41d24..7ccfc7a 100644 --- a/internal/nodes/server_node.go +++ b/internal/nodes/server_node.go @@ -15,7 +15,9 @@ type ServerReply struct{ } // RPC call func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error { - log.Info(node.selfId + "收到客户端write请求") + log.Sugar().Infof("[%s]收到客户端write请求", node.selfId) + + // 自己不是leader,转交leader地址回复 if node.state != Leader { reply.Isleader = false if (node.leaderId == "") { @@ -23,24 +25,26 @@ func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error { return nil } reply.LeaderAddress = node.nodes[node.leaderId].address - log.Info(node.selfId + "转交给" + node.leaderId) + log.Sugar().Infof("[%s]转交给[%s]", node.selfId, node.leaderId) return nil } + // 自己是leader,修改自己的记录并广播 node.maxLogId++ logId := node.maxLogId rLogE := RaftLogEntry{kvCall.LogE, logId, node.currTerm} node.log = append(node.log, rLogE) node.storage.AppendLog(rLogE) - log.Info("leader" + node.selfId + "处理请求 : " + kvCall.LogE.print() + ", 模拟方式 : " + strconv.Itoa(int(kvCall.CallState))) + log.Info("leader[" + node.selfId + "]处理请求 : " + kvCall.LogE.print() + ", 模拟方式 : " + strconv.Itoa(int(kvCall.CallState))) // 广播给其它节点 node.BroadCastKV(kvCall.CallState) reply.Isleader = true return nil } + // RPC call func (node *Node) ReadKey(key string, reply *ServerReply) error { - log.Info("server read : " + key) + log.Sugar().Infof("[%s]收到客户端read请求", node.selfId) // 先只读自己(无论自己是不是leader),也方便测试 value, err := node.db.Get([]byte(key), nil) if err == leveldb.ErrNotFound { diff --git a/internal/nodes/vote.go b/internal/nodes/vote.go index cf3799b..5b1e12a 100644 --- a/internal/nodes/vote.go +++ b/internal/nodes/vote.go @@ -1,6 +1,7 @@ package nodes import ( + "math/rand" "net/rpc" "strconv" "sync" @@ -24,7 +25,7 @@ type RequestVoteReply struct { func (n *Node) startElection() { n.mu.Lock() defer n.mu.Unlock() - // 1. 增加当前任期,转换为 Candidate + // 增加当前任期,转换为 Candidate n.currTerm++ n.state = Candidate n.votedFor = n.selfId // 自己投自己 @@ -32,10 +33,10 @@ func (n *Node) startElection() { log.Sugar().Infof("[%s] 开始选举,当前任期: %d", n.selfId, n.currTerm) - // 2. 重新设置选举超时,防止重复选举 + // 重新设置选举超时,防止重复选举 n.resetElectionTimer() - // 3. 构造 RequestVote 请求 + // 构造 RequestVote 请求 var lastLogIndex int var lastLogTerm int @@ -53,7 +54,7 @@ func (n *Node) startElection() { LastLogTerm: lastLogTerm, } - // 4. 并行向其他节点发送请求投票 + // 并行向其他节点发送请求投票 var mu sync.Mutex cond := sync.NewCond(&mu) totalNodes := len(n.nodes) @@ -81,7 +82,7 @@ func (n *Node) startElection() { grantedVotes++ } - if grantedVotes > totalNodes/2 { + if grantedVotes == totalNodes / 2 + 1 { n.state = Leader log.Sugar().Infof("[%s] 当选 Leader!", n.selfId) n.initLeaderState() @@ -93,7 +94,7 @@ func (n *Node) startElection() { }(peerId) } - // 5. 等待选举结果 + // 等待选举结果 timeout := time.After(300 * time.Millisecond) for { @@ -115,7 +116,7 @@ func (n *Node) startElection() { } func (node *Node) sendRequestVote(peerId string, args *RequestVoteArgs, reply *RequestVoteReply) bool { - log.Sugar().Infof("Sending RequestVote to %s at %s", peerId, node.nodes[peerId].address) + log.Sugar().Infof("[%s] 请求 [%s] 投票给自己", node.selfId, peerId) client, err := rpc.DialHTTP("tcp", node.nodes[peerId].address) if err != nil { log.Error("dialing: ", zap.Error(err)) @@ -139,14 +140,14 @@ func (node *Node) sendRequestVote(peerId string, args *RequestVoteArgs, reply *R func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error { n.mu.Lock() defer n.mu.Unlock() - // 1. 如果候选人的任期小于当前任期,则拒绝投票 + // 如果候选人的任期小于当前任期,则拒绝投票 if args.Term < n.currTerm { reply.Term = n.currTerm reply.VoteGranted = false return nil } - // 2. 如果请求的 Term 更高,则更新当前 Term 并回退为 Follower + // 如果请求的 Term 更高,则更新当前 Term 并回退为 Follower if args.Term > n.currTerm { n.currTerm = args.Term n.state = Follower @@ -154,9 +155,9 @@ func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error n.resetElectionTimer() // 重新设置选举超时 } - // 3. 检查是否已经投过票,且是否投给了同一个候选人 + // 检查是否已经投过票,且是否投给了同一个候选人 if n.votedFor == "" || n.votedFor == args.CandidateId { - // 4. 检查日志是否足够新 + // 检查日志是否足够新 var lastLogIndex int var lastLogTerm int @@ -170,9 +171,9 @@ func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error if args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex) { - // 5. 投票给候选人 + // 够新就投票给候选人 n.votedFor = args.CandidateId - log.Info("term" + strconv.Itoa(n.currTerm) + ", " + n.selfId + "投票给" + n.votedFor) + log.Sugar().Infof("在term(%s), [%s]投票给[%s]", strconv.Itoa(n.currTerm), n.selfId, n.votedFor) reply.VoteGranted = true n.resetElectionTimer() } else { @@ -185,4 +186,20 @@ func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error n.storage.SetTermAndVote(n.currTerm, n.votedFor) reply.Term = n.currTerm return nil +} + +// follower 500-1000ms内没收到appendentries心跳,就变成candidate发起选举 +func (node *Node) resetElectionTimer() { + if node.electionTimer == nil { + node.electionTimer = time.NewTimer(time.Duration(500+rand.Intn(500)) * time.Millisecond) + go func() { + for { + <-node.electionTimer.C + node.startElection() + } + }() + } else { + node.electionTimer.Stop() + node.electionTimer.Reset(time.Duration(500+rand.Intn(500)) * time.Millisecond) + } } \ No newline at end of file diff --git a/scripts/run.sh b/scripts/run.sh deleted file mode 100755 index 794c247..0000000 --- a/scripts/run.sh +++ /dev/null @@ -1,61 +0,0 @@ -#!/bin/bash - -# 设置运行时间限制:s -RUN_TIME=10 - -# 需要传递数据的管道 -PIPE_NAME="/tmp/input_pipe" - -# 启动节点1 -echo "Starting Node 1..." -timeout $RUN_TIME ./main -id 1 -port ":9091" -cluster "127.0.0.1:9092,127.0.0.1:9093" -pipe "$PIPE_NAME" -isleader=true & - -# 启动节点2 -echo "Starting Node 2..." -timeout $RUN_TIME ./main -id 2 -port ":9092" -cluster "127.0.0.1:9091,127.0.0.1:9093" -pipe "$PIPE_NAME" & - -# 启动节点3 -echo "Starting Node 3..." -timeout $RUN_TIME ./main -id 3 -port ":9093" -cluster "127.0.0.1:9091,127.0.0.1:9092" -pipe "$PIPE_NAME"& - -echo "All nodes started successfully!" -# 创建一个管道用于进程间通信 -if [[ ! -p "$PIPE_NAME" ]]; then - mkfifo "$PIPE_NAME" -fi - -# 捕获终端输入并通过管道传递给三个节点 -echo "Enter input to send to nodes:" -start_time=$(date +%s) -while true; do - # 从终端读取用户输入 - read -r user_input - - current_time=$(date +%s) - elapsed_time=$((current_time - start_time)) - - # 如果运行时间大于限制时间,就退出 - if [ $elapsed_time -ge $RUN_TIME ]; then - echo 'Timeout reached, normal exit now' - break - fi - - # 如果输入为空,跳过 - if [[ -z "$user_input" ]]; then - continue - fi - - # 将用户输入发送到管道 - echo "$user_input" > "$PIPE_NAME" - - # 如果输入 "exit",结束脚本 - if [[ "$user_input" == "exit" ]]; then - break - fi -done - -# 删除管道 -rm "$PIPE_NAME" - -# 等待所有节点完成启动 -wait diff --git a/test/restart_follower_test.go b/test/restart_node_test.go similarity index 55% rename from test/restart_follower_test.go rename to test/restart_node_test.go index 3624ad2..db0c600 100644 --- a/test/restart_follower_test.go +++ b/test/restart_node_test.go @@ -11,9 +11,9 @@ import ( "time" ) -func TestFollowerRestart(t *testing.T) { +func TestNodeRestart(t *testing.T) { // 登记结点信息 - n := 3 + n := 5 var clusters []string for i := 0; i < n; i++ { port := fmt.Sprintf("%d", uint16(9090)+uint16(i)) @@ -24,23 +24,13 @@ func TestFollowerRestart(t *testing.T) { // 结点启动 var cmds []*exec.Cmd for i := 0; i < n; i++ { - var cmd *exec.Cmd - if i == 0 { - cmd = ExecuteNodeI(i, true, clusters) - } else { - cmd = ExecuteNodeI(i, true, clusters) - } - - if cmd == nil { - return - } else { - cmds = append(cmds, cmd) - } + cmd := ExecuteNodeI(i, true, clusters) + cmds = append(cmds, cmd) } time.Sleep(time.Second) // 等待启动完毕 - // client启动, 连接leader - cWrite := clientPkg.Client{Address: clusters[0], ServerId: "1"} + // client启动, 连接任意节点 + cWrite := clientPkg.Client{Address: clusters} // 写入 var s clientPkg.Status @@ -53,34 +43,30 @@ func TestFollowerRestart(t *testing.T) { } } time.Sleep(time.Second) // 等待写入完毕 - // 模拟最后一个结点崩溃 - err := cmds[n - 1].Process.Signal(syscall.SIGTERM) - if err != nil { - fmt.Println("Error sending signal:", err) - return - } - // 继续写入 - for i := 5; i < 10; i++ { - key := strconv.Itoa(i) - newlog := nodes.LogEntry{Key: key, Value: "hello"} - s := cWrite.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal}) - if s != clientPkg.Ok { - t.Errorf("write test fail") - } - } - // 恢复结点 - cmd := ExecuteNodeI(n - 1, false, clusters) - if cmd == nil { - t.Errorf("recover test1 fail") - return - } else { - cmds[n - 1] = cmd + + // 模拟结点轮流崩溃 + for i := 0; i < n; i++ { + err := cmds[i].Process.Signal(syscall.SIGTERM) + if err != nil { + fmt.Println("Error sending signal:", err) + return + } + + time.Sleep(time.Second) + cmd := ExecuteNodeI(i, false, clusters) + if cmd == nil { + t.Errorf("recover test1 fail") + return + } else { + cmds[i] = cmd + } + time.Sleep(time.Second) // 等待启动完毕 } - time.Sleep(time.Second) // 等待启动完毕 - // client启动, 连接节点n-1(去读它的数据) - cRead := clientPkg.Client{Address: clusters[n - 1], ServerId: "n"} - // 读崩溃前写入数据 + + // client启动 + cRead := clientPkg.Client{Address: clusters} + // 读写入数据 for i := 0; i < 5; i++ { key := strconv.Itoa(i) var value string @@ -100,7 +86,7 @@ func TestFollowerRestart(t *testing.T) { } } - // 通知进程结束 + // 通知所有进程结束 for _, cmd := range cmds { err := cmd.Process.Signal(syscall.SIGTERM) if err != nil { @@ -108,5 +94,4 @@ func TestFollowerRestart(t *testing.T) { return } } - } diff --git a/test/server_client_test.go b/test/server_client_test.go index 72ad6d1..ff6835b 100644 --- a/test/server_client_test.go +++ b/test/server_client_test.go @@ -25,17 +25,12 @@ func TestServerClient(t *testing.T) { var cmds []*exec.Cmd for i := 0; i < n; i++ { cmd := ExecuteNodeI(i, true, clusters) - - if cmd == nil { - return - } else { - cmds = append(cmds, cmd) - } + cmds = append(cmds, cmd) } time.Sleep(time.Second) // 等待启动完毕 // client启动 - c := clientPkg.Client{Address: "127.0.0.1:9092", ServerId: "3"} + c := clientPkg.Client{Address: clusters} // 写入 var s clientPkg.Status