Browse Source

增加logId

pull/1/head
augurier 6 months ago
parent
commit
aa54845d65
7 changed files with 72 additions and 30 deletions
  1. +1
    -1
      README.md
  2. +1
    -1
      cmd/main.go
  3. +12
    -5
      internal/nodes/init.go
  4. +5
    -0
      internal/nodes/log.go
  5. +17
    -7
      internal/nodes/node.go
  6. +7
    -5
      internal/nodes/server_node.go
  7. +29
    -11
      test/server_client_test.go

+ 1
- 1
README.md View File

@ -7,7 +7,7 @@
# 环境与运行 # 环境与运行
使用环境是wsl+ubuntu 使用环境是wsl+ubuntu
go mod download安装依赖 go mod download安装依赖
./scripts/build.sh 会在根目录下编译出raftnode
./scripts/build.sh 会在根目录下编译出main
./scripts/run.sh 运行三个节点,目前能在终端进行读入,leader(n1)节点输出send log,其余节点输出receive log。终端输入后如果超时就退出(脚本运行时间可以在其中调整)。 ./scripts/run.sh 运行三个节点,目前能在终端进行读入,leader(n1)节点输出send log,其余节点输出receive log。终端输入后如果超时就退出(脚本运行时间可以在其中调整)。
# 注意 # 注意

+ 1
- 1
cmd/main.go View File

@ -59,6 +59,6 @@ func main() {
nodes.Start(node, *isLeader) nodes.Start(node, *isLeader)
sig := <-sigs sig := <-sigs
fmt.Println("接收到信号:", sig)
fmt.Println("node_" + *id + "接收到信号:", sig)
} }

+ 12
- 5
internal/nodes/init.go View File

@ -7,6 +7,7 @@ import (
"net/rpc" "net/rpc"
"os" "os"
"simple-kv-store/internal/logprovider" "simple-kv-store/internal/logprovider"
"strconv"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
@ -32,6 +33,8 @@ func Init(id string, nodeAddr map[string]string, pipe string) *Node {
selfId: id, selfId: id,
nodes: ns, nodes: ns,
pipeAddr: pipe, pipeAddr: pipe,
maxLogId: 0,
log: make(map[int]LogEntry),
} }
} }
@ -68,13 +71,17 @@ func Start(node *Node, isLeader bool) {
log.Error("Error reading from pipe") log.Error("Error reading from pipe")
} }
if n > 0 { if n > 0 {
input := string(buffer[:n])
log.Info("send : " + input)
input := string(buffer[:n])
// 将用户输入封装成一个 LogEntry // 将用户输入封装成一个 LogEntry
kv := LogEntry{input, ""}
node.log = append(node.log, kv)
kv := LogEntry{input, ""} // 目前键盘输入key,value 0
logId := node.maxLogId
node.maxLogId++
node.log[logId] = kv
log.Info("send : logId = " + strconv.Itoa(logId) + ", key = " + input)
// 广播给其它节点 // 广播给其它节点
node.BroadCastKV(kv)
node.BroadCastKV(logId, kv)
// 持久化
} }
} }
} }

+ 5
- 0
internal/nodes/log.go View File

