From 053dbe107fe2e7fbb4b69e1134033772c8d5e2b1 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Sat, 22 Mar 2025 17:25:59 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=81=E8=A3=85=E4=BA=86rpc=E8=B6=85?= =?UTF-8?q?=E6=97=B6=EF=BC=8C=E4=BF=AE=E5=A4=8D=E5=B9=B6=E5=8F=91candidate?= =?UTF-8?q?=E6=97=B6=E7=9A=84=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/client/client_node.go | 13 +++++++------ internal/nodes/init.go | 43 ++++++++++++++++++++++++++++++++++++++++-- internal/nodes/replica.go | 10 +++++++--- internal/nodes/server_node.go | 6 +++--- internal/nodes/vote.go | 32 +++++++++---------------------- 5 files changed, 67 insertions(+), 37 deletions(-) diff --git a/internal/client/client_node.go b/internal/client/client_node.go index 929c601..6669c6f 100644 --- a/internal/client/client_node.go +++ b/internal/client/client_node.go @@ -42,10 +42,11 @@ func (client *Client) FindActiveNode() *rpc.Client { var c *rpc.Client for { // 直到找到一个可连接的节点(保证至少一个节点活着) addr := getRandomAddress(client.Address) - c, err = rpc.DialHTTP("tcp", addr) + c, err = nodes.DialHTTPWithTimeout("tcp", addr) if err != nil { log.Error("dialing: ", zap.Error(err)) } else { + log.Sugar().Info("client发现活跃节点地址[%s]", addr) return c } } @@ -67,7 +68,7 @@ func (client *Client) Write(kvCall nodes.LogEntryCall) Status { var err error for !reply.Isleader { // 根据存活节点的反馈,直到找到leader - callErr := c.Call("Node.WriteKV", kvCall, &reply) // RPC + callErr := nodes.CallWithTimeout(c, "Node.WriteKV", &kvCall, &reply) // RPC if callErr != nil { // dial和call之间可能崩溃,重新找存活节点 log.Error("dialing: ", zap.Error(callErr)) client.CloseRpcClient(c) @@ -78,7 +79,7 @@ func (client *Client) Write(kvCall nodes.LogEntryCall) Status { if !reply.Isleader { // 对方不是leader,根据反馈找leader addr := reply.LeaderAddress client.CloseRpcClient(c) - c, err = rpc.DialHTTP("tcp", addr) + c, err = nodes.DialHTTPWithTimeout("tcp", addr) for err != nil { // 重新找下一个存活节点 c = client.FindActiveNode() } @@ -101,7 +102,7 @@ func (client *Client) Read(key string, value *string) Status { // 查不到value c = client.FindActiveNode() var reply nodes.ServerReply - callErr := c.Call("Node.ReadKey", key, &reply) // RPC + callErr := nodes.CallWithTimeout(c, "Node.ReadKey", &key, &reply) // RPC if callErr != nil { log.Error("dialing: ", zap.Error(callErr)) client.CloseRpcClient(c) @@ -128,7 +129,7 @@ func (client *Client) FindLeader() string { var err error for !reply.Isleader { // 根据存活节点的反馈,直到找到leader - callErr := c.Call("Node.FindLeader", &arg, &reply) // RPC + callErr := nodes.CallWithTimeout(c, "Node.FindLeader", &arg, &reply) // RPC if callErr != nil { // dial和call之间可能崩溃,重新找存活节点 log.Error("dialing: ", zap.Error(callErr)) client.CloseRpcClient(c) @@ -139,7 +140,7 @@ func (client *Client) FindLeader() string { if !reply.Isleader { // 对方不是leader,根据反馈找leader addr := client.Address[reply.LeaderId] client.CloseRpcClient(c) - c, err = rpc.DialHTTP("tcp", addr) + c, err = nodes.DialHTTPWithTimeout("tcp", addr) for err != nil { // 重新找下一个存活节点 c = client.FindActiveNode() } diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 170cfe0..b0f4aa8 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -1,6 +1,7 @@ package nodes import ( + // "context" "fmt" "net" "net/http" @@ -30,7 +31,7 @@ func Init(selfId string, nodeAddr map[string]string, db *leveldb.DB, rstorage *R // 创建节点 node := &Node{ selfId: selfId, - leaderId: "", + leaderId: "", nodes: ns, maxLogId: -1, // 后来发现论文中是从1开始的(初始0),但不想改了 currTerm: 1, @@ -40,7 +41,7 @@ func Init(selfId string, nodeAddr map[string]string, db *leveldb.DB, rstorage *R nextIndex: make(map[string]int), matchIndex: make(map[string]int), db: db, - storage: rstorage, + storage: rstorage, } node.initLeaderState() if isRestart { @@ -74,6 +75,7 @@ func Start(node *Node) { case Leader: // 发送心跳 fmt.Printf("[%s] is the leader, 发送心跳...\n", node.selfId) + node.resetElectionTimer() // leader不主动触发选举 node.BroadCastKV(Normal) } time.Sleep(50 * time.Millisecond) @@ -81,7 +83,9 @@ func Start(node *Node) { }() } +// 初始时注册rpc方法 func (node *Node) Rpc(port string) { + err := rpc.Register(node) if err != nil { log.Fatal("rpc register failed", zap.Error(err)) @@ -99,3 +103,38 @@ func (node *Node) Rpc(port string) { } }() } + +// 封装有超时的dial +func DialHTTPWithTimeout(network, address string) (*rpc.Client, error) { + done := make(chan struct{}) + var client *rpc.Client + var err error + + go func() { + client, err = rpc.DialHTTP(network, address) + close(done) + }() + + select { + case <-done: + return client, err + case <-time.After(50 * time.Millisecond): + return nil, fmt.Errorf("dial timeout: %s", address) + } +} + +// 封装有超时的call +func CallWithTimeout[T1 any, T2 any](client *rpc.Client, serviceMethod string, args *T1, reply *T2) error { + done := make(chan error, 1) + + go func() { + done <- client.Call(serviceMethod, args, reply) + }() + + select { + case err := <-done: + return err + case <-time.After(50 * time.Millisecond): + return fmt.Errorf("call timeout: %s", serviceMethod) + } +} diff --git a/internal/nodes/replica.go b/internal/nodes/replica.go index 93477f4..41be21f 100644 --- a/internal/nodes/replica.go +++ b/internal/nodes/replica.go @@ -47,7 +47,7 @@ func (node *Node) sendKV(id string, callMode CallMode) { default: } - client, err := rpc.DialHTTP("tcp", node.nodes[id].address) + client, err := DialHTTPWithTimeout("tcp", node.nodes[id].address) if err != nil { log.Error("dialing: ", zap.Error(err)) return @@ -82,7 +82,7 @@ func (node *Node) sendKV(id string, callMode CallMode) { if arg.PrevLogIndex >= 0 { arg.PrevLogTerm = node.log[arg.PrevLogIndex].Term } - callErr := client.Call("Node.AppendEntries", arg, &appendReply) // RPC + callErr := CallWithTimeout(client, "Node.AppendEntries", &arg, &appendReply) // RPC if callErr != nil { log.Error("dialing node_"+ id +"fail: ", zap.Error(callErr)) } @@ -142,7 +142,11 @@ func (node *Node) applyCommittedLogs() { } // RPC call -func (node *Node) AppendEntries(arg AppendEntriesArg, reply *AppendEntriesReply) error { +func (node *Node) AppendEntries(arg *AppendEntriesArg, reply *AppendEntriesReply) error { + // start := time.Now() + // defer func() { + // log.Sugar().Infof("AppendEntries 处理时间: %v", time.Since(start)) + // }() node.mu.Lock() defer node.mu.Unlock() diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go index a666ca9..1e7d90f 100644 --- a/internal/nodes/server_node.go +++ b/internal/nodes/server_node.go @@ -14,7 +14,7 @@ type ServerReply struct{ Value string } // RPC call -func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error { +func (node *Node) WriteKV(kvCall *LogEntryCall, reply *ServerReply) error { log.Sugar().Infof("[%s]收到客户端write请求", node.selfId) // 自己不是leader,转交leader地址回复 @@ -43,10 +43,10 @@ func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error { } // RPC call -func (node *Node) ReadKey(key string, reply *ServerReply) error { +func (node *Node) ReadKey(key *string, reply *ServerReply) error { log.Sugar().Infof("[%s]收到客户端read请求", node.selfId) // 先只读自己(无论自己是不是leader),也方便测试 - value, err := node.db.Get([]byte(key), nil) + value, err := node.db.Get([]byte(*key), nil) if err == leveldb.ErrNotFound { reply.HaveValue = false } else { diff --git a/internal/nodes/vote.go b/internal/nodes/vote.go index 9ad9e9d..cbe5654 100644 --- a/internal/nodes/vote.go +++ b/internal/nodes/vote.go @@ -56,7 +56,6 @@ func (n *Node) startElection() { // 并行向其他节点发送请求投票 var mu sync.Mutex - cond := sync.NewCond(&mu) totalNodes := len(n.nodes) grantedVotes := 1 // 自己的票 @@ -86,7 +85,6 @@ func (n *Node) startElection() { n.state = Leader log.Sugar().Infof("[%s] 当选 Leader!", n.selfId) n.initLeaderState() - cond.Broadcast() } mu.Unlock() @@ -95,31 +93,19 @@ func (n *Node) startElection() { } // 等待选举结果 - timeout := time.After(300 * time.Millisecond) - - for { - mu.Lock() - if n.state != Candidate { // 选举成功或回退,不再等待 - mu.Unlock() - return - } - select { - case <-timeout: - log.Sugar().Infof("[%s] 选举超时,重新发起选举", n.selfId) - n.state = Follower - n.resetElectionTimer() - mu.Unlock() - return - default: - cond.Wait() - } - mu.Unlock() + time.Sleep(300 * time.Millisecond) + mu.Lock() + if n.state == Candidate { + log.Sugar().Infof("[%s] 选举超时,重新发起选举", n.selfId) + // n.state = Follower 这里不修改,如果appendentries收到term合理的心跳,再变回follower + n.resetElectionTimer() } + mu.Unlock() } func (node *Node) sendRequestVote(peerId string, args *RequestVoteArgs, reply *RequestVoteReply) bool { log.Sugar().Infof("[%s] 请求 [%s] 投票给自己", node.selfId, peerId) - client, err := rpc.DialHTTP("tcp", node.nodes[peerId].address) + client, err := DialHTTPWithTimeout("tcp", node.nodes[peerId].address) if err != nil { log.Error("dialing: ", zap.Error(err)) return false @@ -132,7 +118,7 @@ func (node *Node) sendRequestVote(peerId string, args *RequestVoteArgs, reply *R } }(client) - callErr := client.Call("Node.RequestVote", args, reply) // RPC + callErr := CallWithTimeout(client, "Node.RequestVote", args, reply) // RPC if callErr != nil { log.Error("dialing node_"+peerId+"fail: ", zap.Error(callErr)) }