diff --git a/internal/nodes/init.go b/internal/nodes/init.go index dc9de96..a393a5e 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -180,7 +180,7 @@ func (n *Node) initLeaderState() { func Start(node *Node, quitChan chan struct{}) { node.State = Follower // 所有节点以 Follower 状态启动 - node.resetElectionTimer() // 启动选举超时定时器 + node.ResetElectionTimer() // 启动选举超时定时器 go func() { ticker := time.NewTicker(50 * time.Millisecond) @@ -201,7 +201,7 @@ func Start(node *Node, quitChan chan struct{}) { case Leader: // 发送心跳 // fmt.Printf("[%s] is the leader, 发送心跳...\n", node.SelfId) - node.resetElectionTimer() // leader 不主动触发选举 + node.ResetElectionTimer() // leader 不主动触发选举 node.BroadCastKV() } } diff --git a/internal/nodes/replica.go b/internal/nodes/replica.go index be380f7..f4511a4 100644 --- a/internal/nodes/replica.go +++ b/internal/nodes/replica.go @@ -45,7 +45,7 @@ func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) { if *failCount == len(node.Nodes) / 2 + 1 { // 无法联系超过半数:自己有问题,降级 node.LeaderId = "" node.State = Follower - node.resetElectionTimer() + node.ResetElectionTimer() } failMutex.Unlock() return @@ -88,7 +88,7 @@ func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) { if *failCount == len(node.Nodes) / 2 + 1 { // 无法联系超过半数:自己有问题,降级 node.LeaderId = "" node.State = Follower - node.resetElectionTimer() + node.ResetElectionTimer() } failMutex.Unlock() return @@ -101,7 +101,7 @@ func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) { node.State = Follower node.VotedFor = "" node.Storage.SetTermAndVote(node.CurrTerm, node.VotedFor) - node.resetElectionTimer() + node.ResetElectionTimer() return } NextIndex-- // 失败往前传一格 @@ -221,7 +221,7 @@ func (node *Node) AppendEntries(arg *AppendEntriesArg, reply *AppendEntriesReply node.applyCommittedLogs() // 在成功接受日志或心跳后,重置选举超时 - node.resetElectionTimer() + node.ResetElectionTimer() *reply = AppendEntriesReply{node.CurrTerm, true} return nil } \ No newline at end of file diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go index 09b3223..03a79c9 100644 --- a/internal/nodes/server_node.go +++ b/internal/nodes/server_node.go @@ -14,6 +14,8 @@ type ServerReply struct{ // RPC call func (node *Node) WriteKV(kvCall *LogEntryCall, reply *ServerReply) error { log.Sugar().Infof("[%s]收到客户端write请求", node.SelfId) + node.Mu.Lock() + defer node.Mu.Unlock() // 自己不是leader,转交leader地址回复 if node.State != Leader { @@ -39,6 +41,9 @@ func (node *Node) WriteKV(kvCall *LogEntryCall, reply *ServerReply) error { // RPC call func (node *Node) ReadKey(key *string, reply *ServerReply) error { log.Sugar().Infof("[%s]收到客户端read请求", node.SelfId) + node.Mu.Lock() + defer node.Mu.Unlock() + // 先只读自己(无论自己是不是leader),也方便测试 value, err := node.Db.Get([]byte(*key), nil) if err == leveldb.ErrNotFound { @@ -57,6 +62,9 @@ type FindLeaderReply struct{ LeaderId string } func (node *Node) FindLeader(_ struct{}, reply *FindLeaderReply) error { + node.Mu.Lock() + defer node.Mu.Unlock() + // 自己不是leader,转交leader地址回复 if node.State != Leader { reply.Isleader = false diff --git a/internal/nodes/vote.go b/internal/nodes/vote.go index 2f6fefb..3568610 100644 --- a/internal/nodes/vote.go +++ b/internal/nodes/vote.go @@ -21,7 +21,7 @@ type RequestVoteReply struct { VoteGranted bool // 是否同意投票 } -func (n *Node) startElection() { +func (n *Node) StartElection() { n.Mu.Lock() defer n.Mu.Unlock() // 增加当前任期,转换为 Candidate @@ -33,7 +33,7 @@ func (n *Node) startElection() { log.Sugar().Infof("[%s] 开始选举,当前任期: %d", n.SelfId, n.CurrTerm) // 重新设置选举超时,防止重复选举 - n.resetElectionTimer() + n.ResetElectionTimer() // 构造 RequestVote 请求 var lastLogIndex int @@ -71,7 +71,7 @@ func (n *Node) startElection() { n.State = Follower n.VotedFor = "" n.Storage.SetTermAndVote(n.CurrTerm, n.VotedFor) - n.resetElectionTimer() + n.ResetElectionTimer() Mu.Unlock() return } @@ -95,9 +95,9 @@ func (n *Node) startElection() { time.Sleep(300 * time.Millisecond) Mu.Lock() if n.State == Candidate { - log.Sugar().Infof("[%s] 选举超时,重新发起选举", n.SelfId) + log.Sugar().Infof("[%s] 选举超时,等待后将重新发起选举", n.SelfId) // n.State = Follower 这里不修改,如果appendentries收到term合理的心跳,再变回follower - n.resetElectionTimer() + n.ResetElectionTimer() } Mu.Unlock() } @@ -139,7 +139,7 @@ func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error n.CurrTerm = args.Term n.State = Follower n.VotedFor = "" - n.resetElectionTimer() // 重新设置选举超时 + n.ResetElectionTimer() // 重新设置选举超时 } // 检查是否已经投过票,且是否投给了同一个候选人 @@ -162,7 +162,7 @@ func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error n.VotedFor = args.CandidateId log.Sugar().Infof("在term(%s), [%s]投票给[%s]", strconv.Itoa(n.CurrTerm), n.SelfId, n.VotedFor) reply.VoteGranted = true - n.resetElectionTimer() + n.ResetElectionTimer() } else { reply.VoteGranted = false } @@ -176,13 +176,13 @@ func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error } // follower 500-1000ms内没收到appendentries心跳,就变成candidate发起选举 -func (node *Node) resetElectionTimer() { +func (node *Node) ResetElectionTimer() { if node.ElectionTimer == nil { node.ElectionTimer = time.NewTimer(time.Duration(500+rand.Intn(500)) * time.Millisecond) go func() { for { <-node.ElectionTimer.C - node.startElection() + node.StartElection() } }() } else { diff --git a/threadTest/common.go b/threadTest/common.go index 0d87af1..18c9598 100644 --- a/threadTest/common.go +++ b/threadTest/common.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "simple-kv-store/internal/nodes" + "testing" "github.com/syndtr/goleveldb/leveldb" ) @@ -36,3 +37,39 @@ func ExecuteNodeI(id string, isRestart bool, peerIds []string, threadTransport * go nodes.Start(node, quitChan) return node, quitChan } + +func CheckOneLeader(t *testing.T, nodeCollections []* nodes.Node) { + cnt := 0 + for _, node := range nodeCollections { + if node.State == nodes.Leader { + cnt++ + } + } + if cnt != 1 { + t.Errorf("实际有%d个leader(!=1)", cnt) + } +} + +func CheckZeroOrOneLeader(t *testing.T, nodeCollections []* nodes.Node) { + cnt := 0 + for _, node := range nodeCollections { + if node.State == nodes.Leader { + cnt++ + } + } + if cnt > 1 { + t.Errorf("实际有%d个leader(>1)", cnt) + } +} + +func CheckIsLeader(t *testing.T, node *nodes.Node) { + if node.State != nodes.Leader { + t.Errorf("[%s]不是leader", node.SelfId) + } +} + +func CheckTerm(t *testing.T, node *nodes.Node, targetTerm int) { + if node.CurrTerm != targetTerm { + t.Errorf("[%s]实际term=%d (!=%d)", node.SelfId, node.CurrTerm, targetTerm) + } +} diff --git a/threadTest/election_test.go b/threadTest/election_test.go new file mode 100644 index 0000000..4b762b4 --- /dev/null +++ b/threadTest/election_test.go @@ -0,0 +1,217 @@ +package threadTest + +import ( + "fmt" + "os" + "simple-kv-store/internal/nodes" + "strconv" + "testing" + "time" + + "github.com/syndtr/goleveldb/leveldb" +) + +func ExecuteStaticNodeI(id string, isRestart bool, peerIds []string, threadTransport *nodes.ThreadTransport) (*nodes.Node, chan struct{}) { + if !isRestart { + os.RemoveAll("storage/node" + id) + } + + os.RemoveAll("leveldb/simple-kv-store" + id) + + db, err := leveldb.OpenFile("leveldb/simple-kv-store" + id, nil) + if err != nil { + fmt.Println("Failed to open database: ", err) + } + + // 打开或创建节点数据持久化文件 + storage := nodes.NewRaftStorage("storage/node" + id) + + var otherIds []string + for _, ids := range peerIds { + if ids != id { + otherIds = append(otherIds, ids) // 删除目标元素 + } + } + // 初始化 + node, quitChan := nodes.InitThreadNode(id, otherIds, db, storage, isRestart, threadTransport) + + // 开启 raft + // go nodes.Start(node, quitChan) + return node, quitChan +} + +func StopElectionReset(nodeCollections [] *nodes.Node, quitCollections []chan struct{}) { + for i := 0; i < len(quitCollections); i++ { + node := nodeCollections[i] + quitChan := quitCollections[i] + go func(node *nodes.Node, quitChan chan struct{}) { + ticker := time.NewTicker(400 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-quitChan: + return // 退出 goroutine + + case <-ticker.C: + node.ResetElectionTimer() // 不主动触发选举 + } + } + }(node, quitChan) + } + +} + +func TestInitElection(t *testing.T) { + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + threadTransport := nodes.NewThreadTransport() + for i := 0; i < n; i++ { + n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + StopElectionReset(nodeCollections, quitCollections) + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + for i := 0; i < n; i++ { + nodeCollections[i].State = nodes.Follower + } + + nodeCollections[0].StartElection() + time.Sleep(time.Second) + + CheckOneLeader(t, nodeCollections) + CheckIsLeader(t, nodeCollections[0]) + CheckTerm(t, nodeCollections[0], 2) +} + +func TestRepeatElection(t *testing.T) { + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + threadTransport := nodes.NewThreadTransport() + for i := 0; i < n; i++ { + n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + StopElectionReset(nodeCollections, quitCollections) + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + for i := 0; i < n; i++ { + nodeCollections[i].State = nodes.Follower + } + + go nodeCollections[0].StartElection() + go nodeCollections[0].StartElection() + time.Sleep(time.Second) + + CheckOneLeader(t, nodeCollections) + CheckIsLeader(t, nodeCollections[0]) + CheckTerm(t, nodeCollections[0], 3) +} + +func TestBelowHalfCandidateElection(t *testing.T) { + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + threadTransport := nodes.NewThreadTransport() + for i := 0; i < n; i++ { + n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + StopElectionReset(nodeCollections, quitCollections) + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + for i := 0; i < n; i++ { + nodeCollections[i].State = nodes.Follower + } + + go nodeCollections[0].StartElection() + go nodeCollections[1].StartElection() + time.Sleep(time.Second) + + CheckOneLeader(t, nodeCollections) + for i := 0; i < n; i++ { + CheckTerm(t, nodeCollections[i], 2) + } +} + +func TestOverHalfCandidateElection(t *testing.T) { + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + threadTransport := nodes.NewThreadTransport() + for i := 0; i < n; i++ { + n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + StopElectionReset(nodeCollections, quitCollections) + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + for i := 0; i < n; i++ { + nodeCollections[i].State = nodes.Follower + } + + go nodeCollections[0].StartElection() + go nodeCollections[1].StartElection() + go nodeCollections[2].StartElection() + time.Sleep(time.Second) + + CheckZeroOrOneLeader(t, nodeCollections) + for i := 0; i < n; i++ { + CheckTerm(t, nodeCollections[i], 2) + } +} \ No newline at end of file