From 33a41f95c080bb2bf8b83c641df88735be5f5f77 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Mon, 7 Apr 2025 21:08:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=A7=E8=A7=84=E6=A8=A1debug=EF=BC=8C?= =?UTF-8?q?=E9=80=9A=E8=BF=87=E9=9A=8F=E6=9C=BA=E6=B6=88=E6=81=AF=E7=9A=84?= =?UTF-8?q?fuzz=5Ftest?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + internal/client/client_node.go | 20 +++-- internal/logprovider/traceback.go | 15 ++++ internal/nodes/init.go | 130 +++++++++++++++++-------------- internal/nodes/node.go | 3 + internal/nodes/node_storage.go | 20 +++++ internal/nodes/random_timetable.go | 12 ++- internal/nodes/replica.go | 86 +++++++++++++++++---- internal/nodes/server_node.go | 9 ++- internal/nodes/thread_transport.go | 2 +- internal/nodes/vote.go | 43 +++++++++-- threadTest/common.go | 97 ++++++++++++++++++----- threadTest/election_test.go | 14 ++-- threadTest/fuzz/fuzz_test.go | 152 +++++++++++++++++++++++++++++++++++++ threadTest/log_replication_test.go | 16 ++-- 15 files changed, 498 insertions(+), 122 deletions(-) create mode 100644 internal/logprovider/traceback.go create mode 100644 threadTest/fuzz/fuzz_test.go diff --git a/.gitignore b/.gitignore index c982650..afb9a4e 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,4 @@ main leveldb storage *.log +testdata diff --git a/internal/client/client_node.go b/internal/client/client_node.go index 4d2d4e8..3474b6a 100644 --- a/internal/client/client_node.go +++ b/internal/client/client_node.go @@ -4,6 +4,7 @@ import ( "math/rand" "simple-kv-store/internal/logprovider" "simple-kv-store/internal/nodes" + "time" "go.uber.org/zap" ) @@ -64,12 +65,21 @@ func (client *Client) Write(kv nodes.LogEntry) Status { Id: nodes.LogEntryCallId{ClientId: client.ClientId, LogId: client.NextLogId}} client.NextLogId++ - var reply nodes.ServerReply - reply.Isleader = false c := client.FindActiveNode() var err error - for !reply.Isleader { // 根据存活节点的反馈,直到找到leader + timeout := 5 * time.Second + deadline := time.Now().Add(timeout) + + for { // 根据存活节点的反馈,直到找到leader + if time.Now().After(deadline) { + log.Error("系统繁忙,疑似出错") + return Fail + } + + var reply nodes.ServerReply + reply.Isleader = false + callErr := client.Transport.CallWithTimeout(c, "Node.WriteKV", &kvCall, &reply) // RPC if callErr != nil { // dial和call之间可能崩溃,重新找存活节点 log.Error("dialing: ", zap.Error(callErr)) @@ -85,7 +95,7 @@ func (client *Client) Write(kv nodes.LogEntry) Status { c = client.FindActiveNode() } else { // dial leader c, err = client.Transport.DialHTTPWithTimeout("tcp", "", leaderId) - for err != nil { // dial失败,重新找下一个存活节点 + if err != nil { // dial失败,重新找下一个存活节点 c = client.FindActiveNode() } } @@ -94,8 +104,6 @@ func (client *Client) Write(kv nodes.LogEntry) Status { return Ok } } - log.Fatal("客户端会一直找存活节点,不会运行到这里") - return Fail } func (client *Client) Read(key string, value *string) Status { // 查不到value为空 diff --git a/internal/logprovider/traceback.go b/internal/logprovider/traceback.go new file mode 100644 index 0000000..98cfcc7 --- /dev/null +++ b/internal/logprovider/traceback.go @@ -0,0 +1,15 @@ +package logprovider + +import ( + "fmt" + "os" + "runtime/debug" +) +func DebugTraceback(errFuncName string) { + if r := recover(); r != nil { + msg := fmt.Sprintf("panic in goroutine: %v\n%s", r, debug.Stack()) + f, _ := os.Create(errFuncName + ".log") + fmt.Fprint(f, msg) + f.Close() + } +} \ No newline at end of file diff --git a/internal/nodes/init.go b/internal/nodes/init.go index bacf26a..3e13373 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -39,6 +39,7 @@ func InitRPCNode(SelfId string, port string, nodeAddr map[string]string, db *lev Transport: &HTTPTransport{NodeMap: nodeAddr}, RTTable: NewRTTable(), SeenRequests: make(map[LogEntryCallId]bool), + IsFinish: false, } node.initLeaderState() if isRestart { @@ -94,6 +95,7 @@ func InitThreadNode(SelfId string, peerIds []string, db *leveldb.DB, rstorage *R Transport: threadTransport, RTTable: NewRTTable(), SeenRequests: make(map[LogEntryCallId]bool), + IsFinish: false, } node.initLeaderState() if isRestart { @@ -111,7 +113,7 @@ func InitThreadNode(SelfId string, peerIds []string, db *leveldb.DB, rstorage *R } func (node *Node) listenForChan(rpcChan chan RPCRequest, quitChan chan struct{}) { - defer node.Db.Close() + defer logprovider.DebugTraceback("listen") for { select { @@ -126,63 +128,19 @@ func (node *Node) listenForChan(rpcChan chan RPCRequest, quitChan chan struct{}) if !ok2 { log.Fatal("没有设置对应的delay时间") } - time.Sleep(duration) + go node.switchReq(req, duration) case FailRpc: continue } - switch req.ServiceMethod { - case "Node.AppendEntries": - arg, ok := req.Args.(*AppendEntriesArg) - resp, ok2 := req.Reply.(*AppendEntriesReply) - if !ok || !ok2 { - req.Done <- errors.New("type assertion failed for AppendEntries") - } else { - req.Done <- node.AppendEntries(arg, resp) - } - - case "Node.RequestVote": - arg, ok := req.Args.(*RequestVoteArgs) - resp, ok2 := req.Reply.(*RequestVoteReply) - if !ok || !ok2 { - req.Done <- errors.New("type assertion failed for RequestVote") - } else { - req.Done <- node.RequestVote(arg, resp) - } - - case "Node.WriteKV": - arg, ok := req.Args.(*LogEntryCall) - resp, ok2 := req.Reply.(*ServerReply) - if !ok || !ok2 { - req.Done <- errors.New("type assertion failed for WriteKV") - } else { - req.Done <- node.WriteKV(arg, resp) - } - - case "Node.ReadKey": - arg, ok := req.Args.(*string) - resp, ok2 := req.Reply.(*ServerReply) - if !ok || !ok2 { - req.Done <- errors.New("type assertion failed for ReadKey") - } else { - req.Done <- node.ReadKey(arg, resp) - } - - case "Node.FindLeader": - arg, ok := req.Args.(struct{}) - resp, ok2 := req.Reply.(*FindLeaderReply) - if !ok || !ok2 { - req.Done <- errors.New("type assertion failed for FindLeader") - } else { - req.Done <- node.FindLeader(arg, resp) - } - - default: - req.Done <- fmt.Errorf("未知方法: %s", req.ServiceMethod) - } + go node.switchReq(req, 0) + case <-quitChan: - log.Sugar().Infof("[%s] 监听线程收到退出信号", node.SelfId) + node.Mu.Lock() + defer node.Mu.Unlock() + log.Sugar().Infof("[%s] 监听线程收到退出信号", node.SelfId) + node.IsFinish = true node.Db.Close() node.Storage.Close() return @@ -190,6 +148,61 @@ func (node *Node) listenForChan(rpcChan chan RPCRequest, quitChan chan struct{}) } } +func (node *Node) switchReq(req RPCRequest, delayTime time.Duration) { + defer logprovider.DebugTraceback("switch") + time.Sleep(delayTime) + + switch req.ServiceMethod { + case "Node.AppendEntries": + arg, ok := req.Args.(*AppendEntriesArg) + resp, ok2 := req.Reply.(*AppendEntriesReply) + if !ok || !ok2 { + req.Done <- errors.New("type assertion failed for AppendEntries") + } else { + req.Done <- node.AppendEntries(arg, resp) + } + + case "Node.RequestVote": + arg, ok := req.Args.(*RequestVoteArgs) + resp, ok2 := req.Reply.(*RequestVoteReply) + if !ok || !ok2 { + req.Done <- errors.New("type assertion failed for RequestVote") + } else { + req.Done <- node.RequestVote(arg, resp) + } + + case "Node.WriteKV": + arg, ok := req.Args.(*LogEntryCall) + resp, ok2 := req.Reply.(*ServerReply) + if !ok || !ok2 { + req.Done <- errors.New("type assertion failed for WriteKV") + } else { + req.Done <- node.WriteKV(arg, resp) + } + + case "Node.ReadKey": + arg, ok := req.Args.(*string) + resp, ok2 := req.Reply.(*ServerReply) + if !ok || !ok2 { + req.Done <- errors.New("type assertion failed for ReadKey") + } else { + req.Done <- node.ReadKey(arg, resp) + } + + case "Node.FindLeader": + arg, ok := req.Args.(struct{}) + resp, ok2 := req.Reply.(*FindLeaderReply) + if !ok || !ok2 { + req.Done <- errors.New("type assertion failed for FindLeader") + } else { + req.Done <- node.FindLeader(arg, resp) + } + + default: + req.Done <- fmt.Errorf("未知方法: %s", req.ServiceMethod) + } +} + // 共同部分和启动 func (n *Node) initLeaderState() { for _, peerId := range n.Nodes { @@ -199,10 +212,13 @@ func (n *Node) initLeaderState() { } func Start(node *Node, quitChan chan struct{}) { + node.Mu.Lock() node.State = Follower // 所有节点以 Follower 状态启动 + node.Mu.Unlock() node.ResetElectionTimer() // 启动选举超时定时器 go func() { + defer logprovider.DebugTraceback("start") ticker := time.NewTicker(50 * time.Millisecond) defer ticker.Stop() @@ -213,14 +229,16 @@ func Start(node *Node, quitChan chan struct{}) { return // 退出 goroutine case <-ticker.C: - switch node.State { + node.Mu.Lock() + state := node.State + node.Mu.Unlock() + + switch state { case Follower: // 监听心跳超时 - // fmt.Printf("[%s] is a follower, 监听中...\n", node.SelfId) - + case Leader: // 发送心跳 - // fmt.Printf("[%s] is the leader, 发送心跳...\n", node.SelfId) node.ResetElectionTimer() // leader 不主动触发选举 node.BroadCastKV() } diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 82c9b1f..7bacadb 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -17,6 +17,7 @@ const ( type Node struct { Mu sync.Mutex + MuElection sync.Mutex // 当前节点id SelfId string // 记录的leader(不能用votedfor:投票的leader可能没有收到多数票) @@ -65,5 +66,7 @@ type Node struct { // 已经处理过的客户端请求 SeenRequests map[LogEntryCallId]bool + + IsFinish bool } diff --git a/internal/nodes/node_storage.go b/internal/nodes/node_storage.go index 353223f..b6434f4 100644 --- a/internal/nodes/node_storage.go +++ b/internal/nodes/node_storage.go @@ -15,6 +15,7 @@ type RaftStorage struct { mu sync.Mutex db *leveldb.DB filePath string + isfinish bool } // NewRaftStorage 创建 Raft 存储 @@ -27,6 +28,7 @@ func NewRaftStorage(filePath string) *RaftStorage { return &RaftStorage{ db: db, filePath: filePath, + isfinish: false, } } @@ -34,6 +36,9 @@ func NewRaftStorage(filePath string) *RaftStorage { func (rs *RaftStorage) SetCurrentTerm(term int) { rs.mu.Lock() defer rs.mu.Unlock() + if rs.isfinish { + return + } err := rs.db.Put([]byte("current_term"), []byte(strconv.Itoa(term)), nil) if err != nil { log.Error("SetCurrentTerm 持久化失败:", zap.Error(err)) @@ -56,6 +61,9 @@ func (rs *RaftStorage) GetCurrentTerm() int { func (rs *RaftStorage) SetVotedFor(candidate string) { rs.mu.Lock() defer rs.mu.Unlock() + if rs.isfinish { + return + } err := rs.db.Put([]byte("voted_for"), []byte(candidate), nil) if err != nil { log.Error("SetVotedFor 持久化失败:", zap.Error(err)) @@ -77,6 +85,9 @@ func (rs *RaftStorage) GetVotedFor() string { func (rs *RaftStorage) SetTermAndVote(term int, candidate string) { rs.mu.Lock() defer rs.mu.Unlock() + if rs.isfinish { + return + } batch := new(leveldb.Batch) batch.Put([]byte("current_term"), []byte(strconv.Itoa(term))) @@ -92,6 +103,9 @@ func (rs *RaftStorage) SetTermAndVote(term int, candidate string) { func (rs *RaftStorage) AppendLog(entry RaftLogEntry) { rs.mu.Lock() defer rs.mu.Unlock() + if rs.db == nil { + return + } // 序列化日志 batch := new(leveldb.Batch) @@ -126,6 +140,9 @@ func (rs *RaftStorage) WriteLog(entries []RaftLogEntry) { } rs.mu.Lock() defer rs.mu.Unlock() + if rs.isfinish { + return + } batch := new(leveldb.Batch) for _, entry := range entries { @@ -170,5 +187,8 @@ func (rs *RaftStorage) GetLogEntries() []RaftLogEntry { // Close 关闭数据库 func (rs *RaftStorage) Close() { + rs.mu.Lock() + defer rs.mu.Unlock() rs.db.Close() + rs.isfinish = true } diff --git a/internal/nodes/random_timetable.go b/internal/nodes/random_timetable.go index 0c132d2..5496cf1 100644 --- a/internal/nodes/random_timetable.go +++ b/internal/nodes/random_timetable.go @@ -2,14 +2,16 @@ package nodes import ( "math/rand" + "sync" "time" ) type RandomTimeTable struct { + Mu sync.Mutex electionTimeOut time.Duration israndom bool // heartbeat 50ms - // rpcTimeout 100ms + // rpcTimeout 50ms // follower变candidate 500ms // 等待选举成功时间 300ms } @@ -21,18 +23,24 @@ func NewRTTable() *RandomTimeTable { } func (rttable *RandomTimeTable) GetElectionTimeout() time.Duration { + rttable.Mu.Lock() + defer rttable.Mu.Unlock() if rttable.israndom { return time.Duration(500+rand.Intn(500)) * time.Millisecond } else { - return time.Duration(rttable.electionTimeOut) + return rttable.electionTimeOut } } func (rttable *RandomTimeTable) SetElectionTimeout(t time.Duration) { + rttable.Mu.Lock() + defer rttable.Mu.Unlock() rttable.israndom = false rttable.electionTimeOut = t } func (rttable *RandomTimeTable) ResetElectionTimeout() { + rttable.Mu.Lock() + defer rttable.Mu.Unlock() rttable.israndom = true } \ No newline at end of file diff --git a/internal/nodes/replica.go b/internal/nodes/replica.go index f4511a4..150433f 100644 --- a/internal/nodes/replica.go +++ b/internal/nodes/replica.go @@ -1,6 +1,7 @@ package nodes import ( + "simple-kv-store/internal/logprovider" "sort" "strconv" "sync" @@ -18,6 +19,7 @@ type AppendEntriesArg struct { } type AppendEntriesReply struct { + Mu sync.Mutex Term int Success bool } @@ -31,14 +33,20 @@ func (node *Node) BroadCastKV() { // 遍历所有节点 for _, id := range node.Nodes { go func(id string) { + defer logprovider.DebugTraceback("send") node.sendKV(id, &failCount, &failMutex) }(id) } } func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) { - client, err := node.Transport.DialHTTPWithTimeout("tcp", node.SelfId, peerId) + node.Mu.Lock() + selfId := node.SelfId + node.Mu.Unlock() + + client, err := node.Transport.DialHTTPWithTimeout("tcp", selfId, peerId) if err != nil { + node.Mu.Lock() log.Error("[" + node.SelfId + "]dialling [" + peerId + "] fail: ", zap.Error(err)) failMutex.Lock() *failCount++ @@ -48,6 +56,7 @@ func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) { node.ResetElectionTimer() } failMutex.Unlock() + node.Mu.Unlock() return } @@ -59,16 +68,15 @@ func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) { }(client) node.Mu.Lock() - defer node.Mu.Unlock() - var appendReply AppendEntriesReply - appendReply.Success = false NextIndex := node.NextIndex[peerId] // log.Info("NextIndex " + strconv.Itoa(NextIndex)) - for (!appendReply.Success) { + for { if NextIndex < 0 { log.Fatal("assert >= 0 here") } + + sendEntries := node.Log[NextIndex:] arg := AppendEntriesArg{ Term: node.CurrTerm, @@ -80,30 +88,60 @@ func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) { if arg.PrevLogIndex >= 0 { arg.PrevLogTerm = node.Log[arg.PrevLogIndex].Term } + // 记录关键数据后解锁 + currTerm := node.CurrTerm + currState := node.State + MaxLogId := node.MaxLogId + + var appendReply AppendEntriesReply + appendReply.Success = false + node.Mu.Unlock() + callErr := node.Transport.CallWithTimeout(client, "Node.AppendEntries", &arg, &appendReply) // RPC + + node.Mu.Lock() + if node.CurrTerm != currTerm || node.MaxLogId != MaxLogId || node.State != currState { + node.Mu.Unlock() + return + } + if callErr != nil { log.Error("[" + node.SelfId + "]calling [" + peerId + "] fail: ", zap.Error(callErr)) failMutex.Lock() *failCount++ if *failCount == len(node.Nodes) / 2 + 1 { // 无法联系超过半数:自己有问题,降级 + log.Info("term=" + strconv.Itoa(node.CurrTerm) + "的Leader[" + node.SelfId + "]无法联系到半数节点, 降级为 Follower") node.LeaderId = "" node.State = Follower node.ResetElectionTimer() } failMutex.Unlock() + node.Mu.Unlock() return } + appendReply.Mu.Lock() if appendReply.Term != node.CurrTerm { - log.Info("term=" + strconv.Itoa(node.CurrTerm) + "的Leader[" + node.SelfId + "]收到更高的 term=" + strconv.Itoa(appendReply.Term) + ",转换为 Follower") + log.Sugar().Infof("term=%s的leader[%s]因为[%s]收到更高的term=%s, 转换为follower", + strconv.Itoa(node.CurrTerm), node.SelfId, peerId, strconv.Itoa(appendReply.Term)) + node.LeaderId = "" node.CurrTerm = appendReply.Term node.State = Follower node.VotedFor = "" node.Storage.SetTermAndVote(node.CurrTerm, node.VotedFor) node.ResetElectionTimer() + appendReply.Mu.Unlock() + node.Mu.Unlock() return } + + if appendReply.Success { + appendReply.Mu.Unlock() + break + } + + appendReply.Mu.Unlock() NextIndex-- // 失败往前传一格 } @@ -111,9 +149,17 @@ func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) { node.NextIndex[peerId] = node.MaxLogId + 1 node.MatchIndex[peerId] = node.MaxLogId node.updateCommitIndex() + node.Mu.Unlock() } func (node *Node) updateCommitIndex() { + if node.Mu.TryLock() { + log.Fatal("这里要保证有锁") + } + if node.IsFinish { + return + } + totalNodes := len(node.Nodes) // 收集所有 MatchIndex 并排序 @@ -151,17 +197,19 @@ func (node *Node) applyCommittedLogs() { // RPC call func (node *Node) AppendEntries(arg *AppendEntriesArg, reply *AppendEntriesReply) error { - // start := time.Now() - // defer func() { - // log.Sugar().Infof("AppendEntries 处理时间: %v", time.Since(start)) - // }() - log.Sugar().Infof("[%s]收到[%s]的AppendEntries", node.SelfId, arg.LeaderId) - node.Mu.Lock() - defer node.Mu.Unlock() + defer logprovider.DebugTraceback("append") + + node.Mu.Lock() + defer node.Mu.Unlock() + log.Sugar().Infof("[%s]在term=%d收到[%s]的AppendEntries", node.SelfId, node.CurrTerm, arg.LeaderId) + // 如果 term 过期,拒绝接受日志 if node.CurrTerm > arg.Term { - *reply = AppendEntriesReply{node.CurrTerm, false} + reply.Mu.Lock() + reply.Term = node.CurrTerm + reply.Success = false + reply.Mu.Unlock() return nil } @@ -179,7 +227,10 @@ func (node *Node) AppendEntries(arg *AppendEntriesArg, reply *AppendEntriesReply // 检查 prevLogIndex 是否有效 if arg.PrevLogIndex >= len(node.Log) || (arg.PrevLogIndex >= 0 && node.Log[arg.PrevLogIndex].Term != arg.PrevLogTerm) { - *reply = AppendEntriesReply{node.CurrTerm, false} + reply.Mu.Lock() + reply.Term = node.CurrTerm + reply.Success = false + reply.Mu.Unlock() return nil } @@ -222,6 +273,9 @@ func (node *Node) AppendEntries(arg *AppendEntriesArg, reply *AppendEntriesReply // 在成功接受日志或心跳后,重置选举超时 node.ResetElectionTimer() - *reply = AppendEntriesReply{node.CurrTerm, true} + reply.Mu.Lock() + reply.Term = node.CurrTerm + reply.Success = true + reply.Mu.Unlock() return nil } \ No newline at end of file diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go index 0d1ca23..e07a33e 100644 --- a/internal/nodes/server_node.go +++ b/internal/nodes/server_node.go @@ -1,6 +1,8 @@ package nodes import ( + "simple-kv-store/internal/logprovider" + "github.com/syndtr/goleveldb/leveldb" ) @@ -13,10 +15,11 @@ type ServerReply struct{ } // RPC call func (node *Node) WriteKV(kvCall *LogEntryCall, reply *ServerReply) error { - log.Sugar().Infof("[%s]收到客户端write请求", node.SelfId) + defer logprovider.DebugTraceback("write") node.Mu.Lock() defer node.Mu.Unlock() + log.Sugar().Infof("[%s]收到客户端write请求", node.SelfId) // 自己不是leader,转交leader地址回复 if node.State != Leader { reply.Isleader = false @@ -47,9 +50,10 @@ func (node *Node) WriteKV(kvCall *LogEntryCall, reply *ServerReply) error { // RPC call func (node *Node) ReadKey(key *string, reply *ServerReply) error { - log.Sugar().Infof("[%s]收到客户端read请求", node.SelfId) + defer logprovider.DebugTraceback("read") node.Mu.Lock() defer node.Mu.Unlock() + log.Sugar().Infof("[%s]收到客户端read请求", node.SelfId) // 先只读自己(无论自己是不是leader),也方便测试 value, err := node.Db.Get([]byte(*key), nil) @@ -69,6 +73,7 @@ type FindLeaderReply struct{ LeaderId string } func (node *Node) FindLeader(_ struct{}, reply *FindLeaderReply) error { + defer logprovider.DebugTraceback("find") node.Mu.Lock() defer node.Mu.Unlock() diff --git a/internal/nodes/thread_transport.go b/internal/nodes/thread_transport.go index eaee05a..819ed53 100644 --- a/internal/nodes/thread_transport.go +++ b/internal/nodes/thread_transport.go @@ -184,7 +184,7 @@ func (t *ThreadTransport) CallWithTimeout(client ClientInterface, serviceMethod return fmt.Errorf("network partition: %s cannot reach %s", threadClient.TargetId, threadClient.SourceId) } return err - case <-time.After(100 * time.Millisecond): + case <-time.After(250 * time.Millisecond): return fmt.Errorf("RPC 调用超时: %s", serviceMethod) } } diff --git a/internal/nodes/vote.go b/internal/nodes/vote.go index 8f3a365..f1eb7ae 100644 --- a/internal/nodes/vote.go +++ b/internal/nodes/vote.go @@ -2,6 +2,7 @@ package nodes import ( "math/rand" + "simple-kv-store/internal/logprovider" "strconv" "sync" "time" @@ -17,13 +18,18 @@ type RequestVoteArgs struct { } type RequestVoteReply struct { + Mu sync.Mutex Term int // 当前节点的最新任期 VoteGranted bool // 是否同意投票 } func (n *Node) StartElection() { + defer logprovider.DebugTraceback("startElection") n.Mu.Lock() - defer n.Mu.Unlock() + if n.IsFinish { + n.Mu.Unlock() + return + } // 增加当前任期,转换为 Candidate n.CurrTerm++ n.State = Candidate @@ -58,12 +64,25 @@ func (n *Node) StartElection() { totalNodes := len(n.Nodes) grantedVotes := 1 // 自己的票 + currTerm := n.CurrTerm + currState := n.State + n.Mu.Unlock() + for _, peerId := range n.Nodes { go func(peerId string) { - reply := RequestVoteReply{} + defer logprovider.DebugTraceback("vote") + var reply RequestVoteReply if n.sendRequestVote(peerId, &args, &reply) { Mu.Lock() + defer Mu.Unlock() + n.Mu.Lock() + defer n.Mu.Unlock() + + if currTerm != n.CurrTerm || currState != n.State { + return + } + reply.Mu.Lock() if reply.Term > n.CurrTerm { // 发现更高任期,回退为 Follower log.Sugar().Infof("[%s] 发现更高的 Term (%d),回退为 Follower", n.SelfId, reply.Term) @@ -72,34 +91,37 @@ func (n *Node) StartElection() { n.VotedFor = "" n.Storage.SetTermAndVote(n.CurrTerm, n.VotedFor) n.ResetElectionTimer() - Mu.Unlock() + reply.Mu.Unlock() return } if reply.VoteGranted { grantedVotes++ } + reply.Mu.Unlock() if grantedVotes == totalNodes / 2 + 1 { n.State = Leader log.Sugar().Infof("[%s] 当选 Leader!", n.SelfId) n.initLeaderState() } - - Mu.Unlock() } }(peerId) } // 等待选举结果 time.Sleep(300 * time.Millisecond) + Mu.Lock() + defer Mu.Unlock() + n.Mu.Lock() + defer n.Mu.Unlock() + if n.State == Candidate { log.Sugar().Infof("[%s] 选举超时,等待后将重新发起选举", n.SelfId) // n.State = Follower 这里不修改,如果appendentries收到term合理的心跳,再变回follower n.ResetElectionTimer() } - Mu.Unlock() } func (node *Node) sendRequestVote(peerId string, args *RequestVoteArgs, reply *RequestVoteReply) bool { @@ -125,8 +147,12 @@ func (node *Node) sendRequestVote(peerId string, args *RequestVoteArgs, reply *R } func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error { + defer logprovider.DebugTraceback("requestVote") n.Mu.Lock() defer n.Mu.Unlock() + + reply.Mu.Lock() + defer reply.Mu.Unlock() // 如果候选人的任期小于当前任期,则拒绝投票 if args.Term < n.CurrTerm { reply.Term = n.CurrTerm @@ -175,11 +201,14 @@ func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error return nil } -// follower 150-300ms内没收到appendentries心跳,就变成candidate发起选举 +// follower 一段时间内没收到appendentries心跳,就变成candidate发起选举 func (node *Node) ResetElectionTimer() { + node.MuElection.Lock() + defer node.MuElection.Unlock() if node.ElectionTimer == nil { node.ElectionTimer = time.NewTimer(node.RTTable.GetElectionTimeout()) go func() { + defer logprovider.DebugTraceback("reset") for { <-node.ElectionTimer.C node.StartElection() diff --git a/threadTest/common.go b/threadTest/common.go index c6a7d40..87be524 100644 --- a/threadTest/common.go +++ b/threadTest/common.go @@ -17,15 +17,25 @@ func ExecuteNodeI(id string, isRestart bool, peerIds []string, threadTransport * os.RemoveAll("storage/node" + id) } - os.RemoveAll("leveldb/simple-kv-store" + id) + // 创建临时目录用于 leveldb + dbPath, err := os.MkdirTemp("", "simple-kv-store-"+id+"-") + if err != nil { + panic(fmt.Sprintf("无法创建临时数据库目录: %s", err)) + } - db, err := leveldb.OpenFile("leveldb/simple-kv-store" + id, nil) + // 创建临时目录用于 storage + storagePath, err := os.MkdirTemp("", "raft-storage-"+id+"-") if err != nil { - fmt.Println("Failed to open database: ", err) + panic(fmt.Sprintf("无法创建临时存储目录: %s", err)) } - // 打开或创建节点数据持久化文件 - storage := nodes.NewRaftStorage("storage/node" + id) + db, err := leveldb.OpenFile(dbPath, nil) + if err != nil { + panic(fmt.Sprintf("Failed to open database: %s", err)) + } + + // 初始化 Raft 存储 + storage := nodes.NewRaftStorage(storagePath) var otherIds []string for _, ids := range peerIds { @@ -70,24 +80,18 @@ func ExecuteStaticNodeI(id string, isRestart bool, peerIds []string, threadTrans return node, quitChan } -func StopElectionReset(nodeCollections [] *nodes.Node, quitCollections []chan struct{}) { - for i := 0; i < len(quitCollections); i++ { +func StopElectionReset(nodeCollections [] *nodes.Node) { + for i := 0; i < len(nodeCollections); i++ { node := nodeCollections[i] - quitChan := quitCollections[i] - go func(node *nodes.Node, quitChan chan struct{}) { + go func(node *nodes.Node) { ticker := time.NewTicker(400 * time.Millisecond) defer ticker.Stop() for { - select { - case <-quitChan: - return // 退出 goroutine - - case <-ticker.C: - node.ResetElectionTimer() // 不主动触发选举 - } + <-ticker.C + node.ResetElectionTimer() // 不主动触发选举 } - }(node, quitChan) + }(node) } } @@ -123,59 +127,116 @@ func FindLeader(t *testing.T, nodeCollections []* nodes.Node) (i int) { } } t.Errorf("系统目前没有leader") + t.FailNow() return 0 } func CheckOneLeader(t *testing.T, nodeCollections []* nodes.Node) { cnt := 0 for _, node := range nodeCollections { + node.Mu.Lock() if node.State == nodes.Leader { cnt++ } + node.Mu.Unlock() } if cnt != 1 { t.Errorf("实际有%d个leader(!=1)", cnt) + t.FailNow() } } func CheckNoLeader(t *testing.T, nodeCollections []* nodes.Node) { cnt := 0 for _, node := range nodeCollections { + node.Mu.Lock() if node.State == nodes.Leader { cnt++ } + node.Mu.Unlock() } if cnt != 0 { t.Errorf("实际有%d个leader(!=0)", cnt) + t.FailNow() } } func CheckZeroOrOneLeader(t *testing.T, nodeCollections []* nodes.Node) { cnt := 0 for _, node := range nodeCollections { + node.Mu.Lock() if node.State == nodes.Leader { cnt++ } + node.Mu.Unlock() } if cnt > 1 { - t.Errorf("实际有%d个leader(>1)", cnt) + errmsg := fmt.Sprintf("实际有%d个leader(>1)", cnt) + WriteFailLog(nodeCollections[0].SelfId, errmsg) + t.Error(errmsg) + t.FailNow() } } func CheckIsLeader(t *testing.T, node *nodes.Node) { + node.Mu.Lock() + defer node.Mu.Unlock() if node.State != nodes.Leader { t.Errorf("[%s]不是leader", node.SelfId) + t.FailNow() } } func CheckTerm(t *testing.T, node *nodes.Node, targetTerm int) { + node.Mu.Lock() + defer node.Mu.Unlock() if node.CurrTerm != targetTerm { t.Errorf("[%s]实际term=%d (!=%d)", node.SelfId, node.CurrTerm, targetTerm) + t.FailNow() } } func CheckLogNum(t *testing.T, node *nodes.Node, targetnum int) { + node.Mu.Lock() + defer node.Mu.Unlock() if len(node.Log) != targetnum { t.Errorf("[%s]实际logNum=%d (!=%d)", node.SelfId, len(node.Log), targetnum) + t.FailNow() + } +} + +func CheckSameLog(t *testing.T, nodeCollections []* nodes.Node) { + nodeCollections[0].Mu.Lock() + defer nodeCollections[0].Mu.Unlock() + standard_node := nodeCollections[0] + for i, node := range nodeCollections { + if i != 0 { + node.Mu.Lock() + if len(node.Log) != len(standard_node.Log) { + errmsg := fmt.Sprintf("[1]和[%s]日志数量不一致", node.SelfId) + WriteFailLog(node.SelfId, errmsg) + t.Error(errmsg) + t.FailNow() + } + + for idx, log := range node.Log { + standard_log := standard_node.Log[idx] + if log.Term != standard_log.Term || + log.LogE.Key != standard_log.LogE.Key || + log.LogE.Value != standard_log.LogE.Value { + errmsg := fmt.Sprintf("[1]和[%s]日志id%d不一致", node.SelfId, idx) + WriteFailLog(node.SelfId, errmsg) + t.Error(errmsg) + t.FailNow() + } + } + node.Mu.Unlock() + } } } + +func WriteFailLog(name string, errmsg string) { + f, _ := os.Create(name + ".log") + fmt.Fprint(f, errmsg) + f.Close() +} diff --git a/threadTest/election_test.go b/threadTest/election_test.go index 43b988b..f9078ee 100644 --- a/threadTest/election_test.go +++ b/threadTest/election_test.go @@ -23,7 +23,7 @@ func TestInitElection(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){ @@ -60,7 +60,7 @@ func TestRepeatElection(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){ @@ -98,7 +98,7 @@ func TestBelowHalfCandidateElection(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){ @@ -137,7 +137,7 @@ func TestOverHalfCandidateElection(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){ @@ -178,7 +178,7 @@ func TestRepeatVoteRpc(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){ @@ -229,7 +229,7 @@ func TestFailVoteRpc(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){ @@ -276,7 +276,7 @@ func TestDelayVoteRpc(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){ diff --git a/threadTest/fuzz/fuzz_test.go b/threadTest/fuzz/fuzz_test.go new file mode 100644 index 0000000..bf899b0 --- /dev/null +++ b/threadTest/fuzz/fuzz_test.go @@ -0,0 +1,152 @@ +package fuzz + +import ( + "fmt" + "math/rand" + "os" + "runtime/debug" + "sync" + "testing" + "time" + + "simple-kv-store/internal/client" + "simple-kv-store/internal/nodes" + "simple-kv-store/threadTest" + "strconv" +) + +func FuzzRaftBasic(f *testing.F) { + var seenSeeds sync.Map + // 添加初始种子 + f.Add(int64(1)) + fmt.Println("Running") + + f.Fuzz(func(t *testing.T, seed int64) { + if _, loaded := seenSeeds.LoadOrStore(seed, true); loaded { + t.Skipf("Seed %d already tested, skipping...", seed) + return + } + defer func() { + if r := recover(); r != nil { + msg := fmt.Sprintf("goroutine panic: %v\n%s", r, debug.Stack()) + f, _ := os.Create("panic_goroutine.log") + fmt.Fprint(f, msg) + f.Close() + } + }() + + r := rand.New(rand.NewSource(seed)) // 使用局部 rand + + n := 3 + 2*(r.Intn(4)) + fmt.Printf("随机了%d个节点\n", n) + logs := (r.Intn(10)) + fmt.Printf("随机了%d份日志\n", logs) + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(int(seed)) + "." + strconv.Itoa(i+1)) + } + + ctx := nodes.NewCtx() + threadTransport := nodes.NewThreadTransport(ctx) + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + + for i := 0; i < n; i++ { + node, quitChan := threadTest.ExecuteNodeI(strconv.Itoa(int(seed)) + "." + strconv.Itoa(i+1), false, peerIds, threadTransport) + nodeCollections = append(nodeCollections, node) + node.RTTable.SetElectionTimeout(750 * time.Millisecond) + quitCollections = append(quitCollections, quitChan) + } + + // 模拟 a-b 通讯行为 + faultyNodes := injectRandomBehavior(ctx, r, peerIds) + + time.Sleep(time.Second) + + clientObj := clientPkg.NewClient("0", peerIds, threadTransport) + + for i := 0; i < logs; i++ { + key := fmt.Sprintf("k%d", i) + log := nodes.LogEntry{Key: key, Value: "v"} + clientObj.Write(log) + } + + time.Sleep(time.Second) + + var rightNodeCollections []*nodes.Node + for _, node := range nodeCollections { + if !faultyNodes[node.SelfId] { + rightNodeCollections = append(rightNodeCollections, node) + } + } + threadTest.CheckSameLog(t, rightNodeCollections) + threadTest.CheckZeroOrOneLeader(t, nodeCollections) + + for _, quitChan := range quitCollections { + close(quitChan) + } + time.Sleep(time.Second) + for i := 0; i < n; i++ { + // 确保完成退出 + nodeCollections[i].Mu.Lock() + if !nodeCollections[i].IsFinish { + nodeCollections[i].IsFinish = true + } + nodeCollections[i].Mu.Unlock() + + os.RemoveAll("leveldb/simple-kv-store" + strconv.Itoa(int(seed)) + "." + strconv.Itoa(i+1)) + os.RemoveAll("storage/node" + strconv.Itoa(int(seed)) + "." + strconv.Itoa(i+1)) + } + }) +} + +// 注入节点间行为 +func injectRandomBehavior(ctx *nodes.Ctx, r *rand.Rand, peers []string) (map[string]bool) { + behaviors := []nodes.CallBehavior{ + nodes.FailRpc, + nodes.DelayRpc, + nodes.RetryRpc, + } + n := len(peers) + maxFaulty := r.Intn(n/2 + 1) // 随机选择 0 ~ n/2 个出问题的节点 + + // 随机选择出问题的节点 + shuffled := append([]string(nil), peers...) + r.Shuffle(n, func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] }) + faultyNodes := make(map[string]bool) + for i := 0; i < maxFaulty; i++ { + faultyNodes[shuffled[i]] = true + } + + for _, one := range peers { + if faultyNodes[one] { + b := behaviors[r.Intn(len(behaviors))] + delay := time.Duration(r.Intn(100)) * time.Millisecond + + switch b { + case nodes.FailRpc: + fmt.Printf("[%s]的异常行为是fail\n", one) + case nodes.DelayRpc: + fmt.Printf("[%s]的异常行为是delay\n", one) + case nodes.RetryRpc: + fmt.Printf("[%s]的异常行为是retry\n", one) + } + + for _, two := range peers { + if one == two { + continue + } + + if faultyNodes[one] && faultyNodes[two] { + ctx.SetBehavior(one, two, nodes.FailRpc, 0, 0) + ctx.SetBehavior(one, two, nodes.FailRpc, 0, 0) + } else { + ctx.SetBehavior(one, two, b, delay, 2) + ctx.SetBehavior(two, one, b, delay, 2) + } + } + } + + } + return faultyNodes +} diff --git a/threadTest/log_replication_test.go b/threadTest/log_replication_test.go index 93b6371..c9b7c2c 100644 --- a/threadTest/log_replication_test.go +++ b/threadTest/log_replication_test.go @@ -23,7 +23,7 @@ func TestNormalReplication(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){ @@ -70,7 +70,7 @@ func TestParallelReplication(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){ @@ -118,7 +118,7 @@ func TestFollowerLagging(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){ @@ -137,6 +137,7 @@ func TestFollowerLagging(t *testing.T) { CheckIsLeader(t, nodeCollections[0]) CheckTerm(t, nodeCollections[0], 2) close(quitCollections[1]) + time.Sleep(time.Second) for i := 0; i < 10; i++ { key := strconv.Itoa(i) @@ -148,7 +149,8 @@ func TestFollowerLagging(t *testing.T) { quitCollections[1] = q nodeCollections[1] = node nodeCollections[1].State = nodes.Follower - StopElectionReset(nodeCollections[1:2], quitCollections[1:2]) + StopElectionReset(nodeCollections[1:2]) + nodeCollections[0].BroadCastKV() time.Sleep(time.Second) for i := 0; i < n; i++ { @@ -173,7 +175,7 @@ func TestFailLogAppendRpc(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){ @@ -225,7 +227,7 @@ func TestRepeatLogAppendRpc(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){ @@ -277,7 +279,7 @@ func TestDelayLogAppendRpc(t *testing.T) { quitCollections = append(quitCollections, quitChan) nodeCollections = append(nodeCollections, n) } - StopElectionReset(nodeCollections, quitCollections) + StopElectionReset(nodeCollections) // 通知所有node结束 defer func(){