From 99fb57518ec9e75f4fb2201c7640c5c55cbce16b Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Fri, 28 Feb 2025 15:49:34 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E6=8F=90=E4=BE=9B=E4=BA=86=E5=AE=A2?= =?UTF-8?q?=E6=88=B7=E7=AB=AF=E5=90=91leader=E5=8F=91=E9=80=81=E7=9A=84?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 +- README.md | 2 + cmd/main.go | 34 ++++++++++++--- internal/client/client_node.go | 94 ++++++++++++++++++++++++++++++++++++++++++ internal/nodes/init.go | 61 +++++++++++++-------------- internal/nodes/node.go | 23 ++++++----- internal/nodes/server_node.go | 37 +++++++++++++++++ scripts/build.sh | 2 +- scripts/run.sh | 8 ++-- test/server_client_test.go | 87 ++++++++++++++++++++++++++++++++++++++ 10 files changed, 298 insertions(+), 52 deletions(-) create mode 100644 internal/client/client_node.go create mode 100644 internal/nodes/server_node.go create mode 100644 test/server_client_test.go diff --git a/.gitignore b/.gitignore index 408abd6..3f067b7 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,4 @@ go.work .idea .idea/* -raftnode +main diff --git a/README.md b/README.md index 8f4d03d..6a667a3 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,8 @@ go mod download安装依赖 如果出现tcp listen error可能是因为之前的进程没用正常退出,占用了端口 lsof -i :9091查看pid kill -9 杀死进程 +## 关于测试 +通过新开进程的方式创建节点,如果通过线程创建,会出现重复注册rpc问题 # todo list 消息通讯异常的处理 diff --git a/cmd/main.go b/cmd/main.go index 255daec..ee36f99 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,11 +2,16 @@ package main import ( "flag" + "fmt" + "os" + "os/signal" "simple-kv-store/internal/logprovider" "simple-kv-store/internal/nodes" - "go.uber.org/zap" "strconv" "strings" + "syscall" + + "go.uber.org/zap" ) var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) @@ -18,23 +23,42 @@ func main() { } }() + // 设置一个通道来捕获中断信号 + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) + port := flag.String("port", ":9091", "rpc listen port") cluster := flag.String("cluster", "127.0.0.1:9092,127.0.0.1:9093", "comma sep") - id := flag.Int("id", 1, "node ID") + id := flag.String("id", "1", "node ID") pipe := flag.String("pipe", "", "input from scripts") isLeader := flag.Bool("isleader", false, "init node state") // 参数解析 flag.Parse() clusters := strings.Split(*cluster, ",") - node := nodes.Init(*id, clusters, *pipe) + idClusterPairs := make(map[string]string) + idCnt := 1 + selfi, err := strconv.Atoi(*id) + if err != nil { + log.Error("figure id only") + } + for _, addr := range clusters { + if idCnt == selfi { + idCnt++ // 命令行cluster按id排序传入,记录时跳过自己的id,先保证所有节点互相记录的id一致 + } + idClusterPairs[strconv.Itoa(idCnt)] = addr + idCnt++ + } + node := nodes.Init(*id, idClusterPairs, *pipe) - log.Info("id: " + strconv.Itoa(*id) + "节点开始监听: " + *port + "端口") + log.Info("id: " + *id + "节点开始监听: " + *port + "端口") // 监听rpc node.Rpc(*port) // 开启 raft nodes.Start(node, *isLeader) - select {} + sig := <-sigs + fmt.Println("接收到信号:", sig) + } diff --git a/internal/client/client_node.go b/internal/client/client_node.go new file mode 100644 index 0000000..345ed05 --- /dev/null +++ b/internal/client/client_node.go @@ -0,0 +1,94 @@ +package clientPkg + +import ( + "net/rpc" + "simple-kv-store/internal/logprovider" + "simple-kv-store/internal/nodes" + + "go.uber.org/zap" +) + +var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) + +type Client struct { + // 连接的server端节点(node1) + ServerId string + Address string +} + +type Status = uint8 + +const ( + Ok Status = iota + 1 + NotFound + Fail +) + +func (client *Client) Write(kv nodes.LogEntry) Status { + log.Info("client write request key :" + kv.Key) + c, err := rpc.DialHTTP("tcp", client.Address) + if err != nil { + log.Error("dialing: ", zap.Error(err)) + return Fail + } + + defer func(server *rpc.Client) { + err := c.Close() + if err != nil { + log.Error("client close err: ", zap.Error(err)) + } + }(c) + + var reply nodes.ServerReply + callErr := c.Call("Node.WriteKV", kv, &reply) // RPC + if callErr != nil { + log.Error("dialing: ", zap.Error(callErr)) + return Fail + } + + if reply.Isconnect { // 发送成功 + return Ok + } else { // 失败 + return Fail + } +} + +func (client *Client) Read(key string, value *string) Status { // 查不到value为空 + log.Info("client read request key :" + key) + if value == nil { + return Fail + } + + c, err := rpc.DialHTTP("tcp", client.Address) + if err != nil { + log.Error("dialing: ", zap.Error(err)) + return Fail + } + + defer func(server *rpc.Client) { + err := c.Close() + if err != nil { + log.Error("client close err: ", zap.Error(err)) + } + }(c) + + var reply nodes.ServerReply + callErr := c.Call("Node.ReadKey", key, &reply) // RPC + if callErr != nil { + log.Error("dialing: ", zap.Error(callErr)) + return Fail + } + + if reply.Isconnect { // 发送成功 + if reply.HaveValue { + *value = reply.Value + return Ok + } else { + return NotFound + } + } else { // 失败 + return Fail + } +} + + diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 9c2117e..900247f 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -21,15 +21,15 @@ func newNode(address string) *Public_node_info { } } -func Init(id int, nodeAddr []string, pipe string) *Node { - ns := make(map[int]*Public_node_info) - for k, v := range nodeAddr { - ns[k] = newNode(v) +func Init(id string, nodeAddr map[string]string, pipe string) *Node { + ns := make(map[string]*Public_node_info) + for id, addr := range nodeAddr { + ns[id] = newNode(addr) } // 创建节点 return &Node{ - self: id, + selfId: id, nodes: ns, pipeAddr: pipe, } @@ -51,31 +51,31 @@ func Start(node *Node, isLeader bool) { // candidate发布一个监听输入线程后,变成leader node.state = Leader go func() { - if node.pipeAddr == "" { - log.Error("暂不支持非管道读入") - } - - pipe, err := os.Open(node.pipeAddr) - if err != nil { - log.Error("Failed to open pipe") - } - defer pipe.Close() - - // 不断读取管道中的输入 - buffer := make([]byte, 256) - for { - n, err := pipe.Read(buffer) - if err != nil && err != io.EOF { - log.Error("Error reading from pipe") + if node.pipeAddr == "" { // 客户端远程调用server_node方法 + log.Info("请运行客户端进程进行读写") + } else { // 命令行提供了管道,支持管道(键盘)输入 + pipe, err := os.Open(node.pipeAddr) + if err != nil { + log.Error("Failed to open pipe") } - if n > 0 { - input := string(buffer[:n]) - log.Info("send : " + input) - // 将用户输入封装成一个 LogEntry - kv := LogEntry{input, ""} - node.log = append(node.log, kv) - // 广播给其它节点 - node.BroadCastKV(kv) + defer pipe.Close() + + // 不断读取管道中的输入 + buffer := make([]byte, 256) + for { + n, err := pipe.Read(buffer) + if err != nil && err != io.EOF { + log.Error("Error reading from pipe") + } + if n > 0 { + input := string(buffer[:n]) + log.Info("send : " + input) + // 将用户输入封装成一个 LogEntry + kv := LogEntry{input, ""} + node.log = append(node.log, kv) + // 广播给其它节点 + node.BroadCastKV(kv) + } } } }() @@ -92,9 +92,10 @@ func (node *Node) Rpc(port string) { log.Fatal("rpc register failed", zap.Error(err)) } rpc.HandleHTTP() + l, e := net.Listen("tcp", port) if e != nil { - log.Fatal("listen error:", zap.Error(err)) + log.Fatal("listen error:", zap.Error(e)) } go func() { err := http.Serve(l, nil) diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 504b212..08ebb68 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -2,6 +2,7 @@ package nodes import ( "net/rpc" + "go.uber.org/zap" ) @@ -20,10 +21,10 @@ type Public_node_info struct { type Node struct { // 当前节点id - self int + selfId string // 除当前节点外其他节点信息 - nodes map[int]*Public_node_info + nodes map[string]*Public_node_info //管道名 pipeAddr string @@ -37,16 +38,16 @@ type Node struct { func (node *Node) BroadCastKV(kv LogEntry) { // 遍历所有节点 - for i := range node.nodes { - go func (index int, kv LogEntry) { + for id, _ := range node.nodes { + go func(id string, kv LogEntry) { var reply KVReply - node.sendKV(index, kv, &reply) - } (i, kv) + node.sendKV(id, kv, &reply) + }(id, kv) } } -func (node *Node) sendKV(index int, kv LogEntry, reply *KVReply) { - client, err := rpc.DialHTTP("tcp", node.nodes[index].address) +func (node *Node) sendKV(id string, kv LogEntry, reply *KVReply) { + client, err := rpc.DialHTTP("tcp", node.nodes[id].address) if err != nil { log.Error("dialing: ", zap.Error(err)) return @@ -73,10 +74,11 @@ func (node *Node) sendKV(index int, kv LogEntry, reply *KVReply) { // RPC call func (node *Node) ReceiveKV(kv LogEntry, reply *KVReply) error { - log.Info("receive: " + kv.Key) - reply.Reply = true; + log.Info("node_" + node.selfId + " receive: " + kv.Key) + reply.Reply = true return nil } + // func (node *Node) broadcastHeartbeat() { // // 遍历所有节点 // for i := range raft.nodes { @@ -104,4 +106,3 @@ func (node *Node) ReceiveKV(kv LogEntry, reply *KVReply) error { // }(i, hb) // } // } - diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go new file mode 100644 index 0000000..a8b1cc7 --- /dev/null +++ b/internal/nodes/server_node.go @@ -0,0 +1,37 @@ +package nodes + +import ( + +) +// leader node作为server为client注册的方法 +type ServerReply struct{ + Isconnect bool + HaveValue bool + Value string +} +// RPC call +func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error { + log.Info("server write : " + kv.Key) + node.log = append(node.log, kv) + // 广播给其它节点 + node.BroadCastKV(kv) + reply.Isconnect = true + return nil +} +// RPC call +func (node *Node) ReadKey(key string, reply *ServerReply) error { + log.Info("server read : " + key) + // 先只读leader自己 + for _, kv := range node.log { + if kv.Key == key { + reply.Value = kv.Value + reply.HaveValue = true + reply.Isconnect = true + return nil + } + } + reply.HaveValue = false + reply.Isconnect = true + return nil +} + diff --git a/scripts/build.sh b/scripts/build.sh index f682f82..d7acf22 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -1 +1 @@ -go build -o raftnode ./cmd/main.go \ No newline at end of file +go build -o main ./cmd/main.go \ No newline at end of file diff --git a/scripts/run.sh b/scripts/run.sh index 4c250de..794c247 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -4,19 +4,19 @@ RUN_TIME=10 # 需要传递数据的管道 -PIPE_NAME="/tmp/raft_input_pipe" +PIPE_NAME="/tmp/input_pipe" # 启动节点1 echo "Starting Node 1..." -timeout $RUN_TIME ./raftnode -id 1 -port ":9091" -cluster "127.0.0.1:9092,127.0.0.1:9093" -pipe "$PIPE_NAME" -isleader=true & +timeout $RUN_TIME ./main -id 1 -port ":9091" -cluster "127.0.0.1:9092,127.0.0.1:9093" -pipe "$PIPE_NAME" -isleader=true & # 启动节点2 echo "Starting Node 2..." -timeout $RUN_TIME ./raftnode -id 2 -port ":9092" -cluster "127.0.0.1:9091,127.0.0.1:9093" -pipe "$PIPE_NAME" & +timeout $RUN_TIME ./main -id 2 -port ":9092" -cluster "127.0.0.1:9091,127.0.0.1:9093" -pipe "$PIPE_NAME" & # 启动节点3 echo "Starting Node 3..." -timeout $RUN_TIME ./raftnode -id 3 -port ":9093" -cluster "127.0.0.1:9091,127.0.0.1:9092" -pipe "$PIPE_NAME"& +timeout $RUN_TIME ./main -id 3 -port ":9093" -cluster "127.0.0.1:9091,127.0.0.1:9092" -pipe "$PIPE_NAME"& echo "All nodes started successfully!" # 创建一个管道用于进程间通信 diff --git a/test/server_client_test.go b/test/server_client_test.go new file mode 100644 index 0000000..0d0ef10 --- /dev/null +++ b/test/server_client_test.go @@ -0,0 +1,87 @@ +package test + +import ( + "fmt" + "os" + "os/exec" + "simple-kv-store/internal/client" + "simple-kv-store/internal/logprovider" + "simple-kv-store/internal/nodes" + "strconv" + "strings" + "syscall" + "testing" + "time" + + "go.uber.org/zap" +) + +var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) + +func TestServerClient(t *testing.T) { + n := 5 + var clusters []string + for i := 0; i < n; i++ { + port := fmt.Sprintf("%d", uint16(9090)+uint16(i)) + addr := "127.0.0.1:" + port + clusters = append(clusters, addr) + } + + var cmds []*exec.Cmd + for i := 0; i < n; i++ { + tmpClusters := append(clusters[:i], clusters[i+1:]...) + port := fmt.Sprintf(":%d", uint16(9090)+uint16(i)) + + var isleader string + if i == 0 { + isleader = "true" + } else { + isleader = "false" + } + cmd := exec.Command( + "../main", + "-id", strconv.Itoa(i + 1), + "-port", port, + "-cluster", strings.Join(tmpClusters, ","), + "-isleader=" + isleader, + ) + cmds = append(cmds, cmd) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // 执行命令 + err := cmd.Start() + if err != nil { + fmt.Println("启动进程出错:", err) + return + } + } + + time.Sleep(time.Second) + // client启动 + c := clientPkg.Client{Address: "127.0.0.1:9090", ServerId: "1"} + s := c.Write(nodes.LogEntry{Key: "1", Value: "hello"}) + if s != clientPkg.Ok { + t.Errorf("write test fail") + } + + var value string + s = c.Read("1", &value) + if s != clientPkg.Ok { + t.Errorf("Read test1 fail") + } + + s = c.Read("2", &value) + if s != clientPkg.NotFound { + t.Errorf("Read test2 fail") + } + + for _, cmd := range cmds { + err := cmd.Process.Signal(syscall.SIGTERM) + if err != nil { + fmt.Println("Error sending signal:", err) + return + } + } + +} From aa54845d65595829bff90ad1f4730e52397852d4 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Sat, 1 Mar 2025 10:43:30 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E5=A2=9E=E5=8A=A0logId?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- cmd/main.go | 2 +- internal/nodes/init.go | 17 ++++++++++++----- internal/nodes/log.go | 5 +++++ internal/nodes/node.go | 24 +++++++++++++++++------- internal/nodes/server_node.go | 12 +++++++----- test/server_client_test.go | 40 +++++++++++++++++++++++++++++----------- 7 files changed, 72 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 6a667a3..9a0791c 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ # 环境与运行 使用环境是wsl+ubuntu go mod download安装依赖 -./scripts/build.sh 会在根目录下编译出raftnode +./scripts/build.sh 会在根目录下编译出main ./scripts/run.sh 运行三个节点,目前能在终端进行读入,leader(n1)节点输出send log,其余节点输出receive log。终端输入后如果超时就退出(脚本运行时间可以在其中调整)。 # 注意 diff --git a/cmd/main.go b/cmd/main.go index ee36f99..03e763d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -59,6 +59,6 @@ func main() { nodes.Start(node, *isLeader) sig := <-sigs - fmt.Println("接收到信号:", sig) + fmt.Println("node_" + *id + "接收到信号:", sig) } diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 900247f..0a860de 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -7,6 +7,7 @@ import ( "net/rpc" "os" "simple-kv-store/internal/logprovider" + "strconv" "time" "go.uber.org/zap" @@ -32,6 +33,8 @@ func Init(id string, nodeAddr map[string]string, pipe string) *Node { selfId: id, nodes: ns, pipeAddr: pipe, + maxLogId: 0, + log: make(map[int]LogEntry), } } @@ -68,13 +71,17 @@ func Start(node *Node, isLeader bool) { log.Error("Error reading from pipe") } if n > 0 { - input := string(buffer[:n]) - log.Info("send : " + input) + input := string(buffer[:n]) // 将用户输入封装成一个 LogEntry - kv := LogEntry{input, ""} - node.log = append(node.log, kv) + kv := LogEntry{input, ""} // 目前键盘输入key,value 0 + logId := node.maxLogId + node.maxLogId++ + node.log[logId] = kv + + log.Info("send : logId = " + strconv.Itoa(logId) + ", key = " + input) // 广播给其它节点 - node.BroadCastKV(kv) + node.BroadCastKV(logId, kv) + // 持久化 } } } diff --git a/internal/nodes/log.go b/internal/nodes/log.go index 3880995..95c67d0 100644 --- a/internal/nodes/log.go +++ b/internal/nodes/log.go @@ -7,4 +7,9 @@ type LogEntry struct { type KVReply struct { Reply bool +} + +type LogIdAndEntry struct { + LogId int + Entry LogEntry } \ No newline at end of file diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 08ebb68..fc465d3 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -2,6 +2,7 @@ package nodes import ( "net/rpc" + "strconv" "go.uber.org/zap" ) @@ -33,20 +34,23 @@ type Node struct { state State // 简单的kv存储 - log []LogEntry + log map[int]LogEntry + + // leader用来标记新log + maxLogId int } -func (node *Node) BroadCastKV(kv LogEntry) { +func (node *Node) BroadCastKV(logId int, kv LogEntry) { // 遍历所有节点 for id, _ := range node.nodes { go func(id string, kv LogEntry) { var reply KVReply - node.sendKV(id, kv, &reply) + node.sendKV(id, logId, kv, &reply) }(id, kv) } } -func (node *Node) sendKV(id string, kv LogEntry, reply *KVReply) { +func (node *Node) sendKV(id string, logId int, kv LogEntry, reply *KVReply) { client, err := rpc.DialHTTP("tcp", node.nodes[id].address) if err != nil { log.Error("dialing: ", zap.Error(err)) @@ -60,7 +64,8 @@ func (node *Node) sendKV(id string, kv LogEntry, reply *KVReply) { } }(client) - callErr := client.Call("Node.ReceiveKV", kv, reply) // RPC + arg := LogIdAndEntry{logId, kv} + callErr := client.Call("Node.ReceiveKV", arg, reply) // RPC if callErr != nil { log.Error("dialing: ", zap.Error(callErr)) } @@ -73,8 +78,13 @@ func (node *Node) sendKV(id string, kv LogEntry, reply *KVReply) { } // RPC call -func (node *Node) ReceiveKV(kv LogEntry, reply *KVReply) error { - log.Info("node_" + node.selfId + " receive: " + kv.Key) +func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error { + log.Info("node_" + node.selfId + " receive: logId = "+ strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key) + entry, ok := node.log[arg.LogId] + if !ok { + node.log[arg.LogId] = entry + } + // 持久化 reply.Reply = true return nil } diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go index a8b1cc7..568e472 100644 --- a/internal/nodes/server_node.go +++ b/internal/nodes/server_node.go @@ -1,8 +1,7 @@ package nodes -import ( +import "strconv" -) // leader node作为server为client注册的方法 type ServerReply struct{ Isconnect bool @@ -11,10 +10,13 @@ type ServerReply struct{ } // RPC call func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error { - log.Info("server write : " + kv.Key) - node.log = append(node.log, kv) + + logId := node.maxLogId + node.maxLogId++ + node.log[logId] = kv // 广播给其它节点 - node.BroadCastKV(kv) + log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kv.Key) + node.BroadCastKV(logId, kv) reply.Isconnect = true return nil } diff --git a/test/server_client_test.go b/test/server_client_test.go index 0d0ef10..7e6613c 100644 --- a/test/server_client_test.go +++ b/test/server_client_test.go @@ -19,6 +19,7 @@ import ( var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) func TestServerClient(t *testing.T) { + // 登记结点信息 n := 5 var clusters []string for i := 0; i < n; i++ { @@ -27,6 +28,7 @@ func TestServerClient(t *testing.T) { clusters = append(clusters, addr) } + // 结点启动 var cmds []*exec.Cmd for i := 0; i < n; i++ { tmpClusters := append(clusters[:i], clusters[i+1:]...) @@ -57,25 +59,41 @@ func TestServerClient(t *testing.T) { } } - time.Sleep(time.Second) + time.Sleep(time.Second) // 等待启动完毕 // client启动 c := clientPkg.Client{Address: "127.0.0.1:9090", ServerId: "1"} - s := c.Write(nodes.LogEntry{Key: "1", Value: "hello"}) - if s != clientPkg.Ok { - t.Errorf("write test fail") + + // 写入 + var s clientPkg.Status + for i := 0; i < 10; i++ { + key := strconv.Itoa(i) + s := c.Write(nodes.LogEntry{Key: key, Value: "hello"}) + if s != clientPkg.Ok { + t.Errorf("write test fail") + } } - var value string - s = c.Read("1", &value) - if s != clientPkg.Ok { - t.Errorf("Read test1 fail") + // 读写入数据 + for i := 0; i < 10; i++ { + key := strconv.Itoa(i) + var value string + s = c.Read(key, &value) + if s != clientPkg.Ok { + t.Errorf("Read test1 fail") + } } - s = c.Read("2", &value) - if s != clientPkg.NotFound { - t.Errorf("Read test2 fail") + // 读未写入数据 + for i := 10; i < 15; i++ { + key := strconv.Itoa(i) + var value string + s = c.Read(key, &value) + if s != clientPkg.NotFound { + t.Errorf("Read test2 fail") + } } + // 通知进程结束 for _, cmd := range cmds { err := cmd.Process.Signal(syscall.SIGTERM) if err != nil { From 67e7706f61c3789835ef1924d84bb78632b66ab4 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Sat, 1 Mar 2025 16:06:01 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E6=8C=81=E4=B9=85=E5=8C=96=E4=B8=8E?= =?UTF-8?q?=E6=81=A2=E5=A4=8D=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + cmd/main.go | 24 ++++++++- go.mod | 2 + go.sum | 18 +++++++ internal/nodes/init.go | 15 +++--- internal/nodes/node.go | 41 +++------------- internal/nodes/server_node.go | 21 ++++---- test/common.go | 45 +++++++++++++++++ test/restart_follower_test.go | 110 ++++++++++++++++++++++++++++++++++++++++++ test/server_client_test.go | 43 +++++------------ 10 files changed, 236 insertions(+), 84 deletions(-) create mode 100644 test/common.go create mode 100644 test/restart_follower_test.go diff --git a/.gitignore b/.gitignore index 3f067b7..b6d444b 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ go.work .idea/* main +*/leveldb diff --git a/cmd/main.go b/cmd/main.go index 03e763d..b0b8bd8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "syscall" + "github.com/syndtr/goleveldb/leveldb" "go.uber.org/zap" ) @@ -32,6 +33,7 @@ func main() { id := flag.String("id", "1", "node ID") pipe := flag.String("pipe", "", "input from scripts") isLeader := flag.Bool("isleader", false, "init node state") + isNewDb := flag.Bool("isNewDb", true, "new test or restart") // 参数解析 flag.Parse() @@ -49,10 +51,28 @@ func main() { idClusterPairs[strconv.Itoa(idCnt)] = addr idCnt++ } - node := nodes.Init(*id, idClusterPairs, *pipe) - log.Info("id: " + *id + "节点开始监听: " + *port + "端口") + if *isNewDb { + os.RemoveAll("leveldb/simple-kv-store" + *id) + } + // 打开或创建每个结点自己的数据库 + db, err := leveldb.OpenFile("leveldb/simple-kv-store" + *id, nil) + if err != nil { + log.Fatal("Failed to open database: ", zap.Error(err)) + } + defer db.Close() // 确保数据库在使用完毕后关闭 + iter := db.NewIterator(nil, nil) + defer iter.Release() + // 计数 + count := 0 + for iter.Next() { + count++ + } + fmt.Printf(*id + "结点目前有数据:%d\n", count) + + node := nodes.Init(*id, idClusterPairs, *pipe, db) + log.Info("id: " + *id + "节点开始监听: " + *port + "端口") // 监听rpc node.Rpc(*port) // 开启 raft diff --git a/go.mod b/go.mod index d9c592b..0981ec0 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.20 require go.uber.org/zap v1.24.0 require ( + github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect + github.com/syndtr/goleveldb v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect ) diff --git a/go.sum b/go.sum index 081dfe1..9b63927 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,18 @@ github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= +github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= @@ -10,4 +20,12 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 0a860de..4ef68fe 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -10,6 +10,7 @@ import ( "strconv" "time" + "github.com/syndtr/goleveldb/leveldb" "go.uber.org/zap" ) @@ -22,7 +23,7 @@ func newNode(address string) *Public_node_info { } } -func Init(id string, nodeAddr map[string]string, pipe string) *Node { +func Init(id string, nodeAddr map[string]string, pipe string, db *leveldb.DB) *Node { ns := make(map[string]*Public_node_info) for id, addr := range nodeAddr { ns[id] = newNode(addr) @@ -30,11 +31,12 @@ func Init(id string, nodeAddr map[string]string, pipe string) *Node { // 创建节点 return &Node{ - selfId: id, - nodes: ns, + selfId: id, + nodes: ns, pipeAddr: pipe, maxLogId: 0, - log: make(map[int]LogEntry), + log: make(map[int]LogEntry), + db: db, } } @@ -71,7 +73,7 @@ func Start(node *Node, isLeader bool) { log.Error("Error reading from pipe") } if n > 0 { - input := string(buffer[:n]) + input := string(buffer[:n]) // 将用户输入封装成一个 LogEntry kv := LogEntry{input, ""} // 目前键盘输入key,value 0 logId := node.maxLogId @@ -82,6 +84,7 @@ func Start(node *Node, isLeader bool) { // 广播给其它节点 node.BroadCastKV(logId, kv) // 持久化 + node.db.Put([]byte(kv.Key), []byte(kv.Value), nil) } } } @@ -99,7 +102,7 @@ func (node *Node) Rpc(port string) { log.Fatal("rpc register failed", zap.Error(err)) } rpc.HandleHTTP() - + l, e := net.Listen("tcp", port) if e != nil { log.Fatal("listen error:", zap.Error(e)) diff --git a/internal/nodes/node.go b/internal/nodes/node.go index fc465d3..1c99767 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -4,6 +4,7 @@ import ( "net/rpc" "strconv" + "github.com/syndtr/goleveldb/leveldb" "go.uber.org/zap" ) @@ -38,6 +39,8 @@ type Node struct { // leader用来标记新log maxLogId int + + db *leveldb.DB } func (node *Node) BroadCastKV(logId int, kv LogEntry) { @@ -67,13 +70,7 @@ func (node *Node) sendKV(id string, logId int, kv LogEntry, reply *KVReply) { arg := LogIdAndEntry{logId, kv} callErr := client.Call("Node.ReceiveKV", arg, reply) // RPC if callErr != nil { - log.Error("dialing: ", zap.Error(callErr)) - } - - if reply.Reply { // 发送成功 - - } else { // 失败 - + log.Error("dialing node_" + id + "fail: ", zap.Error(callErr)) } } @@ -85,34 +82,8 @@ func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error { node.log[arg.LogId] = entry } // 持久化 - reply.Reply = true + node.db.Put([]byte(arg.Entry.Key), []byte(arg.Entry.Value), nil) + reply.Reply = true // rpc call需要有reply,但实际上调用是否成功是error返回值决定 return nil } -// func (node *Node) broadcastHeartbeat() { -// // 遍历所有节点 -// for i := range raft.nodes { -// // request 参数 -// hb := Heartbeat{ -// Term: raft.currTerm, -// LeaderId: raft.self, -// CommitIndex: raft.commitIndex, -// } - -// prevLogIndex := raft.nextIndex[i] - 1 - -// // 如果有日志未同步则发送 -// if raft.getLastIndex() > prevLogIndex { -// hb.PrevLogIndex = prevLogIndex -// hb.PrevLogTerm = raft.log[prevLogIndex].CurrTerm -// hb.Entries = raft.log[prevLogIndex:] -// // log.Info("will send log entries", zap.Any("logEntries", hb.Entries)) -// } - -// go func(index int, hb Heartbeat) { -// var reply HeartbeatReply -// // 向某一个节点发送 heartbeat -// raft.sendHeartbeat(index, hb, &reply) -// }(i, hb) -// } -// } diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go index 568e472..a7681a7 100644 --- a/internal/nodes/server_node.go +++ b/internal/nodes/server_node.go @@ -1,6 +1,10 @@ package nodes -import "strconv" +import ( + "strconv" + + "github.com/syndtr/goleveldb/leveldb" +) // leader node作为server为client注册的方法 type ServerReply struct{ @@ -15,6 +19,7 @@ func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error { node.maxLogId++ node.log[logId] = kv // 广播给其它节点 + node.db.Put([]byte(kv.Key), []byte(kv.Value), nil) log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kv.Key) node.BroadCastKV(logId, kv) reply.Isconnect = true @@ -24,15 +29,13 @@ func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error { func (node *Node) ReadKey(key string, reply *ServerReply) error { log.Info("server read : " + key) // 先只读leader自己 - for _, kv := range node.log { - if kv.Key == key { - reply.Value = kv.Value - reply.HaveValue = true - reply.Isconnect = true - return nil - } + value, err := node.db.Get([]byte(key), nil) + if err == leveldb.ErrNotFound { + reply.HaveValue = false + } else { + reply.HaveValue = true + reply.Value = string(value) } - reply.HaveValue = false reply.Isconnect = true return nil } diff --git a/test/common.go b/test/common.go new file mode 100644 index 0000000..554927d --- /dev/null +++ b/test/common.go @@ -0,0 +1,45 @@ +package test + +import ( + "fmt" + "os" + "os/exec" + "strconv" + "strings" +) + +func ExecuteNodeI(i int, isLeader bool, isNewDb bool, clusters []string) *exec.Cmd { + tmpClusters := append(clusters[:i], clusters[i+1:]...) + port := fmt.Sprintf(":%d", uint16(9090)+uint16(i)) + + var isleader string + if isLeader { + isleader = "true" + } else { + isleader = "false" + } + var isnewdb string + if isNewDb { + isnewdb = "true" + } else { + isnewdb = "false" + } + cmd := exec.Command( + "../main", + "-id", strconv.Itoa(i + 1), + "-port", port, + "-cluster", strings.Join(tmpClusters, ","), + "-isleader=" + isleader, + "-isNewDb=" + isnewdb, + ) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // 执行命令 + err := cmd.Start() + if err != nil { + fmt.Println("启动进程出错:", err) + return nil + } + return cmd +} \ No newline at end of file diff --git a/test/restart_follower_test.go b/test/restart_follower_test.go new file mode 100644 index 0000000..4bbce54 --- /dev/null +++ b/test/restart_follower_test.go @@ -0,0 +1,110 @@ +package test + +import ( + "fmt" + "os/exec" + "simple-kv-store/internal/client" + "simple-kv-store/internal/nodes" + "strconv" + "syscall" + "testing" + "time" +) + +func TestFollowerRestart(t *testing.T) { + // 登记结点信息 + n := 3 + var clusters []string + for i := 0; i < n; i++ { + port := fmt.Sprintf("%d", uint16(9090)+uint16(i)) + addr := "127.0.0.1:" + port + clusters = append(clusters, addr) + } + + // 结点启动 + var cmds []*exec.Cmd + for i := 0; i < n; i++ { + var cmd *exec.Cmd + if i == 0 { + cmd = ExecuteNodeI(i, true, true, clusters) + } else { + cmd = ExecuteNodeI(i, false, true, clusters) + } + + if cmd == nil { + return + } else { + cmds = append(cmds, cmd) + } + } + + time.Sleep(time.Second) // 等待启动完毕 + // client启动, 连接leader + cWrite := clientPkg.Client{Address: clusters[0], ServerId: "1"} + + // 写入 + var s clientPkg.Status + for i := 0; i < 5; i++ { + key := strconv.Itoa(i) + s := cWrite.Write(nodes.LogEntry{Key: key, Value: "hello"}) + if s != clientPkg.Ok { + t.Errorf("write test fail") + } + } + time.Sleep(time.Second) // 等待写入完毕 + // 模拟最后一个结点崩溃 + err := cmds[n - 1].Process.Signal(syscall.SIGTERM) + if err != nil { + fmt.Println("Error sending signal:", err) + return + } + // 继续写入 + for i := 5; i < 10; i++ { + key := strconv.Itoa(i) + s := cWrite.Write(nodes.LogEntry{Key: key, Value: "hello"}) + if s != clientPkg.Ok { + t.Errorf("write test fail") + } + } + // 恢复结点 + cmd := ExecuteNodeI(n - 1, false, false, clusters) + if cmd == nil { + t.Errorf("recover test1 fail") + return + } else { + cmds[n - 1] = cmd + } + time.Sleep(time.Second) // 等待启动完毕 + + // client启动, 连接节点n-1(去读它的数据) + cRead := clientPkg.Client{Address: clusters[n - 1], ServerId: "n"} + // 读崩溃前写入数据 + for i := 0; i < 5; i++ { + key := strconv.Itoa(i) + var value string + s = cRead.Read(key, &value) + if s != clientPkg.Ok { + t.Errorf("Read test1 fail") + } + } + + // 读未写入数据 + for i := 5; i < 15; i++ { + key := strconv.Itoa(i) + var value string + s = cRead.Read(key, &value) + if s != clientPkg.NotFound { + t.Errorf("Read test2 fail") + } + } + + // 通知进程结束 + for _, cmd := range cmds { + err := cmd.Process.Signal(syscall.SIGTERM) + if err != nil { + fmt.Println("Error sending signal:", err) + return + } + } + +} diff --git a/test/server_client_test.go b/test/server_client_test.go index 7e6613c..74599a5 100644 --- a/test/server_client_test.go +++ b/test/server_client_test.go @@ -2,22 +2,15 @@ package test import ( "fmt" - "os" "os/exec" "simple-kv-store/internal/client" - "simple-kv-store/internal/logprovider" "simple-kv-store/internal/nodes" "strconv" - "strings" "syscall" "testing" "time" - - "go.uber.org/zap" ) -var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) - func TestServerClient(t *testing.T) { // 登记结点信息 n := 5 @@ -30,33 +23,19 @@ func TestServerClient(t *testing.T) { // 结点启动 var cmds []*exec.Cmd - for i := 0; i < n; i++ { - tmpClusters := append(clusters[:i], clusters[i+1:]...) - port := fmt.Sprintf(":%d", uint16(9090)+uint16(i)) - - var isleader string + for i := 0; i < n; i++ { + var cmd *exec.Cmd if i == 0 { - isleader = "true" + cmd = ExecuteNodeI(i, true, true, clusters) } else { - isleader = "false" + cmd = ExecuteNodeI(i, false, true, clusters) } - cmd := exec.Command( - "../main", - "-id", strconv.Itoa(i + 1), - "-port", port, - "-cluster", strings.Join(tmpClusters, ","), - "-isleader=" + isleader, - ) - cmds = append(cmds, cmd) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - // 执行命令 - err := cmd.Start() - if err != nil { - fmt.Println("启动进程出错:", err) + + if cmd == nil { return - } + } else { + cmds = append(cmds, cmd) + } } time.Sleep(time.Second) // 等待启动完毕 @@ -67,7 +46,7 @@ func TestServerClient(t *testing.T) { var s clientPkg.Status for i := 0; i < 10; i++ { key := strconv.Itoa(i) - s := c.Write(nodes.LogEntry{Key: key, Value: "hello"}) + s := c.Write(nodes.LogEntry{Key: key, Value: "hello" + key}) if s != clientPkg.Ok { t.Errorf("write test fail") } @@ -78,7 +57,7 @@ func TestServerClient(t *testing.T) { key := strconv.Itoa(i) var value string s = c.Read(key, &value) - if s != clientPkg.Ok { + if s != clientPkg.Ok && value != "hello" + key { t.Errorf("Read test1 fail") } } From 5ff390ae99fb337c81b96e9b3635a65c4e1031a8 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Fri, 7 Mar 2025 13:59:22 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E5=B0=81=E8=A3=85=E4=BA=86=E5=8F=91?= =?UTF-8?q?=E9=80=81=E7=8A=B6=E6=80=81=E7=9A=84=E6=A8=A1=E6=8B=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 +- internal/client/client_node.go | 6 +++--- internal/nodes/init.go | 3 ++- internal/nodes/log.go | 11 +++++++++++ internal/nodes/node.go | 32 ++++++++++++++++++++++---------- internal/nodes/server_node.go | 12 ++++++------ test/restart_follower_test.go | 6 ++++-- test/server_client_test.go | 3 ++- 8 files changed, 51 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index b6d444b..7091e8e 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,4 @@ go.work .idea/* main -*/leveldb +leveldb diff --git a/internal/client/client_node.go b/internal/client/client_node.go index 345ed05..7eaf99d 100644 --- a/internal/client/client_node.go +++ b/internal/client/client_node.go @@ -24,8 +24,8 @@ const ( Fail ) -func (client *Client) Write(kv nodes.LogEntry) Status { - log.Info("client write request key :" + kv.Key) +func (client *Client) Write(kvCall nodes.LogEntryCall) Status { + log.Info("client write request key :" + kvCall.LogE.Key) c, err := rpc.DialHTTP("tcp", client.Address) if err != nil { log.Error("dialing: ", zap.Error(err)) @@ -40,7 +40,7 @@ func (client *Client) Write(kv nodes.LogEntry) Status { }(c) var reply nodes.ServerReply - callErr := c.Call("Node.WriteKV", kv, &reply) // RPC + callErr := c.Call("Node.WriteKV", kvCall, &reply) // RPC if callErr != nil { log.Error("dialing: ", zap.Error(callErr)) return Fail diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 4ef68fe..0a38451 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -82,7 +82,8 @@ func Start(node *Node, isLeader bool) { log.Info("send : logId = " + strconv.Itoa(logId) + ", key = " + input) // 广播给其它节点 - node.BroadCastKV(logId, kv) + kvCall := LogEntryCall{kv, Normal} + node.BroadCastKV(logId, kvCall) // 持久化 node.db.Put([]byte(kv.Key), []byte(kv.Value), nil) } diff --git a/internal/nodes/log.go b/internal/nodes/log.go index 95c67d0..3d13831 100644 --- a/internal/nodes/log.go +++ b/internal/nodes/log.go @@ -1,10 +1,21 @@ package nodes +const ( + Normal State = iota + 1 + Delay + Fail +) + type LogEntry struct { Key string Value string } +type LogEntryCall struct { + LogE LogEntry + CallState State +} + type KVReply struct { Reply bool } diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 1c99767..f1a17c8 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -1,8 +1,10 @@ package nodes import ( + "math/rand" "net/rpc" "strconv" + "time" "github.com/syndtr/goleveldb/leveldb" "go.uber.org/zap" @@ -38,22 +40,33 @@ type Node struct { log map[int]LogEntry // leader用来标记新log - maxLogId int + maxLogId int db *leveldb.DB } -func (node *Node) BroadCastKV(logId int, kv LogEntry) { +func (node *Node) BroadCastKV(logId int, kvCall LogEntryCall) { // 遍历所有节点 for id, _ := range node.nodes { - go func(id string, kv LogEntry) { + go func(id string, kv LogEntryCall) { var reply KVReply - node.sendKV(id, logId, kv, &reply) - }(id, kv) + node.sendKV(id, logId, kvCall, &reply) + }(id, kvCall) } } -func (node *Node) sendKV(id string, logId int, kv LogEntry, reply *KVReply) { +func (node *Node) sendKV(id string, logId int, kvCall LogEntryCall, reply *KVReply) { + switch kvCall.CallState { + case Fail: + log.Info("模拟发送失败") + // 这么写向所有的node发送都失败,也可以随机数确定是否失败 + case Delay: + log.Info("模拟发送延迟") + // 随机延迟0-5ms + time.Sleep(time.Millisecond * time.Duration(rand.Intn(5))) + default: + } + client, err := rpc.DialHTTP("tcp", node.nodes[id].address) if err != nil { log.Error("dialing: ", zap.Error(err)) @@ -67,16 +80,16 @@ func (node *Node) sendKV(id string, logId int, kv LogEntry, reply *KVReply) { } }(client) - arg := LogIdAndEntry{logId, kv} + arg := LogIdAndEntry{logId, kvCall.LogE} callErr := client.Call("Node.ReceiveKV", arg, reply) // RPC if callErr != nil { - log.Error("dialing node_" + id + "fail: ", zap.Error(callErr)) + log.Error("dialing node_"+id+"fail: ", zap.Error(callErr)) } } // RPC call func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error { - log.Info("node_" + node.selfId + " receive: logId = "+ strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key) + log.Info("node_" + node.selfId + " receive: logId = " + strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key) entry, ok := node.log[arg.LogId] if !ok { node.log[arg.LogId] = entry @@ -86,4 +99,3 @@ func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error { reply.Reply = true // rpc call需要有reply,但实际上调用是否成功是error返回值决定 return nil } - diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go index a7681a7..58eed40 100644 --- a/internal/nodes/server_node.go +++ b/internal/nodes/server_node.go @@ -13,15 +13,15 @@ type ServerReply struct{ Value string } // RPC call -func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error { +func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error { logId := node.maxLogId node.maxLogId++ - node.log[logId] = kv - // 广播给其它节点 - node.db.Put([]byte(kv.Key), []byte(kv.Value), nil) - log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kv.Key) - node.BroadCastKV(logId, kv) + node.log[logId] = kvCall.LogE + node.db.Put([]byte(kvCall.LogE.Key), []byte(kvCall.LogE.Value), nil) + log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kvCall.LogE.Key) + // 广播给其它节点 + node.BroadCastKV(logId, kvCall) reply.Isconnect = true return nil } diff --git a/test/restart_follower_test.go b/test/restart_follower_test.go index 4bbce54..323507d 100644 --- a/test/restart_follower_test.go +++ b/test/restart_follower_test.go @@ -46,7 +46,8 @@ func TestFollowerRestart(t *testing.T) { var s clientPkg.Status for i := 0; i < 5; i++ { key := strconv.Itoa(i) - s := cWrite.Write(nodes.LogEntry{Key: key, Value: "hello"}) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + s := cWrite.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal}) if s != clientPkg.Ok { t.Errorf("write test fail") } @@ -61,7 +62,8 @@ func TestFollowerRestart(t *testing.T) { // 继续写入 for i := 5; i < 10; i++ { key := strconv.Itoa(i) - s := cWrite.Write(nodes.LogEntry{Key: key, Value: "hello"}) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + s := cWrite.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal}) if s != clientPkg.Ok { t.Errorf("write test fail") } diff --git a/test/server_client_test.go b/test/server_client_test.go index 74599a5..ab106c3 100644 --- a/test/server_client_test.go +++ b/test/server_client_test.go @@ -46,7 +46,8 @@ func TestServerClient(t *testing.T) { var s clientPkg.Status for i := 0; i < 10; i++ { key := strconv.Itoa(i) - s := c.Write(nodes.LogEntry{Key: key, Value: "hello" + key}) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + s := c.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal}) if s != clientPkg.Ok { t.Errorf("write test fail") }