From aa54845d65595829bff90ad1f4730e52397852d4 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Sat, 1 Mar 2025 10:43:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0logId?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- cmd/main.go | 2 +- internal/nodes/init.go | 17 ++++++++++++----- internal/nodes/log.go | 5 +++++ internal/nodes/node.go | 24 +++++++++++++++++------- internal/nodes/server_node.go | 12 +++++++----- test/server_client_test.go | 40 +++++++++++++++++++++++++++++----------- 7 files changed, 72 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 6a667a3..9a0791c 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ # 环境与运行 使用环境是wsl+ubuntu go mod download安装依赖 -./scripts/build.sh 会在根目录下编译出raftnode +./scripts/build.sh 会在根目录下编译出main ./scripts/run.sh 运行三个节点,目前能在终端进行读入,leader(n1)节点输出send log,其余节点输出receive log。终端输入后如果超时就退出(脚本运行时间可以在其中调整)。 # 注意 diff --git a/cmd/main.go b/cmd/main.go index ee36f99..03e763d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -59,6 +59,6 @@ func main() { nodes.Start(node, *isLeader) sig := <-sigs - fmt.Println("接收到信号:", sig) + fmt.Println("node_" + *id + "接收到信号:", sig) } diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 900247f..0a860de 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -7,6 +7,7 @@ import ( "net/rpc" "os" "simple-kv-store/internal/logprovider" + "strconv" "time" "go.uber.org/zap" @@ -32,6 +33,8 @@ func Init(id string, nodeAddr map[string]string, pipe string) *Node { selfId: id, nodes: ns, pipeAddr: pipe, + maxLogId: 0, + log: make(map[int]LogEntry), } } @@ -68,13 +71,17 @@ func Start(node *Node, isLeader bool) { log.Error("Error reading from pipe") } if n > 0 { - input := string(buffer[:n]) - log.Info("send : " + input) + input := string(buffer[:n]) // 将用户输入封装成一个 LogEntry - kv := LogEntry{input, ""} - node.log = append(node.log, kv) + kv := LogEntry{input, ""} // 目前键盘输入key,value 0 + logId := node.maxLogId + node.maxLogId++ + node.log[logId] = kv + + log.Info("send : logId = " + strconv.Itoa(logId) + ", key = " + input) // 广播给其它节点 - node.BroadCastKV(kv) + node.BroadCastKV(logId, kv) + // 持久化 } } } diff --git a/internal/nodes/log.go b/internal/nodes/log.go index 3880995..95c67d0 100644 --- a/internal/nodes/log.go +++ b/internal/nodes/log.go @@ -7,4 +7,9 @@ type LogEntry struct { type KVReply struct { Reply bool +} + +type LogIdAndEntry struct { + LogId int + Entry LogEntry } \ No newline at end of file diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 08ebb68..fc465d3 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -2,6 +2,7 @@ package nodes import ( "net/rpc" + "strconv" "go.uber.org/zap" ) @@ -33,20 +34,23 @@ type Node struct { state State // 简单的kv存储 - log []LogEntry + log map[int]LogEntry + + // leader用来标记新log + maxLogId int } -func (node *Node) BroadCastKV(kv LogEntry) { +func (node *Node) BroadCastKV(logId int, kv LogEntry) { // 遍历所有节点 for id, _ := range node.nodes { go func(id string, kv LogEntry) { var reply KVReply - node.sendKV(id, kv, &reply) + node.sendKV(id, logId, kv, &reply) }(id, kv) } } -func (node *Node) sendKV(id string, kv LogEntry, reply *KVReply) { +func (node *Node) sendKV(id string, logId int, kv LogEntry, reply *KVReply) { client, err := rpc.DialHTTP("tcp", node.nodes[id].address) if err != nil { log.Error("dialing: ", zap.Error(err)) @@ -60,7 +64,8 @@ func (node *Node) sendKV(id string, kv LogEntry, reply *KVReply) { } }(client) - callErr := client.Call("Node.ReceiveKV", kv, reply) // RPC + arg := LogIdAndEntry{logId, kv} + callErr := client.Call("Node.ReceiveKV", arg, reply) // RPC if callErr != nil { log.Error("dialing: ", zap.Error(callErr)) } @@ -73,8 +78,13 @@ func (node *Node) sendKV(id string, kv LogEntry, reply *KVReply) { } // RPC call -func (node *Node) ReceiveKV(kv LogEntry, reply *KVReply) error { - log.Info("node_" + node.selfId + " receive: " + kv.Key) +func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error { + log.Info("node_" + node.selfId + " receive: logId = "+ strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key) + entry, ok := node.log[arg.LogId] + if !ok { + node.log[arg.LogId] = entry + } + // 持久化 reply.Reply = true return nil } diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go index a8b1cc7..568e472 100644 --- a/internal/nodes/server_node.go +++ b/internal/nodes/server_node.go @@ -1,8 +1,7 @@ package nodes -import ( +import "strconv" -) // leader node作为server为client注册的方法 type ServerReply struct{ Isconnect bool @@ -11,10 +10,13 @@ type ServerReply struct{ } // RPC call func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error { - log.Info("server write : " + kv.Key) - node.log = append(node.log, kv) + + logId := node.maxLogId + node.maxLogId++ + node.log[logId] = kv // 广播给其它节点 - node.BroadCastKV(kv) + log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kv.Key) + node.BroadCastKV(logId, kv) reply.Isconnect = true return nil } diff --git a/test/server_client_test.go b/test/server_client_test.go index 0d0ef10..7e6613c 100644 --- a/test/server_client_test.go +++ b/test/server_client_test.go @@ -19,6 +19,7 @@ import ( var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) func TestServerClient(t *testing.T) { + // 登记结点信息 n := 5 var clusters []string for i := 0; i < n; i++ { @@ -27,6 +28,7 @@ func TestServerClient(t *testing.T) { clusters = append(clusters, addr) } + // 结点启动 var cmds []*exec.Cmd for i := 0; i < n; i++ { tmpClusters := append(clusters[:i], clusters[i+1:]...) @@ -57,25 +59,41 @@ func TestServerClient(t *testing.T) { } } - time.Sleep(time.Second) + time.Sleep(time.Second) // 等待启动完毕 // client启动 c := clientPkg.Client{Address: "127.0.0.1:9090", ServerId: "1"} - s := c.Write(nodes.LogEntry{Key: "1", Value: "hello"}) - if s != clientPkg.Ok { - t.Errorf("write test fail") + + // 写入 + var s clientPkg.Status + for i := 0; i < 10; i++ { + key := strconv.Itoa(i) + s := c.Write(nodes.LogEntry{Key: key, Value: "hello"}) + if s != clientPkg.Ok { + t.Errorf("write test fail") + } } - var value string - s = c.Read("1", &value) - if s != clientPkg.Ok { - t.Errorf("Read test1 fail") + // 读写入数据 + for i := 0; i < 10; i++ { + key := strconv.Itoa(i) + var value string + s = c.Read(key, &value) + if s != clientPkg.Ok { + t.Errorf("Read test1 fail") + } } - s = c.Read("2", &value) - if s != clientPkg.NotFound { - t.Errorf("Read test2 fail") + // 读未写入数据 + for i := 10; i < 15; i++ { + key := strconv.Itoa(i) + var value string + s = c.Read(key, &value) + if s != clientPkg.NotFound { + t.Errorf("Read test2 fail") + } } + // 通知进程结束 for _, cmd := range cmds { err := cmd.Process.Signal(syscall.SIGTERM) if err != nil {