From 99fb57518ec9e75f4fb2201c7640c5c55cbce16b Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Fri, 28 Feb 2025 15:49:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BE=9B=E4=BA=86=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E5=90=91leader=E5=8F=91=E9=80=81=E7=9A=84=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 +- README.md | 2 + cmd/main.go | 34 ++++++++++++--- internal/client/client_node.go | 94 ++++++++++++++++++++++++++++++++++++++++++ internal/nodes/init.go | 61 +++++++++++++-------------- internal/nodes/node.go | 23 ++++++----- internal/nodes/server_node.go | 37 +++++++++++++++++ scripts/build.sh | 2 +- scripts/run.sh | 8 ++-- test/server_client_test.go | 87 ++++++++++++++++++++++++++++++++++++++ 10 files changed, 298 insertions(+), 52 deletions(-) create mode 100644 internal/client/client_node.go create mode 100644 internal/nodes/server_node.go create mode 100644 test/server_client_test.go diff --git a/.gitignore b/.gitignore index 408abd6..3f067b7 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,4 @@ go.work .idea .idea/* -raftnode +main diff --git a/README.md b/README.md index 8f4d03d..6a667a3 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,8 @@ go mod download安装依赖 如果出现tcp listen error可能是因为之前的进程没用正常退出,占用了端口 lsof -i :9091查看pid kill -9 杀死进程 +## 关于测试 +通过新开进程的方式创建节点,如果通过线程创建,会出现重复注册rpc问题 # todo list 消息通讯异常的处理 diff --git a/cmd/main.go b/cmd/main.go index 255daec..ee36f99 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,11 +2,16 @@ package main import ( "flag" + "fmt" + "os" + "os/signal" "simple-kv-store/internal/logprovider" "simple-kv-store/internal/nodes" - "go.uber.org/zap" "strconv" "strings" + "syscall" + + "go.uber.org/zap" ) var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) @@ -18,23 +23,42 @@ func main() { } }() + // 设置一个通道来捕获中断信号 + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) + port := flag.String("port", ":9091", "rpc listen port") cluster := flag.String("cluster", "127.0.0.1:9092,127.0.0.1:9093", "comma sep") - id := flag.Int("id", 1, "node ID") + id := flag.String("id", "1", "node ID") pipe := flag.String("pipe", "", "input from scripts") isLeader := flag.Bool("isleader", false, "init node state") // 参数解析 flag.Parse() clusters := strings.Split(*cluster, ",") - node := nodes.Init(*id, clusters, *pipe) + idClusterPairs := make(map[string]string) + idCnt := 1 + selfi, err := strconv.Atoi(*id) + if err != nil { + log.Error("figure id only") + } + for _, addr := range clusters { + if idCnt == selfi { + idCnt++ // 命令行cluster按id排序传入,记录时跳过自己的id,先保证所有节点互相记录的id一致 + } + idClusterPairs[strconv.Itoa(idCnt)] = addr + idCnt++ + } + node := nodes.Init(*id, idClusterPairs, *pipe) - log.Info("id: " + strconv.Itoa(*id) + "节点开始监听: " + *port + "端口") + log.Info("id: " + *id + "节点开始监听: " + *port + "端口") // 监听rpc node.Rpc(*port) // 开启 raft nodes.Start(node, *isLeader) - select {} + sig := <-sigs + fmt.Println("接收到信号:", sig) + } diff --git a/internal/client/client_node.go b/internal/client/client_node.go new file mode 100644 index 0000000..345ed05 --- /dev/null +++ b/internal/client/client_node.go @@ -0,0 +1,94 @@ +package clientPkg + +import ( + "net/rpc" + "simple-kv-store/internal/logprovider" + "simple-kv-store/internal/nodes" + + "go.uber.org/zap" +) + +var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) + +type Client struct { + // 连接的server端节点(node1) + ServerId string + Address string +} + +type Status = uint8 + +const ( + Ok Status = iota + 1 + NotFound + Fail +) + +func (client *Client) Write(kv nodes.LogEntry) Status { + log.Info("client write request key :" + kv.Key) + c, err := rpc.DialHTTP("tcp", client.Address) + if err != nil { + log.Error("dialing: ", zap.Error(err)) + return Fail + } + + defer func(server *rpc.Client) { + err := c.Close() + if err != nil { + log.Error("client close err: ", zap.Error(err)) + } + }(c) + + var reply nodes.ServerReply + callErr := c.Call("Node.WriteKV", kv, &reply) // RPC + if callErr != nil { + log.Error("dialing: ", zap.Error(callErr)) + return Fail + } + + if reply.Isconnect { // 发送成功 + return Ok + } else { // 失败 + return Fail + } +} + +func (client *Client) Read(key string, value *string) Status { // 查不到value为空 + log.Info("client read request key :" + key) + if value == nil { + return Fail + } + + c, err := rpc.DialHTTP("tcp", client.Address) + if err != nil { + log.Error("dialing: ", zap.Error(err)) + return Fail + } + + defer func(server *rpc.Client) { + err := c.Close() + if err != nil { + log.Error("client close err: ", zap.Error(err)) + } + }(c) + + var reply nodes.ServerReply + callErr := c.Call("Node.ReadKey", key, &reply) // RPC + if callErr != nil { + log.Error("dialing: ", zap.Error(callErr)) + return Fail + } + + if reply.Isconnect { // 发送成功 + if reply.HaveValue { + *value = reply.Value + return Ok + } else { + return NotFound + } + } else { // 失败 + return Fail + } +} + + diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 9c2117e..900247f 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -21,15 +21,15 @@ func newNode(address string) *Public_node_info { } } -func Init(id int, nodeAddr []string, pipe string) *Node { - ns := make(map[int]*Public_node_info) - for k, v := range nodeAddr { - ns[k] = newNode(v) +func Init(id string, nodeAddr map[string]string, pipe string) *Node { + ns := make(map[string]*Public_node_info) + for id, addr := range nodeAddr { + ns[id] = newNode(addr) } // 创建节点 return &Node{ - self: id, + selfId: id, nodes: ns, pipeAddr: pipe, } @@ -51,31 +51,31 @@ func Start(node *Node, isLeader bool) { // candidate发布一个监听输入线程后,变成leader node.state = Leader go func() { - if node.pipeAddr == "" { - log.Error("暂不支持非管道读入") - } - - pipe, err := os.Open(node.pipeAddr) - if err != nil { - log.Error("Failed to open pipe") - } - defer pipe.Close() - - // 不断读取管道中的输入 - buffer := make([]byte, 256) - for { - n, err := pipe.Read(buffer) - if err != nil && err != io.EOF { - log.Error("Error reading from pipe") + if node.pipeAddr == "" { // 客户端远程调用server_node方法 + log.Info("请运行客户端进程进行读写") + } else { // 命令行提供了管道,支持管道(键盘)输入 + pipe, err := os.Open(node.pipeAddr) + if err != nil { + log.Error("Failed to open pipe") } - if n > 0 { - input := string(buffer[:n]) - log.Info("send : " + input) - // 将用户输入封装成一个 LogEntry - kv := LogEntry{input, ""} - node.log = append(node.log, kv) - // 广播给其它节点 - node.BroadCastKV(kv) + defer pipe.Close() + + // 不断读取管道中的输入 + buffer := make([]byte, 256) + for { + n, err := pipe.Read(buffer) + if err != nil && err != io.EOF { + log.Error("Error reading from pipe") + } + if n > 0 { + input := string(buffer[:n]) + log.Info("send : " + input) + // 将用户输入封装成一个 LogEntry + kv := LogEntry{input, ""} + node.log = append(node.log, kv) + // 广播给其它节点 + node.BroadCastKV(kv) + } } } }() @@ -92,9 +92,10 @@ func (node *Node) Rpc(port string) { log.Fatal("rpc register failed", zap.Error(err)) } rpc.HandleHTTP() + l, e := net.Listen("tcp", port) if e != nil { - log.Fatal("listen error:", zap.Error(err)) + log.Fatal("listen error:", zap.Error(e)) } go func() { err := http.Serve(l, nil) diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 504b212..08ebb68 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -2,6 +2,7 @@ package nodes import ( "net/rpc" + "go.uber.org/zap" ) @@ -20,10 +21,10 @@ type Public_node_info struct { type Node struct { // 当前节点id - self int + selfId string // 除当前节点外其他节点信息 - nodes map[int]*Public_node_info + nodes map[string]*Public_node_info //管道名 pipeAddr string @@ -37,16 +38,16 @@ type Node struct { func (node *Node) BroadCastKV(kv LogEntry) { // 遍历所有节点 - for i := range node.nodes { - go func (index int, kv LogEntry) { + for id, _ := range node.nodes { + go func(id string, kv LogEntry) { var reply KVReply - node.sendKV(index, kv, &reply) - } (i, kv) + node.sendKV(id, kv, &reply) + }(id, kv) } } -func (node *Node) sendKV(index int, kv LogEntry, reply *KVReply) { - client, err := rpc.DialHTTP("tcp", node.nodes[index].address) +func (node *Node) sendKV(id string, kv LogEntry, reply *KVReply) { + client, err := rpc.DialHTTP("tcp", node.nodes[id].address) if err != nil { log.Error("dialing: ", zap.Error(err)) return @@ -73,10 +74,11 @@ func (node *Node) sendKV(index int, kv LogEntry, reply *KVReply) { // RPC call func (node *Node) ReceiveKV(kv LogEntry, reply *KVReply) error { - log.Info("receive: " + kv.Key) - reply.Reply = true; + log.Info("node_" + node.selfId + " receive: " + kv.Key) + reply.Reply = true return nil } + // func (node *Node) broadcastHeartbeat() { // // 遍历所有节点 // for i := range raft.nodes { @@ -104,4 +106,3 @@ func (node *Node) ReceiveKV(kv LogEntry, reply *KVReply) error { // }(i, hb) // } // } - diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go new file mode 100644 index 0000000..a8b1cc7 --- /dev/null +++ b/internal/nodes/server_node.go @@ -0,0 +1,37 @@ +package nodes + +import ( + +) +// leader node作为server为client注册的方法 +type ServerReply struct{ + Isconnect bool + HaveValue bool + Value string +} +// RPC call +func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error { + log.Info("server write : " + kv.Key) + node.log = append(node.log, kv) + // 广播给其它节点 + node.BroadCastKV(kv) + reply.Isconnect = true + return nil +} +// RPC call +func (node *Node) ReadKey(key string, reply *ServerReply) error { + log.Info("server read : " + key) + // 先只读leader自己 + for _, kv := range node.log { + if kv.Key == key { + reply.Value = kv.Value + reply.HaveValue = true + reply.Isconnect = true + return nil + } + } + reply.HaveValue = false + reply.Isconnect = true + return nil +} + diff --git a/scripts/build.sh b/scripts/build.sh index f682f82..d7acf22 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -1 +1 @@ -go build -o raftnode ./cmd/main.go \ No newline at end of file +go build -o main ./cmd/main.go \ No newline at end of file diff --git a/scripts/run.sh b/scripts/run.sh index 4c250de..794c247 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -4,19 +4,19 @@ RUN_TIME=10 # 需要传递数据的管道 -PIPE_NAME="/tmp/raft_input_pipe" +PIPE_NAME="/tmp/input_pipe" # 启动节点1 echo "Starting Node 1..." -timeout $RUN_TIME ./raftnode -id 1 -port ":9091" -cluster "127.0.0.1:9092,127.0.0.1:9093" -pipe "$PIPE_NAME" -isleader=true & +timeout $RUN_TIME ./main -id 1 -port ":9091" -cluster "127.0.0.1:9092,127.0.0.1:9093" -pipe "$PIPE_NAME" -isleader=true & # 启动节点2 echo "Starting Node 2..." -timeout $RUN_TIME ./raftnode -id 2 -port ":9092" -cluster "127.0.0.1:9091,127.0.0.1:9093" -pipe "$PIPE_NAME" & +timeout $RUN_TIME ./main -id 2 -port ":9092" -cluster "127.0.0.1:9091,127.0.0.1:9093" -pipe "$PIPE_NAME" & # 启动节点3 echo "Starting Node 3..." -timeout $RUN_TIME ./raftnode -id 3 -port ":9093" -cluster "127.0.0.1:9091,127.0.0.1:9092" -pipe "$PIPE_NAME"& +timeout $RUN_TIME ./main -id 3 -port ":9093" -cluster "127.0.0.1:9091,127.0.0.1:9092" -pipe "$PIPE_NAME"& echo "All nodes started successfully!" # 创建一个管道用于进程间通信 diff --git a/test/server_client_test.go b/test/server_client_test.go new file mode 100644 index 0000000..0d0ef10 --- /dev/null +++ b/test/server_client_test.go @@ -0,0 +1,87 @@ +package test + +import ( + "fmt" + "os" + "os/exec" + "simple-kv-store/internal/client" + "simple-kv-store/internal/logprovider" + "simple-kv-store/internal/nodes" + "strconv" + "strings" + "syscall" + "testing" + "time" + + "go.uber.org/zap" +) + +var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) + +func TestServerClient(t *testing.T) { + n := 5 + var clusters []string + for i := 0; i < n; i++ { + port := fmt.Sprintf("%d", uint16(9090)+uint16(i)) + addr := "127.0.0.1:" + port + clusters = append(clusters, addr) + } + + var cmds []*exec.Cmd + for i := 0; i < n; i++ { + tmpClusters := append(clusters[:i], clusters[i+1:]...) + port := fmt.Sprintf(":%d", uint16(9090)+uint16(i)) + + var isleader string + if i == 0 { + isleader = "true" + } else { + isleader = "false" + } + cmd := exec.Command( + "../main", + "-id", strconv.Itoa(i + 1), + "-port", port, + "-cluster", strings.Join(tmpClusters, ","), + "-isleader=" + isleader, + ) + cmds = append(cmds, cmd) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // 执行命令 + err := cmd.Start() + if err != nil { + fmt.Println("启动进程出错:", err) + return + } + } + + time.Sleep(time.Second) + // client启动 + c := clientPkg.Client{Address: "127.0.0.1:9090", ServerId: "1"} + s := c.Write(nodes.LogEntry{Key: "1", Value: "hello"}) + if s != clientPkg.Ok { + t.Errorf("write test fail") + } + + var value string + s = c.Read("1", &value) + if s != clientPkg.Ok { + t.Errorf("Read test1 fail") + } + + s = c.Read("2", &value) + if s != clientPkg.NotFound { + t.Errorf("Read test2 fail") + } + + for _, cmd := range cmds { + err := cmd.Process.Signal(syscall.SIGTERM) + if err != nil { + fmt.Println("Error sending signal:", err) + return + } + } + +}