diff --git a/.gitignore b/.gitignore index 3f067b7..b6d444b 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ go.work .idea/* main +*/leveldb diff --git a/cmd/main.go b/cmd/main.go index 03e763d..b0b8bd8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "syscall" + "github.com/syndtr/goleveldb/leveldb" "go.uber.org/zap" ) @@ -32,6 +33,7 @@ func main() { id := flag.String("id", "1", "node ID") pipe := flag.String("pipe", "", "input from scripts") isLeader := flag.Bool("isleader", false, "init node state") + isNewDb := flag.Bool("isNewDb", true, "new test or restart") // 参数解析 flag.Parse() @@ -49,10 +51,28 @@ func main() { idClusterPairs[strconv.Itoa(idCnt)] = addr idCnt++ } - node := nodes.Init(*id, idClusterPairs, *pipe) - log.Info("id: " + *id + "节点开始监听: " + *port + "端口") + if *isNewDb { + os.RemoveAll("leveldb/simple-kv-store" + *id) + } + // 打开或创建每个结点自己的数据库 + db, err := leveldb.OpenFile("leveldb/simple-kv-store" + *id, nil) + if err != nil { + log.Fatal("Failed to open database: ", zap.Error(err)) + } + defer db.Close() // 确保数据库在使用完毕后关闭 + iter := db.NewIterator(nil, nil) + defer iter.Release() + // 计数 + count := 0 + for iter.Next() { + count++ + } + fmt.Printf(*id + "结点目前有数据:%d\n", count) + + node := nodes.Init(*id, idClusterPairs, *pipe, db) + log.Info("id: " + *id + "节点开始监听: " + *port + "端口") // 监听rpc node.Rpc(*port) // 开启 raft diff --git a/go.mod b/go.mod index d9c592b..0981ec0 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.20 require go.uber.org/zap v1.24.0 require ( + github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect + github.com/syndtr/goleveldb v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect ) diff --git a/go.sum b/go.sum index 081dfe1..9b63927 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,18 @@ github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= +github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= @@ -10,4 +20,12 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 0a860de..4ef68fe 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -10,6 +10,7 @@ import ( "strconv" "time" + "github.com/syndtr/goleveldb/leveldb" "go.uber.org/zap" ) @@ -22,7 +23,7 @@ func newNode(address string) *Public_node_info { } } -func Init(id string, nodeAddr map[string]string, pipe string) *Node { +func Init(id string, nodeAddr map[string]string, pipe string, db *leveldb.DB) *Node { ns := make(map[string]*Public_node_info) for id, addr := range nodeAddr { ns[id] = newNode(addr) @@ -30,11 +31,12 @@ func Init(id string, nodeAddr map[string]string, pipe string) *Node { // 创建节点 return &Node{ - selfId: id, - nodes: ns, + selfId: id, + nodes: ns, pipeAddr: pipe, maxLogId: 0, - log: make(map[int]LogEntry), + log: make(map[int]LogEntry), + db: db, } } @@ -71,7 +73,7 @@ func Start(node *Node, isLeader bool) { log.Error("Error reading from pipe") } if n > 0 { - input := string(buffer[:n]) + input := string(buffer[:n]) // 将用户输入封装成一个 LogEntry kv := LogEntry{input, ""} // 目前键盘输入key,value 0 logId := node.maxLogId @@ -82,6 +84,7 @@ func Start(node *Node, isLeader bool) { // 广播给其它节点 node.BroadCastKV(logId, kv) // 持久化 + node.db.Put([]byte(kv.Key), []byte(kv.Value), nil) } } } @@ -99,7 +102,7 @@ func (node *Node) Rpc(port string) { log.Fatal("rpc register failed", zap.Error(err)) } rpc.HandleHTTP() - + l, e := net.Listen("tcp", port) if e != nil { log.Fatal("listen error:", zap.Error(e)) diff --git a/internal/nodes/node.go b/internal/nodes/node.go index fc465d3..1c99767 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -4,6 +4,7 @@ import ( "net/rpc" "strconv" + "github.com/syndtr/goleveldb/leveldb" "go.uber.org/zap" ) @@ -38,6 +39,8 @@ type Node struct { // leader用来标记新log maxLogId int + + db *leveldb.DB } func (node *Node) BroadCastKV(logId int, kv LogEntry) { @@ -67,13 +70,7 @@ func (node *Node) sendKV(id string, logId int, kv LogEntry, reply *KVReply) { arg := LogIdAndEntry{logId, kv} callErr := client.Call("Node.ReceiveKV", arg, reply) // RPC if callErr != nil { - log.Error("dialing: ", zap.Error(callErr)) - } - - if reply.Reply { // 发送成功 - - } else { // 失败 - + log.Error("dialing node_" + id + "fail: ", zap.Error(callErr)) } } @@ -85,34 +82,8 @@ func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error { node.log[arg.LogId] = entry } // 持久化 - reply.Reply = true + node.db.Put([]byte(arg.Entry.Key), []byte(arg.Entry.Value), nil) + reply.Reply = true // rpc call需要有reply,但实际上调用是否成功是error返回值决定 return nil } -// func (node *Node) broadcastHeartbeat() { -// // 遍历所有节点 -// for i := range raft.nodes { -// // request 参数 -// hb := Heartbeat{ -// Term: raft.currTerm, -// LeaderId: raft.self, -// CommitIndex: raft.commitIndex, -// } - -// prevLogIndex := raft.nextIndex[i] - 1 - -// // 如果有日志未同步则发送 -// if raft.getLastIndex() > prevLogIndex { -// hb.PrevLogIndex = prevLogIndex -// hb.PrevLogTerm = raft.log[prevLogIndex].CurrTerm -// hb.Entries = raft.log[prevLogIndex:] -// // log.Info("will send log entries", zap.Any("logEntries", hb.Entries)) -// } - -// go func(index int, hb Heartbeat) { -// var reply HeartbeatReply -// // 向某一个节点发送 heartbeat -// raft.sendHeartbeat(index, hb, &reply) -// }(i, hb) -// } -// } diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go index 568e472..a7681a7 100644 --- a/internal/nodes/server_node.go +++ b/internal/nodes/server_node.go @@ -1,6 +1,10 @@ package nodes -import "strconv" +import ( + "strconv" + + "github.com/syndtr/goleveldb/leveldb" +) // leader node作为server为client注册的方法 type ServerReply struct{ @@ -15,6 +19,7 @@ func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error { node.maxLogId++ node.log[logId] = kv // 广播给其它节点 + node.db.Put([]byte(kv.Key), []byte(kv.Value), nil) log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kv.Key) node.BroadCastKV(logId, kv) reply.Isconnect = true @@ -24,15 +29,13 @@ func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error { func (node *Node) ReadKey(key string, reply *ServerReply) error { log.Info("server read : " + key) // 先只读leader自己 - for _, kv := range node.log { - if kv.Key == key { - reply.Value = kv.Value - reply.HaveValue = true - reply.Isconnect = true - return nil - } + value, err := node.db.Get([]byte(key), nil) + if err == leveldb.ErrNotFound { + reply.HaveValue = false + } else { + reply.HaveValue = true + reply.Value = string(value) } - reply.HaveValue = false reply.Isconnect = true return nil } diff --git a/test/common.go b/test/common.go new file mode 100644 index 0000000..554927d --- /dev/null +++ b/test/common.go @@ -0,0 +1,45 @@ +package test + +import ( + "fmt" + "os" + "os/exec" + "strconv" + "strings" +) + +func ExecuteNodeI(i int, isLeader bool, isNewDb bool, clusters []string) *exec.Cmd { + tmpClusters := append(clusters[:i], clusters[i+1:]...) + port := fmt.Sprintf(":%d", uint16(9090)+uint16(i)) + + var isleader string + if isLeader { + isleader = "true" + } else { + isleader = "false" + } + var isnewdb string + if isNewDb { + isnewdb = "true" + } else { + isnewdb = "false" + } + cmd := exec.Command( + "../main", + "-id", strconv.Itoa(i + 1), + "-port", port, + "-cluster", strings.Join(tmpClusters, ","), + "-isleader=" + isleader, + "-isNewDb=" + isnewdb, + ) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // 执行命令 + err := cmd.Start() + if err != nil { + fmt.Println("启动进程出错:", err) + return nil + } + return cmd +} \ No newline at end of file diff --git a/test/restart_follower_test.go b/test/restart_follower_test.go new file mode 100644 index 0000000..4bbce54 --- /dev/null +++ b/test/restart_follower_test.go @@ -0,0 +1,110 @@ +package test + +import ( + "fmt" + "os/exec" + "simple-kv-store/internal/client" + "simple-kv-store/internal/nodes" + "strconv" + "syscall" + "testing" + "time" +) + +func TestFollowerRestart(t *testing.T) { + // 登记结点信息 + n := 3 + var clusters []string + for i := 0; i < n; i++ { + port := fmt.Sprintf("%d", uint16(9090)+uint16(i)) + addr := "127.0.0.1:" + port + clusters = append(clusters, addr) + } + + // 结点启动 + var cmds []*exec.Cmd + for i := 0; i < n; i++ { + var cmd *exec.Cmd + if i == 0 { + cmd = ExecuteNodeI(i, true, true, clusters) + } else { + cmd = ExecuteNodeI(i, false, true, clusters) + } + + if cmd == nil { + return + } else { + cmds = append(cmds, cmd) + } + } + + time.Sleep(time.Second) // 等待启动完毕 + // client启动, 连接leader + cWrite := clientPkg.Client{Address: clusters[0], ServerId: "1"} + + // 写入 + var s clientPkg.Status + for i := 0; i < 5; i++ { + key := strconv.Itoa(i) + s := cWrite.Write(nodes.LogEntry{Key: key, Value: "hello"}) + if s != clientPkg.Ok { + t.Errorf("write test fail") + } + } + time.Sleep(time.Second) // 等待写入完毕 + // 模拟最后一个结点崩溃 + err := cmds[n - 1].Process.Signal(syscall.SIGTERM) + if err != nil { + fmt.Println("Error sending signal:", err) + return + } + // 继续写入 + for i := 5; i < 10; i++ { + key := strconv.Itoa(i) + s := cWrite.Write(nodes.LogEntry{Key: key, Value: "hello"}) + if s != clientPkg.Ok { + t.Errorf("write test fail") + } + } + // 恢复结点 + cmd := ExecuteNodeI(n - 1, false, false, clusters) + if cmd == nil { + t.Errorf("recover test1 fail") + return + } else { + cmds[n - 1] = cmd + } + time.Sleep(time.Second) // 等待启动完毕 + + // client启动, 连接节点n-1(去读它的数据) + cRead := clientPkg.Client{Address: clusters[n - 1], ServerId: "n"} + // 读崩溃前写入数据 + for i := 0; i < 5; i++ { + key := strconv.Itoa(i) + var value string + s = cRead.Read(key, &value) + if s != clientPkg.Ok { + t.Errorf("Read test1 fail") + } + } + + // 读未写入数据 + for i := 5; i < 15; i++ { + key := strconv.Itoa(i) + var value string + s = cRead.Read(key, &value) + if s != clientPkg.NotFound { + t.Errorf("Read test2 fail") + } + } + + // 通知进程结束 + for _, cmd := range cmds { + err := cmd.Process.Signal(syscall.SIGTERM) + if err != nil { + fmt.Println("Error sending signal:", err) + return + } + } + +} diff --git a/test/server_client_test.go b/test/server_client_test.go index 7e6613c..74599a5 100644 --- a/test/server_client_test.go +++ b/test/server_client_test.go @@ -2,22 +2,15 @@ package test import ( "fmt" - "os" "os/exec" "simple-kv-store/internal/client" - "simple-kv-store/internal/logprovider" "simple-kv-store/internal/nodes" "strconv" - "strings" "syscall" "testing" "time" - - "go.uber.org/zap" ) -var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) - func TestServerClient(t *testing.T) { // 登记结点信息 n := 5 @@ -30,33 +23,19 @@ func TestServerClient(t *testing.T) { // 结点启动 var cmds []*exec.Cmd - for i := 0; i < n; i++ { - tmpClusters := append(clusters[:i], clusters[i+1:]...) - port := fmt.Sprintf(":%d", uint16(9090)+uint16(i)) - - var isleader string + for i := 0; i < n; i++ { + var cmd *exec.Cmd if i == 0 { - isleader = "true" + cmd = ExecuteNodeI(i, true, true, clusters) } else { - isleader = "false" + cmd = ExecuteNodeI(i, false, true, clusters) } - cmd := exec.Command( - "../main", - "-id", strconv.Itoa(i + 1), - "-port", port, - "-cluster", strings.Join(tmpClusters, ","), - "-isleader=" + isleader, - ) - cmds = append(cmds, cmd) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - // 执行命令 - err := cmd.Start() - if err != nil { - fmt.Println("启动进程出错:", err) + + if cmd == nil { return - } + } else { + cmds = append(cmds, cmd) + } } time.Sleep(time.Second) // 等待启动完毕 @@ -67,7 +46,7 @@ func TestServerClient(t *testing.T) { var s clientPkg.Status for i := 0; i < 10; i++ { key := strconv.Itoa(i) - s := c.Write(nodes.LogEntry{Key: key, Value: "hello"}) + s := c.Write(nodes.LogEntry{Key: key, Value: "hello" + key}) if s != clientPkg.Ok { t.Errorf("write test fail") } @@ -78,7 +57,7 @@ func TestServerClient(t *testing.T) { key := strconv.Itoa(i) var value string s = c.Read(key, &value) - if s != clientPkg.Ok { + if s != clientPkg.Ok && value != "hello" + key { t.Errorf("Read test1 fail") } }