2024-04-19
Go & 后端
00

目录

1 Day5 分布式节点
1 HTTP客户端
2 集成进geecache中
2 项目整体示意&流程图
3 测试文件

主要内容:

  1. 分布式节点 节点间通讯

1 Day5 分布式节点

想要实现分布式节点,先要明白GeeCache整体的服务过程,它的工作过程应该大致是这样的:

224d301f6fa4b8b8784c4b9c0ce20f8.jpg

要想要实现图中这个流程,就意味着一个缓存就得配一个HTTP服务端+客户端,这个客户端要有三个功能:第一、存储所有可用的节点;第二、对于给定的key,要能通过一致性哈希找到对应的远程节点,同时这个远程节点不能是自己;第三、请求这个远程节点。这三个功能完成之后,主缓存就可以直接调用函数来访问远程节点,而不用关心具体是哪个远程节点了。

1 HTTP客户端

先抽象出两个接口:

go
// src/geecache/peer.go package geecache // PeerPicker 接口 根据key挑选远程节点 type PeerPicker interface { PickPeer(key string) (peerGetter PeerCacheValueGetter, ok bool) } // PeerCacheValueGetter 接口 根据key和给定的group获取缓存值 type PeerCacheValueGetter interface { GetCacheFromPeer(group string, key string) ([]byte, error) }

第二个接口,原来的名字PeerGetter和方法Get实在太过于不明确了,我给改了个名字,之前的一些方法,接口也是,有一些实在容易分不清的也改名了。感觉作者是不是对Get这个词有点太喜欢了。

提示

我曾想过这里的这两个接口是否有一些多余,不过Github上的讨论区里作者的回复比较合理:

Q:第一步就抽象出接口,感觉不是很好理解。根据这篇文章https://blog.chewxy.com/2018/03/18/golang-interfaces/ ,是否可以考虑延迟定义接口。即首先定义类型/struct,在会使用到接口的时候再定义接口。

A:一般抽象出接口是为了扩展性,很多场景下需要优先抽象接口。比如 RPC 通信需要支持不同的编解码方式,那首先想到的是一个支持编解码的结构体需要支持哪些方法,即接口。GeeRPC第一天 服务端与消息编码 在这篇文章中体现了这种思考方式。

我觉得你说的也是有道理的,如果是比较确定的业务,优先 struct,需要扩展时再抽象接口更符合直觉一些。不过对于实现框架的童鞋来说,可扩展性是第一位的,所以一般都会优先设计接口。比如 go-micro 这个微服务框架,所有的参数都是接口类型的,这个框架是完全可插拔的,允许用户替换任意的类,只要 struct 实现了接口就行。

之后就是实现HTTP客户端的功能,并在原有的HTTPPool上追加功能。

