From 6ce161187ae70029f9217e00437f7928829f3807 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Wed, 2 Apr 2025 19:06:29 +0800 Subject: [PATCH] =?UTF-8?q?replication=E6=B5=8B=E8=AF=95=E8=A1=A5=E5=85=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- threadTest/common.go | 70 +++++++++++++++++ threadTest/election_test.go | 55 ------------- threadTest/log_replication_test.go | 157 +++++++++++++++++++++++++++++++++++++ 3 files changed, 227 insertions(+), 55 deletions(-) create mode 100644 threadTest/log_replication_test.go diff --git a/threadTest/common.go b/threadTest/common.go index 18c9598..c5188fc 100644 --- a/threadTest/common.go +++ b/threadTest/common.go @@ -5,6 +5,7 @@ import ( "os" "simple-kv-store/internal/nodes" "testing" + "time" "github.com/syndtr/goleveldb/leveldb" ) @@ -38,6 +39,69 @@ func ExecuteNodeI(id string, isRestart bool, peerIds []string, threadTransport * return node, quitChan } +func ExecuteStaticNodeI(id string, isRestart bool, peerIds []string, threadTransport *nodes.ThreadTransport) (*nodes.Node, chan struct{}) { + if !isRestart { + os.RemoveAll("storage/node" + id) + } + + os.RemoveAll("leveldb/simple-kv-store" + id) + + db, err := leveldb.OpenFile("leveldb/simple-kv-store" + id, nil) + if err != nil { + fmt.Println("Failed to open database: ", err) + } + + // 打开或创建节点数据持久化文件 + storage := nodes.NewRaftStorage("storage/node" + id) + + var otherIds []string + for _, ids := range peerIds { + if ids != id { + otherIds = append(otherIds, ids) // 删除目标元素 + } + } + // 初始化 + node, quitChan := nodes.InitThreadNode(id, otherIds, db, storage, isRestart, threadTransport) + + // 开启 raft + // go nodes.Start(node, quitChan) + return node, quitChan +} + +func StopElectionReset(nodeCollections [] *nodes.Node, quitCollections []chan struct{}) { + for i := 0; i < len(quitCollections); i++ { + node := nodeCollections[i] + quitChan := quitCollections[i] + go func(node *nodes.Node, quitChan chan struct{}) { + ticker := time.NewTicker(400 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-quitChan: + return // 退出 goroutine + + case <-ticker.C: + node.ResetElectionTimer() // 不主动触发选举 + } + } + }(node, quitChan) + } +} + +func SendKvCall(kvCall *nodes.LogEntryCall, node *nodes.Node) { + node.Mu.Lock() + defer node.Mu.Unlock() + + node.MaxLogId++ + logId := node.MaxLogId + rLogE := nodes.RaftLogEntry{LogE: kvCall.LogE,LogId: logId, Term: node.CurrTerm} + node.Log = append(node.Log, rLogE) + node.Storage.AppendLog(rLogE) + // 广播给其它节点 + node.BroadCastKV() +} + func CheckOneLeader(t *testing.T, nodeCollections []* nodes.Node) { cnt := 0 for _, node := range nodeCollections { @@ -73,3 +137,9 @@ func CheckTerm(t *testing.T, node *nodes.Node, targetTerm int) { t.Errorf("[%s]实际term=%d (!=%d)", node.SelfId, node.CurrTerm, targetTerm) } } + +func CheckLogNum(t *testing.T, node *nodes.Node, targetnum int) { + if len(node.Log) != targetnum { + t.Errorf("[%s]实际logNum=%d (!=%d)", node.SelfId, len(node.Log), targetnum) + } +} diff --git a/threadTest/election_test.go b/threadTest/election_test.go index 4b762b4..a69d7ab 100644 --- a/threadTest/election_test.go +++ b/threadTest/election_test.go @@ -1,67 +1,12 @@ package threadTest import ( - "fmt" - "os" "simple-kv-store/internal/nodes" "strconv" "testing" "time" - - "github.com/syndtr/goleveldb/leveldb" ) -func ExecuteStaticNodeI(id string, isRestart bool, peerIds []string, threadTransport *nodes.ThreadTransport) (*nodes.Node, chan struct{}) { - if !isRestart { - os.RemoveAll("storage/node" + id) - } - - os.RemoveAll("leveldb/simple-kv-store" + id) - - db, err := leveldb.OpenFile("leveldb/simple-kv-store" + id, nil) - if err != nil { - fmt.Println("Failed to open database: ", err) - } - - // 打开或创建节点数据持久化文件 - storage := nodes.NewRaftStorage("storage/node" + id) - - var otherIds []string - for _, ids := range peerIds { - if ids != id { - otherIds = append(otherIds, ids) // 删除目标元素 - } - } - // 初始化 - node, quitChan := nodes.InitThreadNode(id, otherIds, db, storage, isRestart, threadTransport) - - // 开启 raft - // go nodes.Start(node, quitChan) - return node, quitChan -} - -func StopElectionReset(nodeCollections [] *nodes.Node, quitCollections []chan struct{}) { - for i := 0; i < len(quitCollections); i++ { - node := nodeCollections[i] - quitChan := quitCollections[i] - go func(node *nodes.Node, quitChan chan struct{}) { - ticker := time.NewTicker(400 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-quitChan: - return // 退出 goroutine - - case <-ticker.C: - node.ResetElectionTimer() // 不主动触发选举 - } - } - }(node, quitChan) - } - -} - func TestInitElection(t *testing.T) { n := 5 var peerIds []string diff --git a/threadTest/log_replication_test.go b/threadTest/log_replication_test.go new file mode 100644 index 0000000..15a897f --- /dev/null +++ b/threadTest/log_replication_test.go @@ -0,0 +1,157 @@ +package threadTest + +import ( + "simple-kv-store/internal/nodes" + "strconv" + "testing" + "time" +) + +func TestNormalReplication(t *testing.T) { + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + threadTransport := nodes.NewThreadTransport() + for i := 0; i < n; i++ { + n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + StopElectionReset(nodeCollections, quitCollections) + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + for i := 0; i < n; i++ { + nodeCollections[i].State = nodes.Follower + } + + nodeCollections[0].StartElection() + time.Sleep(time.Second) + CheckOneLeader(t, nodeCollections) + CheckIsLeader(t, nodeCollections[0]) + CheckTerm(t, nodeCollections[0], 2) + + for i := 0; i < 10; i++ { + key := strconv.Itoa(i) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + SendKvCall(&nodes.LogEntryCall{LogE: newlog}, nodeCollections[0]) + } + + time.Sleep(time.Second) + for i := 0; i < n; i++ { + CheckLogNum(t, nodeCollections[i], 10) + } +} + +func TestParallelReplication(t *testing.T) { + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + threadTransport := nodes.NewThreadTransport() + for i := 0; i < n; i++ { + n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + StopElectionReset(nodeCollections, quitCollections) + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + for i := 0; i < n; i++ { + nodeCollections[i].State = nodes.Follower + } + + nodeCollections[0].StartElection() + time.Sleep(time.Second) + CheckOneLeader(t, nodeCollections) + CheckIsLeader(t, nodeCollections[0]) + CheckTerm(t, nodeCollections[0], 2) + + for i := 0; i < 10; i++ { + key := strconv.Itoa(i) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + go SendKvCall(&nodes.LogEntryCall{LogE: newlog}, nodeCollections[0]) + go nodeCollections[0].BroadCastKV() + } + + time.Sleep(time.Second) + for i := 0; i < n; i++ { + CheckLogNum(t, nodeCollections[i], 10) + } +} + +func TestFollowerLagging(t *testing.T) { + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + threadTransport := nodes.NewThreadTransport() + for i := 0; i < n; i++ { + n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + StopElectionReset(nodeCollections, quitCollections) + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + for i := 0; i < n; i++ { + nodeCollections[i].State = nodes.Follower + } + + nodeCollections[0].StartElection() + time.Sleep(time.Second) + CheckOneLeader(t, nodeCollections) + CheckIsLeader(t, nodeCollections[0]) + CheckTerm(t, nodeCollections[0], 2) + close(quitCollections[1]) + + for i := 0; i < 10; i++ { + key := strconv.Itoa(i) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + go SendKvCall(&nodes.LogEntryCall{LogE: newlog}, nodeCollections[0]) + } + + node, q := ExecuteStaticNodeI("2", true, peerIds, threadTransport) + quitCollections[1] = q + nodeCollections[1] = node + nodeCollections[1].State = nodes.Follower + StopElectionReset(nodeCollections[1:2], quitCollections[1:2]) + + time.Sleep(time.Second) + for i := 0; i < n; i++ { + CheckLogNum(t, nodeCollections[i], 10) + } +}