diff --git a/internal/client/client_node.go b/internal/client/client_node.go index 1ef1de6..4d2d4e8 100644 --- a/internal/client/client_node.go +++ b/internal/client/client_node.go @@ -11,6 +11,8 @@ import ( var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) type Client struct { + ClientId string // 每个client唯一标识 + NextLogId int // 连接的server端节点群 PeerIds []string Transport nodes.Transport @@ -24,6 +26,10 @@ const ( Fail ) +func NewClient(clientId string, peerIds []string, transport nodes.Transport) *Client { + return &Client{ClientId: clientId, NextLogId: 0, PeerIds: peerIds, Transport: transport} +} + func getRandomAddress(peerIds []string) string { // 随机选一个 id randomKey := peerIds[rand.Intn(len(peerIds))] @@ -52,8 +58,11 @@ func (client *Client) CloseRpcClient(c nodes.ClientInterface) { } } -func (client *Client) Write(kvCall nodes.LogEntryCall) Status { - log.Info("client write request key :" + kvCall.LogE.Key) +func (client *Client) Write(kv nodes.LogEntry) Status { + log.Info("client write request key :" + kv.Key) + kvCall := nodes.LogEntryCall{LogE: kv, + Id: nodes.LogEntryCallId{ClientId: client.ClientId, LogId: client.NextLogId}} + client.NextLogId++ var reply nodes.ServerReply reply.Isleader = false diff --git a/internal/nodes/init.go b/internal/nodes/init.go index 77c70f9..77afdeb 100644 --- a/internal/nodes/init.go +++ b/internal/nodes/init.go @@ -37,6 +37,7 @@ func InitRPCNode(SelfId string, port string, nodeAddr map[string]string, db *lev Db: db, Storage: rstorage, Transport: &HTTPTransport{NodeMap: nodeAddr}, + SeenRequests: make(map[LogEntryCallId]bool), } node.initLeaderState() if isRestart { @@ -90,6 +91,7 @@ func InitThreadNode(SelfId string, peerIds []string, db *leveldb.DB, rstorage *R Db: db, Storage: rstorage, Transport: threadTransport, + SeenRequests: make(map[LogEntryCallId]bool), } node.initLeaderState() if isRestart { diff --git a/internal/nodes/log.go b/internal/nodes/log.go index 08c0830..f8b0447 100644 --- a/internal/nodes/log.go +++ b/internal/nodes/log.go @@ -20,9 +20,15 @@ func (RLogE *RaftLogEntry) print() string { } type LogEntryCall struct { + Id LogEntryCallId LogE LogEntry } +type LogEntryCallId struct { + ClientId string + LogId int +} + type KVReply struct { Reply bool } diff --git a/internal/nodes/node.go b/internal/nodes/node.go index b1e1c8b..bd12ff6 100644 --- a/internal/nodes/node.go +++ b/internal/nodes/node.go @@ -59,5 +59,8 @@ type Node struct { // 通信方式 Transport Transport + + // 已经处理过的客户端请求 + SeenRequests map[LogEntryCallId]bool } diff --git a/internal/nodes/server_node.go b/internal/nodes/server_node.go index 03a79c9..0d1ca23 100644 --- a/internal/nodes/server_node.go +++ b/internal/nodes/server_node.go @@ -25,6 +25,13 @@ func (node *Node) WriteKV(kvCall *LogEntryCall, reply *ServerReply) error { return nil } + if node.SeenRequests[kvCall.Id] { + log.Sugar().Infof("Leader [%s] 已处理过client[%s]的请求 %d, 跳过", node.SelfId, kvCall.Id.ClientId, kvCall.Id.LogId) + reply.Isleader = true + return nil + } + node.SeenRequests[kvCall.Id] = true + // 自己是leader,修改自己的记录并广播 node.MaxLogId++ logId := node.MaxLogId diff --git a/test/restart_node_test.go b/test/restart_node_test.go index 5efcfbe..b8d4093 100644 --- a/test/restart_node_test.go +++ b/test/restart_node_test.go @@ -45,14 +45,15 @@ func TestNodeRestart(t *testing.T) { time.Sleep(time.Second) // 等待启动完毕 // client启动, 连接任意节点 - cWrite := clientPkg.Client{PeerIds: peerIds, Transport: &nodes.HTTPTransport{NodeMap: addressMap}} + transport := &nodes.HTTPTransport{NodeMap: addressMap} + cWrite := clientPkg.NewClient("0", peerIds, transport) // 写入 var s clientPkg.Status for i := 0; i < 5; i++ { key := strconv.Itoa(i) newlog := nodes.LogEntry{Key: key, Value: "hello"} - s := cWrite.Write(nodes.LogEntryCall{LogE: newlog}) + s := cWrite.Write(newlog) if s != clientPkg.Ok { t.Errorf("write test fail") } @@ -80,7 +81,7 @@ func TestNodeRestart(t *testing.T) { // client启动 - cRead := clientPkg.Client{PeerIds: peerIds, Transport: &nodes.HTTPTransport{NodeMap: addressMap}} + cRead := clientPkg.NewClient("0", peerIds, transport) // 读写入数据 for i := 0; i < 5; i++ { key := strconv.Itoa(i) diff --git a/test/server_client_test.go b/test/server_client_test.go index 8751c67..fc29ddc 100644 --- a/test/server_client_test.go +++ b/test/server_client_test.go @@ -45,14 +45,15 @@ func TestServerClient(t *testing.T) { time.Sleep(time.Second) // 等待启动完毕 // client启动 - c := clientPkg.Client{PeerIds: peerIds, Transport: &nodes.HTTPTransport{NodeMap: addressMap}} + transport := &nodes.HTTPTransport{NodeMap: addressMap} + c := clientPkg.NewClient("0", peerIds, transport) // 写入 var s clientPkg.Status for i := 0; i < 10; i++ { key := strconv.Itoa(i) newlog := nodes.LogEntry{Key: key, Value: "hello"} - s := c.Write(nodes.LogEntryCall{LogE: newlog}) + s := c.Write(newlog) if s != clientPkg.Ok { t.Errorf("write test fail") } diff --git a/threadTest/network_partition_test.go b/threadTest/network_partition_test.go index 94b765b..610e22d 100644 --- a/threadTest/network_partition_test.go +++ b/threadTest/network_partition_test.go @@ -120,12 +120,12 @@ func TestSingelPartition(t *testing.T) { } // client启动 - c := clientPkg.Client{PeerIds: peerIds, Transport: threadTransport} + c := clientPkg.NewClient("0", peerIds, threadTransport) var s clientPkg.Status for i := 0; i < 5; i++ { key := strconv.Itoa(i) newlog := nodes.LogEntry{Key: key, Value: "hello"} - s = c.Write(nodes.LogEntryCall{LogE: newlog}) + s = c.Write(newlog) if s != clientPkg.Ok { t.Errorf("write test fail") } @@ -204,12 +204,12 @@ func TestQuorumPartition(t *testing.T) { } // client启动 - c := clientPkg.Client{PeerIds: peerIds, Transport: threadTransport} + c := clientPkg.NewClient("0", peerIds, threadTransport) var s clientPkg.Status for i := 0; i < 5; i++ { key := strconv.Itoa(i) newlog := nodes.LogEntry{Key: key, Value: "hello"} - s = c.Write(nodes.LogEntryCall{LogE: newlog}) + s = c.Write(newlog) if s != clientPkg.Ok { t.Errorf("write test fail") } diff --git a/threadTest/restart_node_test.go b/threadTest/restart_node_test.go index 9dc3b95..6c96ead 100644 --- a/threadTest/restart_node_test.go +++ b/threadTest/restart_node_test.go @@ -33,13 +33,13 @@ func TestNodeRestart(t *testing.T) { time.Sleep(time.Second) // 等待启动完毕 // client启动, 连接任意节点 - cWrite := clientPkg.Client{PeerIds: peerIds, Transport: threadTransport} + cWrite := clientPkg.NewClient("0", peerIds, threadTransport) // 写入 var s clientPkg.Status for i := 0; i < 5; i++ { key := strconv.Itoa(i) newlog := nodes.LogEntry{Key: key, Value: "hello"} - s = cWrite.Write(nodes.LogEntryCall{LogE: newlog}) + s = cWrite.Write(newlog) if s != clientPkg.Ok { t.Errorf("write test fail") } @@ -58,7 +58,7 @@ func TestNodeRestart(t *testing.T) { // client启动 - cRead := clientPkg.Client{PeerIds: peerIds, Transport: threadTransport} + cRead := clientPkg.NewClient("0", peerIds, threadTransport) // 读写入数据 for i := 0; i < 5; i++ { key := strconv.Itoa(i) diff --git a/threadTest/server_client_test.go b/threadTest/server_client_test.go index 295f5a6..c95c60a 100644 --- a/threadTest/server_client_test.go +++ b/threadTest/server_client_test.go @@ -33,14 +33,14 @@ func TestServerClient(t *testing.T) { time.Sleep(time.Second) // 等待启动完毕 // client启动 - c := clientPkg.Client{PeerIds: peerIds, Transport: threadTransport} + c := clientPkg.NewClient("0", peerIds, threadTransport) // 写入 var s clientPkg.Status for i := 0; i < 10; i++ { key := strconv.Itoa(i) newlog := nodes.LogEntry{Key: key, Value: "hello"} - s = c.Write(nodes.LogEntryCall{LogE: newlog}) + s = c.Write(newlog) if s != clientPkg.Ok { t.Errorf("write test fail") } @@ -67,3 +67,73 @@ func TestServerClient(t *testing.T) { } } } + +func TestRepeatClientReq(t *testing.T) { + // 登记结点信息 + n := 5 + var peerIds []string + for i := 0; i < n; i++ { + peerIds = append(peerIds, strconv.Itoa(i + 1)) + } + + // 结点启动 + var quitCollections []chan struct{} + var nodeCollections []*nodes.Node + ctx := nodes.NewCtx() + threadTransport := nodes.NewThreadTransport(ctx) + for i := 0; i < n; i++ { + n, quitChan := ExecuteNodeI(strconv.Itoa(i + 1), false, peerIds, threadTransport) + quitCollections = append(quitCollections, quitChan) + nodeCollections = append(nodeCollections, n) + } + + // 通知所有node结束 + defer func(){ + for _, quitChan := range quitCollections { + close(quitChan) + } + }() + + time.Sleep(time.Second) // 等待启动完毕 + // client启动 + c := clientPkg.NewClient("0", peerIds, threadTransport) + for i := 0; i < n; i++ { + ctx.SetBehavior("", nodeCollections[i].SelfId, nodes.RetryRpc, 0, 2) + } + + // 写入 + var s clientPkg.Status + for i := 0; i < 10; i++ { + key := strconv.Itoa(i) + newlog := nodes.LogEntry{Key: key, Value: "hello"} + s = c.Write(newlog) + if s != clientPkg.Ok { + t.Errorf("write test fail") + } + } + + time.Sleep(time.Second) // 等待写入完毕 + // 读写入数据 + for i := 0; i < 10; i++ { + key := strconv.Itoa(i) + var value string + s = c.Read(key, &value) + if s != clientPkg.Ok || value != "hello" { + t.Errorf("Read test1 fail") + } + } + + // 读未写入数据 + for i := 10; i < 15; i++ { + key := strconv.Itoa(i) + var value string + s = c.Read(key, &value) + if s != clientPkg.NotFound { + t.Errorf("Read test2 fail") + } + } + + for i := 0; i < n; i++ { + CheckLogNum(t, nodeCollections[i], 10) + } +}