Browse Source

提供了客户端向leader发送的功能

pull/1/head
augurier 6 months ago
parent
commit
99fb57518e
10 changed files with 298 additions and 52 deletions
  1. +1
    -1
      .gitignore
  2. +2
    -0
      README.md
  3. +29
    -5
      cmd/main.go
  4. +94
    -0
      internal/client/client_node.go
  5. +31
    -30
      internal/nodes/init.go
  6. +12
    -11
      internal/nodes/node.go
  7. +37
    -0
      internal/nodes/server_node.go
  8. +1
    -1
      scripts/build.sh
  9. +4
    -4
      scripts/run.sh
  10. +87
    -0
      test/server_client_test.go

+ 1
- 1
.gitignore View File

@ -24,4 +24,4 @@ go.work
.idea
.idea/*
raftnode
main

+ 2
- 0
README.md View File

@ -15,6 +15,8 @@ go mod download安装依赖
如果出现tcp listen error可能是因为之前的进程没用正常退出,占用了端口
lsof -i :9091查看pid
kill -9 <pid>杀死进程
## 关于测试
通过新开进程的方式创建节点,如果通过线程创建,会出现重复注册rpc问题
# todo list
消息通讯异常的处理

+ 29
- 5
cmd/main.go View File

@ -2,11 +2,16 @@ package main
import (
"flag"
"fmt"
"os"
"os/signal"
"simple-kv-store/internal/logprovider"
"simple-kv-store/internal/nodes"
"go.uber.org/zap"
"strconv"
"strings"
"syscall"
"go.uber.org/zap"
)
var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel)
@ -18,23 +23,42 @@ func main() {
}
}()
// 设置一个通道来捕获中断信号
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
port := flag.String("port", ":9091", "rpc listen port")
cluster := flag.String("cluster", "127.0.0.1:9092,127.0.0.1:9093", "comma sep")
id := flag.Int("id", 1, "node ID")
id := flag.String("id", "1", "node ID")
pipe := flag.String("pipe", "", "input from scripts")
isLeader := flag.Bool("isleader", false, "init node state")
// 参数解析
flag.Parse()
clusters := strings.Split(*cluster, ",")
node := nodes.Init(*id, clusters, *pipe)
idClusterPairs := make(map[string]string)
idCnt := 1
selfi, err := strconv.Atoi(*id)
if err != nil {
log.Error("figure id only")
}
for _, addr := range clusters {
if idCnt == selfi {
idCnt++ // 命令行cluster按id排序传入,记录时跳过自己的id,先保证所有节点互相记录的id一致
}
idClusterPairs[strconv.Itoa(idCnt)] = addr
idCnt++
}
node := nodes.Init(*id, idClusterPairs, *pipe)
log.Info("id: " + strconv.Itoa(*id) + "节点开始监听: " + *port + "端口")
log.Info("id: " + *id + "节点开始监听: " + *port + "端口")
// 监听rpc
node.Rpc(*port)
// 开启 raft
nodes.Start(node, *isLeader)
select {}
sig := <-sigs
fmt.Println("接收到信号:", sig)
}

+ 94
- 0
internal/client/client_node.go View File

@ -0,0 +1,94 @@
package clientPkg
import (
"net/rpc"
"simple-kv-store/internal/logprovider"
"simple-kv-store/internal/nodes"
"go.uber.org/zap"
)
var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel)
type Client struct {
// 连接的server端节点(node1)
ServerId string
Address string
}
type Status = uint8
const (
Ok Status = iota + 1
NotFound
Fail
)
func (client *Client) Write(kv nodes.LogEntry) Status {
log.Info("client write request key :" + kv.Key)
c, err := rpc.DialHTTP("tcp", client.Address)
if err != nil {
log.Error("dialing: ", zap.Error(err))
return Fail
}
defer func(server *rpc.Client) {
err := c.Close()
if err != nil {
log.Error("client close err: ", zap.Error(err))
}
}(c)
var reply nodes.ServerReply
callErr := c.Call("Node.WriteKV", kv, &reply) // RPC
if callErr != nil {
log.Error("dialing: ", zap.Error(callErr))
return Fail
}
if reply.Isconnect { // 发送成功
return Ok
} else { // 失败
return Fail
}
}
func (client *Client) Read(key string, value *string) Status { // 查不到value为空
log.Info("client read request key :" + key)
if value == nil {
return Fail
}
c, err := rpc.DialHTTP("tcp", client.Address)
if err != nil {
log.Error("dialing: ", zap.Error(err))
return Fail
}
defer func(server *rpc.Client) {
err := c.Close()
if err != nil {
log.Error("client close err: ", zap.Error(err))
}
}(c)
var reply nodes.ServerReply
callErr := c.Call("Node.ReadKey", key, &reply) // RPC
if callErr != nil {
log.Error("dialing: ", zap.Error(callErr))
return Fail
}
if reply.Isconnect { // 发送成功
if reply.HaveValue {
*value = reply.Value
return Ok
} else {
return NotFound
}
} else { // 失败
return Fail
}
}

+ 31
- 30
internal/nodes/init.go View File

@ -21,15 +21,15 @@ func newNode(address string) *Public_node_info {
}
}
func Init(id int, nodeAddr []string, pipe string) *Node {
ns := make(map[int]*Public_node_info)
for k, v := range nodeAddr {
ns[k] = newNode(v)
func Init(id string, nodeAddr map[string]string, pipe string) *Node {
ns := make(map[string]*Public_node_info)
for id, addr := range nodeAddr {
ns[id] = newNode(addr)
}
// 创建节点
return &Node{
self: id,
selfId: id,
nodes: ns,
pipeAddr: pipe,
}
@ -51,31 +51,31 @@ func Start(node *Node, isLeader bool) {
// candidate发布一个监听输入线程后,变成leader
node.state = Leader
go func() {
if node.pipeAddr == "" {
log.Error("暂不支持非管道读入")
}
pipe, err := os.Open(node.pipeAddr)
if err != nil {
log.Error("Failed to open pipe")
}
defer pipe.Close()
// 不断读取管道中的输入
buffer := make([]byte, 256)
for {
n, err := pipe.Read(buffer)
if err != nil && err != io.EOF {
log.Error("Error reading from pipe")
if node.pipeAddr == "" { // 客户端远程调用server_node方法
log.Info("请运行客户端进程进行读写")
} else { // 命令行提供了管道,支持管道(键盘)输入
pipe, err := os.Open(node.pipeAddr)
if err != nil {
log.Error("Failed to open pipe")
}
if n > 0 {
input := string(buffer[:n])
log.Info("send : " + input)
// 将用户输入封装成一个 LogEntry
kv := LogEntry{input, ""}
node.log = append(node.log, kv)
// 广播给其它节点
node.BroadCastKV(kv)
defer pipe.Close()
// 不断读取管道中的输入
buffer := make([]byte, 256)
for {
n, err := pipe.Read(buffer)
if err != nil && err != io.EOF {
log.Error("Error reading from pipe")
}
if n > 0 {
input := string(buffer[:n])
log.Info("send : " + input)
// 将用户输入封装成一个 LogEntry
kv := LogEntry{input, ""}
node.log = append(node.log, kv)
// 广播给其它节点
node.BroadCastKV(kv)
}
}
}
}()
@ -92,9 +92,10 @@ func (node *Node) Rpc(port string) {
log.Fatal("rpc register failed", zap.Error(err))
}
rpc.HandleHTTP()
l, e := net.Listen("tcp", port)
if e != nil {
log.Fatal("listen error:", zap.Error(err))
log.Fatal("listen error:", zap.Error(e))
}
go func() {
err := http.Serve(l, nil)

+ 12
- 11
internal/nodes/node.go View File

@ -2,6 +2,7 @@ package nodes
import (
"net/rpc"
"go.uber.org/zap"
)
@ -20,10 +21,10 @@ type Public_node_info struct {
type Node struct {
// 当前节点id
self int
selfId string
// 除当前节点外其他节点信息
nodes map[int]*Public_node_info
nodes map[string]*Public_node_info
//管道名
pipeAddr string
@ -37,16 +38,16 @@ type Node struct {
func (node *Node) BroadCastKV(kv LogEntry) {
// 遍历所有节点
for i := range node.nodes {
go func (index int, kv LogEntry) {
for id, _ := range node.nodes {
go func(id string, kv LogEntry) {
var reply KVReply
node.sendKV(index, kv, &reply)
} (i, kv)
node.sendKV(id, kv, &reply)
}(id, kv)
}
}
func (node *Node) sendKV(index int, kv LogEntry, reply *KVReply) {
client, err := rpc.DialHTTP("tcp", node.nodes[index].address)
func (node *Node) sendKV(id string, kv LogEntry, reply *KVReply) {
client, err := rpc.DialHTTP("tcp", node.nodes[id].address)
if err != nil {
log.Error("dialing: ", zap.Error(err))
return
@ -73,10 +74,11 @@ func (node *Node) sendKV(index int, kv LogEntry, reply *KVReply) {
// RPC call
func (node *Node) ReceiveKV(kv LogEntry, reply *KVReply) error {
log.Info("receive: " + kv.Key)
reply.Reply = true;
log.Info("node_" + node.selfId + " receive: " + kv.Key)
reply.Reply = true
return nil
}
// func (node *Node) broadcastHeartbeat() {
// // 遍历所有节点
// for i := range raft.nodes {
@ -104,4 +106,3 @@ func (node *Node) ReceiveKV(kv LogEntry, reply *KVReply) error {
// }(i, hb)
// }
// }

+ 37
- 0
internal/nodes/server_node.go View File

@ -0,0 +1,37 @@
package nodes
import (
)
// leader node作为server为client注册的方法
type ServerReply struct{
Isconnect bool
HaveValue bool
Value string
}
// RPC call
func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error {
log.Info("server write : " + kv.Key)
node.log = append(node.log, kv)
// 广播给其它节点
node.BroadCastKV(kv)
reply.Isconnect = true
return nil
}
// RPC call
func (node *Node) ReadKey(key string, reply *ServerReply) error {
log.Info("server read : " + key)
// 先只读leader自己
for _, kv := range node.log {
if kv.Key == key {
reply.Value = kv.Value
reply.HaveValue = true
reply.Isconnect = true
return nil
}
}
reply.HaveValue = false
reply.Isconnect = true
return nil
}

+ 1
- 1
scripts/build.sh View File

@ -1 +1 @@
go build -o raftnode ./cmd/main.go
go build -o main ./cmd/main.go

+ 4
- 4
scripts/run.sh View File

@ -4,19 +4,19 @@
RUN_TIME=10
# 需要传递数据的管道
PIPE_NAME="/tmp/raft_input_pipe"
PIPE_NAME="/tmp/input_pipe"
# 启动节点1
echo "Starting Node 1..."
timeout $RUN_TIME ./raftnode -id 1 -port ":9091" -cluster "127.0.0.1:9092,127.0.0.1:9093" -pipe "$PIPE_NAME" -isleader=true &
timeout $RUN_TIME ./main -id 1 -port ":9091" -cluster "127.0.0.1:9092,127.0.0.1:9093" -pipe "$PIPE_NAME" -isleader=true &
# 启动节点2
echo "Starting Node 2..."
timeout $RUN_TIME ./raftnode -id 2 -port ":9092" -cluster "127.0.0.1:9091,127.0.0.1:9093" -pipe "$PIPE_NAME" &
timeout $RUN_TIME ./main -id 2 -port ":9092" -cluster "127.0.0.1:9091,127.0.0.1:9093" -pipe "$PIPE_NAME" &
# 启动节点3
echo "Starting Node 3..."
timeout $RUN_TIME ./raftnode -id 3 -port ":9093" -cluster "127.0.0.1:9091,127.0.0.1:9092" -pipe "$PIPE_NAME"&
timeout $RUN_TIME ./main -id 3 -port ":9093" -cluster "127.0.0.1:9091,127.0.0.1:9092" -pipe "$PIPE_NAME"&
echo "All nodes started successfully!"
# 创建一个管道用于进程间通信

+ 87
- 0
test/server_client_test.go View File

@ -0,0 +1,87 @@
package test
import (
"fmt"
"os"
"os/exec"
"simple-kv-store/internal/client"
"simple-kv-store/internal/logprovider"
"simple-kv-store/internal/nodes"
"strconv"
"strings"
"syscall"
"testing"
"time"
"go.uber.org/zap"
)
var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel)
func TestServerClient(t *testing.T) {
n := 5
var clusters []string
for i := 0; i < n; i++ {
port := fmt.Sprintf("%d", uint16(9090)+uint16(i))
addr := "127.0.0.1:" + port
clusters = append(clusters, addr)
}
var cmds []*exec.Cmd
for i := 0; i < n; i++ {
tmpClusters := append(clusters[:i], clusters[i+1:]...)
port := fmt.Sprintf(":%d", uint16(9090)+uint16(i))
var isleader string
if i == 0 {
isleader = "true"
} else {
isleader = "false"
}
cmd := exec.Command(
"../main",
"-id", strconv.Itoa(i + 1),
"-port", port,
"-cluster", strings.Join(tmpClusters, ","),
"-isleader=" + isleader,
)
cmds = append(cmds, cmd)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// 执行命令
err := cmd.Start()
if err != nil {
fmt.Println("启动进程出错:", err)
return
}
}
time.Sleep(time.Second)
// client启动
c := clientPkg.Client{Address: "127.0.0.1:9090", ServerId: "1"}
s := c.Write(nodes.LogEntry{Key: "1", Value: "hello"})
if s != clientPkg.Ok {
t.Errorf("write test fail")
}
var value string
s = c.Read("1", &value)
if s != clientPkg.Ok {
t.Errorf("Read test1 fail")
}
s = c.Read("2", &value)
if s != clientPkg.NotFound {
t.Errorf("Read test2 fail")
}
for _, cmd := range cmds {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
fmt.Println("Error sending signal:", err)
return
}
}
}

Loading…
Cancel
Save