go
// src/geecache/http.go // httpClient HTTP客户端 向远程节点发送请求 一个远程节点对应一个客户端 type httpClient struct { baseURL string } // GetCacheFromPeer 实现PeerCacheValueGetter接口 从远程节点获得缓存 func (h *httpClient) GetCacheFromPeer(group string, key string) (result []byte, err error) { url := fmt.Sprintf("%v%v/%v", h.baseURL, url.QueryEscape(group), url.QueryEscape(key)) resp, err := http.Get(url) if err != nil { result = nil return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { result = nil err = fmt.Errorf("server returned error: %v", resp.Status) } bytes, err := io.ReadAll(resp.Body) // 读取响应体 原有的ioutil.ReadAll方法被弃用 if err != nil { result = nil err = fmt.Errorf("reading response body error: %v", err) } result = bytes err = nil return } var _ PeerCacheValueGetter = (*httpClient)(nil) // 检查接口是否被完整实现
go
// src/geecache/http.go package geecache import ( "GeeCache/src/geecache/consistenthash" "fmt" "io" "log" "net/http" "net/url" "strings" "sync" ) const ( defaultBasePath = "/_geecache/" // 默认资源地址 // New defaultReplicas = 50 // 默认真实节点和虚拟节点倍数 ) // HTTPPool 一个缓存对应一个HTTP池 记录自身的地址和URL type HTTPPool struct { selfAddr string // 记录缓存自身的地址 包括端口 basePath string // 记录URL // New mu sync.Mutex // 锁 // New peers *consistenthash.Map // 记录所有远程节点 // New httpGetters map[string]*httpClient // 集成一个HTTP客户端 } // NewHTTPPool HTTPPool的构造方法 func NewHTTPPool(selfAddr string) (result *HTTPPool) { result = &HTTPPool{ selfAddr: selfAddr, basePath: defaultBasePath, } return } // Log 记录信息 参数v可传多个值 这些值会按format来进行格式化 再进入log func (pool *HTTPPool) Log(format string, v ...interface{}) { log.Printf("[Server %s] %s", pool.selfAddr, fmt.Sprintf(format, v...)) } // ServeHTTP HTTP服务端的Handler方法 对有效的缓存请求进行响应 func (pool *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !strings.HasPrefix(r.URL.Path, pool.basePath) { // 检查请求是否有效 panic("HTTPPool is serving unexpected path: " + r.URL.Path) } pool.Log("%s %s", r.Method, r.URL.Path) // log记录该次请求的信息 parts := strings.SplitN(r.URL.Path[len(pool.basePath):], "/", 2) if len(parts) != 2 { // 检查请求是否有效 http.Error(w, "bad request", http.StatusBadRequest) // 400 return } groupName := parts[0] key := parts[1] group := GetGroup(groupName) if group == nil { // 请求的缓存不存在 http.Error(w, "no such group:" + groupName, http.StatusNotFound) // 404 return } view, err := group.GetFromCache(key) if err != nil { // 缓存请求失败 http.Error(w, err.Error(), http.StatusInternalServerError) // 500 return } w.Header().Set("Content-Type", "application/octet-stream") _, err = w.Write(view.GetByteCopy()) if err != nil { // 写入响应失败 http.Error(w, "write into responseWriter error: " + err.Error(), http.StatusInternalServerError) // 500 return } } // New // SetNewPeer 在本节点初始化远程节点信息 func (p *HTTPPool) Set(peers ...string) { p.mu.Lock() // attention 加锁必要性? defer p.mu.Unlock() p.peers = consistenthash.NewMap(nil, defaultReplicas) p.peers.AddRealNode(peers...) p.httpGetters = make(map[string]*httpClient, len(peers)) for _, peer := range peers { p.httpGetters[peer] = &httpClient{baseURL: peer + p.basePath} } } // New // PickPeer 根据一致性哈希挑选合适的远程节点 func (p *HTTPPool) PickPeer(key string) (PeerCacheValueGetter, bool) { p.mu.Lock() defer p.mu.Unlock() if peer := p.peers.GetRealNodeByKey(key); peer != "" && peer != p.selfAddr { // 注意这里 这里排除了自身节点 p.Log("PickPeer picked %s", peer) return p.httpGetters[peer], true } return nil, false } var _ PeerPicker = (*HTTPPool)(nil) // 检查PeerPicker接口是否已完全实现

2 集成进geecache中

