#1 ld

Merged
李度 merged 4 commits from ld into master 6 months ago
  1. +2
    -1
      .gitignore
  2. +3
    -1
      README.md
  3. +49
    -5
      cmd/main.go
  4. +2
    -0
      go.mod
  5. +18
    -0
      go.sum
  6. +94
    -0
      internal/client/client_node.go
  7. +42
    -30
      internal/nodes/init.go
  8. +16
    -0
      internal/nodes/log.go
  9. +43
    -49
      internal/nodes/node.go
  10. +42
    -0
      internal/nodes/server_node.go
  11. +1
    -1
      scripts/build.sh
  12. +4
    -4
      scripts/run.sh
  13. +45
    -0
      test/common.go
  14. +112
    -0
      test/restart_follower_test.go
  15. +85
    -0
      test/server_client_test.go

+ 2
- 1
.gitignore View File

@ -24,4 +24,5 @@ go.work
.idea .idea
.idea/* .idea/*
raftnode
main
leveldb

+ 3
- 1
README.md View File

@ -7,7 +7,7 @@
# 环境与运行 # 环境与运行
使用环境是wsl+ubuntu 使用环境是wsl+ubuntu
go mod download安装依赖 go mod download安装依赖
./scripts/build.sh 会在根目录下编译出raftnode
./scripts/build.sh 会在根目录下编译出main
./scripts/run.sh 运行三个节点,目前能在终端进行读入,leader(n1)节点输出send log,其余节点输出receive log。终端输入后如果超时就退出(脚本运行时间可以在其中调整)。 ./scripts/run.sh 运行三个节点,目前能在终端进行读入,leader(n1)节点输出send log,其余节点输出receive log。终端输入后如果超时就退出(脚本运行时间可以在其中调整)。
# 注意 # 注意
@ -15,6 +15,8 @@ go mod download安装依赖
如果出现tcp listen error可能是因为之前的进程没用正常退出,占用了端口 如果出现tcp listen error可能是因为之前的进程没用正常退出,占用了端口
lsof -i :9091查看pid lsof -i :9091查看pid
kill -9 <pid>杀死进程 kill -9 <pid>杀死进程
## 关于测试
通过新开进程的方式创建节点,如果通过线程创建,会出现重复注册rpc问题
# todo list # todo list
消息通讯异常的处理 消息通讯异常的处理

+ 49
- 5
cmd/main.go View File

@ -2,11 +2,17 @@ package main
import ( import (
"flag" "flag"
"fmt"
"os"
"os/signal"
"simple-kv-store/internal/logprovider" "simple-kv-store/internal/logprovider"
"simple-kv-store/internal/nodes" "simple-kv-store/internal/nodes"
"go.uber.org/zap"
"strconv" "strconv"
"strings" "strings"
"syscall"
"github.com/syndtr/goleveldb/leveldb"
"go.uber.org/zap"
) )
var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel)
@ -18,23 +24,61 @@ func main() {
} }
}() }()
// 设置一个通道来捕获中断信号
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
port := flag.String("port", ":9091", "rpc listen port") port := flag.String("port", ":9091", "rpc listen port")
cluster := flag.String("cluster", "127.0.0.1:9092,127.0.0.1:9093", "comma sep") cluster := flag.String("cluster", "127.0.0.1:9092,127.0.0.1:9093", "comma sep")
id := flag.Int("id", 1, "node ID")
id := flag.String("id", "1", "node ID")
pipe := flag.String("pipe", "", "input from scripts") pipe := flag.String("pipe", "", "input from scripts")
isLeader := flag.Bool("isleader", false, "init node state") isLeader := flag.Bool("isleader", false, "init node state")
isNewDb := flag.Bool("isNewDb", true, "new test or restart")
// 参数解析 // 参数解析
flag.Parse() flag.Parse()
clusters := strings.Split(*cluster, ",") clusters := strings.Split(*cluster, ",")
node := nodes.Init(*id, clusters, *pipe)
idClusterPairs := make(map[string]string)
idCnt := 1
selfi, err := strconv.Atoi(*id)
if err != nil {
log.Error("figure id only")
}
for _, addr := range clusters {
if idCnt == selfi {
idCnt++ // 命令行cluster按id排序传入,记录时跳过自己的id,先保证所有节点互相记录的id一致
}
idClusterPairs[strconv.Itoa(idCnt)] = addr
idCnt++
}
if *isNewDb {
os.RemoveAll("leveldb/simple-kv-store" + *id)
}
// 打开或创建每个结点自己的数据库
db, err := leveldb.OpenFile("leveldb/simple-kv-store" + *id, nil)
if err != nil {
log.Fatal("Failed to open database: ", zap.Error(err))
}
defer db.Close() // 确保数据库在使用完毕后关闭
iter := db.NewIterator(nil, nil)
defer iter.Release()
log.Info("id: " + strconv.Itoa(*id) + "节点开始监听: " + *port + "端口")
// 计数
count := 0
for iter.Next() {
count++
}
fmt.Printf(*id + "结点目前有数据:%d\n", count)
node := nodes.Init(*id, idClusterPairs, *pipe, db)
log.Info("id: " + *id + "节点开始监听: " + *port + "端口")
// 监听rpc // 监听rpc
node.Rpc(*port) node.Rpc(*port)
// 开启 raft // 开启 raft
nodes.Start(node, *isLeader) nodes.Start(node, *isLeader)
select {}
sig := <-sigs
fmt.Println("node_" + *id + "接收到信号:", sig)
} }

+ 2
- 0
go.mod View File

@ -5,6 +5,8 @@ go 1.20
require go.uber.org/zap v1.24.0 require go.uber.org/zap v1.24.0
require ( require (
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/syndtr/goleveldb v1.0.0 // indirect
go.uber.org/atomic v1.11.0 // indirect go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
) )

+ 18
- 0
go.sum View File

@ -1,8 +1,18 @@
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
@ -10,4 +20,12 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

+ 94
- 0
internal/client/client_node.go View File

@ -0,0 +1,94 @@
package clientPkg
import (
"net/rpc"
"simple-kv-store/internal/logprovider"
"simple-kv-store/internal/nodes"
"go.uber.org/zap"
)
var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel)
type Client struct {
// 连接的server端节点(node1)
ServerId string
Address string
}
type Status = uint8
const (
Ok Status = iota + 1
NotFound
Fail
)
func (client *Client) Write(kvCall nodes.LogEntryCall) Status {
log.Info("client write request key :" + kvCall.LogE.Key)
c, err := rpc.DialHTTP("tcp", client.Address)
if err != nil {
log.Error("dialing: ", zap.Error(err))
return Fail
}
defer func(server *rpc.Client) {
err := c.Close()
if err != nil {
log.Error("client close err: ", zap.Error(err))
}
}(c)
var reply nodes.ServerReply
callErr := c.Call("Node.WriteKV", kvCall, &reply) // RPC
if callErr != nil {
log.Error("dialing: ", zap.Error(callErr))
return Fail
}
if reply.Isconnect { // 发送成功
return Ok
} else { // 失败
return Fail
}
}
func (client *Client) Read(key string, value *string) Status { // 查不到value为空
log.Info("client read request key :" + key)
if value == nil {
return Fail
}
c, err := rpc.DialHTTP("tcp", client.Address)
if err != nil {
log.Error("dialing: ", zap.Error(err))
return Fail
}
defer func(server *rpc.Client) {
err := c.Close()
if err != nil {
log.Error("client close err: ", zap.Error(err))
}
}(c)
var reply nodes.ServerReply
callErr := c.Call("Node.ReadKey", key, &reply) // RPC
if callErr != nil {
log.Error("dialing: ", zap.Error(callErr))
return Fail
}
if reply.Isconnect { // 发送成功
if reply.HaveValue {
*value = reply.Value
return Ok
} else {
return NotFound
}
} else { // 失败
return Fail
}
}

+ 42
- 30
internal/nodes/init.go View File

@ -7,8 +7,10 @@ import (
"net/rpc" "net/rpc"
"os" "os"
"simple-kv-store/internal/logprovider" "simple-kv-store/internal/logprovider"
"strconv"
"time" "time"
"github.com/syndtr/goleveldb/leveldb"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -21,17 +23,20 @@ func newNode(address string) *Public_node_info {
} }
} }
func Init(id int, nodeAddr []string, pipe string) *Node {
ns := make(map[int]*Public_node_info)
for k, v := range nodeAddr {
ns[k] = newNode(v)
func Init(id string, nodeAddr map[string]string, pipe string, db *leveldb.DB) *Node {
ns := make(map[string]*Public_node_info)
for id, addr := range nodeAddr {
ns[id] = newNode(addr)
} }
// 创建节点 // 创建节点
return &Node{ return &Node{
self: id,
nodes: ns,
selfId: id,
nodes: ns,
pipeAddr: pipe, pipeAddr: pipe,
maxLogId: 0,
log: make(map[int]LogEntry),
db: db,
} }
} }
@ -51,31 +56,37 @@ func Start(node *Node, isLeader bool) {
// candidate发布一个监听输入线程后,变成leader // candidate发布一个监听输入线程后,变成leader
node.state = Leader node.state = Leader
go func() { go func() {
if node.pipeAddr == "" {
log.Error("暂不支持非管道读入")
}
if node.pipeAddr == "" { // 客户端远程调用server_node方法
log.Info("请运行客户端进程进行读写")
} else { // 命令行提供了管道,支持管道(键盘)输入
pipe, err := os.Open(node.pipeAddr)
if err != nil {
log.Error("Failed to open pipe")
}
defer pipe.Close()
pipe, err := os.Open(node.pipeAddr)
if err != nil {
log.Error("Failed to open pipe")
}
defer pipe.Close()
// 不断读取管道中的输入
buffer := make([]byte, 256)
for {
n, err := pipe.Read(buffer)
if err != nil && err != io.EOF {
log.Error("Error reading from pipe")
}
if n > 0 {
input := string(buffer[:n])
// 将用户输入封装成一个 LogEntry
kv := LogEntry{input, ""} // 目前键盘输入key,value 0
logId := node.maxLogId
node.maxLogId++
node.log[logId] = kv
// 不断读取管道中的输入
buffer := make([]byte, 256)
for {
n, err := pipe.Read(buffer)
if err != nil && err != io.EOF {
log.Error("Error reading from pipe")
}
if n > 0 {
input := string(buffer[:n])
log.Info("send : " + input)
// 将用户输入封装成一个 LogEntry
kv := LogEntry{input, ""}
node.log = append(node.log, kv)
// 广播给其它节点
node.BroadCastKV(kv)
log.Info("send : logId = " + strconv.Itoa(logId) + ", key = " + input)
// 广播给其它节点
kvCall := LogEntryCall{kv, Normal}
node.BroadCastKV(logId, kvCall)
// 持久化
node.db.Put([]byte(kv.Key), []byte(kv.Value), nil)
}
} }
} }
}() }()
@ -92,9 +103,10 @@ func (node *Node) Rpc(port string) {
log.Fatal("rpc register failed", zap.Error(err)) log.Fatal("rpc register failed", zap.Error(err))
} }
rpc.HandleHTTP() rpc.HandleHTTP()
l, e := net.Listen("tcp", port) l, e := net.Listen("tcp", port)
if e != nil { if e != nil {
log.Fatal("listen error:", zap.Error(err))
log.Fatal("listen error:", zap.Error(e))
} }
go func() { go func() {
err := http.Serve(l, nil) err := http.Serve(l, nil)

+ 16
- 0
internal/nodes/log.go View File

@ -1,10 +1,26 @@
package nodes package nodes
const (
Normal State = iota + 1
Delay
Fail
)
type LogEntry struct { type LogEntry struct {
Key string Key string
Value string Value string
} }
type LogEntryCall struct {
LogE LogEntry
CallState State
}
type KVReply struct { type KVReply struct {
Reply bool Reply bool
} }
type LogIdAndEntry struct {
LogId int
Entry LogEntry
}

+ 43
- 49
internal/nodes/node.go View File

@ -1,7 +1,12 @@
package nodes package nodes
import ( import (
"math/rand"
"net/rpc" "net/rpc"
"strconv"
"time"
"github.com/syndtr/goleveldb/leveldb"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -20,10 +25,10 @@ type Public_node_info struct {
type Node struct { type Node struct {
// 当前节点id // 当前节点id
self int
selfId string
// 除当前节点外其他节点信息 // 除当前节点外其他节点信息
nodes map[int]*Public_node_info
nodes map[string]*Public_node_info
//管道名 //管道名
pipeAddr string pipeAddr string
@ -32,21 +37,37 @@ type Node struct {
state State state State
// 简单的kv存储 // 简单的kv存储
log []LogEntry
log map[int]LogEntry
// leader用来标记新log
maxLogId int
db *leveldb.DB
} }
func (node *Node) BroadCastKV(kv LogEntry) {
func (node *Node) BroadCastKV(logId int, kvCall LogEntryCall) {
// 遍历所有节点 // 遍历所有节点
for i := range node.nodes {
go func (index int, kv LogEntry) {
for id, _ := range node.nodes {
go func(id string, kv LogEntryCall) {
var reply KVReply var reply KVReply
node.sendKV(index, kv, &reply)
} (i, kv)
node.sendKV(id, logId, kvCall, &reply)
}(id, kvCall)
} }
} }
func (node *Node) sendKV(index int, kv LogEntry, reply *KVReply) {
client, err := rpc.DialHTTP("tcp", node.nodes[index].address)
func (node *Node) sendKV(id string, logId int, kvCall LogEntryCall, reply *KVReply) {
switch kvCall.CallState {
case Fail:
log.Info("模拟发送失败")
// 这么写向所有的node发送都失败,也可以随机数确定是否失败
case Delay:
log.Info("模拟发送延迟")
// 随机延迟0-5ms
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)))
default:
}
client, err := rpc.DialHTTP("tcp", node.nodes[id].address)
if err != nil { if err != nil {
log.Error("dialing: ", zap.Error(err)) log.Error("dialing: ", zap.Error(err))
return return
@ -59,49 +80,22 @@ func (node *Node) sendKV(index int, kv LogEntry, reply *KVReply) {
} }
}(client) }(client)
callErr := client.Call("Node.ReceiveKV", kv, reply) // RPC
arg := LogIdAndEntry{logId, kvCall.LogE}
callErr := client.Call("Node.ReceiveKV", arg, reply) // RPC
if callErr != nil { if callErr != nil {
log.Error("dialing: ", zap.Error(callErr))
}
if reply.Reply { // 发送成功
} else { // 失败
log.Error("dialing node_"+id+"fail: ", zap.Error(callErr))
} }
} }
// RPC call // RPC call
func (node *Node) ReceiveKV(kv LogEntry, reply *KVReply) error {
log.Info("receive: " + kv.Key)
reply.Reply = true;
func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error {
log.Info("node_" + node.selfId + " receive: logId = " + strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key)
entry, ok := node.log[arg.LogId]
if !ok {
node.log[arg.LogId] = entry
}
// 持久化
node.db.Put([]byte(arg.Entry.Key), []byte(arg.Entry.Value), nil)
reply.Reply = true // rpc call需要有reply,但实际上调用是否成功是error返回值决定
return nil return nil
} }
// func (node *Node) broadcastHeartbeat() {
// // 遍历所有节点
// for i := range raft.nodes {
// // request 参数
// hb := Heartbeat{
// Term: raft.currTerm,
// LeaderId: raft.self,
// CommitIndex: raft.commitIndex,
// }
// prevLogIndex := raft.nextIndex[i] - 1
// // 如果有日志未同步则发送
// if raft.getLastIndex() > prevLogIndex {
// hb.PrevLogIndex = prevLogIndex
// hb.PrevLogTerm = raft.log[prevLogIndex].CurrTerm
// hb.Entries = raft.log[prevLogIndex:]
// // log.Info("will send log entries", zap.Any("logEntries", hb.Entries))
// }
// go func(index int, hb Heartbeat) {
// var reply HeartbeatReply
// // 向某一个节点发送 heartbeat
// raft.sendHeartbeat(index, hb, &reply)
// }(i, hb)
// }
// }

+ 42
- 0
internal/nodes/server_node.go View File

@ -0,0 +1,42 @@
package nodes
import (
"strconv"
"github.com/syndtr/goleveldb/leveldb"
)
// leader node作为server为client注册的方法
type ServerReply struct{
Isconnect bool
HaveValue bool
Value string
}
// RPC call
func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error {
logId := node.maxLogId
node.maxLogId++
node.log[logId] = kvCall.LogE
node.db.Put([]byte(kvCall.LogE.Key), []byte(kvCall.LogE.Value), nil)
log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kvCall.LogE.Key)
// 广播给其它节点
node.BroadCastKV(logId, kvCall)
reply.Isconnect = true
return nil
}
// RPC call
func (node *Node) ReadKey(key string, reply *ServerReply) error {
log.Info("server read : " + key)
// 先只读leader自己
value, err := node.db.Get([]byte(key), nil)
if err == leveldb.ErrNotFound {
reply.HaveValue = false
} else {
reply.HaveValue = true
reply.Value = string(value)
}
reply.Isconnect = true
return nil
}

+ 1
- 1
scripts/build.sh View File

@ -1 +1 @@
go build -o raftnode ./cmd/main.go
go build -o main ./cmd/main.go

+ 4
- 4
scripts/run.sh View File

@ -4,19 +4,19 @@
RUN_TIME=10 RUN_TIME=10
# 需要传递数据的管道 # 需要传递数据的管道
PIPE_NAME="/tmp/raft_input_pipe"
PIPE_NAME="/tmp/input_pipe"
# 启动节点1 # 启动节点1
echo "Starting Node 1..." echo "Starting Node 1..."
timeout $RUN_TIME ./raftnode -id 1 -port ":9091" -cluster "127.0.0.1:9092,127.0.0.1:9093" -pipe "$PIPE_NAME" -isleader=true &
timeout $RUN_TIME ./main -id 1 -port ":9091" -cluster "127.0.0.1:9092,127.0.0.1:9093" -pipe "$PIPE_NAME" -isleader=true &
# 启动节点2 # 启动节点2
echo "Starting Node 2..." echo "Starting Node 2..."
timeout $RUN_TIME ./raftnode -id 2 -port ":9092" -cluster "127.0.0.1:9091,127.0.0.1:9093" -pipe "$PIPE_NAME" &
timeout $RUN_TIME ./main -id 2 -port ":9092" -cluster "127.0.0.1:9091,127.0.0.1:9093" -pipe "$PIPE_NAME" &
# 启动节点3 # 启动节点3
echo "Starting Node 3..." echo "Starting Node 3..."
timeout $RUN_TIME ./raftnode -id 3 -port ":9093" -cluster "127.0.0.1:9091,127.0.0.1:9092" -pipe "$PIPE_NAME"&
timeout $RUN_TIME ./main -id 3 -port ":9093" -cluster "127.0.0.1:9091,127.0.0.1:9092" -pipe "$PIPE_NAME"&
echo "All nodes started successfully!" echo "All nodes started successfully!"
# 创建一个管道用于进程间通信 # 创建一个管道用于进程间通信

+ 45
- 0
test/common.go View File

@ -0,0 +1,45 @@
package test
import (
"fmt"
"os"
"os/exec"
"strconv"
"strings"
)
func ExecuteNodeI(i int, isLeader bool, isNewDb bool, clusters []string) *exec.Cmd {
tmpClusters := append(clusters[:i], clusters[i+1:]...)
port := fmt.Sprintf(":%d", uint16(9090)+uint16(i))
var isleader string
if isLeader {
isleader = "true"
} else {
isleader = "false"
}
var isnewdb string
if isNewDb {
isnewdb = "true"
} else {
isnewdb = "false"
}
cmd := exec.Command(
"../main",
"-id", strconv.Itoa(i + 1),
"-port", port,
"-cluster", strings.Join(tmpClusters, ","),
"-isleader=" + isleader,
"-isNewDb=" + isnewdb,
)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// 执行命令
err := cmd.Start()
if err != nil {
fmt.Println("启动进程出错:", err)
return nil
}
return cmd
}

+ 112
- 0
test/restart_follower_test.go View File

@ -0,0 +1,112 @@
package test
import (
"fmt"
"os/exec"
"simple-kv-store/internal/client"
"simple-kv-store/internal/nodes"
"strconv"
"syscall"
"testing"
"time"
)
func TestFollowerRestart(t *testing.T) {
// 登记结点信息
n := 3
var clusters []string
for i := 0; i < n; i++ {
port := fmt.Sprintf("%d", uint16(9090)+uint16(i))
addr := "127.0.0.1:" + port
clusters = append(clusters, addr)
}
// 结点启动
var cmds []*exec.Cmd
for i := 0; i < n; i++ {
var cmd *exec.Cmd
if i == 0 {
cmd = ExecuteNodeI(i, true, true, clusters)
} else {
cmd = ExecuteNodeI(i, false, true, clusters)
}
if cmd == nil {
return
} else {
cmds = append(cmds, cmd)
}
}
time.Sleep(time.Second) // 等待启动完毕
// client启动, 连接leader
cWrite := clientPkg.Client{Address: clusters[0], ServerId: "1"}
// 写入
var s clientPkg.Status
for i := 0; i < 5; i++ {
key := strconv.Itoa(i)
newlog := nodes.LogEntry{Key: key, Value: "hello"}
s := cWrite.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal})
if s != clientPkg.Ok {
t.Errorf("write test fail")
}
}
time.Sleep(time.Second) // 等待写入完毕
// 模拟最后一个结点崩溃
err := cmds[n - 1].Process.Signal(syscall.SIGTERM)
if err != nil {
fmt.Println("Error sending signal:", err)
return
}
// 继续写入
for i := 5; i < 10; i++ {
key := strconv.Itoa(i)
newlog := nodes.LogEntry{Key: key, Value: "hello"}
s := cWrite.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal})
if s != clientPkg.Ok {
t.Errorf("write test fail")
}
}
// 恢复结点
cmd := ExecuteNodeI(n - 1, false, false, clusters)
if cmd == nil {
t.Errorf("recover test1 fail")
return
} else {
cmds[n - 1] = cmd
}
time.Sleep(time.Second) // 等待启动完毕
// client启动, 连接节点n-1(去读它的数据)
cRead := clientPkg.Client{Address: clusters[n - 1], ServerId: "n"}
// 读崩溃前写入数据
for i := 0; i < 5; i++ {
key := strconv.Itoa(i)
var value string
s = cRead.Read(key, &value)
if s != clientPkg.Ok {
t.Errorf("Read test1 fail")
}
}
// 读未写入数据
for i := 5; i < 15; i++ {
key := strconv.Itoa(i)
var value string
s = cRead.Read(key, &value)
if s != clientPkg.NotFound {
t.Errorf("Read test2 fail")
}
}
// 通知进程结束
for _, cmd := range cmds {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
fmt.Println("Error sending signal:", err)
return
}
}
}

+ 85
- 0
test/server_client_test.go View File

@ -0,0 +1,85 @@
package test
import (
"fmt"
"os/exec"
"simple-kv-store/internal/client"
"simple-kv-store/internal/nodes"
"strconv"
"syscall"
"testing"
"time"
)
func TestServerClient(t *testing.T) {
// 登记结点信息
n := 5
var clusters []string
for i := 0; i < n; i++ {
port := fmt.Sprintf("%d", uint16(9090)+uint16(i))
addr := "127.0.0.1:" + port
clusters = append(clusters, addr)
}
// 结点启动
var cmds []*exec.Cmd
for i := 0; i < n; i++ {
var cmd *exec.Cmd
if i == 0 {
cmd = ExecuteNodeI(i, true, true, clusters)
} else {
cmd = ExecuteNodeI(i, false, true, clusters)
}
if cmd == nil {
return
} else {
cmds = append(cmds, cmd)
}
}
time.Sleep(time.Second) // 等待启动完毕
// client启动
c := clientPkg.Client{Address: "127.0.0.1:9090", ServerId: "1"}
// 写入
var s clientPkg.Status
for i := 0; i < 10; i++ {
key := strconv.Itoa(i)
newlog := nodes.LogEntry{Key: key, Value: "hello"}
s := c.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal})
if s != clientPkg.Ok {
t.Errorf("write test fail")
}
}
// 读写入数据
for i := 0; i < 10; i++ {
key := strconv.Itoa(i)
var value string
s = c.Read(key, &value)
if s != clientPkg.Ok && value != "hello" + key {
t.Errorf("Read test1 fail")
}
}
// 读未写入数据
for i := 10; i < 15; i++ {
key := strconv.Itoa(i)
var value string
s = c.Read(key, &value)
if s != clientPkg.NotFound {
t.Errorf("Read test2 fail")
}
}
// 通知进程结束
for _, cmd := range cmds {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
fmt.Println("Error sending signal:", err)
return
}
}
}

Loading…
Cancel
Save