Browse Source

增加客户端找主

ld
augurier 6 months ago
parent
commit
e86202ca9b
5 changed files with 102 additions and 1520 deletions
  1. +52
    -12
      internal/client/client_node.go
  2. +21
    -0
      internal/nodes/server_node.go
  3. +15
    -11
      test/restart_node_test.go
  4. +14
    -11
      test/server_client_test.go
  5. +0
    -1486
      test/test.log

+ 52
- 12
internal/client/client_node.go View File

@ -1,6 +1,7 @@
package clientPkg package clientPkg
import ( import (
"math/rand"
"net/rpc" "net/rpc"
"simple-kv-store/internal/logprovider" "simple-kv-store/internal/logprovider"
"simple-kv-store/internal/nodes" "simple-kv-store/internal/nodes"
@ -12,7 +13,7 @@ var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel)
type Client struct { type Client struct {
// 连接的server端节点群 // 连接的server端节点群
Address [] string
Address map[string]string
} }
type Status = uint8 type Status = uint8
@ -23,23 +24,31 @@ const (
Fail Fail
) )
func getRandomAddress(addressMap map[string]string) string {
keys := make([]string, 0, len(addressMap))
// 获取所有 key
for key := range addressMap {
keys = append(keys, key)
}
// 随机选一个 key
randomKey := keys[rand.Intn(len(keys))]
return addressMap[randomKey]
}
func (client *Client) FindActiveNode() *rpc.Client { func (client *Client) FindActiveNode() *rpc.Client {
var err error var err error
var c *rpc.Client var c *rpc.Client
i := 0
for { // 直到找到一个可连接的节点(保证至少一个节点活着)
c, err = rpc.DialHTTP("tcp", client.Address[i])
for { // 直到找到一个可连接的节点(保证至少一个节点活着)
addr := getRandomAddress(client.Address)
c, err = rpc.DialHTTP("tcp", addr)
if err != nil { if err != nil {
log.Error("dialing: ", zap.Error(err)) log.Error("dialing: ", zap.Error(err))
i++
} else { } else {
break
}
if i == len(client.Address) {
log.Fatal("没找到存活节点")
return c
} }
} }
return c
} }
func (client *Client) CloseRpcClient(c *rpc.Client) { func (client *Client) CloseRpcClient(c *rpc.Client) {
@ -48,7 +57,7 @@ func (client *Client) CloseRpcClient(c *rpc.Client) {
log.Error("client close err: ", zap.Error(err)) log.Error("client close err: ", zap.Error(err))
} }
} }
//
func (client *Client) Write(kvCall nodes.LogEntryCall) Status { func (client *Client) Write(kvCall nodes.LogEntryCall) Status {
log.Info("client write request key :" + kvCall.LogE.Key) log.Info("client write request key :" + kvCall.LogE.Key)
@ -108,8 +117,39 @@ func (client *Client) Read(key string, value *string) Status { // 查不到value
client.CloseRpcClient(c) client.CloseRpcClient(c)
return NotFound return NotFound
} }
}
}
func (client *Client) FindLeader() string {
var arg struct{}
var reply nodes.FindLeaderReply
reply.Isleader = false
c := client.FindActiveNode()
var err error
for !reply.Isleader { // 根据存活节点的反馈,直到找到leader
callErr := c.Call("Node.FindLeader", &arg, &reply) // RPC
if callErr != nil { // dial和call之间可能崩溃,重新找存活节点
log.Error("dialing: ", zap.Error(callErr))
client.CloseRpcClient(c)
c = client.FindActiveNode()
continue
}
if !reply.Isleader { // 对方不是leader,根据反馈找leader
addr := client.Address[reply.LeaderId]
client.CloseRpcClient(c)
c, err = rpc.DialHTTP("tcp", addr)
for err != nil { // 重新找下一个存活节点
c = client.FindActiveNode()
}
} else { // 成功
client.CloseRpcClient(c)
return reply.LeaderId
}
} }
log.Fatal("客户端会一直找存活节点,不会运行到这里")
return "fault"
} }

+ 21
- 0
internal/nodes/server_node.go View File

