Browse Source

项目框架调整

ld
augurier 6 months ago
parent
commit
15c7f2ad4f
13 changed files with 443 additions and 539 deletions
  1. +19
    -6
      README.md
  2. +5
    -6
      cmd/main.go
  3. +63
    -42
      internal/client/client_node.go
  4. +3
    -79
      internal/nodes/init.go
  5. +0
    -208
      internal/nodes/node.go
  6. +2
    -1
      internal/nodes/node_storage.go
  7. +214
    -0
      internal/nodes/replica.go
  8. +8
    -4
      internal/nodes/server_node.go
  9. +30
    -13
      internal/nodes/vote.go
  10. +0
    -61
      scripts/run.sh
  11. +29
    -44
      test/restart_node_test.go
  12. +2
    -7
      test/server_client_test.go

+ 19
- 6
README.md View File

@ -4,21 +4,34 @@
基于go语言实现分布式kv数据库
# 框架
每个运行main.go进程作为一个节点
```
---internal
---client 客户端使用节点提供的读写功能
---logprovider 封装了简单的日志打印,方便调试
---nodes 分布式核心代码
init.go 节点在main中的调用初始化,和大循环启动
log.go 节点存储的entry相关数据结构
node_storage.go 抽象了节点数据持久化方法,存到json文件里
node.go 节点的相关数据结构
replica.go 日志复制相关逻辑
server_node.go 节点作为server为 client提供的功能(读写)
vote.go 选主相关逻辑
```
# 环境与运行
使用环境是wsl+ubuntu
go mod download安装依赖
./scripts/build.sh 会在根目录下编译出main
./scripts/run.sh 运行三个节点,目前能在终端进行读入,leader(n1)节点输出send log,其余节点输出receive log。终端输入后如果超时就退出(脚本运行时间可以在其中调整)。
# 注意
脚本第一次运行需要权限获取 chmod +x <脚本>
如果出现tcp listen error可能是因为之前的进程没用正常退出,占用了端口
lsof -i :9091查看pid
kill -9 <pid>杀死进程
## 关于测试
通过新开进程的方式创建节点,如果通过线程创建,会出现重复注册rpc问题
通过新开进程的方式创建节点(参考test/common.go中executeNodeI函数
如果通过线程创建,会出现重复注册rpc问题
# todo list
消息通讯异常的处理
kv本地持久化
崩溃与恢复(以及对应的测试)

+ 5
- 6
cmd/main.go View File

@ -31,7 +31,6 @@ func main() {
port := flag.String("port", ":9091", "rpc listen port")
cluster := flag.String("cluster", "127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093", "comma sep")
id := flag.String("id", "1", "node ID")
pipe := flag.String("pipe", "", "input from scripts")
isNewDb := flag.Bool("isNewDb", true, "new test or restart")
// 参数解析
@ -41,7 +40,7 @@ func main() {
idCnt := 1
selfi, err := strconv.Atoi(*id)
if err != nil {
log.Error("figure id only")
log.Fatal("figure id only")
}
for _, addr := range clusters {
if idCnt == selfi {
@ -73,16 +72,16 @@ func main() {
for iter.Next() {
count++
}
fmt.Printf(*id+"结点目前有数据:%d\n", count)
log.Sugar().Infof("[%s]目前有数据:%d", *id, count)
node := nodes.Init(*id, idClusterPairs, *pipe, db, storage)
log.Info("id: " + *id + "节点开始监听: " + *port + "端口")
node := nodes.Init(*id, idClusterPairs, db, storage)
log.Sugar().Infof("[%s]开始监听" + *port + "端口", *id)
// 监听rpc
node.Rpc(*port)
// 开启 raft
nodes.Start(node)
sig := <-sigs
fmt.Println("node_"+*id+"接收到信号:", sig)
fmt.Println("node_"+ *id +"接收到信号:", sig)
}

+ 63
- 42
internal/client/client_node.go View File

@ -11,9 +11,8 @@ import (
var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel)
type Client struct {
// 连接的server端节点(node1)
ServerId string
Address string
// 连接的server端节点群
Address [] string
}
type Status = uint8
@ -24,35 +23,62 @@ const (
Fail
)
func (client *Client) FindActiveNode() *rpc.Client {
var err error
var c *rpc.Client
i := 0
for { // 直到找到一个可连接的节点(保证至少一个节点活着)
c, err = rpc.DialHTTP("tcp", client.Address[i])
if err != nil {
log.Error("dialing: ", zap.Error(err))
i++
} else {
break
}
if i == len(client.Address) {
log.Fatal("没找到存活节点")
}
}
return c
}
func (client *Client) CloseRpcClient(c *rpc.Client) {
err := c.Close()
if err != nil {
log.Error("client close err: ", zap.Error(err))
}
}
//
func (client *Client) Write(kvCall nodes.LogEntryCall) Status {
log.Info("client write request key :" + kvCall.LogE.Key)
var reply nodes.ServerReply
reply.Isleader = false
addr := client.Address
for !reply.Isleader {
c, err := rpc.DialHTTP("tcp", addr)
if err != nil {
log.Error("dialing: ", zap.Error(err))
return Fail
}
c := client.FindActiveNode()
var err error
for !reply.Isleader { // 根据存活节点的反馈,直到找到leader
callErr := c.Call("Node.WriteKV", kvCall, &reply) // RPC
if callErr != nil {
if callErr != nil { // dial和call之间可能崩溃,重新找存活节点
log.Error("dialing: ", zap.Error(callErr))
return Fail
}
err = c.Close()
if err != nil {
log.Error("client close err: ", zap.Error(err))
client.CloseRpcClient(c)
c = client.FindActiveNode()
continue
}
if !reply.Isleader { // 发过去的不是leader
addr = reply.LeaderAddress
if !reply.Isleader { // 对方不是leader,根据反馈找leader
addr := reply.LeaderAddress
client.CloseRpcClient(c)
c, err = rpc.DialHTTP("tcp", addr)
for err != nil { // 重新找下一个存活节点
c = client.FindActiveNode()
}
} else { // 成功
client.CloseRpcClient(c)
return Ok
}
}
log.Fatal("客户端会一直找存活节点,不会运行到这里")
return Fail
}
@ -61,34 +87,29 @@ func (client *Client) Read(key string, value *string) Status { // 查不到value
if value == nil {
return Fail
}
var c *rpc.Client
for {
c = client.FindActiveNode()
c, err := rpc.DialHTTP("tcp", client.Address)
if err != nil {
log.Error("dialing: ", zap.Error(err))
return Fail
}
defer func(server *rpc.Client) {
err := c.Close()
if err != nil {
log.Error("client close err: ", zap.Error(err))
var reply nodes.ServerReply
callErr := c.Call("Node.ReadKey", key, &reply) // RPC
if callErr != nil {
log.Error("dialing: ", zap.Error(callErr))
client.CloseRpcClient(c)
continue
}
}(c)
var reply nodes.ServerReply
callErr := c.Call("Node.ReadKey", key, &reply) // RPC
if callErr != nil {
log.Error("dialing: ", zap.Error(callErr))
return Fail
}
// 目前一定发送成功
if reply.HaveValue {
*value = reply.Value
return Ok
} else {
return NotFound
// 目前一定发送成功
if reply.HaveValue {
*value = reply.Value
client.CloseRpcClient(c)
return Ok
} else {
client.CloseRpcClient(c)
return NotFound
}
}
}

+ 3
- 79
internal/nodes/init.go View File

@ -2,7 +2,6 @@ package nodes
import (
"fmt"
"math/rand"
"net"
"net/http"
"net/rpc"
@ -22,7 +21,7 @@ func newNode(address string) *Public_node_info {
}
}
func Init(selfId string, nodeAddr map[string]string, pipe string, db *leveldb.DB, rstorage *RaftStorage) *Node {
func Init(selfId string, nodeAddr map[string]string, db *leveldb.DB, rstorage *RaftStorage) *Node {
ns := make(map[string]*Public_node_info)
for id, addr := range nodeAddr {
ns[id] = newNode(addr)
@ -33,7 +32,6 @@ func Init(selfId string, nodeAddr map[string]string, pipe string, db *leveldb.DB
selfId: selfId,
leaderId: "",
nodes: ns,
pipeAddr: pipe,
maxLogId: -1, // 后来发现论文中是从1开始的(初始0),但不想改了
currTerm: 1,
log: make([]RaftLogEntry, 0),
@ -48,64 +46,6 @@ func Init(selfId string, nodeAddr map[string]string, pipe string, db *leveldb.DB
return node
}
// func Start(node *Node, isLeader bool) {
// if isLeader {
// node.state = Candidate // 需要身份转变
// } else {
// node.state = Follower
// }
// go func() {
// for {
// switch node.state {
// case Follower:
// case Candidate:
// // todo 成为leader的初始化
// // node.currTerm = 1
// // candidate发布一个监听输入线程后,变成leader
// node.state = Leader
// go func() {
// if node.pipeAddr == "" { // 客户端远程调用server_node方法
// log.Info("请运行客户端进程进行读写")
// } else { // 命令行提供了管道,支持管道(键盘)输入
// pipe, err := os.Open(node.pipeAddr)
// if err != nil {
// log.Error("Failed to open pipe")
// }
// defer pipe.Close()
// // 不断读取管道中的输入
// buffer := make([]byte, 256)
// for {
// n, err := pipe.Read(buffer)
// if err != nil && err != io.EOF {
// log.Error("Error reading from pipe")
// }
// if n > 0 {
// input := string(buffer[:n])
// // 将用户输入封装成一个 LogEntry
// kv := LogEntry{input, ""} // 目前键盘输入key,value 0
// logId := node.maxLogId
// node.maxLogId++
// node.log[logId] = RaftLogEntry{kv, logId, node.currTerm}
// log.Info("send : logId = " + strconv.Itoa(logId) + ", key = " + input)
// // 广播给其它节点
// node.BroadCastKV(Normal)
// // 持久化
// node.db.Put([]byte(kv.Key), []byte(kv.Value), nil)
// }
// }
// }
// }()
// case Leader:
// time.Sleep(50 * time.Millisecond)
// }
// }
// }()
// }
func (n *Node) initLeaderState() {
for peerId := range n.nodes {
n.nextIndex[peerId] = len(n.log) // 发送日志的下一个索引
@ -123,11 +63,11 @@ func Start(node *Node) {
switch node.state {
case Follower:
// 监听心跳超时
fmt.Printf("Node %s is a follower, 监听中...\n", node.selfId)
fmt.Printf("[%s] is a follower, 监听中...\n", node.selfId)
case Leader:
// 发送心跳
fmt.Printf("Node %s is the leader, 发送心跳...\n", node.selfId)
fmt.Printf("[%s] is the leader, 发送心跳...\n", node.selfId)
node.BroadCastKV(Normal)
}
time.Sleep(50 * time.Millisecond)
@ -135,22 +75,6 @@ func Start(node *Node) {
}()
}
// follower 500-1000ms内没收到appendentries心跳,就变成candidate发起选举
func (node *Node) resetElectionTimer() {
if node.electionTimer == nil {
node.electionTimer = time.NewTimer(time.Duration(500+rand.Intn(500)) * time.Millisecond)
go func() {
for {
<-node.electionTimer.C
node.startElection()
}
}()
} else {
node.electionTimer.Stop()
node.electionTimer.Reset(time.Duration(500+rand.Intn(500)) * time.Millisecond)
}
}
func (node *Node) Rpc(port string) {
err := rpc.Register(node)
if err != nil {

+ 0
- 208
internal/nodes/node.go View File

@ -1,15 +1,10 @@
package nodes
import (
"math/rand"
"net/rpc"
"sort"
"strconv"
"sync"
"time"
"github.com/syndtr/goleveldb/leveldb"
"go.uber.org/zap"
)
type State = uint8
@ -35,9 +30,6 @@ type Node struct {
// 除当前节点外其他节点信息
nodes map[string]*Public_node_info
// 管道名
pipeAddr string
// 当前节点状态
state State
@ -71,203 +63,3 @@ type Node struct {
electionTimer *time.Timer
}
func (node *Node) BroadCastKV(callMode CallMode) {
// 遍历所有节点
for id := range node.nodes {
go func(id string, kv CallMode) {
node.sendKV(id, callMode)
}(id, callMode)
}
}
func (node *Node) sendKV(id string, callMode CallMode) {
switch callMode {
case Fail:
log.Info("模拟发送失败")
// 这么写向所有的node发送都失败,也可以随机数确定是否失败
case Delay:
log.Info("模拟发送延迟")
// 随机延迟0-5ms
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)))
default:
}
client, err := rpc.DialHTTP("tcp", node.nodes[id].address)
if err != nil {
log.Error("dialing: ", zap.Error(err))
return
}
defer func(client *rpc.Client) {
err := client.Close()
if err != nil {
log.Error("client close err: ", zap.Error(err))
}
}(client)
node.mu.Lock()
defer node.mu.Unlock()
var appendReply AppendEntriesReply
appendReply.Success = false
nextIndex := node.nextIndex[id]
// log.Info("nextindex " + strconv.Itoa(nextIndex))
for (!appendReply.Success) {
if nextIndex < 0 {
log.Error("assert >= 0 here")
}
sendEntries := node.log[nextIndex:]
arg := AppendEntriesArg{
Term: node.currTerm,
PrevLogIndex: nextIndex - 1,
Entries: sendEntries,
LeaderCommit: node.commitIndex,
LeaderId: node.selfId,
}
if arg.PrevLogIndex >= 0 {
arg.PrevLogTerm = node.log[arg.PrevLogIndex].Term
}
callErr := client.Call("Node.AppendEntries", arg, &appendReply) // RPC
if callErr != nil {
log.Error("dialing node_"+id+"fail: ", zap.Error(callErr))
}
if appendReply.Term != node.currTerm {
log.Info("Leader " + node.selfId + " 收到更高的 term=" + strconv.Itoa(appendReply.Term) + ",转换为 Follower")
node.currTerm = appendReply.Term
node.state = Follower
node.votedFor = ""
node.storage.SetTermAndVote(node.currTerm, node.votedFor)
node.resetElectionTimer()
return
}
nextIndex-- // 失败往前传一格
}
// 不变成follower情况下
node.nextIndex[id] = node.maxLogId + 1
node.matchIndex[id] = node.maxLogId
node.updateCommitIndex()
}
func (node *Node) updateCommitIndex() {
totalNodes := len(node.nodes)
// 收集所有 matchIndex 并排序
matchIndexes := make([]int, 0, totalNodes)
for _, index := range node.matchIndex {
matchIndexes = append(matchIndexes, index)
}
sort.Ints(matchIndexes) // 排序
// 计算多数派 commitIndex
majorityIndex := matchIndexes[totalNodes/2] // 取 N/2 位置上的索引(多数派)
// 确保这个索引的日志条目属于当前 term,防止提交旧 term 的日志
if majorityIndex > node.commitIndex && majorityIndex < len(node.log) && node.log[majorityIndex].Term == node.currTerm {
node.commitIndex = majorityIndex
log.Info("Leader" + node.selfId + "更新 commitIndex: " + strconv.Itoa(majorityIndex))
// 应用日志到状态机
node.applyCommittedLogs()
}
}
// 应用日志到状态机
func (node *Node) applyCommittedLogs() {
for node.lastApplied < node.commitIndex {
node.lastApplied++
logEntry := node.log[node.lastApplied]
log.Info("node" + node.selfId + "应用日志到状态机: " + logEntry.print())
err := node.db.Put([]byte(logEntry.LogE.Key), []byte(logEntry.LogE.Value), nil)
if err != nil {
log.Error("应用状态机失败: ", zap.Error(err))
}
}
}
// RPC call
func (node *Node) AppendEntries(arg AppendEntriesArg, reply *AppendEntriesReply) error {
node.mu.Lock()
defer node.mu.Unlock()
// 1. 如果 term 过期,拒绝接受日志
if node.currTerm > arg.Term {
*reply = AppendEntriesReply{node.currTerm, false}
return nil
}
node.leaderId = arg.LeaderId // 记录Leader
if node.currTerm < arg.Term {
log.Info("Node " + node.selfId + " 发现更高 term=" + strconv.Itoa(arg.Term))
node.currTerm = arg.Term
node.state = Follower
node.votedFor = ""
node.storage.SetTermAndVote(node.currTerm, node.votedFor)
}
node.storage.SetTermAndVote(node.currTerm, node.votedFor)
// 2. 检查 prevLogIndex 是否有效
if arg.PrevLogIndex >= len(node.log) || (arg.PrevLogIndex >= 0 && node.log[arg.PrevLogIndex].Term != arg.PrevLogTerm) {
*reply = AppendEntriesReply{node.currTerm, false}
return nil
}
// 3. 处理日志冲突(如果存在不同 term,则截断日志)
idx := arg.PrevLogIndex + 1
for i := idx; i < len(node.log) && i-idx < len(arg.Entries); i++ {
if node.log[i].Term != arg.Entries[i-idx].Term {
node.log = node.log[:idx]
break
}
}
// log.Info(strconv.Itoa(idx) + strconv.Itoa(len(node.log)))
// 4. 追加新的日志条目
for _, raftLogEntry := range arg.Entries {
log.Info(node.selfId + "结点写入" + raftLogEntry.print())
if idx < len(node.log) {
node.log[idx] = raftLogEntry
} else {
node.log = append(node.log, raftLogEntry)
}
idx++
}
// 暴力持久化
node.storage.WriteLog(node.log)
// 更新 maxLogId
node.maxLogId = len(node.log) - 1
// 更新 commitIndex
if arg.LeaderCommit < node.maxLogId {
node.commitIndex = arg.LeaderCommit
} else {
node.commitIndex = node.maxLogId
}
// 提交已提交的日志
node.applyCommittedLogs()
// 在成功接受日志或心跳后,重置选举超时
node.resetElectionTimer()
*reply = AppendEntriesReply{node.currTerm, true}
return nil
}
type AppendEntriesArg struct {
Term int
LeaderId string
PrevLogIndex int
PrevLogTerm int
Entries []RaftLogEntry
LeaderCommit int
}
type AppendEntriesReply struct {
Term int
Success bool
}

+ 2
- 1
internal/nodes/node_storage.go View File

@ -34,7 +34,7 @@ func (rs *RaftStorage) loadData() {
file, err := os.Open(rs.filePath)
if err != nil {
log.Info("文件不存在:" + rs.filePath)
log.Info("文件未创建:" + rs.filePath)
rs.saveData() // 文件不存在时创建默认数据
return
}
@ -103,6 +103,7 @@ func (rs *RaftStorage) GetVotedFor() string {
return rs.VotedFor
}
// 同时设置
func (rs *RaftStorage) SetTermAndVote(term int, candidate string) {
rs.mu.Lock()
defer rs.mu.Unlock()

+ 214
- 0
internal/nodes/replica.go View File

@ -0,0 +1,214 @@
package nodes
import (
"math/rand"
"net/rpc"
"sort"
"strconv"
"time"
"go.uber.org/zap"
)
type AppendEntriesArg struct {
Term int
LeaderId string
PrevLogIndex int
PrevLogTerm int
Entries []RaftLogEntry
LeaderCommit int
}
type AppendEntriesReply struct {
Term int
Success bool
}
// leader收到新内容要广播,以及心跳广播(同步自己的log)
func (node *Node) BroadCastKV(callMode CallMode) {
// 遍历所有节点
for id := range node.nodes {
go func(id string, kv CallMode) {
node.sendKV(id, callMode)
}(id, callMode)
}
}
func (node *Node) sendKV(id string, callMode CallMode) {
switch callMode {
case Fail:
log.Info("模拟发送失败")
// 这么写向所有的node发送都失败,也可以随机数确定是否失败
case Delay:
log.Info("模拟发送延迟")
// 随机延迟0-5ms
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)))
default:
}
client, err := rpc.DialHTTP("tcp", node.nodes[id].address)
if err != nil {
log.Error("dialing: ", zap.Error(err))
return
}
defer func(client *rpc.Client) {
err := client.Close()
if err != nil {
log.Error("client close err: ", zap.Error(err))
}
}(client)
node.mu.Lock()
defer node.mu.Unlock()
var appendReply AppendEntriesReply
appendReply.Success = false
nextIndex := node.nextIndex[id]
// log.Info("nextindex " + strconv.Itoa(nextIndex))
for (!appendReply.Success) {
if nextIndex < 0 {
log.Fatal("assert >= 0 here")
}
sendEntries := node.log[nextIndex:]
arg := AppendEntriesArg{
Term: node.currTerm,
PrevLogIndex: nextIndex - 1,
Entries: sendEntries,
LeaderCommit: node.commitIndex,
LeaderId: node.selfId,
}
if arg.PrevLogIndex >= 0 {
arg.PrevLogTerm = node.log[arg.PrevLogIndex].Term
}
callErr := client.Call("Node.AppendEntries", arg, &appendReply) // RPC
if callErr != nil {
log.Error("dialing node_"+ id +"fail: ", zap.Error(callErr))
}
if appendReply.Term != node.currTerm {
log.Info("Leader[" + node.selfId + "]收到更高的 term=" + strconv.Itoa(appendReply.Term) + ",转换为 Follower")
node.currTerm = appendReply.Term
node.state = Follower
node.votedFor = ""
node.storage.SetTermAndVote(node.currTerm, node.votedFor)
node.resetElectionTimer()
return
}
nextIndex-- // 失败往前传一格
}
// 不变成follower情况下
node.nextIndex[id] = node.maxLogId + 1
node.matchIndex[id] = node.maxLogId
node.updateCommitIndex()
}
func (node *Node) updateCommitIndex() {
totalNodes := len(node.nodes)
// 收集所有 matchIndex 并排序
matchIndexes := make([]int, 0, totalNodes)
for _, index := range node.matchIndex {
matchIndexes = append(matchIndexes, index)
}
sort.Ints(matchIndexes) // 排序
// 计算多数派 commitIndex
majorityIndex := matchIndexes[totalNodes/2] // 取 N/2 位置上的索引(多数派)
// 确保这个索引的日志条目属于当前 term,防止提交旧 term 的日志
if majorityIndex > node.commitIndex && majorityIndex < len(node.log) && node.log[majorityIndex].Term == node.currTerm {
node.commitIndex = majorityIndex
log.Info("Leader[" + node.selfId + "]更新 commitIndex: " + strconv.Itoa(majorityIndex))
// 应用日志到状态机
node.applyCommittedLogs()
}
}
// 应用日志到状态机
func (node *Node) applyCommittedLogs() {
for node.lastApplied < node.commitIndex {
node.lastApplied++
logEntry := node.log[node.lastApplied]
log.Sugar().Infof("[%s]应用日志到状态机: " + logEntry.print(), node.selfId)
err := node.db.Put([]byte(logEntry.LogE.Key), []byte(logEntry.LogE.Value), nil)
if err != nil {
log.Error(node.selfId + "应用状态机失败: ", zap.Error(err))
}
}
}
// RPC call
func (node *Node) AppendEntries(arg AppendEntriesArg, reply *AppendEntriesReply) error {
node.mu.Lock()
defer node.mu.Unlock()
// 如果 term 过期,拒绝接受日志
if node.currTerm > arg.Term {
*reply = AppendEntriesReply{node.currTerm, false}
return nil
}
node.leaderId = arg.LeaderId // 记录Leader
// 如果term比自己高,或自己不是follower但收到相同term的心跳
if node.currTerm < arg.Term || node.state != Follower {
log.Sugar().Infof("[%s]发现更高 term(%s)", node.selfId, strconv.Itoa(arg.Term))
node.currTerm = arg.Term
node.state = Follower
node.votedFor = ""
node.storage.SetTermAndVote(node.currTerm, node.votedFor)
}
node.storage.SetTermAndVote(node.currTerm, node.votedFor)
// 检查 prevLogIndex 是否有效
if arg.PrevLogIndex >= len(node.log) || (arg.PrevLogIndex >= 0 && node.log[arg.PrevLogIndex].Term != arg.PrevLogTerm) {
*reply = AppendEntriesReply{node.currTerm, false}
return nil
}
// 处理日志冲突(如果存在不同 term,则截断日志)
idx := arg.PrevLogIndex + 1
for i := idx; i < len(node.log) && i-idx < len(arg.Entries); i++ {
if node.log[i].Term != arg.Entries[i-idx].Term {
node.log = node.log[:idx]
break
}
}
// log.Info(strconv.Itoa(idx) + strconv.Itoa(len(node.log)))
// 追加新的日志条目
for _, raftLogEntry := range arg.Entries {
log.Sugar().Infof("[%s]写入:" + raftLogEntry.print(), node.selfId)
if idx < len(node.log) {
node.log[idx] = raftLogEntry
} else {
node.log = append(node.log, raftLogEntry)
}
idx++
}
// 暴力持久化
node.storage.WriteLog(node.log)
// 更新 maxLogId
node.maxLogId = len(node.log) - 1
// 更新 commitIndex
if arg.LeaderCommit < node.maxLogId {
node.commitIndex = arg.LeaderCommit
} else {
node.commitIndex = node.maxLogId
}
// 提交已提交的日志
node.applyCommittedLogs()
// 在成功接受日志或心跳后,重置选举超时
node.resetElectionTimer()
*reply = AppendEntriesReply{node.currTerm, true}
return nil
}

+ 8
- 4
internal/nodes/server_node.go View File

@ -15,7 +15,9 @@ type ServerReply struct{
}
// RPC call
func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error {
log.Info(node.selfId + "收到客户端write请求")
log.Sugar().Infof("[%s]收到客户端write请求", node.selfId)
// 自己不是leader,转交leader地址回复
if node.state != Leader {
reply.Isleader = false
if (node.leaderId == "") {
@ -23,24 +25,26 @@ func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error {
return nil
}
reply.LeaderAddress = node.nodes[node.leaderId].address
log.Info(node.selfId + "转交给" + node.leaderId)
log.Sugar().Infof("[%s]转交给[%s]", node.selfId, node.leaderId)
return nil
}
// 自己是leader,修改自己的记录并广播
node.maxLogId++
logId := node.maxLogId
rLogE := RaftLogEntry{kvCall.LogE, logId, node.currTerm}
node.log = append(node.log, rLogE)
node.storage.AppendLog(rLogE)
log.Info("leader" + node.selfId + "处理请求 : " + kvCall.LogE.print() + ", 模拟方式 : " + strconv.Itoa(int(kvCall.CallState)))
log.Info("leader[" + node.selfId + "]处理请求 : " + kvCall.LogE.print() + ", 模拟方式 : " + strconv.Itoa(int(kvCall.CallState)))
// 广播给其它节点
node.BroadCastKV(kvCall.CallState)
reply.Isleader = true
return nil
}
// RPC call
func (node *Node) ReadKey(key string, reply *ServerReply) error {
log.Info("server read : " + key)
log.Sugar().Infof("[%s]收到客户端read请求", node.selfId)
// 先只读自己(无论自己是不是leader),也方便测试
value, err := node.db.Get([]byte(key), nil)
if err == leveldb.ErrNotFound {

+ 30
- 13
internal/nodes/vote.go View File

@ -1,6 +1,7 @@
package nodes
import (
"math/rand"
"net/rpc"
"strconv"
"sync"
@ -24,7 +25,7 @@ type RequestVoteReply struct {
func (n *Node) startElection() {
n.mu.Lock()
defer n.mu.Unlock()
// 1. 增加当前任期,转换为 Candidate
// 增加当前任期,转换为 Candidate
n.currTerm++
n.state = Candidate
n.votedFor = n.selfId // 自己投自己
@ -32,10 +33,10 @@ func (n *Node) startElection() {
log.Sugar().Infof("[%s] 开始选举,当前任期: %d", n.selfId, n.currTerm)
// 2. 重新设置选举超时,防止重复选举
// 重新设置选举超时,防止重复选举
n.resetElectionTimer()
// 3. 构造 RequestVote 请求
// 构造 RequestVote 请求
var lastLogIndex int
var lastLogTerm int
@ -53,7 +54,7 @@ func (n *Node) startElection() {
LastLogTerm: lastLogTerm,
}
// 4. 并行向其他节点发送请求投票
// 并行向其他节点发送请求投票
var mu sync.Mutex
cond := sync.NewCond(&mu)
totalNodes := len(n.nodes)
@ -81,7 +82,7 @@ func (n *Node) startElection() {
grantedVotes++
}
if grantedVotes > totalNodes/2 {
if grantedVotes == totalNodes / 2 + 1 {
n.state = Leader
log.Sugar().Infof("[%s] 当选 Leader!", n.selfId)
n.initLeaderState()
@ -93,7 +94,7 @@ func (n *Node) startElection() {
}(peerId)
}
// 5. 等待选举结果
// 等待选举结果
timeout := time.After(300 * time.Millisecond)
for {
@ -115,7 +116,7 @@ func (n *Node) startElection() {
}
func (node *Node) sendRequestVote(peerId string, args *RequestVoteArgs, reply *RequestVoteReply) bool {
log.Sugar().Infof("Sending RequestVote to %s at %s", peerId, node.nodes[peerId].address)
log.Sugar().Infof("[%s] 请求 [%s] 投票给自己", node.selfId, peerId)
client, err := rpc.DialHTTP("tcp", node.nodes[peerId].address)
if err != nil {
log.Error("dialing: ", zap.Error(err))
@ -139,14 +140,14 @@ func (node *Node) sendRequestVote(peerId string, args *RequestVoteArgs, reply *R
func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error {
n.mu.Lock()
defer n.mu.Unlock()
// 1. 如果候选人的任期小于当前任期,则拒绝投票
// 如果候选人的任期小于当前任期,则拒绝投票
if args.Term < n.currTerm {
reply.Term = n.currTerm
reply.VoteGranted = false
return nil
}
// 2. 如果请求的 Term 更高,则更新当前 Term 并回退为 Follower
// 如果请求的 Term 更高,则更新当前 Term 并回退为 Follower
if args.Term > n.currTerm {
n.currTerm = args.Term
n.state = Follower
@ -154,9 +155,9 @@ func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error
n.resetElectionTimer() // 重新设置选举超时
}
// 3. 检查是否已经投过票,且是否投给了同一个候选人
// 检查是否已经投过票,且是否投给了同一个候选人
if n.votedFor == "" || n.votedFor == args.CandidateId {
// 4. 检查日志是否足够新
// 检查日志是否足够新
var lastLogIndex int
var lastLogTerm int
@ -170,9 +171,9 @@ func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error
if args.LastLogTerm > lastLogTerm ||
(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex) {
// 5. 投票给候选人
// 够新就投票给候选人
n.votedFor = args.CandidateId
log.Info("term" + strconv.Itoa(n.currTerm) + ", " + n.selfId + "投票给" + n.votedFor)
log.Sugar().Infof("在term(%s), [%s]投票给[%s]", strconv.Itoa(n.currTerm), n.selfId, n.votedFor)
reply.VoteGranted = true
n.resetElectionTimer()
} else {
@ -185,4 +186,20 @@ func (n *Node) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) error
n.storage.SetTermAndVote(n.currTerm, n.votedFor)
reply.Term = n.currTerm
return nil
}
// follower 500-1000ms内没收到appendentries心跳,就变成candidate发起选举
func (node *Node) resetElectionTimer() {
if node.electionTimer == nil {
node.electionTimer = time.NewTimer(time.Duration(500+rand.Intn(500)) * time.Millisecond)
go func() {
for {
<-node.electionTimer.C
node.startElection()
}
}()
} else {
node.electionTimer.Stop()
node.electionTimer.Reset(time.Duration(500+rand.Intn(500)) * time.Millisecond)
}
}

+ 0
- 61
scripts/run.sh View File

@ -1,61 +0,0 @@
#!/bin/bash
# 设置运行时间限制:s
RUN_TIME=10
# 需要传递数据的管道
PIPE_NAME="/tmp/input_pipe"
# 启动节点1
echo "Starting Node 1..."
timeout $RUN_TIME ./main -id 1 -port ":9091" -cluster "127.0.0.1:9092,127.0.0.1:9093" -pipe "$PIPE_NAME" -isleader=true &
# 启动节点2
echo "Starting Node 2..."
timeout $RUN_TIME ./main -id 2 -port ":9092" -cluster "127.0.0.1:9091,127.0.0.1:9093" -pipe "$PIPE_NAME" &
# 启动节点3
echo "Starting Node 3..."
timeout $RUN_TIME ./main -id 3 -port ":9093" -cluster "127.0.0.1:9091,127.0.0.1:9092" -pipe "$PIPE_NAME"&
echo "All nodes started successfully!"
# 创建一个管道用于进程间通信
if [[ ! -p "$PIPE_NAME" ]]; then
mkfifo "$PIPE_NAME"
fi
# 捕获终端输入并通过管道传递给三个节点
echo "Enter input to send to nodes:"
start_time=$(date +%s)
while true; do
# 从终端读取用户输入
read -r user_input
current_time=$(date +%s)
elapsed_time=$((current_time - start_time))
# 如果运行时间大于限制时间,就退出
if [ $elapsed_time -ge $RUN_TIME ]; then
echo 'Timeout reached, normal exit now'
break
fi
# 如果输入为空,跳过
if [[ -z "$user_input" ]]; then
continue
fi
# 将用户输入发送到管道
echo "$user_input" > "$PIPE_NAME"
# 如果输入 "exit",结束脚本
if [[ "$user_input" == "exit" ]]; then
break
fi
done
# 删除管道
rm "$PIPE_NAME"
# 等待所有节点完成启动
wait

test/restart_follower_test.go → test/restart_node_test.go View File

@ -11,9 +11,9 @@ import (
"time"
)
func TestFollowerRestart(t *testing.T) {
func TestNodeRestart(t *testing.T) {
// 登记结点信息
n := 3
n := 5
var clusters []string
for i := 0; i < n; i++ {
port := fmt.Sprintf("%d", uint16(9090)+uint16(i))
@ -24,23 +24,13 @@ func TestFollowerRestart(t *testing.T) {
// 结点启动
var cmds []*exec.Cmd
for i := 0; i < n; i++ {
var cmd *exec.Cmd
if i == 0 {
cmd = ExecuteNodeI(i, true, clusters)
} else {
cmd = ExecuteNodeI(i, true, clusters)
}
if cmd == nil {
return
} else {
cmds = append(cmds, cmd)
}
cmd := ExecuteNodeI(i, true, clusters)
cmds = append(cmds, cmd)
}
time.Sleep(time.Second) // 等待启动完毕
// client启动, 连接leader
cWrite := clientPkg.Client{Address: clusters[0], ServerId: "1"}
// client启动, 连接任意节点
cWrite := clientPkg.Client{Address: clusters}
// 写入
var s clientPkg.Status
@ -53,34 +43,30 @@ func TestFollowerRestart(t *testing.T) {
}
}
time.Sleep(time.Second) // 等待写入完毕
// 模拟最后一个结点崩溃
err := cmds[n - 1].Process.Signal(syscall.SIGTERM)
if err != nil {
fmt.Println("Error sending signal:", err)
return
}
// 继续写入
for i := 5; i < 10; i++ {
key := strconv.Itoa(i)
newlog := nodes.LogEntry{Key: key, Value: "hello"}
s := cWrite.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal})
if s != clientPkg.Ok {
t.Errorf("write test fail")
}
}
// 恢复结点
cmd := ExecuteNodeI(n - 1, false, clusters)
if cmd == nil {
t.Errorf("recover test1 fail")
return
} else {
cmds[n - 1] = cmd
// 模拟结点轮流崩溃
for i := 0; i < n; i++ {
err := cmds[i].Process.Signal(syscall.SIGTERM)
if err != nil {
fmt.Println("Error sending signal:", err)
return
}
time.Sleep(time.Second)
cmd := ExecuteNodeI(i, false, clusters)
if cmd == nil {
t.Errorf("recover test1 fail")
return
} else {
cmds[i] = cmd
}
time.Sleep(time.Second) // 等待启动完毕
}
time.Sleep(time.Second) // 等待启动完毕
// client启动, 连接节点n-1(去读它的数据)
cRead := clientPkg.Client{Address: clusters[n - 1], ServerId: "n"}
// 读崩溃前写入数据
// client启动
cRead := clientPkg.Client{Address: clusters}
// 读写入数据
for i := 0; i < 5; i++ {
key := strconv.Itoa(i)
var value string
@ -100,7 +86,7 @@ func TestFollowerRestart(t *testing.T) {
}
}
// 通知进程结束
// 通知所有进程结束
for _, cmd := range cmds {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
@ -108,5 +94,4 @@ func TestFollowerRestart(t *testing.T) {
return
}
}
}

+ 2
- 7
test/server_client_test.go View File

@ -25,17 +25,12 @@ func TestServerClient(t *testing.T) {
var cmds []*exec.Cmd
for i := 0; i < n; i++ {
cmd := ExecuteNodeI(i, true, clusters)
if cmd == nil {
return
} else {
cmds = append(cmds, cmd)
}
cmds = append(cmds, cmd)
}
time.Sleep(time.Second) // 等待启动完毕
// client启动
c := clientPkg.Client{Address: "127.0.0.1:9092", ServerId: "3"}
c := clientPkg.Client{Address: clusters}
// 写入
var s clientPkg.Status

Loading…
Cancel
Save