Browse Source

大规模debug,通过随机消息的fuzz_test

ld
augurier 5 months ago
parent
commit
33a41f95c0
15 changed files with 498 additions and 122 deletions
  1. +1
    -0
      .gitignore
  2. +14
    -6
      internal/client/client_node.go
  3. +15
    -0
      internal/logprovider/traceback.go
  4. +74
    -56
      internal/nodes/init.go
  5. +3
    -0
      internal/nodes/node.go
  6. +20
    -0
      internal/nodes/node_storage.go
  7. +10
    -2
      internal/nodes/random_timetable.go
  8. +70
    -16
      internal/nodes/replica.go
  9. +7
    -2
      internal/nodes/server_node.go
  10. +1
    -1
      internal/nodes/thread_transport.go
  11. +36
    -7
      internal/nodes/vote.go
  12. +79
    -18
      threadTest/common.go
  13. +7
    -7
      threadTest/election_test.go
  14. +152
    -0
      threadTest/fuzz/fuzz_test.go
  15. +9
    -7
      threadTest/log_replication_test.go

+ 1
- 0
.gitignore View File

@ -28,3 +28,4 @@ main
leveldb
storage
*.log
testdata

+ 14
- 6
internal/client/client_node.go View File

@ -4,6 +4,7 @@ import (
"math/rand"
"simple-kv-store/internal/logprovider"
"simple-kv-store/internal/nodes"
"time"
"go.uber.org/zap"
)
@ -64,12 +65,21 @@ func (client *Client) Write(kv nodes.LogEntry) Status {
Id: nodes.LogEntryCallId{ClientId: client.ClientId, LogId: client.NextLogId}}
client.NextLogId++
var reply nodes.ServerReply
reply.Isleader = false
c := client.FindActiveNode()
var err error
for !reply.Isleader { // 根据存活节点的反馈,直到找到leader
timeout := 5 * time.Second
deadline := time.Now().Add(timeout)
for { // 根据存活节点的反馈,直到找到leader
if time.Now().After(deadline) {
log.Error("系统繁忙,疑似出错")
return Fail
}
var reply nodes.ServerReply
reply.Isleader = false
callErr := client.Transport.CallWithTimeout(c, "Node.WriteKV", &kvCall, &reply) // RPC
if callErr != nil { // dial和call之间可能崩溃,重新找存活节点
log.Error("dialing: ", zap.Error(callErr))
@ -85,7 +95,7 @@ func (client *Client) Write(kv nodes.LogEntry) Status {
c = client.FindActiveNode()
} else { // dial leader
c, err = client.Transport.DialHTTPWithTimeout("tcp", "", leaderId)
for err != nil { // dial失败,重新找下一个存活节点
if err != nil { // dial失败,重新找下一个存活节点
c = client.FindActiveNode()
}
}
@ -94,8 +104,6 @@ func (client *Client) Write(kv nodes.LogEntry) Status {
return Ok
}
}
log.Fatal("客户端会一直找存活节点,不会运行到这里")
return Fail
}
func (client *Client) Read(key string, value *string) Status { // 查不到value为空

+ 15
- 0
internal/logprovider/traceback.go View File

@ -0,0 +1,15 @@
package logprovider
import (
"fmt"
"os"
"runtime/debug"
)
func DebugTraceback(errFuncName string) {
if r := recover(); r != nil {
msg := fmt.Sprintf("panic in goroutine: %v\n%s", r, debug.Stack())
f, _ := os.Create(errFuncName + ".log")
fmt.Fprint(f, msg)
f.Close()
}
}

+ 74
- 56
internal/nodes/init.go View File

@ -39,6 +39,7 @@ func InitRPCNode(SelfId string, port string, nodeAddr map[string]string, db *lev
Transport: &HTTPTransport{NodeMap: nodeAddr},
RTTable: NewRTTable(),
SeenRequests: make(map[LogEntryCallId]bool),
IsFinish: false,
}
node.initLeaderState()
if isRestart {
@ -94,6 +95,7 @@ func InitThreadNode(SelfId string, peerIds []string, db *leveldb.DB, rstorage *R
Transport: threadTransport,
RTTable: NewRTTable(),
SeenRequests: make(map[LogEntryCallId]bool),
IsFinish: false,
}
node.initLeaderState()
if isRestart {
@ -111,7 +113,7 @@ func InitThreadNode(SelfId string, peerIds []string, db *leveldb.DB, rstorage *R
}
func (node *Node) listenForChan(rpcChan chan RPCRequest, quitChan chan struct{}) {
defer node.Db.Close()
defer logprovider.DebugTraceback("listen")
for {
select {
@ -126,63 +128,19 @@ func (node *Node) listenForChan(rpcChan chan RPCRequest, quitChan chan struct{})
if !ok2 {
log.Fatal("没有设置对应的delay时间")
}
time.Sleep(duration)
go node.switchReq(req, duration)
case FailRpc:
continue
}
switch req.ServiceMethod {
case "Node.AppendEntries":
arg, ok := req.Args.(*AppendEntriesArg)
resp, ok2 := req.Reply.(*AppendEntriesReply)
if !ok || !ok2 {
req.Done <- errors.New("type assertion failed for AppendEntries")
} else {
req.Done <- node.AppendEntries(arg, resp)
}
case "Node.RequestVote":
arg, ok := req.Args.(*RequestVoteArgs)
resp, ok2 := req.Reply.(*RequestVoteReply)
if !ok || !ok2 {
req.Done <- errors.New("type assertion failed for RequestVote")
} else {
req.Done <- node.RequestVote(arg, resp)
}
case "Node.WriteKV":
arg, ok := req.Args.(*LogEntryCall)
resp, ok2 := req.Reply.(*ServerReply)
if !ok || !ok2 {
req.Done <- errors.New("type assertion failed for WriteKV")
} else {
req.Done <- node.WriteKV(arg, resp)
}
case "Node.ReadKey":
arg, ok := req.Args.(*string)
resp, ok2 := req.Reply.(*ServerReply)
if !ok || !ok2 {
req.Done <- errors.New("type assertion failed for ReadKey")
} else {
req.Done <- node.ReadKey(arg, resp)
}
case "Node.FindLeader":
arg, ok := req.Args.(struct{})
resp, ok2 := req.Reply.(*FindLeaderReply)
if !ok || !ok2 {
req.Done <- errors.New("type assertion failed for FindLeader")
} else {
req.Done <- node.FindLeader(arg, resp)
}
default:
req.Done <- fmt.Errorf("未知方法: %s", req.ServiceMethod)
}
go node.switchReq(req, 0)
case <-quitChan:
log.Sugar().Infof("[%s] 监听线程收到退出信号", node.SelfId)
node.Mu.Lock()
defer node.Mu.Unlock()
log.Sugar().Infof("[%s] 监听线程收到退出信号", node.SelfId)
node.IsFinish = true
node.Db.Close()
node.Storage.Close()
return
@ -190,6 +148,61 @@ func (node *Node) listenForChan(rpcChan chan RPCRequest, quitChan chan struct{})
}
}
func (node *Node) switchReq(req RPCRequest, delayTime time.Duration) {
defer logprovider.DebugTraceback("switch")
time.Sleep(delayTime)
switch req.ServiceMethod {
case "Node.AppendEntries":
arg, ok := req.Args.(*AppendEntriesArg)
resp, ok2 := req.Reply.(*AppendEntriesReply)
if !ok || !ok2 {
req.Done <- errors.New("type assertion failed for AppendEntries")
} else {
req.Done <- node.AppendEntries(arg, resp)
}
case "Node.RequestVote":
arg, ok := req.Args.(*RequestVoteArgs)
resp, ok2 := req.Reply.(*RequestVoteReply)
if !ok || !ok2 {
req.Done <- errors.New("type assertion failed for RequestVote")
} else {
req.Done <- node.RequestVote(arg, resp)
}
case "Node.WriteKV":
arg, ok := req.Args.(*LogEntryCall)
resp, ok2 := req.Reply.(*ServerReply)
if !ok || !ok2 {
req.Done <- errors.New("type assertion failed for WriteKV")
} else {
req.Done <- node.WriteKV(arg, resp)
}
case "Node.ReadKey":
arg, ok := req.Args.(*string)
resp, ok2 := req.Reply.(*ServerReply)
if !ok || !ok2 {
req.Done <- errors.New("type assertion failed for ReadKey")
} else {
req.Done <- node.ReadKey(arg, resp)
}
case "Node.FindLeader":
arg, ok := req.Args.(struct{})
resp, ok2 := req.Reply.(*FindLeaderReply)
if !ok || !ok2 {
req.Done <- errors.New("type assertion failed for FindLeader")
} else {
req.Done <- node.FindLeader(arg, resp)
}
default:
req.Done <- fmt.Errorf("未知方法: %s", req.ServiceMethod)
}
}
// 共同部分和启动
func (n *Node) initLeaderState() {
for _, peerId := range n.Nodes {
@ -199,10 +212,13 @@ func (n *Node) initLeaderState() {
}
func Start(node *Node, quitChan chan struct{}) {
node.Mu.Lock()
node.State = Follower // 所有节点以 Follower 状态启动
node.Mu.Unlock()
node.ResetElectionTimer() // 启动选举超时定时器
go func() {
defer logprovider.DebugTraceback("start")
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
@ -213,14 +229,16 @@ func Start(node *Node, quitChan chan struct{}) {
return // 退出 goroutine
case <-ticker.C:
switch node.State {
node.Mu.Lock()
state := node.State
node.Mu.Unlock()
switch state {
case Follower:
// 监听心跳超时
// fmt.Printf("[%s] is a follower, 监听中...\n", node.SelfId)
case Leader:
// 发送心跳
// fmt.Printf("[%s] is the leader, 发送心跳...\n", node.SelfId)
node.ResetElectionTimer() // leader 不主动触发选举
node.BroadCastKV()
}

+ 3
- 0
internal/nodes/node.go View File

@ -17,6 +17,7 @@ const (
type Node struct {
Mu sync.Mutex
MuElection sync.Mutex
// 当前节点id
SelfId string
// 记录的leader(不能用votedfor:投票的leader可能没有收到多数票)
@ -65,5 +66,7 @@ type Node struct {
// 已经处理过的客户端请求
SeenRequests map[LogEntryCallId]bool
IsFinish bool
}

+ 20
- 0
internal/nodes/node_storage.go View File

@ -15,6 +15,7 @@ type RaftStorage struct {
mu sync.Mutex
db *leveldb.DB
filePath string
isfinish bool
}
// NewRaftStorage 创建 Raft 存储
@ -27,6 +28,7 @@ func NewRaftStorage(filePath string) *RaftStorage {
return &RaftStorage{
db: db,
filePath: filePath,
isfinish: false,
}
}
@ -34,6 +36,9 @@ func NewRaftStorage(filePath string) *RaftStorage {
func (rs *RaftStorage) SetCurrentTerm(term int) {
rs.mu.Lock()
defer rs.mu.Unlock()
if rs.isfinish {
return
}
err := rs.db.Put([]byte("current_term"), []byte(strconv.Itoa(term)), nil)
if err != nil {
log.Error("SetCurrentTerm 持久化失败:", zap.Error(err))
@ -56,6 +61,9 @@ func (rs *RaftStorage) GetCurrentTerm() int {
func (rs *RaftStorage) SetVotedFor(candidate string) {
rs.mu.Lock()
defer rs.mu.Unlock()
if rs.isfinish {
return
}
err := rs.db.Put([]byte("voted_for"), []byte(candidate), nil)
if err != nil {
log.Error("SetVotedFor 持久化失败:", zap.Error(err))
@ -77,6 +85,9 @@ func (rs *RaftStorage) GetVotedFor() string {
func (rs *RaftStorage) SetTermAndVote(term int, candidate string) {
rs.mu.Lock()
defer rs.mu.Unlock()
if rs.isfinish {
return
}
batch := new(leveldb.Batch)
batch.Put([]byte("current_term"), []byte(strconv.Itoa(term)))
@ -92,6 +103,9 @@ func (rs *RaftStorage) SetTermAndVote(term int, candidate string) {
func (rs *RaftStorage) AppendLog(entry RaftLogEntry) {
rs.mu.Lock()
defer rs.mu.Unlock()
if rs.db == nil {
return
}
// 序列化日志
batch := new(leveldb.Batch)
@ -126,6 +140,9 @@ func (rs *RaftStorage) WriteLog(entries []RaftLogEntry) {
}
rs.mu.Lock()
defer rs.mu.Unlock()
if rs.isfinish {
return
}
batch := new(leveldb.Batch)
for _, entry := range entries {
@ -170,5 +187,8 @@ func (rs *RaftStorage) GetLogEntries() []RaftLogEntry {
// Close 关闭数据库
func (rs *RaftStorage) Close() {
rs.mu.Lock()
defer rs.mu.Unlock()
rs.db.Close()
rs.isfinish = true
}

+ 10
- 2
internal/nodes/random_timetable.go View File

@ -2,14 +2,16 @@ package nodes
import (
"math/rand"
"sync"
"time"
)
type RandomTimeTable struct {
Mu sync.Mutex
electionTimeOut time.Duration
israndom bool
// heartbeat 50ms
// rpcTimeout 100ms
// rpcTimeout 50ms
// follower变candidate 500ms
// 等待选举成功时间 300ms
}
@ -21,18 +23,24 @@ func NewRTTable() *RandomTimeTable {
}
func (rttable *RandomTimeTable) GetElectionTimeout() time.Duration {
rttable.Mu.Lock()
defer rttable.Mu.Unlock()
if rttable.israndom {
return time.Duration(500+rand.Intn(500)) * time.Millisecond
} else {
return time.Duration(rttable.electionTimeOut)
return rttable.electionTimeOut
}
}
func (rttable *RandomTimeTable) SetElectionTimeout(t time.Duration) {
rttable.Mu.Lock()
defer rttable.Mu.Unlock()
rttable.israndom = false
rttable.electionTimeOut = t
}
func (rttable *RandomTimeTable) ResetElectionTimeout() {
rttable.Mu.Lock()
defer rttable.Mu.Unlock()
rttable.israndom = true
}

+ 70
- 16
internal/nodes/replica.go View File

@ -1,6 +1,7 @@
package nodes
import (
"simple-kv-store/internal/logprovider"
"sort"
"strconv"
"sync"
@ -18,6 +19,7 @@ type AppendEntriesArg struct {
}
type AppendEntriesReply struct {
Mu sync.Mutex
Term int
Success bool
}
@ -31,14 +33,20 @@ func (node *Node) BroadCastKV() {
// 遍历所有节点
for _, id := range node.Nodes {
go func(id string) {
defer logprovider.DebugTraceback("send")
node.sendKV(id, &failCount, &failMutex)
}(id)
}
}
func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) {
client, err := node.Transport.DialHTTPWithTimeout("tcp", node.SelfId, peerId)
node.Mu.Lock()
selfId := node.SelfId
node.Mu.Unlock()
client, err := node.Transport.DialHTTPWithTimeout("tcp", selfId, peerId)
if err != nil {
node.Mu.Lock()
log.Error("[" + node.SelfId + "]dialling [" + peerId + "] fail: ", zap.Error(err))
failMutex.Lock()
*failCount++
@ -48,6 +56,7 @@ func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) {
node.ResetElectionTimer()
}
failMutex.Unlock()
node.Mu.Unlock()
return
}
@ -59,16 +68,15 @@ func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) {
}(client)
node.Mu.Lock()
defer node.Mu.Unlock()
var appendReply AppendEntriesReply
appendReply.Success = false
NextIndex := node.NextIndex[peerId]
// log.Info("NextIndex " + strconv.Itoa(NextIndex))
for (!appendReply.Success) {
for {
if NextIndex < 0 {
log.Fatal("assert >= 0 here")
}
sendEntries := node.Log[NextIndex:]
arg := AppendEntriesArg{
Term: node.CurrTerm,
@ -80,30 +88,60 @@ func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) {
if arg.PrevLogIndex >= 0 {
arg.PrevLogTerm = node.Log[arg.PrevLogIndex].Term
}
// 记录关键数据后解锁
currTerm := node.CurrTerm
currState := node.State
MaxLogId := node.MaxLogId
var appendReply AppendEntriesReply
appendReply.Success = false
node.Mu.Unlock()
callErr := node.Transport.CallWithTimeout(client, "Node.AppendEntries", &arg, &appendReply) // RPC
node.Mu.Lock()
if node.CurrTerm != currTerm || node.MaxLogId != MaxLogId || node.State != currState {
node.Mu.Unlock()
return
}
if callErr != nil {
log.Error("[" + node.SelfId + "]calling [" + peerId + "] fail: ", zap.Error(callErr))
failMutex.Lock()
*failCount++
if *failCount == len(node.Nodes) / 2 + 1 { // 无法联系超过半数:自己有问题,降级
log.Info("term=" + strconv.Itoa(node.CurrTerm) + "的Leader[" + node.SelfId + "]无法联系到半数节点, 降级为 Follower")
node.LeaderId = ""
node.State = Follower
node.ResetElectionTimer()
}
failMutex.Unlock()
node.Mu.Unlock()
return
}
appendReply.Mu.Lock()
if appendReply.Term != node.CurrTerm {
log.Info("term=" + strconv.Itoa(node.CurrTerm) + "的Leader[" + node.SelfId + "]收到更高的 term=" + strconv.Itoa(appendReply.Term) + ",转换为 Follower")
log.Sugar().Infof("term=%s的leader[%s]因为[%s]收到更高的term=%s, 转换为follower",
strconv.Itoa(node.CurrTerm), node.SelfId, peerId, strconv.Itoa(appendReply.Term))
node.LeaderId = ""
node.CurrTerm = appendReply.Term
node.State = Follower
node.VotedFor = ""
node.Storage.SetTermAndVote(node.CurrTerm, node.VotedFor)
node.ResetElectionTimer()
appendReply.Mu.Unlock()
node.Mu.Unlock()
return
}
if appendReply.Success {
appendReply.Mu.Unlock()
break
}
appendReply.Mu.Unlock()
NextIndex-- // 失败往前传一格
}
@ -111,9 +149,17 @@ func (node *Node) sendKV(peerId string, failCount *int, failMutex *sync.Mutex) {
node.NextIndex[peerId] = node.MaxLogId + 1
node.MatchIndex[peerId] = node.MaxLogId
node.updateCommitIndex()
node.Mu.Unlock()
}
func (node *Node) updateCommitIndex() {
if node.Mu.TryLock() {
log.Fatal("这里要保证有锁")
}
if node.IsFinish {
return
}
totalNodes := len(node.Nodes)
// 收集所有 MatchIndex 并排序
@ -151,17 +197,19 @@ func (node *Node) applyCommittedLogs() {
// RPC call
func (node *Node) AppendEntries(arg *AppendEntriesArg, reply *AppendEntriesReply) error {
// start := time.Now()
// defer func() {
// log.Sugar().Infof("AppendEntries 处理时间: %v", time.Since(start))
// }()
log.Sugar().Infof("[%s]收到[%s]的AppendEntries", node.SelfId, arg.LeaderId)
node.Mu.Lock()
defer node.Mu.Unlock()
defer logprovider.DebugTraceback("append")
node.Mu.Lock()
defer node.Mu.Unlock()
log.Sugar().Infof("[%s]在term=%d收到[%s]的AppendEntries", node.SelfId, node.CurrTerm, arg.LeaderId)
// 如果 term 过期,拒绝接受日志
if node.CurrTerm > arg.Term {
*reply = AppendEntriesReply{node.CurrTerm, false}
reply.Mu.Lock()
reply.Term = node.CurrTerm
reply.Success = false
reply.Mu.Unlock()
return nil
}
@ -179,7 +227,10 @@ func (node *Node) AppendEntries(arg *AppendEntriesArg, reply *AppendEntriesReply
// 检查 prevLogIndex 是否有效
if arg.PrevLogIndex >= len(node.Log) || (arg.PrevLogIndex >= 0 && node.Log[arg.PrevLogIndex].Term != arg.PrevLogTerm) {
*reply = AppendEntriesReply{node.CurrTerm, false}
reply.Mu.Lock()
reply.Term = node.CurrTerm
reply.Success = false
reply.Mu.Unlock()
return nil
}
@ -222,6 +273,9 @@ func (node *Node) AppendEntries(arg *AppendEntriesArg, reply *AppendEntriesReply
// 在成功接受日志或心跳后,重置选举超时
node.ResetElectionTimer()
*reply = AppendEntriesReply{node.CurrTerm, true}
reply.Mu.Lock()
reply.Term = node.CurrTerm
reply.Success = true
reply.Mu.Unlock()
return nil
}

+ 7
- 2
internal/nodes/server_node.go View File

@ -1,6 +1,8 @@
package nodes
import (
"simple-kv-store/internal/logprovider"
"github.com/syndtr/goleveldb/leveldb"
)
@ -13,10 +15,11 @@ type ServerReply struct{
}
// RPC call
func (node *Node) WriteKV(kvCall *LogEntryCall, reply *ServerReply) error {
log.Sugar().Infof("[%s]收到客户端write请求", node.SelfId)
defer logprovider.DebugTraceback("write")
node.Mu.Lock()
defer node.Mu.Unlock()
log.Sugar().Infof("[%s]收到客户端write请求", node.SelfId)
// 自己不是leader,转交leader地址回复
if node.State != Leader {
reply.Isleader = false
@ -47,9 +50,10 @@ func (node *Node) WriteKV(kvCall *LogEntryCall, reply *ServerReply) error {
// RPC call
func (node *Node) ReadKey(key *string, reply *ServerReply) error {
log.Sugar().Infof("[%s]收到客户端read请求", node.SelfId)
defer logprovider.DebugTraceback("read")
node.Mu.Lock()
defer node.Mu.Unlock()
log.Sugar().Infof("[%s]收到客户端read请求", node.SelfId)
// 先只读自己(无论自己是不是leader),也方便测试
value, err := node.Db.Get([]byte(*key), nil)
@ -69,6 +73,7 @@ type FindLeaderReply struct{
LeaderId string
}
func (node *Node) FindLeader(_ struct{}, reply *FindLeaderReply) error {
defer logprovider.DebugTraceback("find")
node.Mu.Lock()
defer node.Mu.Unlock()

+ 1
- 1
internal/nodes/thread_transport.go View File

@ -184,7 +184,7 @@ func (t *ThreadTransport) CallWithTimeout(client ClientInterface, serviceMethod
return fmt.Errorf("network partition: %s cannot reach %s", threadClient.TargetId, threadClient.SourceId)
}
return err
case <-time.After(100 * time.Millisecond):
case <-time.After(250 * time.Millisecond):
return fmt.Errorf("RPC 调用超时: %s", serviceMethod)
}
}

+ 36
- 7
internal/nodes/vote.go View File

@ -2,6 +2,7 @@ package nodes
import (
"math/rand"
"simple-kv-store/internal/logprovider"
"strconv"
"sync"
"time"
@ -17,13 +18,18 @@ type RequestVoteArgs struct {
}
type RequestVoteReply struct {
Mu sync.Mutex
Term int // 当前节点的最新任期
VoteGranted bool // 是否同意投票
}
func (n *Node) StartElection() {
defer logprovider.DebugTraceback("startElection")
n.Mu.Lock()
defer n.Mu.Unlock()
if n.IsFinish {
n.Mu.Unlock()
return
}
// 增加当前任期,转换为 Candidate
n.CurrTerm++
n.State = Candidate
@ -58,12 +64,25 @@ func (n *Node) StartElection() {
totalNodes := len(n.Nodes)
grantedVotes := 1 // 自己的票
currTerm := n.CurrTerm
currState := n.State
n.Mu.Unlock()
for _, peerId := range n.Nodes {
go func(peerId string) {
reply := RequestVoteReply{}
defer logprovider.DebugTraceback("vote")
var reply RequestVoteReply
if n.sendRequestVote(peerId, &args, &reply) {
Mu.Lock()
defer Mu.Unlock()
n.Mu.Lock()
defer n.Mu.Unlock()
if currTerm != n.CurrTerm || currState != n.State {
return
}
reply.Mu.Lock()
if reply.Term > n.CurrTerm {
// 发现更高任期,回退为 Follower
log.Sugar().Infof("[%s] 发现更高的 Term (%d),回退为 Follower", n.SelfId, reply.Term)
@ -72,34 +91,37 @@ func (n *Node) StartElection() {
n.VotedFor = ""
n.Storage.SetTermAndVote(n.CurrTerm, n.VotedFor)
n.ResetElectionTimer()
Mu.Unlock()
reply.Mu.Unlock()
return
}
if reply.VoteGranted {
grantedVotes++
}
reply.Mu.Unlock()
if grantedVotes == totalNodes / 2 + 1 {
n.State = Leader
log.Sugar().Infof("[%s] 当选 Leader!", n.SelfId)
n.initLeaderState()
}
Mu.Unlock()
}
}(peerId)
}
// 等待选举结果
time.Sleep(300 * time.Millisecond)
Mu.Lock()
defer Mu.Unlock()
n.Mu.Lock()
defer n.Mu.Unlock()
if n.State == Candidate {
log.Sugar().Infof("[%s] 选举超时,等待后将重新发起选举", n.SelfId)
// n.State = Follower 这里不修改,如果appendentries收到term合理的心跳,再变回follower
n.ResetElectionTimer()
}
Mu.Unlock()
}
func (node *Node) sendRequestVote(peerId string, args *RequestVoteArgs, reply *RequestVoteReply) bool {
@ -125,8 +147,12 @@ func (node *Node) sendRequestVote(peerId string, args *RequestVoteArgs, reply *R
}
func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error {
defer logprovider.DebugTraceback("requestVote")
n.Mu.Lock()
defer n.Mu.Unlock()
reply.Mu.Lock()
defer reply.Mu.Unlock()
// 如果候选人的任期小于当前任期,则拒绝投票
if args.Term < n.CurrTerm {
reply.Term = n.CurrTerm
@ -175,11 +201,14 @@ func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error
return nil
}
// follower 150-300ms内没收到appendentries心跳,就变成candidate发起选举
// follower 一段时间内没收到appendentries心跳,就变成candidate发起选举
func (node *Node) ResetElectionTimer() {
node.MuElection.Lock()
defer node.MuElection.Unlock()
if node.ElectionTimer == nil {
node.ElectionTimer = time.NewTimer(node.RTTable.GetElectionTimeout())
go func() {
defer logprovider.DebugTraceback("reset")
for {
<-node.ElectionTimer.C
node.StartElection()

+ 79
- 18
threadTest/common.go View File

@ -17,15 +17,25 @@ func ExecuteNodeI(id string, isRestart bool, peerIds []string, threadTransport *
os.RemoveAll("storage/node" + id)
}
os.RemoveAll("leveldb/simple-kv-store" + id)
// 创建临时目录用于 leveldb
dbPath, err := os.MkdirTemp("", "simple-kv-store-"+id+"-")
if err != nil {
panic(fmt.Sprintf("无法创建临时数据库目录: %s", err))
}
db, err := leveldb.OpenFile("leveldb/simple-kv-store" + id, nil)
// 创建临时目录用于 storage
storagePath, err := os.MkdirTemp("", "raft-storage-"+id+"-")
if err != nil {
fmt.Println("Failed to open database: ", err)
panic(fmt.Sprintf("无法创建临时存储目录: %s", err))
}
// 打开或创建节点数据持久化文件
storage := nodes.NewRaftStorage("storage/node" + id)
db, err := leveldb.OpenFile(dbPath, nil)
if err != nil {
panic(fmt.Sprintf("Failed to open database: %s", err))
}
// 初始化 Raft 存储
storage := nodes.NewRaftStorage(storagePath)
var otherIds []string
for _, ids := range peerIds {
@ -70,24 +80,18 @@ func ExecuteStaticNodeI(id string, isRestart bool, peerIds []string, threadTrans
return node, quitChan
}
func StopElectionReset(nodeCollections [] *nodes.Node, quitCollections []chan struct{}) {
for i := 0; i < len(quitCollections); i++ {
func StopElectionReset(nodeCollections [] *nodes.Node) {
for i := 0; i < len(nodeCollections); i++ {
node := nodeCollections[i]
quitChan := quitCollections[i]
go func(node *nodes.Node, quitChan chan struct{}) {
go func(node *nodes.Node) {
ticker := time.NewTicker(400 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-quitChan:
return // 退出 goroutine
case <-ticker.C:
node.ResetElectionTimer() // 不主动触发选举
}
<-ticker.C
node.ResetElectionTimer() // 不主动触发选举
}
}(node, quitChan)
}(node)
}
}
@ -123,59 +127,116 @@ func FindLeader(t *testing.T, nodeCollections []* nodes.Node) (i int) {
}
}
t.Errorf("系统目前没有leader")
t.FailNow()
return 0
}
func CheckOneLeader(t *testing.T, nodeCollections []* nodes.Node) {
cnt := 0
for _, node := range nodeCollections {
node.Mu.Lock()
if node.State == nodes.Leader {
cnt++
}
node.Mu.Unlock()
}
if cnt != 1 {
t.Errorf("实际有%d个leader(!=1)", cnt)
t.FailNow()
}
}
func CheckNoLeader(t *testing.T, nodeCollections []* nodes.Node) {
cnt := 0
for _, node := range nodeCollections {
node.Mu.Lock()
if node.State == nodes.Leader {
cnt++
}
node.Mu.Unlock()
}
if cnt != 0 {
t.Errorf("实际有%d个leader(!=0)", cnt)
t.FailNow()
}
}
func CheckZeroOrOneLeader(t *testing.T, nodeCollections []* nodes.Node) {
cnt := 0
for _, node := range nodeCollections {
node.Mu.Lock()
if node.State == nodes.Leader {
cnt++
}
node.Mu.Unlock()
}
if cnt > 1 {
t.Errorf("实际有%d个leader(>1)", cnt)
errmsg := fmt.Sprintf("实际有%d个leader(>1)", cnt)
WriteFailLog(nodeCollections[0].SelfId, errmsg)
t.Error(errmsg)
t.FailNow()
}
}
func CheckIsLeader(t *testing.T, node *nodes.Node) {
node.Mu.Lock()
defer node.Mu.Unlock()
if node.State != nodes.Leader {
t.Errorf("[%s]不是leader", node.SelfId)
t.FailNow()
}
}
func CheckTerm(t *testing.T, node *nodes.Node, targetTerm int) {
node.Mu.Lock()
defer node.Mu.Unlock()
if node.CurrTerm != targetTerm {
t.Errorf("[%s]实际term=%d (!=%d)", node.SelfId, node.CurrTerm, targetTerm)
t.FailNow()
}
}
func CheckLogNum(t *testing.T, node *nodes.Node, targetnum int) {
node.Mu.Lock()
defer node.Mu.Unlock()
if len(node.Log) != targetnum {
t.Errorf("[%s]实际logNum=%d (!=%d)", node.SelfId, len(node.Log), targetnum)
t.FailNow()
}
}
func CheckSameLog(t *testing.T, nodeCollections []* nodes.Node) {
nodeCollections[0].Mu.Lock()
defer nodeCollections[0].Mu.Unlock()
standard_node := nodeCollections[0]
for i, node := range nodeCollections {
if i != 0 {
node.Mu.Lock()
if len(node.Log) != len(standard_node.Log) {
errmsg := fmt.Sprintf("[1]和[%s]日志数量不一致", node.SelfId)
WriteFailLog(node.SelfId, errmsg)
t.Error(errmsg)
t.FailNow()
}
for idx, log := range node.Log {
standard_log := standard_node.Log[idx]
if log.Term != standard_log.Term ||
log.LogE.Key != standard_log.LogE.Key ||
log.LogE.Value != standard_log.LogE.Value {
errmsg := fmt.Sprintf("[1]和[%s]日志id%d不一致", node.SelfId, idx)
WriteFailLog(node.SelfId, errmsg)
t.Error(errmsg)
t.FailNow()
}
}
node.Mu.Unlock()
}
}
}
func WriteFailLog(name string, errmsg string) {
f, _ := os.Create(name + ".log")
fmt.Fprint(f, errmsg)
f.Close()
}

+ 7
- 7
threadTest/election_test.go View File

@ -23,7 +23,7 @@ func TestInitElection(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){
@ -60,7 +60,7 @@ func TestRepeatElection(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){
@ -98,7 +98,7 @@ func TestBelowHalfCandidateElection(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){
@ -137,7 +137,7 @@ func TestOverHalfCandidateElection(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){
@ -178,7 +178,7 @@ func TestRepeatVoteRpc(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){
@ -229,7 +229,7 @@ func TestFailVoteRpc(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){
@ -276,7 +276,7 @@ func TestDelayVoteRpc(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){

+ 152
- 0
threadTest/fuzz/fuzz_test.go View File

@ -0,0 +1,152 @@
package fuzz
import (
"fmt"
"math/rand"
"os"
"runtime/debug"
"sync"
"testing"
"time"
"simple-kv-store/internal/client"
"simple-kv-store/internal/nodes"
"simple-kv-store/threadTest"
"strconv"
)
func FuzzRaftBasic(f *testing.F) {
var seenSeeds sync.Map
// 添加初始种子
f.Add(int64(1))
fmt.Println("Running")
f.Fuzz(func(t *testing.T, seed int64) {
if _, loaded := seenSeeds.LoadOrStore(seed, true); loaded {
t.Skipf("Seed %d already tested, skipping...", seed)
return
}
defer func() {
if r := recover(); r != nil {
msg := fmt.Sprintf("goroutine panic: %v\n%s", r, debug.Stack())
f, _ := os.Create("panic_goroutine.log")
fmt.Fprint(f, msg)
f.Close()
}
}()
r := rand.New(rand.NewSource(seed)) // 使用局部 rand
n := 3 + 2*(r.Intn(4))
fmt.Printf("随机了%d个节点\n", n)
logs := (r.Intn(10))
fmt.Printf("随机了%d份日志\n", logs)
var peerIds []string
for i := 0; i < n; i++ {
peerIds = append(peerIds, strconv.Itoa(int(seed)) + "." + strconv.Itoa(i+1))
}
ctx := nodes.NewCtx()
threadTransport := nodes.NewThreadTransport(ctx)
var quitCollections []chan struct{}
var nodeCollections []*nodes.Node
for i := 0; i < n; i++ {
node, quitChan := threadTest.ExecuteNodeI(strconv.Itoa(int(seed)) + "." + strconv.Itoa(i+1), false, peerIds, threadTransport)
nodeCollections = append(nodeCollections, node)
node.RTTable.SetElectionTimeout(750 * time.Millisecond)
quitCollections = append(quitCollections, quitChan)
}
// 模拟 a-b 通讯行为
faultyNodes := injectRandomBehavior(ctx, r, peerIds)
time.Sleep(time.Second)
clientObj := clientPkg.NewClient("0", peerIds, threadTransport)
for i := 0; i < logs; i++ {
key := fmt.Sprintf("k%d", i)
log := nodes.LogEntry{Key: key, Value: "v"}
clientObj.Write(log)
}
time.Sleep(time.Second)
var rightNodeCollections []*nodes.Node
for _, node := range nodeCollections {
if !faultyNodes[node.SelfId] {
rightNodeCollections = append(rightNodeCollections, node)
}
}
threadTest.CheckSameLog(t, rightNodeCollections)
threadTest.CheckZeroOrOneLeader(t, nodeCollections)
for _, quitChan := range quitCollections {
close(quitChan)
}
time.Sleep(time.Second)
for i := 0; i < n; i++ {
// 确保完成退出
nodeCollections[i].Mu.Lock()
if !nodeCollections[i].IsFinish {
nodeCollections[i].IsFinish = true
}
nodeCollections[i].Mu.Unlock()
os.RemoveAll("leveldb/simple-kv-store" + strconv.Itoa(int(seed)) + "." + strconv.Itoa(i+1))
os.RemoveAll("storage/node" + strconv.Itoa(int(seed)) + "." + strconv.Itoa(i+1))
}
})
}
// 注入节点间行为
func injectRandomBehavior(ctx *nodes.Ctx, r *rand.Rand, peers []string) (map[string]bool) {
behaviors := []nodes.CallBehavior{
nodes.FailRpc,
nodes.DelayRpc,
nodes.RetryRpc,
}
n := len(peers)
maxFaulty := r.Intn(n/2 + 1) // 随机选择 0 ~ n/2 个出问题的节点
// 随机选择出问题的节点
shuffled := append([]string(nil), peers...)
r.Shuffle(n, func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
faultyNodes := make(map[string]bool)
for i := 0; i < maxFaulty; i++ {
faultyNodes[shuffled[i]] = true
}
for _, one := range peers {
if faultyNodes[one] {
b := behaviors[r.Intn(len(behaviors))]
delay := time.Duration(r.Intn(100)) * time.Millisecond
switch b {
case nodes.FailRpc:
fmt.Printf("[%s]的异常行为是fail\n", one)
case nodes.DelayRpc:
fmt.Printf("[%s]的异常行为是delay\n", one)
case nodes.RetryRpc:
fmt.Printf("[%s]的异常行为是retry\n", one)
}
for _, two := range peers {
if one == two {
continue
}
if faultyNodes[one] && faultyNodes[two] {
ctx.SetBehavior(one, two, nodes.FailRpc, 0, 0)
ctx.SetBehavior(one, two, nodes.FailRpc, 0, 0)
} else {
ctx.SetBehavior(one, two, b, delay, 2)
ctx.SetBehavior(two, one, b, delay, 2)
}
}
}
}
return faultyNodes
}

+ 9
- 7
threadTest/log_replication_test.go View File

@ -23,7 +23,7 @@ func TestNormalReplication(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){
@ -70,7 +70,7 @@ func TestParallelReplication(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){
@ -118,7 +118,7 @@ func TestFollowerLagging(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){
@ -137,6 +137,7 @@ func TestFollowerLagging(t *testing.T) {
CheckIsLeader(t, nodeCollections[0])
CheckTerm(t, nodeCollections[0], 2)
close(quitCollections[1])
time.Sleep(time.Second)
for i := 0; i < 10; i++ {
key := strconv.Itoa(i)
@ -148,7 +149,8 @@ func TestFollowerLagging(t *testing.T) {
quitCollections[1] = q
nodeCollections[1] = node
nodeCollections[1].State = nodes.Follower
StopElectionReset(nodeCollections[1:2], quitCollections[1:2])
StopElectionReset(nodeCollections[1:2])
nodeCollections[0].BroadCastKV()
time.Sleep(time.Second)
for i := 0; i < n; i++ {
@ -173,7 +175,7 @@ func TestFailLogAppendRpc(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){
@ -225,7 +227,7 @@ func TestRepeatLogAppendRpc(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){
@ -277,7 +279,7 @@ func TestDelayLogAppendRpc(t *testing.T) {
quitCollections = append(quitCollections, quitChan)
nodeCollections = append(nodeCollections, n)
}
StopElectionReset(nodeCollections, quitCollections)
StopElectionReset(nodeCollections)
// 通知所有node结束
defer func(){

Loading…
Cancel
Save