Browse Source

replication测试补充

ld
augurier 5 months ago
parent
commit
6ce161187a
3 changed files with 227 additions and 55 deletions
  1. +70
    -0
      threadTest/common.go
  2. +0
    -55
      threadTest/election_test.go
  3. +157
    -0
      threadTest/log_replication_test.go

+ 70
- 0
threadTest/common.go View File

@ -5,6 +5,7 @@ import (
"os" "os"
"simple-kv-store/internal/nodes" "simple-kv-store/internal/nodes"
"testing" "testing"
"time"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
@ -38,6 +39,69 @@ func ExecuteNodeI(id string, isRestart bool, peerIds []string, threadTransport *
return node, quitChan return node, quitChan
} }
func ExecuteStaticNodeI(id string, isRestart bool, peerIds []string, threadTransport *nodes.ThreadTransport) (*nodes.Node, chan struct{}) {
if !isRestart {
os.RemoveAll("storage/node" + id)
}
os.RemoveAll("leveldb/simple-kv-store" + id)
db, err := leveldb.OpenFile("leveldb/simple-kv-store" + id, nil)
if err != nil {
fmt.Println("Failed to open database: ", err)
}
// 打开或创建节点数据持久化文件
storage := nodes.NewRaftStorage("storage/node" + id)
var otherIds []string
for _, ids := range peerIds {
if ids != id {
otherIds = append(otherIds, ids) // 删除目标元素
}
}
// 初始化
node, quitChan := nodes.InitThreadNode(id, otherIds, db, storage, isRestart, threadTransport)
// 开启 raft
// go nodes.Start(node, quitChan)
return node, quitChan
}
func StopElectionReset(nodeCollections [] *nodes.Node, quitCollections []chan struct{}) {
for i := 0; i < len(quitCollections); i++ {
node := nodeCollections[i]
quitChan := quitCollections[i]
go func(node *nodes.Node, quitChan chan struct{}) {
ticker := time.NewTicker(400 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-quitChan:
return // 退出 goroutine
case <-ticker.C:
node.ResetElectionTimer() // 不主动触发选举
}
}
}(node, quitChan)
}
}
func SendKvCall(kvCall *nodes.LogEntryCall, node *nodes.Node) {
node.Mu.Lock()
defer node.Mu.Unlock()
node.MaxLogId++
logId := node.MaxLogId
rLogE := nodes.RaftLogEntry{LogE: kvCall.LogE,LogId: logId, Term: node.CurrTerm}
node.Log = append(node.Log, rLogE)
node.Storage.AppendLog(rLogE)
// 广播给其它节点
node.BroadCastKV()
}
func CheckOneLeader(t *testing.T, nodeCollections []* nodes.Node) { func CheckOneLeader(t *testing.T, nodeCollections []* nodes.Node) {
cnt := 0 cnt := 0
for _, node := range nodeCollections { for _, node := range nodeCollections {
@ -73,3 +137,9 @@ func CheckTerm(t *testing.T, node *nodes.Node, targetTerm int) {
t.Errorf("[%s]实际term=%d (!=%d)", node.SelfId, node.CurrTerm, targetTerm) t.Errorf("[%s]实际term=%d (!=%d)", node.SelfId, node.CurrTerm, targetTerm)
} }
} }
func CheckLogNum(t *testing.T, node *nodes.Node, targetnum int) {
if len(node.Log) != targetnum {
t.Errorf("[%s]实际logNum=%d (!=%d)", node.SelfId, len(node.Log), targetnum)
}
}

+ 0
- 55
threadTest/election_test.go View File

@ -1,67 +1,12 @@
package threadTest package threadTest
import ( import (
"fmt"
"os"
"simple-kv-store/internal/nodes" "simple-kv-store/internal/nodes"
"strconv" "strconv"
"testing" "testing"
"time" "time"
"github.com/syndtr/goleveldb/leveldb"
) )
func ExecuteStaticNodeI(id string, isRestart bool, peerIds []string, threadTransport *nodes.ThreadTransport) (*nodes.Node, chan struct{}) {
if !isRestart {
os.RemoveAll("storage/node" + id)
}
os.RemoveAll("leveldb/simple-kv-store" + id)
db, err := leveldb.OpenFile("leveldb/simple-kv-store" + id, nil)
if err != nil {
fmt.Println("Failed to open database: ", err)
}
// 打开或创建节点数据持久化文件
storage := nodes.NewRaftStorage("storage/node" + id)
var otherIds []string
for _, ids := range peerIds {
if ids != id {
otherIds = append(otherIds, ids) // 删除目标元素
}
}
// 初始化
node, quitChan := nodes.InitThreadNode(id, otherIds, db, storage, isRestart, threadTransport)
// 开启 raft
// go nodes.Start(node, quitChan)
return node, quitChan
}
func StopElectionReset(nodeCollections [] *nodes.Node, quitCollections []chan struct{}) {
for i := 0; i < len(quitCollections); i++ {
node := nodeCollections[i]
quitChan := quitCollections[i]
go func(node *nodes.Node, quitChan chan struct{}) {
ticker := time.NewTicker(400 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-quitChan:
return // 退出 goroutine
case <-ticker.C:
node.ResetElectionTimer() // 不主动触发选举
}
}
}(node, quitChan)
}
}
func TestInitElection(t *testing.T) { func TestInitElection(t *testing.T) {
n := 5 n := 5
var peerIds []string var peerIds []string

+ 157
- 0
threadTest/log_replication_test.go View File

@ -0,0 +1,157 @@
package threadTest
import (
"simple-kv-store/internal/nodes"
"strconv"
"testing"
"time"
)
func TestNormalReplication(t *testing.T) {
n := 5
var peerIds []string
for i := 0; i < n; i++ {
peerIds = append(peerIds, strconv.Itoa(i + 1))
}
// 结点启动
var quitCollections []chan struct{}
var nodeCollections []*nodes.Node
threadTransport := nodes.NewThreadTransport()
for i := 0; i < n; i++ {
n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport)
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
// 通知所有node结束
defer func(){
for _, quitChan := range quitCollections {
close(quitChan)
}
}()
for i := 0; i < n; i++ {
nodeCollections[i].State = nodes.Follower
}
nodeCollections[0].StartElection()
time.Sleep(time.Second)
CheckOneLeader(t, nodeCollections)
CheckIsLeader(t, nodeCollections[0])
CheckTerm(t, nodeCollections[0], 2)
for i := 0; i < 10; i++ {
key := strconv.Itoa(i)
newlog := nodes.LogEntry{Key: key, Value: "hello"}
SendKvCall(&nodes.LogEntryCall{LogE: newlog}, nodeCollections[0])
}
time.Sleep(time.Second)
for i := 0; i < n; i++ {
CheckLogNum(t, nodeCollections[i], 10)
}
}
func TestParallelReplication(t *testing.T) {
n := 5
var peerIds []string
for i := 0; i < n; i++ {
peerIds = append(peerIds, strconv.Itoa(i + 1))
}
// 结点启动
var quitCollections []chan struct{}
var nodeCollections []*nodes.Node
threadTransport := nodes.NewThreadTransport()
for i := 0; i < n; i++ {
n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport)
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
// 通知所有node结束
defer func(){
for _, quitChan := range quitCollections {
close(quitChan)
}
}()
for i := 0; i < n; i++ {
nodeCollections[i].State = nodes.Follower
}
nodeCollections[0].StartElection()
time.Sleep(time.Second)
CheckOneLeader(t, nodeCollections)
CheckIsLeader(t, nodeCollections[0])
CheckTerm(t, nodeCollections[0], 2)
for i := 0; i < 10; i++ {
key := strconv.Itoa(i)
newlog := nodes.LogEntry{Key: key, Value: "hello"}
go SendKvCall(&nodes.LogEntryCall{LogE: newlog}, nodeCollections[0])
go nodeCollections[0].BroadCastKV()
}
time.Sleep(time.Second)
for i := 0; i < n; i++ {
CheckLogNum(t, nodeCollections[i], 10)
}
}
func TestFollowerLagging(t *testing.T) {
n := 5
var peerIds []string
for i := 0; i < n; i++ {
peerIds = append(peerIds, strconv.Itoa(i + 1))
}
// 结点启动
var quitCollections []chan struct{}
var nodeCollections []*nodes.Node
threadTransport := nodes.NewThreadTransport()
for i := 0; i < n; i++ {
n, quitChan := ExecuteStaticNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport)
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
// 通知所有node结束
defer func(){
for _, quitChan := range quitCollections {
close(quitChan)
}
}()
for i := 0; i < n; i++ {
nodeCollections[i].State = nodes.Follower
}
nodeCollections[0].StartElection()
time.Sleep(time.Second)
CheckOneLeader(t, nodeCollections)
CheckIsLeader(t, nodeCollections[0])
CheckTerm(t, nodeCollections[0], 2)
close(quitCollections[1])
for i := 0; i < 10; i++ {
key := strconv.Itoa(i)
newlog := nodes.LogEntry{Key: key, Value: "hello"}
go SendKvCall(&nodes.LogEntryCall{LogE: newlog}, nodeCollections[0])
}
node, q := ExecuteStaticNodeI("2", true, peerIds, threadTransport)
quitCollections[1] = q
nodeCollections[1] = node
nodeCollections[1].State = nodes.Follower
StopElectionReset(nodeCollections[1:2], quitCollections[1:2])
time.Sleep(time.Second)
for i := 0; i < n; i++ {
CheckLogNum(t, nodeCollections[i], 10)
}
}

Loading…
Cancel
Save