@ -7,4 +7,9 @@ type LogEntry struct {
type KVReply struct { type KVReply struct {
Reply bool Reply bool
}
type LogIdAndEntry struct {
LogId int
Entry LogEntry
} }

+ 17
- 7
internal/nodes/node.go View File

@ -2,6 +2,7 @@ package nodes
import ( import (
"net/rpc" "net/rpc"
"strconv"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -33,20 +34,23 @@ type Node struct {
state State state State
// 简单的kv存储 // 简单的kv存储
log []LogEntry
log map[int]LogEntry
// leader用来标记新log
maxLogId int
} }
func (node *Node) BroadCastKV(kv LogEntry) {
func (node *Node) BroadCastKV(logId int, kv LogEntry) {
// 遍历所有节点 // 遍历所有节点
for id, _ := range node.nodes { for id, _ := range node.nodes {
go func(id string, kv LogEntry) { go func(id string, kv LogEntry) {
var reply KVReply var reply KVReply
node.sendKV(id, kv, &reply)
node.sendKV(id, logId, kv, &reply)
}(id, kv) }(id, kv)
} }
} }
func (node *Node) sendKV(id string, kv LogEntry, reply *KVReply) {
func (node *Node) sendKV(id string, logId int, kv LogEntry, reply *KVReply) {
client, err := rpc.DialHTTP("tcp", node.nodes[id].address) client, err := rpc.DialHTTP("tcp", node.nodes[id].address)
if err != nil { if err != nil {
log.Error("dialing: ", zap.Error(err)) log.Error("dialing: ", zap.Error(err))
@ -60,7 +64,8 @@ func (node *Node) sendKV(id string, kv LogEntry, reply *KVReply) {
} }
}(client) }(client)
callErr := client.Call("Node.ReceiveKV", kv, reply) // RPC
arg := LogIdAndEntry{logId, kv}
callErr := client.Call("Node.ReceiveKV", arg, reply) // RPC
if callErr != nil { if callErr != nil {
log.Error("dialing: ", zap.Error(callErr)) log.Error("dialing: ", zap.Error(callErr))
} }
@ -73,8 +78,13 @@ func (node *Node) sendKV(id string, kv LogEntry, reply *KVReply) {
} }
// RPC call // RPC call
func (node *Node) ReceiveKV(kv LogEntry, reply *KVReply) error {
log.Info("node_" + node.selfId + " receive: " + kv.Key)
func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error {
log.Info("node_" + node.selfId + " receive: logId = "+ strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key)
entry, ok := node.log[arg.LogId]
if !ok {
node.log[arg.LogId] = entry
}
// 持久化
reply.Reply = true reply.Reply = true
return nil return nil
} }

+ 7
- 5
internal/nodes/server_node.go View File

@ -1,8 +1,7 @@
package nodes package nodes
import (
import "strconv"
)
// leader node作为server为client注册的方法 // leader node作为server为client注册的方法
type ServerReply struct{ type ServerReply struct{
Isconnect bool Isconnect bool
@ -11,10 +10,13 @@ type ServerReply struct{
} }
// RPC call // RPC call
func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error { func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error {
log.Info("server write : " + kv.Key)
node.log = append(node.log, kv)
logId := node.maxLogId
node.maxLogId++
node.log[logId] = kv
// 广播给其它节点 // 广播给其它节点
node.BroadCastKV(kv)
log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kv.Key)
node.BroadCastKV(logId, kv)
reply.Isconnect = true reply.Isconnect = true
return nil return nil
} }

+ 29
- 11
test/server_client_test.go View File

@ -19,6 +19,7 @@ import (
var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel)
func TestServerClient(t *testing.T) { func TestServerClient(t *testing.T) {
// 登记结点信息
n := 5 n := 5
var clusters []string var clusters []string
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
@ -27,6 +28,7 @@ func TestServerClient(t *testing.T) {
clusters = append(clusters, addr) clusters = append(clusters, addr)
} }
// 结点启动
var cmds []*exec.Cmd var cmds []*exec.Cmd
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
tmpClusters := append(clusters[:i], clusters[i+1:]...) tmpClusters := append(clusters[:i], clusters[i+1:]...)
@ -57,25 +59,41 @@ func TestServerClient(t *testing.T) {
} }
} }
time.Sleep(time.Second)
time.Sleep(time.Second) // 等待启动完毕
// client启动 // client启动
c := clientPkg.Client{Address: "127.0.0.1:9090", ServerId: "1"} c := clientPkg.Client{Address: "127.0.0.1:9090", ServerId: "1"}
s := c.Write(nodes.LogEntry{Key: "1", Value: "hello"})
if s != clientPkg.Ok {
t.Errorf("write test fail")
// 写入
var s clientPkg.Status
for i := 0; i < 10; i++ {
key := strconv.Itoa(i)
s := c.Write(nodes.LogEntry{Key: key, Value: "hello"})
if s != clientPkg.Ok {
t.Errorf("write test fail")
}
} }
var value string
s = c.Read("1", &value)
if s != clientPkg.Ok {
t.Errorf("Read test1 fail")
// 读写入数据
for i := 0; i < 10; i++ {
key := strconv.Itoa(i)
var value string
s = c.Read(key, &value)
if s != clientPkg.Ok {
t.Errorf("Read test1 fail")
}
} }
s = c.Read("2", &value)
if s != clientPkg.NotFound {
t.Errorf("Read test2 fail")
// 读未写入数据
for i := 10; i < 15; i++ {
key := strconv.Itoa(i)
var value string
s = c.Read(key, &value)
if s != clientPkg.NotFound {
t.Errorf("Read test2 fail")
}
} }
// 通知进程结束
for _, cmd := range cmds { for _, cmd := range cmds {
err := cmd.Process.Signal(syscall.SIGTERM) err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil { if err != nil {

Loading…
Cancel
Save