Browse Source

封装了发送状态的模拟

pull/1/head
augurier 6 months ago
parent
commit
5ff390ae99
8 changed files with 51 additions and 24 deletions
  1. +1
    -1
      .gitignore
  2. +3
    -3
      internal/client/client_node.go
  3. +2
    -1
      internal/nodes/init.go
  4. +11
    -0
      internal/nodes/log.go
  5. +22
    -10
      internal/nodes/node.go
  6. +6
    -6
      internal/nodes/server_node.go
  7. +4
    -2
      test/restart_follower_test.go
  8. +2
    -1
      test/server_client_test.go

+ 1
- 1
.gitignore View File

@ -25,4 +25,4 @@ go.work
.idea/*
main
*/leveldb
leveldb

+ 3
- 3
internal/client/client_node.go View File

@ -24,8 +24,8 @@ const (
Fail
)
func (client *Client) Write(kv nodes.LogEntry) Status {
log.Info("client write request key :" + kv.Key)
func (client *Client) Write(kvCall nodes.LogEntryCall) Status {
log.Info("client write request key :" + kvCall.LogE.Key)
c, err := rpc.DialHTTP("tcp", client.Address)
if err != nil {
log.Error("dialing: ", zap.Error(err))
@ -40,7 +40,7 @@ func (client *Client) Write(kv nodes.LogEntry) Status {
}(c)
var reply nodes.ServerReply
callErr := c.Call("Node.WriteKV", kv, &reply) // RPC
callErr := c.Call("Node.WriteKV", kvCall, &reply) // RPC
if callErr != nil {
log.Error("dialing: ", zap.Error(callErr))
return Fail

+ 2
- 1
internal/nodes/init.go View File

@ -82,7 +82,8 @@ func Start(node *Node, isLeader bool) {
log.Info("send : logId = " + strconv.Itoa(logId) + ", key = " + input)
// 广播给其它节点
node.BroadCastKV(logId, kv)
kvCall := LogEntryCall{kv, Normal}
node.BroadCastKV(logId, kvCall)
// 持久化
node.db.Put([]byte(kv.Key), []byte(kv.Value), nil)
}

+ 11
- 0
internal/nodes/log.go View File

@ -1,10 +1,21 @@
package nodes
const (
Normal State = iota + 1
Delay
Fail
)
type LogEntry struct {
Key string
Value string
}
type LogEntryCall struct {
LogE LogEntry
CallState State
}
type KVReply struct {
Reply bool
}

+ 22
- 10
internal/nodes/node.go View File

@ -1,8 +1,10 @@
package nodes
import (
"math/rand"
"net/rpc"
"strconv"
"time"
"github.com/syndtr/goleveldb/leveldb"
"go.uber.org/zap"
@ -38,22 +40,33 @@ type Node struct {
log map[int]LogEntry
// leader用来标记新log
maxLogId int
maxLogId int
db *leveldb.DB
}
func (node *Node) BroadCastKV(logId int, kv LogEntry) {
func (node *Node) BroadCastKV(logId int, kvCall LogEntryCall) {
// 遍历所有节点
for id, _ := range node.nodes {
go func(id string, kv LogEntry) {
go func(id string, kv LogEntryCall) {
var reply KVReply
node.sendKV(id, logId, kv, &reply)
}(id, kv)
node.sendKV(id, logId, kvCall, &reply)
}(id, kvCall)
}
}
func (node *Node) sendKV(id string, logId int, kv LogEntry, reply *KVReply) {
func (node *Node) sendKV(id string, logId int, kvCall LogEntryCall, reply *KVReply) {
switch kvCall.CallState {
case Fail:
log.Info("模拟发送失败")
// 这么写向所有的node发送都失败,也可以随机数确定是否失败
case Delay:
log.Info("模拟发送延迟")
// 随机延迟0-5ms
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)))
default:
}
client, err := rpc.DialHTTP("tcp", node.nodes[id].address)
if err != nil {
log.Error("dialing: ", zap.Error(err))
@ -67,16 +80,16 @@ func (node *Node) sendKV(id string, logId int, kv LogEntry, reply *KVReply) {
}
}(client)
arg := LogIdAndEntry{logId, kv}
arg := LogIdAndEntry{logId, kvCall.LogE}
callErr := client.Call("Node.ReceiveKV", arg, reply) // RPC
if callErr != nil {
log.Error("dialing node_" + id + "fail: ", zap.Error(callErr))
log.Error("dialing node_"+id+"fail: ", zap.Error(callErr))
}
}
// RPC call
func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error {
log.Info("node_" + node.selfId + " receive: logId = "+ strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key)
log.Info("node_" + node.selfId + " receive: logId = " + strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key)
entry, ok := node.log[arg.LogId]
if !ok {
node.log[arg.LogId] = entry
@ -86,4 +99,3 @@ func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error {
reply.Reply = true // rpc call需要有reply,但实际上调用是否成功是error返回值决定
return nil
}

+ 6
- 6
internal/nodes/server_node.go View File

@ -13,15 +13,15 @@ type ServerReply struct{
Value string
}
// RPC call
func (node *Node) WriteKV(kv LogEntry, reply *ServerReply) error {
func (node *Node) WriteKV(kvCall LogEntryCall, reply *ServerReply) error {
logId := node.maxLogId
node.maxLogId++
node.log[logId] = kv
// 广播给其它节点
node.db.Put([]byte(kv.Key), []byte(kv.Value), nil)
log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kv.Key)
node.BroadCastKV(logId, kv)
node.log[logId] = kvCall.LogE
node.db.Put([]byte(kvCall.LogE.Key), []byte(kvCall.LogE.Value), nil)
log.Info("server write : logId = " + strconv.Itoa(logId) + ", key = " + kvCall.LogE.Key)
// 广播给其它节点
node.BroadCastKV(logId, kvCall)
reply.Isconnect = true
return nil
}

+ 4
- 2
test/restart_follower_test.go View File

@ -46,7 +46,8 @@ func TestFollowerRestart(t *testing.T) {
var s clientPkg.Status
for i := 0; i < 5; i++ {
key := strconv.Itoa(i)
s := cWrite.Write(nodes.LogEntry{Key: key, Value: "hello"})
newlog := nodes.LogEntry{Key: key, Value: "hello"}
s := cWrite.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal})
if s != clientPkg.Ok {
t.Errorf("write test fail")
}
@ -61,7 +62,8 @@ func TestFollowerRestart(t *testing.T) {
// 继续写入
for i := 5; i < 10; i++ {
key := strconv.Itoa(i)
s := cWrite.Write(nodes.LogEntry{Key: key, Value: "hello"})
newlog := nodes.LogEntry{Key: key, Value: "hello"}
s := cWrite.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal})
if s != clientPkg.Ok {
t.Errorf("write test fail")
}

+ 2
- 1
test/server_client_test.go View File

@ -46,7 +46,8 @@ func TestServerClient(t *testing.T) {
var s clientPkg.Status
for i := 0; i < 10; i++ {
key := strconv.Itoa(i)
s := c.Write(nodes.LogEntry{Key: key, Value: "hello" + key})
newlog := nodes.LogEntry{Key: key, Value: "hello"}
s := c.Write(nodes.LogEntryCall{LogE: newlog, CallState: nodes.Normal})
if s != clientPkg.Ok {
t.Errorf("write test fail")
}

Loading…
Cancel
Save