From 1bfa17a7356e991f075e4252193543c7c1519674 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Sat, 12 Apr 2025 10:45:35 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=95=B4=E7=9A=84fuzz?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/client/client_node.go | 3 +- internal/nodes/init.go | 4 +- internal/nodes/replica.go | 1 + internal/nodes/thread_transport.go | 13 ++ threadTest/common.go | 97 ++++++++++-- threadTest/fuzz/fuzz_test.go | 311 +++++++++++++++++++++++++++++++++++-- 6 files changed, 400 insertions(+), 29 deletions(-) diff --git a/internal/client/client_node.go b/internal/client/client_node.go index 3474b6a..6e77a63 100644 --- a/internal/client/client_node.go +++ b/internal/client/client_node.go @@ -60,6 +60,7 @@ func (client *Client) CloseRpcClient(c nodes.ClientInterface) { } func (client *Client) Write(kv nodes.LogEntry) Status { + defer logprovider.DebugTraceback("client") log.Info("client write request key :" + kv.Key) kvCall := nodes.LogEntryCall{LogE: kv, Id: nodes.LogEntryCallId{ClientId: client.ClientId, LogId: client.NextLogId}} @@ -68,7 +69,7 @@ func (client *Client) Write(kv nodes.LogEntry) Status { c := client.FindActiveNode() var err error - timeout := 5 * time.Second + timeout := time.Second deadline := time.Now().Add(timeout) for { // 根据存活节点的反馈,直到找到leader diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 3e13373..27999df 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -132,9 +132,9 @@ func (node *Node) listenForChan(rpcChan chan RPCRequest, quitChan chan struct{}) case FailRpc: continue + default: + go node.switchReq(req, 0) } - - go node.switchReq(req, 0) case <-quitChan: node.Mu.Lock() diff --git a/internal/nodes/replica.go b/internal/nodes/replica.go index 150433f..6807254 100644 --- a/internal/nodes/replica.go +++ b/internal/nodes/replica.go @@ -27,6 +27,7 @@ type AppendEntriesReply struct { // leader收到新内容要广播,以及心跳广播(同步自己的log) func (node *Node) BroadCastKV() { log.Sugar().Infof("leader[%s]广播消息", node.SelfId) + defer logprovider.DebugTraceback("broadcast") failCount := 0 // 这里增加一个锁,防止并发修改成功计数 var failMutex sync.Mutex diff --git a/internal/nodes/thread_transport.go b/internal/nodes/thread_transport.go index 819ed53..1b2e9d8 100644 --- a/internal/nodes/thread_transport.go +++ b/internal/nodes/thread_transport.go @@ -80,6 +80,19 @@ func (t *ThreadTransport) SetConnectivity(from, to string, isConnected bool) { } } +func (t *ThreadTransport) ResetConnectivity() { + t.mu.Lock() + defer t.mu.Unlock() + for firstId:= range t.nodeChans { + for peerId:= range t.nodeChans { + if firstId != peerId { + t.connectivityMap[firstId][peerId] = true + t.connectivityMap[peerId][firstId] = true + } + } + } +} + // 获取节点的 channel func (t *ThreadTransport) getNodeChan(nodeId string) (chan RPCRequest, bool) { t.mu.Lock() diff --git a/threadTest/common.go b/threadTest/common.go index 87be524..44af24c 100644 --- a/threadTest/common.go +++ b/threadTest/common.go @@ -3,7 +3,7 @@ package threadTest import ( "fmt" "os" - "simple-kv-store/internal/client" + clientPkg "simple-kv-store/internal/client" "simple-kv-store/internal/nodes" "strconv" "testing" @@ -58,7 +58,7 @@ func ExecuteStaticNodeI(id string, isRestart bool, peerIds []string, threadTrans os.RemoveAll("leveldb/simple-kv-store" + id) - db, err := leveldb.OpenFile("leveldb/simple-kv-store" + id, nil) + db, err := leveldb.OpenFile("leveldb/simple-kv-store"+id, nil) if err != nil { fmt.Println("Failed to open database: ", err) } @@ -80,7 +80,7 @@ func ExecuteStaticNodeI(id string, isRestart bool, peerIds []string, threadTrans return node, quitChan } -func StopElectionReset(nodeCollections [] *nodes.Node) { +func StopElectionReset(nodeCollections []*nodes.Node) { for i := 0; i < len(nodeCollections); i++ { node := nodeCollections[i] go func(node *nodes.Node) { @@ -91,7 +91,7 @@ func StopElectionReset(nodeCollections [] *nodes.Node) { <-ticker.C node.ResetElectionTimer() // 不主动触发选举 } - }(node) + }(node) } } @@ -101,10 +101,10 @@ func SendKvCall(kvCall *nodes.LogEntryCall, node *nodes.Node) { node.MaxLogId++ logId := node.MaxLogId - rLogE := nodes.RaftLogEntry{LogE: kvCall.LogE,LogId: logId, Term: node.CurrTerm} + rLogE := nodes.RaftLogEntry{LogE: kvCall.LogE, LogId: logId, Term: node.CurrTerm} node.Log = append(node.Log, rLogE) node.Storage.AppendLog(rLogE) - // 广播给其它节点 + // 广播给其它节点 node.BroadCastKV() } @@ -116,11 +116,11 @@ func ClientWriteLog(t *testing.T, startLogid int, endLogid int, cWrite *clientPk s = cWrite.Write(newlog) if s != clientPkg.Ok { t.Errorf("write test fail") - } + } } } -func FindLeader(t *testing.T, nodeCollections []* nodes.Node) (i int) { +func FindLeader(t *testing.T, nodeCollections []*nodes.Node) (i int) { for i, node := range nodeCollections { if node.State == nodes.Leader { return i @@ -131,7 +131,7 @@ func FindLeader(t *testing.T, nodeCollections []* nodes.Node) (i int) { return 0 } -func CheckOneLeader(t *testing.T, nodeCollections []* nodes.Node) { +func CheckOneLeader(t *testing.T, nodeCollections []*nodes.Node) { cnt := 0 for _, node := range nodeCollections { node.Mu.Lock() @@ -146,7 +146,7 @@ func CheckOneLeader(t *testing.T, nodeCollections []* nodes.Node) { } } -func CheckNoLeader(t *testing.T, nodeCollections []* nodes.Node) { +func CheckNoLeader(t *testing.T, nodeCollections []*nodes.Node) { cnt := 0 for _, node := range nodeCollections { node.Mu.Lock() @@ -161,7 +161,7 @@ func CheckNoLeader(t *testing.T, nodeCollections []* nodes.Node) { } } -func CheckZeroOrOneLeader(t *testing.T, nodeCollections []* nodes.Node) { +func CheckZeroOrOneLeader(t *testing.T, nodeCollections []*nodes.Node) { cnt := 0 for _, node := range nodeCollections { node.Mu.Lock() @@ -171,7 +171,7 @@ func CheckZeroOrOneLeader(t *testing.T, nodeCollections []* nodes.Node) { node.Mu.Unlock() } if cnt > 1 { - errmsg := fmt.Sprintf("实际有%d个leader(>1)", cnt) + errmsg := fmt.Sprintf("%d个节点中,实际有%d个leader(>1)", len(nodeCollections), cnt) WriteFailLog(nodeCollections[0].SelfId, errmsg) t.Error(errmsg) t.FailNow() @@ -205,7 +205,7 @@ func CheckLogNum(t *testing.T, node *nodes.Node, targetnum int) { } } -func CheckSameLog(t *testing.T, nodeCollections []* nodes.Node) { +func CheckSameLog(t *testing.T, nodeCollections []*nodes.Node) { nodeCollections[0].Mu.Lock() defer nodeCollections[0].Mu.Unlock() standard_node := nodeCollections[0] @@ -213,7 +213,7 @@ func CheckSameLog(t *testing.T, nodeCollections []* nodes.Node) { if i != 0 { node.Mu.Lock() if len(node.Log) != len(standard_node.Log) { - errmsg := fmt.Sprintf("[1]和[%s]日志数量不一致", node.SelfId) + errmsg := fmt.Sprintf("[%s]和[%s]日志数量不一致", nodeCollections[0].SelfId, node.SelfId) WriteFailLog(node.SelfId, errmsg) t.Error(errmsg) t.FailNow() @@ -221,13 +221,76 @@ func CheckSameLog(t *testing.T, nodeCollections []* nodes.Node) { for idx, log := range node.Log { standard_log := standard_node.Log[idx] - if log.Term != standard_log.Term || - log.LogE.Key != standard_log.LogE.Key || + if log.Term != standard_log.Term || + log.LogE.Key != standard_log.LogE.Key || log.LogE.Value != standard_log.LogE.Value { - errmsg := fmt.Sprintf("[1]和[%s]日志id%d不一致", node.SelfId, idx) + errmsg := fmt.Sprintf("[1]和[%s]日志id%d不一致", node.SelfId, idx) + WriteFailLog(node.SelfId, errmsg) + t.Error(errmsg) + t.FailNow() + } + } + node.Mu.Unlock() + } + } +} + +func CheckLeaderInvariant(t *testing.T, nodeCollections []*nodes.Node) { + leaderCnt := make(map[int]bool) + for _, node := range nodeCollections { + node.Mu.Lock() + if node.State == nodes.Leader { + if _, exist := leaderCnt[node.CurrTerm]; exist { + errmsg := fmt.Sprintf("在%d有多个leader(%s)", node.CurrTerm, node.SelfId) + WriteFailLog(node.SelfId, errmsg) + t.Error(errmsg) + } else { + leaderCnt[node.CurrTerm] = true + } + } + node.Mu.Unlock() + } +} + +func CheckLogInvariant(t *testing.T, nodeCollections []*nodes.Node) { + nodeCollections[0].Mu.Lock() + defer nodeCollections[0].Mu.Unlock() + standard_node := nodeCollections[0] + standard_len := len(standard_node.Log) + for i, node := range nodeCollections { + if i != 0 { + node.Mu.Lock() + len2 := len(node.Log) + var shorti int + if len2 < standard_len { + shorti = len2 + } else { + shorti = standard_len + } + if shorti == 0 { + node.Mu.Unlock() + continue + } + + alreadySame := false + for i := shorti - 1; i >= 0; i-- { + standard_log := standard_node.Log[i] + log := node.Log[i] + if alreadySame { + if log.Term != standard_log.Term || + log.LogE.Key != standard_log.LogE.Key || + log.LogE.Value != standard_log.LogE.Value { + errmsg := fmt.Sprintf("[%s]和[%s]日志id%d不一致", standard_node.SelfId, node.SelfId, i) WriteFailLog(node.SelfId, errmsg) t.Error(errmsg) t.FailNow() + } + } else { + if log.Term == standard_log.Term && + log.LogE.Key == standard_log.LogE.Key && + log.LogE.Value == standard_log.LogE.Value { + alreadySame = true + } } } node.Mu.Unlock() diff --git a/threadTest/fuzz/fuzz_test.go b/threadTest/fuzz/fuzz_test.go index bf899b0..f65c789 100644 --- a/threadTest/fuzz/fuzz_test.go +++ b/threadTest/fuzz/fuzz_test.go @@ -9,12 +9,13 @@ import ( "testing" "time" - "simple-kv-store/internal/client" + clientPkg "simple-kv-store/internal/client" "simple-kv-store/internal/nodes" "simple-kv-store/threadTest" "strconv" ) +// 1.针对随机配置随机消息状态 func FuzzRaftBasic(f *testing.F) { var seenSeeds sync.Map // 添加初始种子 @@ -43,7 +44,7 @@ func FuzzRaftBasic(f *testing.F) { fmt.Printf("随机了%d份日志\n", logs) var peerIds []string for i := 0; i < n; i++ { - peerIds = append(peerIds, strconv.Itoa(int(seed)) + "." + strconv.Itoa(i+1)) + peerIds = append(peerIds, strconv.Itoa(int(seed))+"."+strconv.Itoa(i+1)) } ctx := nodes.NewCtx() @@ -52,7 +53,7 @@ func FuzzRaftBasic(f *testing.F) { var nodeCollections []*nodes.Node for i := 0; i < n; i++ { - node, quitChan := threadTest.ExecuteNodeI(strconv.Itoa(int(seed)) + "." + strconv.Itoa(i+1), false, peerIds, threadTransport) + node, quitChan := threadTest.ExecuteNodeI(strconv.Itoa(int(seed))+"."+strconv.Itoa(i+1), false, peerIds, threadTransport) nodeCollections = append(nodeCollections, node) node.RTTable.SetElectionTimeout(750 * time.Millisecond) quitCollections = append(quitCollections, quitChan) @@ -80,7 +81,7 @@ func FuzzRaftBasic(f *testing.F) { } } threadTest.CheckSameLog(t, rightNodeCollections) - threadTest.CheckZeroOrOneLeader(t, nodeCollections) + threadTest.CheckLeaderInvariant(t, nodeCollections) for _, quitChan := range quitCollections { close(quitChan) @@ -92,7 +93,7 @@ func FuzzRaftBasic(f *testing.F) { if !nodeCollections[i].IsFinish { nodeCollections[i].IsFinish = true } - nodeCollections[i].Mu.Unlock() + nodeCollections[i].Mu.Unlock() os.RemoveAll("leveldb/simple-kv-store" + strconv.Itoa(int(seed)) + "." + strconv.Itoa(i+1)) os.RemoveAll("storage/node" + strconv.Itoa(int(seed)) + "." + strconv.Itoa(i+1)) @@ -101,7 +102,7 @@ func FuzzRaftBasic(f *testing.F) { } // 注入节点间行为 -func injectRandomBehavior(ctx *nodes.Ctx, r *rand.Rand, peers []string) (map[string]bool) { +func injectRandomBehavior(ctx *nodes.Ctx, r *rand.Rand, peers []string) map[string]bool /*id:Isfault*/ { behaviors := []nodes.CallBehavior{ nodes.FailRpc, nodes.DelayRpc, @@ -131,7 +132,7 @@ func injectRandomBehavior(ctx *nodes.Ctx, r *rand.Rand, peers []string) (map[str case nodes.RetryRpc: fmt.Printf("[%s]的异常行为是retry\n", one) } - + for _, two := range peers { if one == two { continue @@ -142,11 +143,303 @@ func injectRandomBehavior(ctx *nodes.Ctx, r *rand.Rand, peers []string) (map[str ctx.SetBehavior(one, two, nodes.FailRpc, 0, 0) } else { ctx.SetBehavior(one, two, b, delay, 2) - ctx.SetBehavior(two, one, b, delay, 2) + ctx.SetBehavior(two, one, b, delay, 2) + } + } + } + + } + return faultyNodes +} + +// 2.对一个长时间运行的系统,注入随机行为 +func FuzzRaftRobust(f *testing.F) { + var seenSeeds sync.Map + var fuzzMu sync.Mutex + // 添加初始种子 + f.Add(int64(0)) + fmt.Println("Running") + n := 5 + + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i+1)) + } + + ctx := nodes.NewCtx() + threadTransport := nodes.NewThreadTransport(ctx) + quitCollections := make(map[string]chan struct{}) + nodeCollections := make(map[string]*nodes.Node) + + for i := 0; i < n; i++ { + id := strconv.Itoa(i+1) + node, quitChan := threadTest.ExecuteNodeI(id, false, peerIds, threadTransport) + nodeCollections[id] = node + quitCollections[id] = quitChan + } + + f.Fuzz(func(t *testing.T, seed int64) { + fuzzMu.Lock() + defer fuzzMu.Unlock() + if _, loaded := seenSeeds.LoadOrStore(seed, true); loaded { + t.Skipf("Seed %d already tested, skipping...", seed) + return + } + defer func() { + if r := recover(); r != nil { + msg := fmt.Sprintf("goroutine panic: %v\n%s", r, debug.Stack()) + f, _ := os.Create("panic_goroutine.log") + fmt.Fprint(f, msg) + f.Close() + } + }() + + r := rand.New(rand.NewSource(seed)) // 使用局部 rand + + clientObj := clientPkg.NewClient("0", peerIds, threadTransport) + + faultyNodes := injectRandomBehavior2(ctx, r, peerIds, threadTransport, quitCollections) + + key := fmt.Sprintf("k%d", seed % 10) + log := nodes.LogEntry{Key: key, Value: "v"} + clientObj.Write(log) + + time.Sleep(time.Second) + + var rightNodeCollections []*nodes.Node + for _, node := range nodeCollections { + _, exist := faultyNodes[node.SelfId] + if !exist { + rightNodeCollections = append(rightNodeCollections, node) + } + } + threadTest.CheckLogInvariant(t, rightNodeCollections) + threadTest.CheckLeaderInvariant(t, rightNodeCollections) + + // ResetFaultyNodes + threadTransport.ResetConnectivity() + for id, isrestart := range faultyNodes { + if !isrestart { + for _, peerIds := range peerIds { + if id == peerIds { + continue + } + ctx.SetBehavior(id, peerIds, nodes.NormalRpc, 0, 0) + ctx.SetBehavior(peerIds, id, nodes.NormalRpc, 0, 0) } - } + } else { + newNode, quitChan := threadTest.ExecuteNodeI(id, true, peerIds, threadTransport) + quitCollections[id] = quitChan + nodeCollections[id] = newNode + } + fmt.Printf("[%s]恢复异常\n", id) } + }) + for _, quitChan := range quitCollections { + close(quitChan) + } + time.Sleep(time.Second) + for id, node := range nodeCollections { + // 确保完成退出 + node.Mu.Lock() + if !node.IsFinish { + node.IsFinish = true + } + node.Mu.Unlock() + os.RemoveAll("leveldb/simple-kv-store" + id) + os.RemoveAll("storage/node" + id) } +} + +// 3.综合 +func FuzzRaftPlus(f *testing.F) { + var seenSeeds sync.Map + // 添加初始种子 + f.Add(int64(0)) + fmt.Println("Running") + + f.Fuzz(func(t *testing.T, seed int64) { + if _, loaded := seenSeeds.LoadOrStore(seed, true); loaded { + t.Skipf("Seed %d already tested, skipping...", seed) + return + } + defer func() { + if r := recover(); r != nil { + msg := fmt.Sprintf("goroutine panic: %v\n%s", r, debug.Stack()) + f, _ := os.Create("panic_goroutine.log") + fmt.Fprint(f, msg) + f.Close() + } + }() + + r := rand.New(rand.NewSource(seed)) // 使用局部 rand + + n := 3 + 2*(r.Intn(4)) + fmt.Printf("随机了%d个节点\n", n) + ElectionTimeOut := 500 + r.Intn(500) + fmt.Printf("随机的投票超时时间:%d\n", ElectionTimeOut) + + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(int(seed))+"."+strconv.Itoa(i+1)) + } + + ctx := nodes.NewCtx() + threadTransport := nodes.NewThreadTransport(ctx) + quitCollections := make(map[string]chan struct{}) + nodeCollections := make(map[string]*nodes.Node) + + for i := 0; i < n; i++ { + id := strconv.Itoa(int(seed))+"."+strconv.Itoa(i+1) + node, quitChan := threadTest.ExecuteNodeI(id, false, peerIds, threadTransport) + nodeCollections[id] = node + node.RTTable.SetElectionTimeout(time.Duration(ElectionTimeOut) * time.Millisecond) + quitCollections[id] = quitChan + } + + clientObj := clientPkg.NewClient("0", peerIds, threadTransport) + + for i := 0; i < 5; i++ { // 模拟10次异常 + fmt.Printf("第%d轮异常注入开始\n", i + 1) + faultyNodes := injectRandomBehavior2(ctx, r, peerIds, threadTransport, quitCollections) + + key := fmt.Sprintf("k%d", i) + log := nodes.LogEntry{Key: key, Value: "v"} + clientObj.Write(log) + + time.Sleep(time.Second) + + var rightNodeCollections []*nodes.Node + for _, node := range nodeCollections { + _, exist := faultyNodes[node.SelfId] + if !exist { + rightNodeCollections = append(rightNodeCollections, node) + } + } + threadTest.CheckLogInvariant(t, rightNodeCollections) + threadTest.CheckLeaderInvariant(t, rightNodeCollections) + + // ResetFaultyNodes + threadTransport.ResetConnectivity() + for id, isrestart := range faultyNodes { + if !isrestart { + for _, peerId := range peerIds { + if id == peerId { + continue + } + ctx.SetBehavior(id, peerId, nodes.NormalRpc, 0, 0) + ctx.SetBehavior(peerId, id, nodes.NormalRpc, 0, 0) + } + } else { + newNode, quitChan := threadTest.ExecuteNodeI(id, true, peerIds, threadTransport) + quitCollections[id] = quitChan + nodeCollections[id] = newNode + } + fmt.Printf("[%s]恢复异常\n", id) + } + + } + + for _, quitChan := range quitCollections { + close(quitChan) + } + time.Sleep(time.Second) + for id, node := range nodeCollections { + // 确保完成退出 + node.Mu.Lock() + if !node.IsFinish { + node.IsFinish = true + } + node.Mu.Unlock() + + os.RemoveAll("leveldb/simple-kv-store" + id) + os.RemoveAll("storage/node" + id) + } + }) +} + +func injectRandomBehavior2(ctx *nodes.Ctx, r *rand.Rand, peers []string, tran *nodes.ThreadTransport, quitCollections map[string]chan struct{}) map[string]bool /*id:needRestart*/ { + + n := len(peers) + maxFaulty := r.Intn(n/2 + 1) // 随机选择 0 ~ n/2 个出问题的节点 + + // 随机选择出问题的节点 + shuffled := append([]string(nil), peers...) + r.Shuffle(n, func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] }) + faultyNodes := make(map[string]bool) + for i := 0; i < maxFaulty; i++ { + faultyNodes[shuffled[i]] = false + } + PartitionNodes := make(map[string]bool) + + for _, one := range peers { + _, exist := faultyNodes[one] + if exist { + b := r.Intn(5) + + switch b { + case 0: + fmt.Printf("[%s]的异常行为是fail\n", one) + for _, two := range peers { + if one == two { + continue + } + ctx.SetBehavior(one, two, nodes.FailRpc, 0, 0) + ctx.SetBehavior(two, one, nodes.FailRpc, 0, 0) + } + case 1: + fmt.Printf("[%s]的异常行为是delay\n", one) + t := r.Intn(100) + fmt.Printf("[%s]的delay time = %d\n", one, t) + delay := time.Duration(t) * time.Millisecond + for _, two := range peers { + if one == two { + continue + } + _, exist2 := faultyNodes[two] + if exist2 { + ctx.SetBehavior(one, two, nodes.FailRpc, 0, 0) + ctx.SetBehavior(two, one, nodes.FailRpc, 0, 0) + } else { + ctx.SetBehavior(one, two, nodes.DelayRpc, delay, 0) + ctx.SetBehavior(two, one, nodes.DelayRpc, delay, 0) + } + } + case 2: + fmt.Printf("[%s]的异常行为是retry\n", one) + for _, two := range peers { + if one == two { + continue + } + _, exist2 := faultyNodes[two] + if exist2 { + ctx.SetBehavior(one, two, nodes.FailRpc, 0, 0) + ctx.SetBehavior(two, one, nodes.FailRpc, 0, 0) + } else { + ctx.SetBehavior(one, two, nodes.RetryRpc, 0, 2) + ctx.SetBehavior(two, one, nodes.RetryRpc, 0, 2) + } + } + case 3: + fmt.Printf("[%s]的异常行为是stop\n", one) + faultyNodes[one] = true + close(quitCollections[one]) + + case 4: + fmt.Printf("[%s]的异常行为是partition\n", one) + PartitionNodes[one] = true + } + } + } + for id, _ := range PartitionNodes { + for _, two := range peers { + if !PartitionNodes[two] { + tran.SetConnectivity(id, two, false) + tran.SetConnectivity(two, id, false) + } + } + } + return faultyNodes }