李度、马也驰 25spring数据库系统 p1仓库
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

101 lines
2.1 KiB

package nodes
import (
"math/rand"
"net/rpc"
"strconv"
"time"
"github.com/syndtr/goleveldb/leveldb"
"go.uber.org/zap"
)
type State = uint8
const (
Follower State = iota + 1
Candidate
Leader
)
type Public_node_info struct {
connect bool
address string
}
type Node struct {
// 当前节点id
selfId string
// 除当前节点外其他节点信息
nodes map[string]*Public_node_info
//管道名
pipeAddr string
// 当前节点状态
state State
// 简单的kv存储
log map[int]LogEntry
// leader用来标记新log
maxLogId int
db *leveldb.DB
}
func (node *Node) BroadCastKV(logId int, kvCall LogEntryCall) {
// 遍历所有节点
for id, _ := range node.nodes {
go func(id string, kv LogEntryCall) {
var reply KVReply
node.sendKV(id, logId, kvCall, &reply)
}(id, kvCall)
}
}
func (node *Node) sendKV(id string, logId int, kvCall LogEntryCall, reply *KVReply) {
switch kvCall.CallState {
case Fail:
log.Info("模拟发送失败")
// 这么写向所有的node发送都失败,也可以随机数确定是否失败
case Delay:
log.Info("模拟发送延迟")
// 随机延迟0-5ms
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)))
default:
}
client, err := rpc.DialHTTP("tcp", node.nodes[id].address)
if err != nil {
log.Error("dialing: ", zap.Error(err))
return
}
defer func(client *rpc.Client) {
err := client.Close()
if err != nil {
log.Error("client close err: ", zap.Error(err))
}
}(client)
arg := LogIdAndEntry{logId, kvCall.LogE}
callErr := client.Call("Node.ReceiveKV", arg, reply) // RPC
if callErr != nil {
log.Error("dialing node_"+id+"fail: ", zap.Error(callErr))
}
}
// RPC call
func (node *Node) ReceiveKV(arg LogIdAndEntry, reply *KVReply) error {
log.Info("node_" + node.selfId + " receive: logId = " + strconv.Itoa(arg.LogId) + ", key = " + arg.Entry.Key)
entry, ok := node.log[arg.LogId]
if !ok {
node.log[arg.LogId] = entry
}
// 持久化
node.db.Put([]byte(arg.Entry.Key), []byte(arg.Entry.Value), nil)
reply.Reply = true // rpc call需要有reply,但实际上调用是否成功是error返回值决定
return nil
}