package clientPkg
|
|
|
|
import (
|
|
"math/rand"
|
|
"simple-kv-store/internal/logprovider"
|
|
"simple-kv-store/internal/nodes"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel)
|
|
|
|
type Client struct {
|
|
ClientId string // 每个client唯一标识
|
|
NextLogId int
|
|
// 连接的server端节点群
|
|
PeerIds []string
|
|
Transport nodes.Transport
|
|
}
|
|
|
|
type Status = uint8
|
|
|
|
const (
|
|
Ok Status = iota + 1
|
|
NotFound
|
|
Fail
|
|
)
|
|
|
|
func NewClient(clientId string, peerIds []string, transport nodes.Transport) *Client {
|
|
return &Client{ClientId: clientId, NextLogId: 0, PeerIds: peerIds, Transport: transport}
|
|
}
|
|
|
|
func getRandomAddress(peerIds []string) string {
|
|
// 随机选一个 id
|
|
randomKey := peerIds[rand.Intn(len(peerIds))]
|
|
return randomKey
|
|
}
|
|
|
|
func (client *Client) FindActiveNode() nodes.ClientInterface {
|
|
var err error
|
|
var c nodes.ClientInterface
|
|
for { // 直到找到一个可连接的节点(保证至少一个节点活着)
|
|
peerId := getRandomAddress(client.PeerIds)
|
|
c, err = client.Transport.DialHTTPWithTimeout("tcp", "", peerId)
|
|
if err != nil {
|
|
log.Error("dialing: ", zap.Error(err))
|
|
} else {
|
|
log.Sugar().Infof("client发现活跃节点[%s]", peerId)
|
|
return c
|
|
}
|
|
}
|
|
}
|
|
|
|
func (client *Client) CloseRpcClient(c nodes.ClientInterface) {
|
|
err := c.Close()
|
|
if err != nil {
|
|
log.Error("client close err: ", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (client *Client) Write(kv nodes.LogEntry) Status {
|
|
defer logprovider.DebugTraceback("client")
|
|
log.Info("client write request key :" + kv.Key)
|
|
kvCall := nodes.LogEntryCall{LogE: kv,
|
|
Id: nodes.LogEntryCallId{ClientId: client.ClientId, LogId: client.NextLogId}}
|
|
client.NextLogId++
|
|
|
|
c := client.FindActiveNode()
|
|
var err error
|
|
|
|
timeout := time.Second
|
|
deadline := time.Now().Add(timeout)
|
|
|
|
for { // 根据存活节点的反馈,直到找到leader
|
|
if time.Now().After(deadline) {
|
|
log.Error("系统繁忙,疑似出错")
|
|
return Fail
|
|
}
|
|
|
|
var reply nodes.ServerReply
|
|
reply.Isleader = false
|
|
|
|
callErr := client.Transport.CallWithTimeout(c, "Node.WriteKV", &kvCall, &reply) // RPC
|
|
if callErr != nil { // dial和call之间可能崩溃,重新找存活节点
|
|
log.Error("dialing: ", zap.Error(callErr))
|
|
client.CloseRpcClient(c)
|
|
c = client.FindActiveNode()
|
|
continue
|
|
}
|
|
|
|
if !reply.Isleader { // 对方不是leader,根据反馈找leader
|
|
leaderId := reply.LeaderId
|
|
client.CloseRpcClient(c)
|
|
if leaderId == "" { // 这个节点不知道leader是谁,再随机找
|
|
c = client.FindActiveNode()
|
|
} else { // dial leader
|
|
c, err = client.Transport.DialHTTPWithTimeout("tcp", "", leaderId)
|
|
if err != nil { // dial失败,重新找下一个存活节点
|
|
c = client.FindActiveNode()
|
|
}
|
|
}
|
|
} else { // 成功
|
|
client.CloseRpcClient(c)
|
|
return Ok
|
|
}
|
|
}
|
|
}
|
|
|
|
func (client *Client) Read(key string, value *string) Status { // 查不到value为空
|
|
log.Info("client read request key :" + key)
|
|
if value == nil {
|
|
return Fail
|
|
}
|
|
var c nodes.ClientInterface
|
|
for {
|
|
c = client.FindActiveNode()
|
|
|
|
var reply nodes.ServerReply
|
|
callErr := client.Transport.CallWithTimeout(c, "Node.ReadKey", &key, &reply) // RPC
|
|
if callErr != nil {
|
|
log.Error("dialing: ", zap.Error(callErr))
|
|
client.CloseRpcClient(c)
|
|
continue
|
|
}
|
|
|
|
// 目前一定发送成功
|
|
if reply.HaveValue {
|
|
*value = reply.Value
|
|
client.CloseRpcClient(c)
|
|
return Ok
|
|
} else {
|
|
client.CloseRpcClient(c)
|
|
return NotFound
|
|
}
|
|
}
|
|
}
|
|
|
|
func (client *Client) FindLeader() string {
|
|
var arg struct{}
|
|
var reply nodes.FindLeaderReply
|
|
reply.Isleader = false
|
|
c := client.FindActiveNode()
|
|
var err error
|
|
|
|
for !reply.Isleader { // 根据存活节点的反馈,直到找到leader
|
|
callErr := client.Transport.CallWithTimeout(c, "Node.FindLeader", &arg, &reply) // RPC
|
|
if callErr != nil { // dial和call之间可能崩溃,重新找存活节点
|
|
log.Error("dialing: ", zap.Error(callErr))
|
|
client.CloseRpcClient(c)
|
|
c = client.FindActiveNode()
|
|
continue
|
|
}
|
|
|
|
if !reply.Isleader { // 对方不是leader,根据反馈找leader
|
|
client.CloseRpcClient(c)
|
|
c, err = client.Transport.DialHTTPWithTimeout("tcp", "", reply.LeaderId)
|
|
for err != nil { // 重新找下一个存活节点
|
|
c = client.FindActiveNode()
|
|
}
|
|
} else { // 成功
|
|
client.CloseRpcClient(c)
|
|
return reply.LeaderId
|
|
}
|
|
}
|
|
log.Fatal("客户端会一直找存活节点,不会运行到这里")
|
|
return "fault"
|
|
}
|
|
|
|
|