go
// src/geecache/geecache.go package geecache import ( "fmt" "log" "sync" ) // Getter 接口 规定了一个Get方法 该方法用于规定缓存未命中时从哪里获得新的缓存 type Getter interface { Get(key string) ([]byte, error) } // GetterFunc 函数类型 专门用来实现Getter接口的函数类型 type GetterFunc func(key string) ([]byte, error) // Get GetterFunc类下的 从Getter接口的Get函数实现而来的函数 func (function GetterFunc) Get(key string) ([]byte, error) { return function(key) } // Group 缓存对外交互的核心数据结构 type Group struct { name string // 该缓存的标识 getter Getter // 缓存未能命中时的回调函数 类型是Getter接口 mainCache cache // 缓存主体 是具有并发保护的LRU缓存 // New peers PeerPicker // 这是实现了PeerPicker的HTTPPool // attention 为什么要将远程节点集成进HTTPPool 而不是节点本身? 是否可以优化? } // 全局变量 var ( mu sync.RWMutex // 互斥锁的高级版本 读写锁 在原有的锁功能上 增加读写特性 当读锁锁定时 只会阻止写 同理当写锁锁定时 会同时阻止读写 groups = make(map[string]*Group) // 保存多个Group缓存 ) // NewGroup 构造函数 func NewGroup(name string, cacheBytes int64, getter Getter) *Group { if getter == nil { panic("nil getter") } mu.Lock() defer mu.Unlock() group := &Group{ name: name, getter: getter, mainCache: cache{cacheBytes: cacheBytes}, } groups[name] = group return group } // GetGroup 获取Group缓存 func GetGroup(name string) *Group { mu.RLock() g := groups[name] mu.RUnlock() return g } // GetFromCache 从缓存中获取值 func (g *Group) GetFromCache(key string) (ByteView, error) { if key == "" { return ByteView{}, fmt.Errorf("key is required") } if v, isOk := g.mainCache.get(key); isOk { // 缓存命中 log.Println("[GeeCache] hit") return v, nil } // 缓存未命中 调用load函数 return g.load(key) } // load 缓存未命中时 从别的地方加载缓存 func (g *Group) load(key string) (value ByteView, err error) { // New if g.peers != nil { if peer, ok := g.peers.PickPeer(key); ok { // 先从存储着远程节点信息的HTTPPool中选出具体的远程节点 if value, err := g.getFromPeer(peer, key); err != nil { // 再根据这个具体的远程节点开始请求 return value, nil } log.Println("[GeeCache] Failed to get from peer", err) } } return g.getFromLocal(key) } // getFromLocal 从本地加载缓存 在这里调用Getter的Get函数 并且通过populateCache存入缓存 func (g *Group) getFromLocal(key string) (ByteView, error) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{cacheBytes: cloneBytes(bytes)} g.populateCache(key, value) return value, nil } // populateCache 新的缓存值 存入缓存 func (g *Group) populateCache(key string, value ByteView) { g.mainCache.add(key, value) } // New // RegisterPeers 将初始化完成的HTTPPool注入到group中 仅一次 func (g *Group) RegisterPeers(peers PeerPicker) { if g.peers == nil { panic("RegisterPeerPicker called more than once") } g.peers = peers } // New // getFromPeer 从远程节点获得缓存 func (g *Group) getFromPeer(peer PeerCacheValueGetter, key string) (ByteView, error) { bytes, err := peer.GetCacheFromPeer(g.name, key) if err != nil { return ByteView{}, err } return ByteView{cacheBytes: bytes}, nil }

到此,GeeCache主体部分就完成了,之后几天的内容就是对GeeCache进行改进。

2 项目整体示意&流程图

aa203d2d34545afb6afb03262027c2a.jpg

3 测试文件

go
package main import ( "GeeCache/src/geecache" "flag" "fmt" "log" "net/http" ) var db = map[string]string{ "Tom": "630", "Jack": "589", "Sam": "567", } func createGroup() *geecache.Group { return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc( func(key string) ([]byte, error) { log.Println("[SlowDB] search key", key) if v, ok := db[key]; ok { log.Printf("[SlowDB] searched key: %s", v) return []byte(v), nil } return nil, fmt.Errorf("%s not exist", key) })) } func startCacheServer(addr string, addrs []string, gee *geecache.Group) { peers := geecache.NewHTTPPool(addr) peers.SetNewPeer(addrs...) gee.RegisterPeers(peers) log.Println("geecache is running at", addr) log.Fatal(http.ListenAndServe(addr[7:], peers)) } func startAPIServer(apiAddr string, gee *geecache.Group) { http.Handle("/api", http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { key := r.URL.Query().Get("key") view, err := gee.GetFromCache(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/octet-stream") w.Write(view.GetByteCopy()) })) log.Println("fontend server is running at", apiAddr) log.Fatal(http.ListenAndServe(apiAddr[7:], nil)) } func main() { var port int var api bool flag.IntVar(&port, "port", 8001, "Geecache server port") flag.BoolVar(&api, "api", false, "Start a api server?") flag.Parse() apiAddr := "http://localhost:9999" addrMap := map[int]string{ 8001: "http://localhost:8001", 8002: "http://localhost:8002", 8003: "http://localhost:8003", } var addrs []string for _, v := range addrMap { addrs = append(addrs, v) } gee := createGroup() if api { go startAPIServer(apiAddr, gee) } startCacheServer(addrMap[port], []string(addrs), gee) }

测试结果:

image.png

注意不要让它跑在协程下,会出问题。

本文作者:御坂19327号

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!