@ -57,3 +57,24 @@ func (node *Node) ReadKey(key string, reply *ServerReply) error {
return nil return nil
} }
// RPC call 测试中寻找当前leader
type FindLeaderReply struct{
Isleader bool
LeaderId string
}
func (node *Node) FindLeader(_ struct{}, reply *FindLeaderReply) error {
// 自己不是leader,转交leader地址回复
if node.state != Leader {
reply.Isleader = false
if (node.leaderId == "") {
log.Fatal("还没选出第一个leader")
return nil
}
reply.LeaderId = node.leaderId
return nil
}
reply.LeaderId = node.selfId
reply.Isleader = true
return nil
}

+ 15
- 11
test/restart_node_test.go View File

@ -15,10 +15,12 @@ func TestNodeRestart(t *testing.T) {
// 登记结点信息 // 登记结点信息
n := 5 n := 5
var clusters []string var clusters []string
addressMap := make(map[string]string)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
port := fmt.Sprintf("%d", uint16(9090)+uint16(i)) port := fmt.Sprintf("%d", uint16(9090)+uint16(i))
addr := "127.0.0.1:" + port addr := "127.0.0.1:" + port
clusters = append(clusters, addr) clusters = append(clusters, addr)
addressMap[strconv.Itoa(i + 1)] = addr
} }
// 结点启动 // 结点启动
@ -28,9 +30,20 @@ func TestNodeRestart(t *testing.T) {
cmds = append(cmds, cmd) cmds = append(cmds, cmd)
} }
// 通知所有进程结束
defer func(){
for _, cmd := range cmds {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
fmt.Println("Error sending signal:", err)
return
}
}
}()
time.Sleep(time.Second) // 等待启动完毕 time.Sleep(time.Second) // 等待启动完毕
// client启动, 连接任意节点 // client启动, 连接任意节点
cWrite := clientPkg.Client{Address: clusters}
cWrite := clientPkg.Client{Address: addressMap}
// 写入 // 写入
var s clientPkg.Status var s clientPkg.Status
@ -65,7 +78,7 @@ func TestNodeRestart(t *testing.T) {
// client启动 // client启动
cRead := clientPkg.Client{Address: clusters}
cRead := clientPkg.Client{Address: addressMap}
// 读写入数据 // 读写入数据
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
key := strconv.Itoa(i) key := strconv.Itoa(i)
@ -85,13 +98,4 @@ func TestNodeRestart(t *testing.T) {
t.Errorf("Read test2 fail") t.Errorf("Read test2 fail")
} }
} }
// 通知所有进程结束
for _, cmd := range cmds {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
fmt.Println("Error sending signal:", err)
return
}
}
} }

+ 14
- 11
test/server_client_test.go View File

@ -15,10 +15,12 @@ func TestServerClient(t *testing.T) {
// 登记结点信息 // 登记结点信息
n := 5 n := 5
var clusters []string var clusters []string
addressMap := make(map[string]string)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
port := fmt.Sprintf("%d", uint16(9090)+uint16(i)) port := fmt.Sprintf("%d", uint16(9090)+uint16(i))
addr := "127.0.0.1:" + port addr := "127.0.0.1:" + port
clusters = append(clusters, addr) clusters = append(clusters, addr)
addressMap[strconv.Itoa(i + 1)] = addr
} }
// 结点启动 // 结点启动
@ -28,9 +30,20 @@ func TestServerClient(t *testing.T) {
cmds = append(cmds, cmd) cmds = append(cmds, cmd)
} }
// 通知所有进程结束
defer func(){
for _, cmd := range cmds {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
fmt.Println("Error sending signal:", err)
return
}
}
}()
time.Sleep(time.Second) // 等待启动完毕 time.Sleep(time.Second) // 等待启动完毕
// client启动 // client启动
c := clientPkg.Client{Address: clusters}
c := clientPkg.Client{Address: addressMap}
// 写入 // 写入
var s clientPkg.Status var s clientPkg.Status
@ -62,14 +75,4 @@ func TestServerClient(t *testing.T) {
t.Errorf("Read test2 fail") t.Errorf("Read test2 fail")
} }
} }
// 通知进程结束
for _, cmd := range cmds {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
fmt.Println("Error sending signal:", err)
return
}
}
} }

+ 0
- 1486
test/test.log
File diff suppressed because it is too large
View File


Loading…
Cancel
Save