From 6b29fe96b16ea3564bcde9288d481b55c980bcca Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Mon, 31 Mar 2025 11:48:11 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8C=81=E4=B9=85=E5=8C=96=E6=96=B9=E5=BC=8F?= =?UTF-8?q?=E4=BB=8Ejson=E8=BF=81=E7=A7=BB=E4=B8=BAleveldb=EF=BC=8C?= =?UTF-8?q?=E4=BF=9D=E8=AF=81=E5=86=99=E5=85=A5=E5=8E=9F=E5=AD=90=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 7 +- cmd/main.go | 9 +- internal/nodes/init.go | 2 + internal/nodes/node_storage.go | 188 +++++++++++++++++++++++------------------ threadTest/common.go | 6 +- 5 files changed, 122 insertions(+), 90 deletions(-) diff --git a/README.md b/README.md index f1da0e8..8675376 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ ---nodes 分布式核心代码 init.go 节点在main中的调用初始化,和大循环启动 log.go 节点存储的entry相关数据结构 - node_storage.go 抽象了节点数据持久化方法,存到json文件里 + node_storage.go 抽象了节点数据持久化方法,序列化后存到leveldb里 node.go 节点的相关数据结构 replica.go 日志复制相关逻辑 server_node.go 节点作为server为 client提供的功能(读写) @@ -32,8 +32,9 @@ lsof -i :9091查看pid kill -9 杀死进程 ## 关于测试 -通过新开进程的方式创建节点(参考test/common.go中executeNodeI函数 -如果通过线程创建,会出现重复注册rpc问题 +test/ 测试真实rpc,通过新开进程(main)的方式创建节点(参考test/common.go中executeNodeI函数) +threadTest/ 测试线程模拟,参考threadTest/common.go中executeNodeI函数启动node + ## 客户端工作原理 客户端每次会随机连上集群中一个节点,此时有四种情况: diff --git a/cmd/main.go b/cmd/main.go index 60842a6..bd36b09 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -51,12 +51,14 @@ func main() { idCnt++ } + // storage/文件夹下为node重要数据持久化数据库,节点一旦创建成功就不能被删除 if !*isRestart { - os.RemoveAll("storage/node" + *id + ".json") + os.RemoveAll("storage/node" + *id) } // 创建每个结点自己的数据库。这里一开始理解上有些误区,状态机的状态恢复应该靠节点的持久化log, - // 而用leveldb模拟状态机,造成了状态机本身的持久化,因此暂时通过删去旧db避免这一矛盾 + // 而用leveldb模拟状态机,造成了状态机本身的持久化,因此通过删去旧db避免这一矛盾 + // 因此leveldb/文件夹下为状态机模拟数据库,每次节点启动都需要删除该数据库 os.RemoveAll("leveldb/simple-kv-store" + *id) db, err := leveldb.OpenFile("leveldb/simple-kv-store" + *id, nil) @@ -66,7 +68,8 @@ func main() { defer db.Close() // 确保数据库在使用完毕后关闭 // 打开或创建节点数据持久化文件 - storage := nodes.NewRaftStorage("storage/node" + *id + ".json") + storage := nodes.NewRaftStorage("storage/node" + *id) + defer storage.Close() // 初始化 node := nodes.InitRPCNode(*id, *port, idClusterPairs, db, storage, !*isRestart) diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 2c3b51e..dc9de96 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -163,6 +163,8 @@ func (node *Node) listenForChan(rpcChan chan RPCRequest, quitChan chan struct{}) } case <-quitChan: log.Sugar().Infof("[%s] 监听线程收到退出信号", node.SelfId) + node.Db.Close() + node.Storage.Close() return } } diff --git a/internal/nodes/node_storage.go b/internal/nodes/node_storage.go index 526607d..353223f 100644 --- a/internal/nodes/node_storage.go +++ b/internal/nodes/node_storage.go @@ -2,82 +2,41 @@ package nodes import ( "encoding/json" - "os" - "path/filepath" + "strconv" + "strings" "sync" + "github.com/syndtr/goleveldb/leveldb" "go.uber.org/zap" ) // RaftStorage 结构,持久化 currentTerm、votedFor 和 logEntries type RaftStorage struct { - mu sync.Mutex - filePath string - CurrentTerm int `json:"current_term"` - VotedFor string `json:"voted_for"` - LogEntries []RaftLogEntry `json:"log_entries"` + mu sync.Mutex + db *leveldb.DB + filePath string } // NewRaftStorage 创建 Raft 存储 func NewRaftStorage(filePath string) *RaftStorage { - storage := &RaftStorage{ - filePath: filePath, - } - storage.loadData() // 载入已有数据 - return storage -} - -// loadData 读取 JSON 文件数据 -func (rs *RaftStorage) loadData() { - rs.mu.Lock() - defer rs.mu.Unlock() - - file, err := os.Open(rs.filePath) + db, err := leveldb.OpenFile(filePath, nil) if err != nil { - log.Info("文件未创建:" + rs.filePath) - rs.saveData() // 文件不存在时创建默认数据 - return + log.Fatal("无法打开 LevelDB:", zap.Error(err)) } - defer file.Close() - err = json.NewDecoder(file).Decode(rs) - if err != nil { - log.Error("读取文件失败:" + rs.filePath, zap.Error(err)) - } -} - -// 持久化数据到 JSON(必须持有锁,不能直接外部调用) -func (rs *RaftStorage) saveData() { - // 获取文件所在的目录 - dir := filepath.Dir(rs.filePath) - - // 确保目录存在 - if err := os.MkdirAll(dir, 0755); err != nil { - log.Error("创建存储目录失败", zap.Error(err)) - return - } - - file, err := os.Create(rs.filePath) - if err != nil { - log.Error("持久化节点出错", zap.Error(err)) - return - } - defer file.Close() - - err = json.NewEncoder(file).Encode(rs) - if err != nil { - log.Error("持久化写入失败") + return &RaftStorage{ + db: db, + filePath: filePath, } } -// SetCurrentTerm 设置当前 term,并清空 votedFor(符合 Raft 规范) +// SetCurrentTerm 设置当前 term func (rs *RaftStorage) SetCurrentTerm(term int) { rs.mu.Lock() defer rs.mu.Unlock() - if term > rs.CurrentTerm { - rs.CurrentTerm = term - rs.VotedFor = "" // 新任期清空投票 - rs.saveData() + err := rs.db.Put([]byte("current_term"), []byte(strconv.Itoa(term)), nil) + if err != nil { + log.Error("SetCurrentTerm 持久化失败:", zap.Error(err)) } } @@ -85,64 +44,131 @@ func (rs *RaftStorage) SetCurrentTerm(term int) { func (rs *RaftStorage) GetCurrentTerm() int { rs.mu.Lock() defer rs.mu.Unlock() - return rs.CurrentTerm + data, err := rs.db.Get([]byte("current_term"), nil) + if err != nil { + return 0 // 默认 term = 0 + } + term, _ := strconv.Atoi(string(data)) + return term } // SetVotedFor 记录投票给谁 func (rs *RaftStorage) SetVotedFor(candidate string) { rs.mu.Lock() defer rs.mu.Unlock() - rs.VotedFor = candidate - rs.saveData() + err := rs.db.Put([]byte("voted_for"), []byte(candidate), nil) + if err != nil { + log.Error("SetVotedFor 持久化失败:", zap.Error(err)) + } } // GetVotedFor 获取投票对象 func (rs *RaftStorage) GetVotedFor() string { rs.mu.Lock() defer rs.mu.Unlock() - return rs.VotedFor + data, err := rs.db.Get([]byte("voted_for"), nil) + if err != nil { + return "" + } + return string(data) } -// 同时设置 +// SetTermAndVote 原子更新 term 和 vote func (rs *RaftStorage) SetTermAndVote(term int, candidate string) { rs.mu.Lock() defer rs.mu.Unlock() - rs.VotedFor = candidate - rs.CurrentTerm = term - rs.saveData() + + batch := new(leveldb.Batch) + batch.Put([]byte("current_term"), []byte(strconv.Itoa(term))) + batch.Put([]byte("voted_for"), []byte(candidate)) + + err := rs.db.Write(batch, nil) // 原子提交 + if err != nil { + log.Error("SetTermAndVote 持久化失败:", zap.Error(err)) + } } -// append日志 -func (rs *RaftStorage) AppendLog(rlogE RaftLogEntry) { +// AppendLog 追加日志 +func (rs *RaftStorage) AppendLog(entry RaftLogEntry) { rs.mu.Lock() defer rs.mu.Unlock() - rs.LogEntries = append(rs.LogEntries, rlogE) - rs.saveData() + // 序列化日志 + batch := new(leveldb.Batch) + data, _ := json.Marshal(entry) + key := "log_" + strconv.Itoa(entry.LogId) + batch.Put([]byte(key), data) + + lastIndex := strconv.Itoa(entry.LogId) + batch.Put([]byte("last_log_index"), []byte(lastIndex)) + err := rs.db.Write(batch, nil) + if err != nil { + log.Error("AppendLog 持久化失败:", zap.Error(err)) + } } -// 更改日志 -func (rs *RaftStorage) WriteLog(rlogEs []RaftLogEntry) { +// GetLastLogIndex 获取最新日志的 index +func (rs *RaftStorage) GetLastLogIndex() int { rs.mu.Lock() defer rs.mu.Unlock() - - rs.LogEntries = rlogEs - rs.saveData() + data, err := rs.db.Get([]byte("last_log_index"), nil) + if err != nil { + return -1 + } + index, _ := strconv.Atoi(string(data)) + return index } -// 获取所有日志 -func (rs *RaftStorage) GetLogEntries() []RaftLogEntry { +// WriteLog 批量写入日志(保证原子性) +func (rs *RaftStorage) WriteLog(entries []RaftLogEntry) { + if len(entries) == 0 { + return + } rs.mu.Lock() defer rs.mu.Unlock() - return rs.LogEntries + + batch := new(leveldb.Batch) + for _, entry := range entries { + data, _ := json.Marshal(entry) + key := "log_" + strconv.Itoa(entry.LogId) + batch.Put([]byte(key), data) + } + + // 更新最新日志索引 + lastIndex := strconv.Itoa(entries[len(entries)-1].LogId) + batch.Put([]byte("last_log_index"), []byte(lastIndex)) + + err := rs.db.Write(batch, nil) + if err != nil { + log.Error("WriteLog 持久化失败:", zap.Error(err)) + } } -// GetLastLogIndex 获取最新日志的 index -func (rs *RaftStorage) GetLastLogIndex() int { +// GetLogEntries 获取所有日志 +func (rs *RaftStorage) GetLogEntries() []RaftLogEntry { rs.mu.Lock() defer rs.mu.Unlock() - if len(rs.LogEntries) == 0 { - return 0 + + var logs []RaftLogEntry + iter := rs.db.NewIterator(nil, nil) // 遍历所有键值 + defer iter.Release() + + for iter.Next() { + key := string(iter.Key()) + if strings.HasPrefix(key, "log_") { // 过滤日志 key + var entry RaftLogEntry + if err := json.Unmarshal(iter.Value(), &entry); err == nil { + logs = append(logs, entry) + } else { + log.Error("解析日志失败:", zap.Error(err)) + } + } } - return len(rs.LogEntries)-1 + + return logs +} + +// Close 关闭数据库 +func (rs *RaftStorage) Close() { + rs.db.Close() } diff --git a/threadTest/common.go b/threadTest/common.go index b276305..0d87af1 100644 --- a/threadTest/common.go +++ b/threadTest/common.go @@ -10,18 +10,18 @@ import ( func ExecuteNodeI(id string, isRestart bool, peerIds []string, threadTransport *nodes.ThreadTransport) (*nodes.Node, chan struct{}) { if !isRestart { - os.RemoveAll("storage/node" + id + ".json") + os.RemoveAll("storage/node" + id) } os.RemoveAll("leveldb/simple-kv-store" + id) - db, err := leveldb.OpenFile("leveldb/simple-kv-store"+id, nil) + db, err := leveldb.OpenFile("leveldb/simple-kv-store" + id, nil) if err != nil { fmt.Println("Failed to open database: ", err) } // 打开或创建节点数据持久化文件 - storage := nodes.NewRaftStorage("storage/node" + id + ".json") + storage := nodes.NewRaftStorage("storage/node" + id) var otherIds []string for _, ids := range peerIds {