Browse Source

raft日志同步部分

ld
augurier 6 months ago
parent
commit
1181f6d796
5 changed files with 204 additions and 40 deletions
  1. +20
    -6
      internal/nodes/init.go
  2. +17
    -2
      internal/nodes/log.go
  3. +160
    -24
      internal/nodes/node.go
  4. +6
    -7
      internal/nodes/server_node.go
  5. +1
    -1
      test/server_client_test.go

+ 20
- 6
internal/nodes/init.go View File

@ -30,14 +30,26 @@ func Init(id string, nodeAddr map[string]string, pipe string, db *leveldb.DB) *N
}
// 创建节点
return &Node{
node := &Node{
selfId: id,
nodes: ns,
pipeAddr: pipe,
maxLogId: 0,
log: make(map[int]LogEntry),
maxLogId: -1,
currTerm: 1,
log: make([]RaftLogEntry, 0),
commitIndex: -1,
lastApplied: -1,
nextIndex: make(map[string]int),
matchIndex: make(map[string]int),
db: db,
}
for nodeId := range nodeAddr {
if nodeId != id { // 不初始化自身
node.nextIndex[nodeId] = node.maxLogId + 1
node.matchIndex[nodeId] = 0
}
}
return node
}
func Start(node *Node, isLeader bool) {
@ -53,6 +65,9 @@ func Start(node *Node, isLeader bool) {
case Follower:
case Candidate:
// todo 成为leader的初始化
// node.currTerm = 1
// candidate发布一个监听输入线程后,变成leader
node.state = Leader
go func() {
@ -78,12 +93,11 @@ func Start(node *Node, isLeader bool) {
kv := LogEntry{input, ""} // 目前键盘输入key,value 0
logId := node.maxLogId
node.maxLogId++
node.log[logId] = kv
node.log[logId] = RaftLogEntry{kv, logId, node.currTerm}
log.Info("send : logId = " + strconv.Itoa(logId) + ", key = " + input)
// 广播给其它节点
kvCall := LogEntryCall{kv, Normal}
node.BroadCastKV(logId, kvCall)
node.BroadCastKV(Normal)
// 持久化
node.db.Put([]byte(kv.Key), []byte(kv.Value), nil)
}

+ 17
- 2
internal/nodes/log.go View File

@ -1,7 +1,10 @@
package nodes
import "strconv"
type CallMode = uint8
const (
Normal State = iota + 1
Normal CallMode = iota + 1
Delay
Fail
)
@ -10,10 +13,22 @@ type LogEntry struct {
Key string
Value string
}
func (LogE *LogEntry) print() string {
return "key: " + LogE.Key + ", value: " + LogE.Value
}
type RaftLogEntry struct {
LogE LogEntry
LogId int
Term int
}
func (RLogE *RaftLogEntry) print() string {
return "logid: " + strconv.Itoa(RLogE.LogId) + ", term: " + strconv.Itoa(RLogE.Term) + ", " + RLogE.LogE.print()
}
type LogEntryCall struct {
LogE LogEntry
CallState State
CallState CallMode
}
type KVReply struct {

+ 160
- 24
internal/nodes/node.go View File

@ -3,6 +3,7 @@ package nodes
import (
"math/rand"
"net/rpc"
"sort"
"strconv"
"time"
@ -30,33 +31,48 @@ type Node struct {
// 除当前节点外其他节点信息
nodes map[string]*Public_node_info
//管道名
// 管道名
pipeAddr string
// 当前节点状态
state State
// 任期
currTerm int
// 简单的kv存储
log map[int]LogEntry
log []RaftLogEntry
// leader用来标记新log
// leader用来标记新log, = log.len
maxLogId int
// 已提交的index
commitIndex int
// 最后应用(写到db)的index
lastApplied int
// 需要发送给每个节点的下一个索引
nextIndex map[string]int
// 已经发送给每个节点的最大索引
matchIndex map[string]int
db *leveldb.DB
}
func (node *Node) BroadCastKV(logId int, kvCall LogEntryCall) {
func (node *Node) BroadCastKV(callMode CallMode) {
// 遍历所有节点
for id, _ := range node.nodes {
go func(id string, kv LogEntryCall) {
var reply KVReply
node.sendKV(id, logId, kvCall, &reply)
}(id, kvCall)
for id := range node.nodes {
go func(id string, kv CallMode) {
node.sendKV(id, callMode)
}(id, callMode)
}
}
func (node *Node) sendKV(id string, logId int, kvCall LogEntryCall, reply *KVReply) {
switch kvCall.CallState {
func (node *Node) sendKV(id string, callMode CallMode) {
switch callMode {
case Fail:
log.Info("模拟发送失败")
// 这么写向所有的node发送都失败,也可以随机数确定是否失败
@ -80,22 +96,142 @@ func (node *Node) sendKV(id string, logId int, kvCall LogEntryCall, reply *KVRep
}
}(client)
arg := LogIdAndEntry{logId, kvCall.LogE}
callErr := client.Call("Node.ReceiveKV", arg, reply) // RPC
if callErr != nil {
log.Error("dialing node_"+id+"fail: ", zap.Error(callErr))
var appendReply AppendEntriesReply
appendReply.Success = false
nextIndex := node.nextIndex[id]
// log.Info("nextindex " + strconv.Itoa(nextIndex))
for (!appendReply.Success) {
if nextIndex < 0 {
log.Error("assert >= 0 here")
}
sendEntries := node.log[nextIndex:]
arg := AppendEntriesArg{
Term: node.currTerm,
PrevLogIndex: nextIndex - 1,
Entries: sendEntries,
LeaderCommit: node.commitIndex,
}
if arg.PrevLogIndex >= 0 {
arg.PrevLogTerm = node.log[arg.PrevLogIndex].Term
}
callErr := client.Call("Node.AppendEntries", arg, &appendReply) // RPC
if callErr != nil {
log.Error("dialing node_"+id+"fail: ", zap.Error(callErr))
}
if appendReply.Term != node.currTerm {
// 转变成follower?
break
}
nextIndex-- // 失败往前传一格
}
// 不变成follower情况下
node.nextIndex[id] = node.maxLogId + 1
node.matchIndex[id] = node.maxLogId
node.updateCommitIndex()
}
func (node *Node) updateCommitIndex() {
// node.mu.Lock()
// defer node.mu.Unlock()
totalNodes := len(node.nodes)
// 收集所有 matchIndex 并排序
matchIndexes := make([]int, 0, totalNodes)
for _, index := range node.matchIndex {
matchIndexes = append(matchIndexes, index)
}
sort.Ints(matchIndexes) // 排序
// 计算多数派 commitIndex
majorityIndex := matchIndexes[totalNodes/2] // 取 N/2 位置上的索引(多数派)
// 确保这个索引的日志条目属于当前 term,防止提交旧 term 的日志
if majorityIndex > node.commitIndex && node.log[majorityIndex].Term == node.currTerm {
node.commitIndex = majorityIndex
log.Info("Leader 更新 commitIndex: " + strconv.Itoa(majorityIndex))
// 应用日志到状态机
node.applyCommittedLogs()
}
}
// 应用日志到状态机
func (node *Node) applyCommittedLogs() {
for node.lastApplied < node.commitIndex {
node.lastApplied++
logEntry := node.log[node.lastApplied]
log.Info("node" + node.selfId + "应用日志到状态机: " + logEntry.print())
err := node.db.Put([]byte(logEntry.LogE.Key), []byte(logEntry.LogE.Value), nil)
if err != nil {
log.Error("应用状态机失败: ", zap.Error(err))
}
}
}
// RPC call
func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error {
log.Info("node_" + node.selfId + " receive: logId = " + strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key)
entry, ok := node.log[arg.LogId]
if !ok {
node.log[arg.LogId] = entry
func (node *Node) AppendEntries(arg AppendEntriesArg, reply *AppendEntriesReply) error {
// node.mu.Lock()
// defer node.mu.Unlock()
// 1. 如果 term 过期,拒绝接受日志
if node.currTerm > arg.Term {
*reply = AppendEntriesReply{node.currTerm, false}
return nil
}
// 2. 检查 prevLogIndex 是否有效
if arg.PrevLogIndex >= len(node.log) || (arg.PrevLogIndex >= 0 && node.log[arg.PrevLogIndex].Term != arg.PrevLogTerm) {
*reply = AppendEntriesReply{node.currTerm, false}
return nil
}
// 3. 处理日志冲突(如果存在不同 term,则截断日志)
idx := arg.PrevLogIndex + 1
if idx < len(node.log) && node.log[idx].Term != arg.Entries[0].Term {
node.log = node.log[:idx] // 截断冲突日志
}
// log.Info(strconv.Itoa(idx) + strconv.Itoa(len(node.log)))
// 4. 追加新的日志条目
for _, raftLogEntry := range arg.Entries {
if idx < len(node.log) {
node.log[idx] = raftLogEntry
} else {
node.log = append(node.log, raftLogEntry)
}
idx++
}
// 5. 更新 maxLogId
node.maxLogId = len(node.log) - 1
// 6. 更新 commitIndex
if arg.LeaderCommit < node.maxLogId {
node.commitIndex = arg.LeaderCommit
} else {
node.commitIndex = node.maxLogId
}
// 持久化
node.db.Put([]byte(arg.Entry.Key), []byte(arg.Entry.Value), nil)
reply.Reply = true // rpc call需要有reply,但实际上调用是否成功是error返回值决定
return nil
// 7. 提交已提交的日志
node.applyCommittedLogs()
*reply = AppendEntriesReply{node.currTerm, true}
return nil
}
type AppendEntriesArg struct {
Term int
// leaderId string
PrevLogIndex int
PrevLogTerm int
Entries []RaftLogEntry
LeaderCommit int
}
type AppendEntriesReply struct {
Term int
Success bool
}

+ 6
- 7
internal/nodes/server_node.go View File

@ -13,15 +13,14 @@ type ServerReply struct{
Value string
}
// RPC call
func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error {
logId := node.maxLogId
func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error {
node.maxLogId++
node.log[logId] = kvCall.LogE
node.db.Put([]byte(kvCall.LogE.Key), []byte(kvCall.LogE.Value), nil)
log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kvCall.LogE.Key)
logId := node.maxLogId
node.log = append(node.log, RaftLogEntry{kvCall.LogE, logId, node.currTerm})
// node.db.Put([]byte(kvCall.LogE.Key), []byte(kvCall.LogE.Value), nil)
log.Info("server write request : " + kvCall.LogE.print() + ", 模拟方式 : " + strconv.Itoa(int(kvCall.CallState)))
// 广播给其它节点
node.BroadCastKV(logId, kvCall)
node.BroadCastKV(kvCall.CallState)
reply.Isconnect = true
return nil
}

+ 1
- 1
test/server_client_test.go View File

@ -58,7 +58,7 @@ func TestServerClient(t *testing.T) {
key := strconv.Itoa(i)
var value string
s = c.Read(key, &value)
if s != clientPkg.Ok && value != "hello" + key {
if s != clientPkg.Ok || value != "hello" {
t.Errorf("Read test1 fail")
}
}

Loading…
Cancel
Save