diff --git a/.gitignore b/.gitignore index b6d444b..7091e8e 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,4 @@ go.work .idea/* main -*/leveldb +leveldb diff --git a/internal/client/client_node.go b/internal/client/client_node.go index 345ed05..7eaf99d 100644 --- a/internal/client/client_node.go +++ b/internal/client/client_node.go @@ -24,8 +24,8 @@ const ( Fail ) -func (client *Client) Write(kv nodes.LogEntry) Status { - log.Info("client write request key :" + kv.Key) +func (client *Client) Write(kvCall nodes.LogEntryCall) Status { + log.Info("client write request key :" + kvCall.LogE.Key) c, err := rpc.DialHTTP("tcp", client.Address) if err != nil { log.Error("dialing: ", zap.Error(err)) @@ -40,7 +40,7 @@ func (client *Client) Write(kv nodes.LogEntry) Status { }(c) var reply nodes.ServerReply - callErr := c.Call("Node.WriteKV", kv, &reply) // RPC + callErr := c.Call("Node.WriteKV", kvCall, &reply) // RPC if callErr != nil { log.Error("dialing: ", zap.Error(callErr)) return Fail diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 4ef68fe..0a38451 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -82,7 +82,8 @@ func Start(node *Node, isLeader bool) { log.Info("send : logId = " + strconv.Itoa(logId) + ", key = " + input) // 广播给其它节点 - node.BroadCastKV(logId, kv) + kvCall := LogEntryCall{kv, Normal} + node.BroadCastKV(logId, kvCall) // 持久化 node.db.Put([]byte(kv.Key), []byte(kv.Value), nil) } diff --git a/internal/nodes/log.go b/internal/nodes/log.go index 95c67d0..3d13831 100644 --- a/internal/nodes/log.go +++ b/internal/nodes/log.go @@ -1,10 +1,21 @@ package nodes +const ( + Normal State = iota + 1 + Delay + Fail +) + type LogEntry struct { Key string Value string } +type LogEntryCall struct { + LogE LogEntry + CallState State +} + type KVReply struct { Reply bool } diff --git a/internal/nodes/node.go b/internal/nodes/node.go index 1c99767..f1a17c8 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -1,8 +1,10 @@ package nodes import ( + "math/rand" "net/rpc" "strconv" + "time" "github.com/syndtr/goleveldb/leveldb" "go.uber.org/zap" @@ -38,22 +40,33 @@ type Node struct { log map[int]LogEntry // leader用来标记新log - maxLogId int + maxLogId int db *leveldb.DB } -func (node *Node) BroadCastKV(logId int, kv LogEntry) { +func (node *Node) BroadCastKV(logId int, kvCall LogEntryCall) { // 遍历所有节点 for id, _ := range node.nodes { - go func(id string, kv LogEntry) { + go func(id string, kv LogEntryCall) { var reply KVReply - node.sendKV(id, logId, kv, &reply) - }(id, kv) + node.sendKV(id, logId, kvCall, &reply) + }(id, kvCall) } } -func (node *Node) sendKV(id string, logId int, kv LogEntry, reply *KVReply) { +func (node *Node) sendKV(id string, logId int, kvCall LogEntryCall, reply *KVReply) { + switch kvCall.CallState { + case Fail: + log.Info("模拟发送失败") + // 这么写向所有的node发送都失败,也可以随机数确定是否失败 + case Delay: + log.Info("模拟发送延迟") + // 随机延迟0-5ms + time.Sleep(time.Millisecond * time.Duration(rand.Intn(5))) + default: + } + client, err := rpc.DialHTTP("tcp", node.nodes[id].address) if err != nil { log.Error("dialing: ", zap.Error(err)) @@ -67,16 +80,16 @@ func (node *Node) sendKV(id string, logId int, kv LogEntry, reply *KVReply) { } }(client) - arg := LogIdAndEntry{logId, kv} + arg := LogIdAndEntry{logId, kvCall.LogE} callErr := client.Call("Node.ReceiveKV", arg, reply) // RPC if callErr != nil { - log.Error("dialing node_" + id + "fail: ", zap.Error(callErr)) + log.Error("dialing node_"+id+"fail: ", zap.Error(callErr)) } } // RPC call func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error { - log.Info("node_" + node.selfId + " receive: logId = "+ strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key) + log.Info("node_" + node.selfId + " receive: logId = " + strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key) entry, ok := node.log[arg.LogId] if !ok { node.log[arg.LogId] = entry @@ -86,4 +99,3 @@ func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error { reply.Reply = true // rpc call需要有reply,但实际上调用是否成功是error返回值决定 return nil } - diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go index a7681a7..58eed40 100644 --- a/internal/nodes/server_node.go +++ b/internal/nodes/server_node.go @@ -13,15 +13,15 @@ type ServerReply struct{ Value string } // RPC call -func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error { +func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error { logId := node.maxLogId node.maxLogId++ - node.log[logId] = kv - // 广播给其它节点 - node.db.Put([]byte(kv.Key), []byte(kv.Value), nil) - log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kv.Key) - node.BroadCastKV(logId, kv) + node.log[logId] = kvCall.LogE + node.db.Put([]byte(kvCall.LogE.Key), []byte(kvCall.LogE.Value), nil) + log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kvCall.LogE.Key) + // 广播给其它节点 + node.BroadCastKV(logId, kvCall) reply.Isconnect = true return nil } diff --git a/test/restart_follower_test.go b/test/restart_follower_test.go index 4bbce54..323507d 100644 --- a/test/restart_follower_test.go +++ b/test/restart_follower_test.go @@ -46,7 +46,8 @@ func TestFollowerRestart(t *testing.T) { var s clientPkg.Status for i := 0; i < 5; i++ { key := strconv.Itoa(i) - s := cWrite.Write(nodes.LogEntry{Key: key, Value: "hello"}) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + s := cWrite.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal}) if s != clientPkg.Ok { t.Errorf("write test fail") } @@ -61,7 +62,8 @@ func TestFollowerRestart(t *testing.T) { // 继续写入 for i := 5; i < 10; i++ { key := strconv.Itoa(i) - s := cWrite.Write(nodes.LogEntry{Key: key, Value: "hello"}) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + s := cWrite.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal}) if s != clientPkg.Ok { t.Errorf("write test fail") } diff --git a/test/server_client_test.go b/test/server_client_test.go index 74599a5..ab106c3 100644 --- a/test/server_client_test.go +++ b/test/server_client_test.go @@ -46,7 +46,8 @@ func TestServerClient(t *testing.T) { var s clientPkg.Status for i := 0; i < 10; i++ { key := strconv.Itoa(i) - s := c.Write(nodes.LogEntry{Key: key, Value: "hello" + key}) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + s := c.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal}) if s != clientPkg.Ok { t.Errorf("write test fail") }