Browse Source

修改了节点重启中,一些理解错误

ld
augurier 6 months ago
parent
commit
28dc22fb16
6 changed files with 1512 additions and 20 deletions
  1. +9
    -11
      cmd/main.go
  2. +8
    -2
      internal/nodes/init.go
  3. +1
    -1
      internal/nodes/node_storage.go
  4. +2
    -0
      internal/nodes/vote.go
  5. +6
    -6
      test/common.go
  6. +1486
    -0
      test/test.log

+ 9
- 11
cmd/main.go View File

@ -31,7 +31,7 @@ func main() {
port := flag.String("port", ":9091", "rpc listen port") port := flag.String("port", ":9091", "rpc listen port")
cluster := flag.String("cluster", "127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093", "comma sep") cluster := flag.String("cluster", "127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093", "comma sep")
id := flag.String("id", "1", "node ID") id := flag.String("id", "1", "node ID")
isNewDb := flag.Bool("isNewDb", true, "new test or restart")
isRestart := flag.Bool("isRestart", true, "new test or restart")
// 参数解析 // 参数解析
flag.Parse() flag.Parse()
@ -51,11 +51,14 @@ func main() {
idCnt++ idCnt++
} }
if *isNewDb {
os.RemoveAll("leveldb/simple-kv-store" + *id)
if *isRestart {
os.RemoveAll("storage/node" + *id + ".json") os.RemoveAll("storage/node" + *id + ".json")
} }
// 打开或创建每个结点自己的数据库
// 创建每个结点自己的数据库。这里一开始理解上有些误区,状态机的状态恢复应该靠节点的持久化log,
// 而用leveldb模拟状态机,造成了状态机本身的持久化,因此暂时通过删去旧db避免这一矛盾
os.RemoveAll("leveldb/simple-kv-store" + *id)
db, err := leveldb.OpenFile("leveldb/simple-kv-store"+*id, nil) db, err := leveldb.OpenFile("leveldb/simple-kv-store"+*id, nil)
if err != nil { if err != nil {
log.Fatal("Failed to open database: ", zap.Error(err)) log.Fatal("Failed to open database: ", zap.Error(err))
@ -67,14 +70,9 @@ func main() {
// 打开或创建节点数据持久化文件 // 打开或创建节点数据持久化文件
storage := nodes.NewRaftStorage("storage/node" + *id + ".json") storage := nodes.NewRaftStorage("storage/node" + *id + ".json")
// 计数
count := 0
for iter.Next() {
count++
}
log.Sugar().Infof("[%s]目前有数据:%d", *id, count)
// 初始化
node := nodes.Init(*id, idClusterPairs, db, storage, !*isRestart)
node := nodes.Init(*id, idClusterPairs, db, storage)
log.Sugar().Infof("[%s]开始监听" + *port + "端口", *id) log.Sugar().Infof("[%s]开始监听" + *port + "端口", *id)
// 监听rpc // 监听rpc
node.Rpc(*port) node.Rpc(*port)

+ 8
- 2
internal/nodes/init.go View File

@ -21,7 +21,7 @@ func newNode(address string) *Public_node_info {
} }
} }
func Init(selfId string, nodeAddr map[string]string, db *leveldb.DB, rstorage *RaftStorage) *Node {
func Init(selfId string, nodeAddr map[string]string, db *leveldb.DB, rstorage *RaftStorage, isRestart bool) *Node {
ns := make(map[string]*Public_node_info) ns := make(map[string]*Public_node_info)
for id, addr := range nodeAddr { for id, addr := range nodeAddr {
ns[id] = newNode(addr) ns[id] = newNode(addr)
@ -43,6 +43,13 @@ func Init(selfId string, nodeAddr map[string]string, db *leveldb.DB, rstorage *R
storage: rstorage, storage: rstorage,
} }
node.initLeaderState() node.initLeaderState()
if isRestart {
node.currTerm = rstorage.GetCurrentTerm()
node.votedFor = rstorage.GetVotedFor()
node.log = rstorage.GetLogEntries()
log.Sugar().Infof("[%s]从重启中恢复log数量: %d", selfId, len(node.log))
}
return node return node
} }
@ -51,7 +58,6 @@ func (n *Node) initLeaderState() {
n.nextIndex[peerId] = len(n.log) // 发送日志的下一个索引 n.nextIndex[peerId] = len(n.log) // 发送日志的下一个索引
n.matchIndex[peerId] = 0 // 复制日志的最新匹配索引 n.matchIndex[peerId] = 0 // 复制日志的最新匹配索引
} }
n.storage.SetTermAndVote(n.currTerm, n.votedFor)
} }
func Start(node *Node) { func Start(node *Node) {

+ 1
- 1
internal/nodes/node_storage.go View File

@ -42,7 +42,7 @@ func (rs *RaftStorage) loadData() {
err = json.NewDecoder(file).Decode(rs) err = json.NewDecoder(file).Decode(rs)
if err != nil { if err != nil {
log.Error("读取文件失败:" + rs.filePath)
log.Error("读取文件失败:" + rs.filePath, zap.Error(err))
} }
} }

+ 2
- 0
internal/nodes/vote.go View File

@ -106,6 +106,8 @@ func (n *Node) startElection() {
select { select {
case <-timeout: case <-timeout:
log.Sugar().Infof("[%s] 选举超时,重新发起选举", n.selfId) log.Sugar().Infof("[%s] 选举超时,重新发起选举", n.selfId)
n.state = Follower
n.resetElectionTimer()
mu.Unlock() mu.Unlock()
return return
default: default:

+ 6
- 6
test/common.go View File

@ -8,21 +8,21 @@ import (
"strings" "strings"
) )
func ExecuteNodeI(i int, isNewDb bool, clusters []string) *exec.Cmd {
func ExecuteNodeI(i int, isRestart bool, clusters []string) *exec.Cmd {
port := fmt.Sprintf(":%d", uint16(9090)+uint16(i)) port := fmt.Sprintf(":%d", uint16(9090)+uint16(i))
var isnewdb string
if isNewDb {
isnewdb = "true"
var isRestartStr string
if isRestart {
isRestartStr = "true"
} else { } else {
isnewdb = "false"
isRestartStr = "false"
} }
cmd := exec.Command( cmd := exec.Command(
"../main", "../main",
"-id", strconv.Itoa(i + 1), "-id", strconv.Itoa(i + 1),
"-port", port, "-port", port,
"-cluster", strings.Join(clusters, ","), "-cluster", strings.Join(clusters, ","),
"-isNewDb=" + isnewdb,
"-isRestart=" + isRestartStr,
) )
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr

+ 1486
- 0
test/test.log
File diff suppressed because it is too large
View File


Loading…
Cancel
Save