commit d7914a1e30525f94169307f5756f0d4e82aa92d8 Author: augurier <14434658+augurier@user.noreply.gitee.com> Date: Wed Feb 26 18:57:05 2025 +0800 项目初始 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..408abd6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,27 @@ +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test1 -c` +*.test1 + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work + +#idea files +.idea +.idea/* + +raftnode diff --git a/README.md b/README.md new file mode 100644 index 0000000..8f4d03d --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +# go-raft-kv + +--- + +基于go语言实现分布式kv数据库 + +# 环境与运行 +使用环境是wsl+ubuntu +go mod download安装依赖 +./scripts/build.sh 会在根目录下编译出raftnode +./scripts/run.sh 运行三个节点,目前能在终端进行读入,leader(n1)节点输出send log,其余节点输出receive log。终端输入后如果超时就退出(脚本运行时间可以在其中调整)。 + +# 注意 +脚本第一次运行需要权限获取 chmod +x <脚本> +如果出现tcp listen error可能是因为之前的进程没用正常退出,占用了端口 +lsof -i :9091查看pid +kill -9 杀死进程 + +# todo list +消息通讯异常的处理 +kv本地持久化 +崩溃与恢复(以及对应的测试) \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..255daec --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,40 @@ +package main + +import ( + "flag" + "simple-kv-store/internal/logprovider" + "simple-kv-store/internal/nodes" + "go.uber.org/zap" + "strconv" + "strings" +) + +var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) + +func main() { + defer func() { + if err := recover(); err != nil { + log.Info("i get a panic", zap.Any("panic error", err)) + } + }() + + port := flag.String("port", ":9091", "rpc listen port") + cluster := flag.String("cluster", "127.0.0.1:9092,127.0.0.1:9093", "comma sep") + id := flag.Int("id", 1, "node ID") + pipe := flag.String("pipe", "", "input from scripts") + isLeader := flag.Bool("isleader", false, "init node state") + + // 参数解析 + flag.Parse() + clusters := strings.Split(*cluster, ",") + node := nodes.Init(*id, clusters, *pipe) + + log.Info("id: " + strconv.Itoa(*id) + "节点开始监听: " + *port + "端口") + + // 监听rpc + node.Rpc(*port) + // 开启 raft + nodes.Start(node, *isLeader) + + select {} +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d9c592b --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module simple-kv-store + +go 1.20 + +require go.uber.org/zap v1.24.0 + +require ( + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..081dfe1 --- /dev/null +++ b/go.sum @@ -0,0 +1,13 @@ +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= +go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/logprovider/log_fmt.go b/internal/logprovider/log_fmt.go new file mode 100644 index 0000000..afdaa76 --- /dev/null +++ b/internal/logprovider/log_fmt.go @@ -0,0 +1,24 @@ +package logprovider + +import "fmt" + +const ( + JsonLogFormat = "json" + ConsoleLogFormat = "console" +) + +var DefaultLogFormat = JsonLogFormat + +// ConvertToZapFormat converts and validated logprovider format string. +func ConvertToZapFormat(format string) (string, error) { + switch format { + case ConsoleLogFormat: + return ConsoleLogFormat, nil + case JsonLogFormat: + return JsonLogFormat, nil + case "": + return DefaultLogFormat, nil + default: + return "", fmt.Errorf("unknown logprovider format: %s, supported values json, console", format) + } +} diff --git a/internal/logprovider/log_level.go b/internal/logprovider/log_level.go new file mode 100644 index 0000000..552c7d1 --- /dev/null +++ b/internal/logprovider/log_level.go @@ -0,0 +1,14 @@ +package logprovider + +import "go.uber.org/zap/zapcore" + +var DefaultLogLevel = "info" + +// ConvertToZapLevel converts logprovider level string to zapcore.Level. +func ConvertToZapLevel(lvl string) zapcore.Level { + var level zapcore.Level + if err := level.Set(lvl); err != nil { + panic(err) + } + return level +} diff --git a/internal/logprovider/zap.go b/internal/logprovider/zap.go new file mode 100644 index 0000000..01d60de --- /dev/null +++ b/internal/logprovider/zap.go @@ -0,0 +1,55 @@ +package logprovider + +import ( + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "time" +) + +// CreateDefaultZapLogger creates a logger with default zap configuration +func CreateDefaultZapLogger(level zapcore.Level) (*zap.Logger, error) { + logCfg := DefaultZapLoggerConfig + logCfg.Level = zap.NewAtomicLevelAt(level) + c, err := logCfg.Build() + if err != nil { + return nil, err + } + return c, nil +} + +// DefaultZapLoggerConfig defines default zap logger configuration. +var DefaultZapLoggerConfig = zap.Config{ + Level: zap.NewAtomicLevelAt(ConvertToZapLevel(DefaultLogLevel)), + + Development: false, + Sampling: &zap.SamplingConfig{ + Initial: 100, + Thereafter: 100, + }, + + Encoding: DefaultLogFormat, + + // copied from "zap.NewProductionEncoderConfig" with some updates + EncoderConfig: zapcore.EncoderConfig{ + TimeKey: "ts", + LevelKey: "level", + NameKey: "logger", + CallerKey: "caller", + MessageKey: "msg", + StacktraceKey: "stacktrace", + LineEnding: zapcore.DefaultLineEnding, + EncodeLevel: zapcore.LowercaseLevelEncoder, + + // Custom EncodeTime function to ensure we match format and precision of historic capnslog timestamps + EncodeTime: func(t time.Time, enc zapcore.PrimitiveArrayEncoder) { + enc.AppendString(t.Format("2006-01-02T15:04:05.999999Z0700")) + }, + + EncodeDuration: zapcore.StringDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + }, + + // Use "/dev/null" to discard all + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, +} diff --git a/internal/nodes/init.go b/internal/nodes/init.go new file mode 100644 index 0000000..9c2117e --- /dev/null +++ b/internal/nodes/init.go @@ -0,0 +1,105 @@ +package nodes + +import ( + "io" + "net" + "net/http" + "net/rpc" + "os" + "simple-kv-store/internal/logprovider" + "time" + + "go.uber.org/zap" +) + +var log, _ = logprovider.CreateDefaultZapLogger(zap.InfoLevel) + +func newNode(address string) *Public_node_info { + return &Public_node_info{ + connect: false, + address: address, + } +} + +func Init(id int, nodeAddr []string, pipe string) *Node { + ns := make(map[int]*Public_node_info) + for k, v := range nodeAddr { + ns[k] = newNode(v) + } + + // 创建节点 + return &Node{ + self: id, + nodes: ns, + pipeAddr: pipe, + } +} + +func Start(node *Node, isLeader bool) { + if isLeader { + node.state = Candidate // 需要身份转变 + } else { + node.state = Follower + } + + go func() { + for { + switch node.state { + case Follower: + + case Candidate: + // candidate发布一个监听输入线程后,变成leader + node.state = Leader + go func() { + if node.pipeAddr == "" { + log.Error("暂不支持非管道读入") + } + + pipe, err := os.Open(node.pipeAddr) + if err != nil { + log.Error("Failed to open pipe") + } + defer pipe.Close() + + // 不断读取管道中的输入 + buffer := make([]byte, 256) + for { + n, err := pipe.Read(buffer) + if err != nil && err != io.EOF { + log.Error("Error reading from pipe") + } + if n > 0 { + input := string(buffer[:n]) + log.Info("send : " + input) + // 将用户输入封装成一个 LogEntry + kv := LogEntry{input, ""} + node.log = append(node.log, kv) + // 广播给其它节点 + node.BroadCastKV(kv) + } + } + }() + case Leader: + time.Sleep(50 * time.Millisecond) + } + } + }() +} + +func (node *Node) Rpc(port string) { + err := rpc.Register(node) + if err != nil { + log.Fatal("rpc register failed", zap.Error(err)) + } + rpc.HandleHTTP() + l, e := net.Listen("tcp", port) + if e != nil { + log.Fatal("listen error:", zap.Error(err)) + } + go func() { + err := http.Serve(l, nil) + if err != nil { + log.Fatal("http server error:", zap.Error(err)) + } + }() +} diff --git a/internal/nodes/log.go b/internal/nodes/log.go new file mode 100644 index 0000000..3880995 --- /dev/null +++ b/internal/nodes/log.go @@ -0,0 +1,10 @@ +package nodes + +type LogEntry struct { + Key string + Value string +} + +type KVReply struct { + Reply bool +} \ No newline at end of file diff --git a/internal/nodes/node.go b/internal/nodes/node.go new file mode 100644 index 0000000..504b212 --- /dev/null +++ b/internal/nodes/node.go @@ -0,0 +1,107 @@ +package nodes + +import ( + "net/rpc" + "go.uber.org/zap" +) + +type State = uint8 + +const ( + Follower State = iota + 1 + Candidate + Leader +) + +type Public_node_info struct { + connect bool + address string +} + +type Node struct { + // 当前节点id + self int + + // 除当前节点外其他节点信息 + nodes map[int]*Public_node_info + + //管道名 + pipeAddr string + + // 当前节点状态 + state State + + // 简单的kv存储 + log []LogEntry +} + +func (node *Node) BroadCastKV(kv LogEntry) { + // 遍历所有节点 + for i := range node.nodes { + go func (index int, kv LogEntry) { + var reply KVReply + node.sendKV(index, kv, &reply) + } (i, kv) + } +} + +func (node *Node) sendKV(index int, kv LogEntry, reply *KVReply) { + client, err := rpc.DialHTTP("tcp", node.nodes[index].address) + if err != nil { + log.Error("dialing: ", zap.Error(err)) + return + } + + defer func(client *rpc.Client) { + err := client.Close() + if err != nil { + log.Error("client close err: ", zap.Error(err)) + } + }(client) + + callErr := client.Call("Node.ReceiveKV", kv, reply) // RPC + if callErr != nil { + log.Error("dialing: ", zap.Error(callErr)) + } + + if reply.Reply { // 发送成功 + + } else { // 失败 + + } +} + +// RPC call +func (node *Node) ReceiveKV(kv LogEntry, reply *KVReply) error { + log.Info("receive: " + kv.Key) + reply.Reply = true; + return nil +} +// func (node *Node) broadcastHeartbeat() { +// // 遍历所有节点 +// for i := range raft.nodes { +// // request 参数 +// hb := Heartbeat{ +// Term: raft.currTerm, +// LeaderId: raft.self, +// CommitIndex: raft.commitIndex, +// } + +// prevLogIndex := raft.nextIndex[i] - 1 + +// // 如果有日志未同步则发送 +// if raft.getLastIndex() > prevLogIndex { +// hb.PrevLogIndex = prevLogIndex +// hb.PrevLogTerm = raft.log[prevLogIndex].CurrTerm +// hb.Entries = raft.log[prevLogIndex:] +// // log.Info("will send log entries", zap.Any("logEntries", hb.Entries)) +// } + +// go func(index int, hb Heartbeat) { +// var reply HeartbeatReply +// // 向某一个节点发送 heartbeat +// raft.sendHeartbeat(index, hb, &reply) +// }(i, hb) +// } +// } + diff --git a/scripts/build.sh b/scripts/build.sh new file mode 100755 index 0000000..f682f82 --- /dev/null +++ b/scripts/build.sh @@ -0,0 +1 @@ +go build -o raftnode ./cmd/main.go \ No newline at end of file diff --git a/scripts/run.sh b/scripts/run.sh new file mode 100755 index 0000000..4c250de --- /dev/null +++ b/scripts/run.sh @@ -0,0 +1,61 @@ +#!/bin/bash + +# 设置运行时间限制:s +RUN_TIME=10 + +# 需要传递数据的管道 +PIPE_NAME="/tmp/raft_input_pipe" + +# 启动节点1 +echo "Starting Node 1..." +timeout $RUN_TIME ./raftnode -id 1 -port ":9091" -cluster "127.0.0.1:9092,127.0.0.1:9093" -pipe "$PIPE_NAME" -isleader=true & + +# 启动节点2 +echo "Starting Node 2..." +timeout $RUN_TIME ./raftnode -id 2 -port ":9092" -cluster "127.0.0.1:9091,127.0.0.1:9093" -pipe "$PIPE_NAME" & + +# 启动节点3 +echo "Starting Node 3..." +timeout $RUN_TIME ./raftnode -id 3 -port ":9093" -cluster "127.0.0.1:9091,127.0.0.1:9092" -pipe "$PIPE_NAME"& + +echo "All nodes started successfully!" +# 创建一个管道用于进程间通信 +if [[ ! -p "$PIPE_NAME" ]]; then + mkfifo "$PIPE_NAME" +fi + +# 捕获终端输入并通过管道传递给三个节点 +echo "Enter input to send to nodes:" +start_time=$(date +%s) +while true; do + # 从终端读取用户输入 + read -r user_input + + current_time=$(date +%s) + elapsed_time=$((current_time - start_time)) + + # 如果运行时间大于限制时间,就退出 + if [ $elapsed_time -ge $RUN_TIME ]; then + echo 'Timeout reached, normal exit now' + break + fi + + # 如果输入为空,跳过 + if [[ -z "$user_input" ]]; then + continue + fi + + # 将用户输入发送到管道 + echo "$user_input" > "$PIPE_NAME" + + # 如果输入 "exit",结束脚本 + if [[ "$user_input" == "exit" ]]; then + break + fi +done + +# 删除管道 +rm "$PIPE_NAME" + +# 等待所有节点完成启动 +wait