From a460aba9853e88b0abb5ce39eb174d9dad87a250 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Sat, 5 Apr 2025 15:45:30 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=85=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/nodes/init.go | 2 + internal/nodes/node.go | 3 + internal/nodes/random_timetable.go | 38 +++++++++ internal/nodes/vote.go | 4 +- threadTest/common.go | 36 ++++++++ threadTest/election_test.go | 75 ++++++++++++++++- threadTest/log_replication_test.go | 167 +++++++++++++++++++++++++++++++++++++ threadTest/restart_node_test.go | 70 +++++++++++++--- threadTest/server_client_test.go | 53 ++++++++++++ 9 files changed, 434 insertions(+), 14 deletions(-) create mode 100644 internal/nodes/random_timetable.go diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 77afdeb..bacf26a 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -37,6 +37,7 @@ func InitRPCNode(SelfId string, port string, nodeAddr map[string]string, db *lev Db: db, Storage: rstorage, Transport: &HTTPTransport{NodeMap: nodeAddr}, + RTTable: NewRTTable(), SeenRequests: make(map[LogEntryCallId]bool), } node.initLeaderState() @@ -91,6 +92,7 @@ func InitThreadNode(SelfId string, peerIds []string, db *leveldb.DB, rstorage *R Db: db, Storage: rstorage, Transport: threadTransport, + RTTable: NewRTTable(), SeenRequests: make(map[LogEntryCallId]bool), } node.initLeaderState() diff --git a/internal/nodes/node.go b/internal/nodes/node.go index bd12ff6..82c9b1f 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -60,6 +60,9 @@ type Node struct { // 通信方式 Transport Transport + // 系统的随机时间 + RTTable *RandomTimeTable + // 已经处理过的客户端请求 SeenRequests map[LogEntryCallId]bool } diff --git a/internal/nodes/random_timetable.go b/internal/nodes/random_timetable.go new file mode 100644 index 0000000..0c132d2 --- /dev/null +++ b/internal/nodes/random_timetable.go @@ -0,0 +1,38 @@ +package nodes + +import ( + "math/rand" + "time" +) + +type RandomTimeTable struct { + electionTimeOut time.Duration + israndom bool + // heartbeat 50ms + // rpcTimeout 100ms + // follower变candidate 500ms + // 等待选举成功时间 300ms +} + +func NewRTTable() *RandomTimeTable { + return &RandomTimeTable{ + israndom: true, + } +} + +func (rttable *RandomTimeTable) GetElectionTimeout() time.Duration { + if rttable.israndom { + return time.Duration(500+rand.Intn(500)) * time.Millisecond + } else { + return time.Duration(rttable.electionTimeOut) + } +} + +func (rttable *RandomTimeTable) SetElectionTimeout(t time.Duration) { + rttable.israndom = false + rttable.electionTimeOut = t +} + +func (rttable *RandomTimeTable) ResetElectionTimeout() { + rttable.israndom = true +} \ No newline at end of file diff --git a/internal/nodes/vote.go b/internal/nodes/vote.go index 3568610..8f3a365 100644 --- a/internal/nodes/vote.go +++ b/internal/nodes/vote.go @@ -175,10 +175,10 @@ func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error return nil } -// follower 500-1000ms内没收到appendentries心跳,就变成candidate发起选举 +// follower 150-300ms内没收到appendentries心跳,就变成candidate发起选举 func (node *Node) ResetElectionTimer() { if node.ElectionTimer == nil { - node.ElectionTimer = time.NewTimer(time.Duration(500+rand.Intn(500)) * time.Millisecond) + node.ElectionTimer = time.NewTimer(node.RTTable.GetElectionTimeout()) go func() { for { <-node.ElectionTimer.C diff --git a/threadTest/common.go b/threadTest/common.go index c5188fc..c6a7d40 100644 --- a/threadTest/common.go +++ b/threadTest/common.go @@ -3,7 +3,9 @@ package threadTest import ( "fmt" "os" + "simple-kv-store/internal/client" "simple-kv-store/internal/nodes" + "strconv" "testing" "time" @@ -102,6 +104,28 @@ func SendKvCall(kvCall *nodes.LogEntryCall, node *nodes.Node) { node.BroadCastKV() } +func ClientWriteLog(t *testing.T, startLogid int, endLogid int, cWrite *clientPkg.Client) { + var s clientPkg.Status + for i := startLogid; i < endLogid; i++ { + key := strconv.Itoa(i) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + s = cWrite.Write(newlog) + if s != clientPkg.Ok { + t.Errorf("write test fail") + } + } +} + +func FindLeader(t *testing.T, nodeCollections []* nodes.Node) (i int) { + for i, node := range nodeCollections { + if node.State == nodes.Leader { + return i + } + } + t.Errorf("系统目前没有leader") + return 0 +} + func CheckOneLeader(t *testing.T, nodeCollections []* nodes.Node) { cnt := 0 for _, node := range nodeCollections { @@ -114,6 +138,18 @@ func CheckOneLeader(t *testing.T, nodeCollections []* nodes.Node) { } } +func CheckNoLeader(t *testing.T, nodeCollections []* nodes.Node) { + cnt := 0 + for _, node := range nodeCollections { + if node.State == nodes.Leader { + cnt++ + } + } + if cnt != 0 { + t.Errorf("实际有%d个leader(!=0)", cnt) + } +} + func CheckZeroOrOneLeader(t *testing.T, nodeCollections []* nodes.Node) { cnt := 0 for _, node := range nodeCollections { diff --git a/threadTest/election_test.go b/threadTest/election_test.go index 335c729..43b988b 100644 --- a/threadTest/election_test.go +++ b/threadTest/election_test.go @@ -198,6 +198,18 @@ func TestRepeatVoteRpc(t *testing.T) { CheckOneLeader(t, nodeCollections) CheckIsLeader(t, nodeCollections[0]) CheckTerm(t, nodeCollections[0], 2) + + for i := 0; i < n; i++ { + ctx.SetBehavior("1", nodeCollections[i].SelfId, nodes.RetryRpc, 0, 2) + ctx.SetBehavior("2", nodeCollections[i].SelfId, nodes.RetryRpc, 0, 2) + } + + go nodeCollections[0].StartElection() + go nodeCollections[1].StartElection() + time.Sleep(time.Second) + + CheckOneLeader(t, nodeCollections) + CheckTerm(t, nodeCollections[0], 3) } func TestFailVoteRpc(t *testing.T) { @@ -237,4 +249,65 @@ func TestFailVoteRpc(t *testing.T) { CheckOneLeader(t, nodeCollections) CheckIsLeader(t, nodeCollections[0]) CheckTerm(t, nodeCollections[0], 2) -} \ No newline at end of file + + ctx.SetBehavior("1", "3", nodes.FailRpc, 0, 0) + ctx.SetBehavior("1", "4", nodes.FailRpc, 0, 0) + nodeCollections[0].StartElection() + time.Sleep(time.Second) + + CheckNoLeader(t, nodeCollections) + CheckTerm(t, nodeCollections[0], 3) +} + +func TestDelayVoteRpc(t *testing.T) { + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + ctx := nodes.NewCtx() + threadTransport := nodes.NewThreadTransport(ctx) + for i := 0; i < n; i++ { + n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + StopElectionReset(nodeCollections, quitCollections) + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + for i := 0; i < n; i++ { + nodeCollections[i].State = nodes.Follower + ctx.SetBehavior("1", nodeCollections[i].SelfId, nodes.DelayRpc, time.Second, 0) + } + + nodeCollections[0].StartElection() + time.Sleep(2 * time.Second) + + CheckNoLeader(t, nodeCollections) + for i := 0; i < n; i++ { + CheckTerm(t, nodeCollections[i], 2) + } + + for i := 0; i < n; i++ { + nodeCollections[i].State = nodes.Follower + ctx.SetBehavior("1", nodeCollections[i].SelfId, nodes.DelayRpc, 50 * time.Millisecond, 0) + } + + nodeCollections[0].StartElection() + time.Sleep(time.Second) + + CheckOneLeader(t, nodeCollections) + for i := 0; i < n; i++ { + CheckTerm(t, nodeCollections[i], 3) + } +} diff --git a/threadTest/log_replication_test.go b/threadTest/log_replication_test.go index 7cc0955..93b6371 100644 --- a/threadTest/log_replication_test.go +++ b/threadTest/log_replication_test.go @@ -155,3 +155,170 @@ func TestFollowerLagging(t *testing.T) { CheckLogNum(t, nodeCollections[i], 10) } } + +func TestFailLogAppendRpc(t *testing.T) { + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + ctx := nodes.NewCtx() + threadTransport := nodes.NewThreadTransport(ctx) + for i := 0; i < n; i++ { + n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + StopElectionReset(nodeCollections, quitCollections) + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + for i := 0; i < n; i++ { + nodeCollections[i].State = nodes.Follower + } + + nodeCollections[0].StartElection() + time.Sleep(time.Second) + CheckOneLeader(t, nodeCollections) + CheckIsLeader(t, nodeCollections[0]) + CheckTerm(t, nodeCollections[0], 2) + + for i := 0; i < n; i++ { + ctx.SetBehavior("1", nodeCollections[i].SelfId, nodes.FailRpc, 0, 0) + } + + for i := 0; i < 10; i++ { + key := strconv.Itoa(i) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + go SendKvCall(&nodes.LogEntryCall{LogE: newlog}, nodeCollections[0]) + } + + time.Sleep(time.Second) + for i := 1; i < n; i++ { + CheckLogNum(t, nodeCollections[i], 0) + } +} + +func TestRepeatLogAppendRpc(t *testing.T) { + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + ctx := nodes.NewCtx() + threadTransport := nodes.NewThreadTransport(ctx) + for i := 0; i < n; i++ { + n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + StopElectionReset(nodeCollections, quitCollections) + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + for i := 0; i < n; i++ { + nodeCollections[i].State = nodes.Follower + } + + nodeCollections[0].StartElection() + time.Sleep(time.Second) + CheckOneLeader(t, nodeCollections) + CheckIsLeader(t, nodeCollections[0]) + CheckTerm(t, nodeCollections[0], 2) + + for i := 0; i < n; i++ { + ctx.SetBehavior("1", nodeCollections[i].SelfId, nodes.RetryRpc, 0, 2) + } + + for i := 0; i < 10; i++ { + key := strconv.Itoa(i) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + go SendKvCall(&nodes.LogEntryCall{LogE: newlog}, nodeCollections[0]) + } + + time.Sleep(time.Second) + for i := 0; i < n; i++ { + CheckLogNum(t, nodeCollections[i], 10) + } +} + +func TestDelayLogAppendRpc(t *testing.T) { + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + ctx := nodes.NewCtx() + threadTransport := nodes.NewThreadTransport(ctx) + for i := 0; i < n; i++ { + n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + StopElectionReset(nodeCollections, quitCollections) + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + for i := 0; i < n; i++ { + nodeCollections[i].State = nodes.Follower + } + + nodeCollections[0].StartElection() + time.Sleep(time.Second) + CheckOneLeader(t, nodeCollections) + CheckIsLeader(t, nodeCollections[0]) + CheckTerm(t, nodeCollections[0], 2) + + for i := 0; i < n; i++ { + ctx.SetBehavior("1", nodeCollections[i].SelfId, nodes.DelayRpc, time.Second, 0) + } + + for i := 0; i < 5; i++ { + key := strconv.Itoa(i) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + go SendKvCall(&nodes.LogEntryCall{LogE: newlog}, nodeCollections[0]) + } + + time.Sleep(time.Millisecond * 100) + + for i := 0; i < n; i++ { + ctx.SetBehavior("1", nodeCollections[i].SelfId, nodes.NormalRpc, 0, 0) + } + for i := 5; i < 10; i++ { + key := strconv.Itoa(i) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + go SendKvCall(&nodes.LogEntryCall{LogE: newlog}, nodeCollections[0]) + } + + time.Sleep(time.Second * 2) + for i := 0; i < n; i++ { + CheckLogNum(t, nodeCollections[i], 10) + } +} diff --git a/threadTest/restart_node_test.go b/threadTest/restart_node_test.go index 6c96ead..71bde83 100644 --- a/threadTest/restart_node_test.go +++ b/threadTest/restart_node_test.go @@ -35,15 +35,7 @@ func TestNodeRestart(t *testing.T) { // client启动, 连接任意节点 cWrite := clientPkg.NewClient("0", peerIds, threadTransport) // 写入 - var s clientPkg.Status - for i := 0; i < 5; i++ { - key := strconv.Itoa(i) - newlog := nodes.LogEntry{Key: key, Value: "hello"} - s = cWrite.Write(newlog) - if s != clientPkg.Ok { - t.Errorf("write test fail") - } - } + ClientWriteLog(t, 0, 5, cWrite) time.Sleep(time.Second) // 等待写入完毕 // 模拟结点轮流崩溃 @@ -63,7 +55,7 @@ func TestNodeRestart(t *testing.T) { for i := 0; i < 5; i++ { key := strconv.Itoa(i) var value string - s = cRead.Read(key, &value) + s := cRead.Read(key, &value) if s != clientPkg.Ok { t.Errorf("Read test1 fail") } @@ -73,9 +65,65 @@ func TestNodeRestart(t *testing.T) { for i := 5; i < 15; i++ { key := strconv.Itoa(i) var value string - s = cRead.Read(key, &value) + s := cRead.Read(key, &value) if s != clientPkg.NotFound { t.Errorf("Read test2 fail") } } } + + +func TestRestartWhileWriting(t *testing.T) { + // 登记结点信息 + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + threadTransport := nodes.NewThreadTransport(nodes.NewCtx()) + for i := 0; i < n; i++ { + n, quitChan := ExecuteNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + time.Sleep(time.Second) // 等待启动完毕 + leaderIdx := FindLeader(t, nodeCollections) + // client启动, 连接任意节点 + cWrite := clientPkg.NewClient("0", peerIds, threadTransport) + // 写入 + go ClientWriteLog(t, 0, 5, cWrite) + + go func() { + close(quitCollections[leaderIdx]) + + n, quitChan := ExecuteNodeI(strconv.Itoa(leaderIdx + 1), true, peerIds, threadTransport) + quitCollections[leaderIdx] = quitChan + nodeCollections[leaderIdx] = n + }() + + time.Sleep(time.Second) // 等待启动完毕 + // client启动 + cRead := clientPkg.NewClient("0", peerIds, threadTransport) + // 读写入数据 + for i := 0; i < 5; i++ { + key := strconv.Itoa(i) + var value string + s := cRead.Read(key, &value) + if s != clientPkg.Ok { + t.Errorf("Read test1 fail") + } + } + CheckLogNum(t, nodeCollections[leaderIdx], 5) +} diff --git a/threadTest/server_client_test.go b/threadTest/server_client_test.go index c95c60a..8b80ac1 100644 --- a/threadTest/server_client_test.go +++ b/threadTest/server_client_test.go @@ -137,3 +137,56 @@ func TestRepeatClientReq(t *testing.T) { CheckLogNum(t, nodeCollections[i], 10) } } + +func TestParallelClientReq(t *testing.T) { + // 登记结点信息 + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + ctx := nodes.NewCtx() + threadTransport := nodes.NewThreadTransport(ctx) + for i := 0; i < n; i++ { + n, quitChan := ExecuteNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + time.Sleep(time.Second) // 等待启动完毕 + // client启动 + c1 := clientPkg.NewClient("0", peerIds, threadTransport) + c2 := clientPkg.NewClient("1", peerIds, threadTransport) + + + + // 写入 + go ClientWriteLog(t, 0, 10, c1) + go ClientWriteLog(t, 0, 10, c2) + + time.Sleep(time.Second) // 等待写入完毕 + // 读写入数据 + for i := 0; i < 10; i++ { + key := strconv.Itoa(i) + var value string + s := c1.Read(key, &value) + if s != clientPkg.Ok || value != "hello" { + t.Errorf("Read test1 fail") + } + } + + for i := 0; i < n; i++ { + CheckLogNum(t, nodeCollections[i], 20) + } +}