package clientPkg import ( "math/rand" "simple-kv-store/internal/logprovider" "simple-kv-store/internal/nodes" "time" "go.uber.org/zap" ) var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) type Client struct { ClientId string // 每个client唯一标识 NextLogId int // 连接的server端节点群 PeerIds []string Transport nodes.Transport } type Status = uint8 const ( Ok Status = iota + 1 NotFound Fail ) func NewClient(clientId string, peerIds []string, transport nodes.Transport) *Client { return &Client{ClientId: clientId, NextLogId: 0, PeerIds: peerIds, Transport: transport} } func getRandomAddress(peerIds []string) string { // 随机选一个 id randomKey := peerIds[rand.Intn(len(peerIds))] return randomKey } func (client *Client) FindActiveNode() nodes.ClientInterface { var err error var c nodes.ClientInterface for { // 直到找到一个可连接的节点(保证至少一个节点活着) peerId := getRandomAddress(client.PeerIds) c, err = client.Transport.DialHTTPWithTimeout("tcp", "", peerId) if err != nil { log.Error("dialing: ", zap.Error(err)) } else { log.Sugar().Infof("client发现活跃节点[%s]", peerId) return c } } } func (client *Client) CloseRpcClient(c nodes.ClientInterface) { err := c.Close() if err != nil { log.Error("client close err: ", zap.Error(err)) } } func (client *Client) Write(kv nodes.LogEntry) Status { defer logprovider.DebugTraceback("client") log.Info("client write request key :" + kv.Key) kvCall := nodes.LogEntryCall{LogE: kv, Id: nodes.LogEntryCallId{ClientId: client.ClientId, LogId: client.NextLogId}} client.NextLogId++ c := client.FindActiveNode() var err error timeout := time.Second deadline := time.Now().Add(timeout) for { // 根据存活节点的反馈,直到找到leader if time.Now().After(deadline) { log.Error("系统繁忙,疑似出错") return Fail } var reply nodes.ServerReply reply.Isleader = false callErr := client.Transport.CallWithTimeout(c, "Node.WriteKV", &kvCall, &reply) // RPC if callErr != nil { // dial和call之间可能崩溃,重新找存活节点 log.Error("dialing: ", zap.Error(callErr)) client.CloseRpcClient(c) c = client.FindActiveNode() continue } if !reply.Isleader { // 对方不是leader,根据反馈找leader leaderId := reply.LeaderId client.CloseRpcClient(c) if leaderId == "" { // 这个节点不知道leader是谁,再随机找 c = client.FindActiveNode() } else { // dial leader c, err = client.Transport.DialHTTPWithTimeout("tcp", "", leaderId) if err != nil { // dial失败,重新找下一个存活节点 c = client.FindActiveNode() } } } else { // 成功 client.CloseRpcClient(c) return Ok } } } func (client *Client) Read(key string, value *string) Status { // 查不到value为空 log.Info("client read request key :" + key) if value == nil { return Fail } var c nodes.ClientInterface for { c = client.FindActiveNode() var reply nodes.ServerReply callErr := client.Transport.CallWithTimeout(c, "Node.ReadKey", &key, &reply) // RPC if callErr != nil { log.Error("dialing: ", zap.Error(callErr)) client.CloseRpcClient(c) continue } // 目前一定发送成功 if reply.HaveValue { *value = reply.Value client.CloseRpcClient(c) return Ok } else { client.CloseRpcClient(c) return NotFound } } } func (client *Client) FindLeader() string { var arg struct{} var reply nodes.FindLeaderReply reply.Isleader = false c := client.FindActiveNode() var err error for !reply.Isleader { // 根据存活节点的反馈,直到找到leader callErr := client.Transport.CallWithTimeout(c, "Node.FindLeader", &arg, &reply) // RPC if callErr != nil { // dial和call之间可能崩溃,重新找存活节点 log.Error("dialing: ", zap.Error(callErr)) client.CloseRpcClient(c) c = client.FindActiveNode() continue } if !reply.Isleader { // 对方不是leader,根据反馈找leader client.CloseRpcClient(c) c, err = client.Transport.DialHTTPWithTimeout("tcp", "", reply.LeaderId) for err != nil { // 重新找下一个存活节点 c = client.FindActiveNode() } } else { // 成功 client.CloseRpcClient(c) return reply.LeaderId } } log.Fatal("客户端会一直找存活节点,不会运行到这里") return "fault" }