主要内容:
想要实现分布式节点,先要明白GeeCache整体的服务过程,它的工作过程应该大致是这样的:
要想要实现图中这个流程,就意味着一个缓存就得配一个HTTP服务端+客户端,这个客户端要有三个功能:第一、存储所有可用的节点;第二、对于给定的key,要能通过一致性哈希找到对应的远程节点,同时这个远程节点不能是自己;第三、请求这个远程节点。这三个功能完成之后,主缓存就可以直接调用函数来访问远程节点,而不用关心具体是哪个远程节点了。
先抽象出两个接口:
go// src/geecache/peer.go
package geecache
// PeerPicker 接口 根据key挑选远程节点
type PeerPicker interface {
PickPeer(key string) (peerGetter PeerCacheValueGetter, ok bool)
}
// PeerCacheValueGetter 接口 根据key和给定的group获取缓存值
type PeerCacheValueGetter interface {
GetCacheFromPeer(group string, key string) ([]byte, error)
}
第二个接口,原来的名字PeerGetter和方法Get实在太过于不明确了,我给改了个名字,之前的一些方法,接口也是,有一些实在容易分不清的也改名了。
感觉作者是不是对Get这个词有点太喜欢了。
提示
我曾想过这里的这两个接口是否有一些多余,不过Github上的讨论区里作者的回复比较合理:
Q:第一步就抽象出接口,感觉不是很好理解。根据这篇文章https://blog.chewxy.com/2018/03/18/golang-interfaces/ ,是否可以考虑延迟定义接口。即首先定义类型/struct,在会使用到接口的时候再定义接口。
A:一般抽象出接口是为了扩展性,很多场景下需要优先抽象接口。比如 RPC 通信需要支持不同的编解码方式,那首先想到的是一个支持编解码的结构体需要支持哪些方法,即接口。GeeRPC第一天 服务端与消息编码 在这篇文章中体现了这种思考方式。
我觉得你说的也是有道理的,如果是比较确定的业务,优先 struct,需要扩展时再抽象接口更符合直觉一些。不过对于实现框架的童鞋来说,可扩展性是第一位的,所以一般都会优先设计接口。比如 go-micro 这个微服务框架,所有的参数都是接口类型的,这个框架是完全可插拔的,允许用户替换任意的类,只要 struct 实现了接口就行。
之后就是实现HTTP客户端的功能,并在原有的HTTPPool上追加功能。
go// src/geecache/http.go
// httpClient HTTP客户端 向远程节点发送请求 一个远程节点对应一个客户端
type httpClient struct {
baseURL string
}
// GetCacheFromPeer 实现PeerCacheValueGetter接口 从远程节点获得缓存
func (h *httpClient) GetCacheFromPeer(group string, key string) (result []byte, err error) {
url := fmt.Sprintf("%v%v/%v", h.baseURL, url.QueryEscape(group), url.QueryEscape(key))
resp, err := http.Get(url)
if err != nil {
result = nil
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
result = nil
err = fmt.Errorf("server returned error: %v", resp.Status)
}
bytes, err := io.ReadAll(resp.Body) // 读取响应体 原有的ioutil.ReadAll方法被弃用
if err != nil {
result = nil
err = fmt.Errorf("reading response body error: %v", err)
}
result = bytes
err = nil
return
}
var _ PeerCacheValueGetter = (*httpClient)(nil) // 检查接口是否被完整实现
go// src/geecache/http.go
package geecache
import (
"GeeCache/src/geecache/consistenthash"
"fmt"
"io"
"log"
"net/http"
"net/url"
"strings"
"sync"
)
const (
defaultBasePath = "/_geecache/" // 默认资源地址
// New
defaultReplicas = 50 // 默认真实节点和虚拟节点倍数
)
// HTTPPool 一个缓存对应一个HTTP池 记录自身的地址和URL
type HTTPPool struct {
selfAddr string // 记录缓存自身的地址 包括端口
basePath string // 记录URL
// New
mu sync.Mutex // 锁
// New
peers *consistenthash.Map // 记录所有远程节点
// New
httpGetters map[string]*httpClient // 集成一个HTTP客户端
}
// NewHTTPPool HTTPPool的构造方法
func NewHTTPPool(selfAddr string) (result *HTTPPool) {
result = &HTTPPool{
selfAddr: selfAddr,
basePath: defaultBasePath,
}
return
}
// Log 记录信息 参数v可传多个值 这些值会按format来进行格式化 再进入log
func (pool *HTTPPool) Log(format string, v ...interface{}) {
log.Printf("[Server %s] %s", pool.selfAddr, fmt.Sprintf(format, v...))
}
// ServeHTTP HTTP服务端的Handler方法 对有效的缓存请求进行响应
func (pool *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.Path, pool.basePath) { // 检查请求是否有效
panic("HTTPPool is serving unexpected path: " + r.URL.Path)
}
pool.Log("%s %s", r.Method, r.URL.Path) // log记录该次请求的信息
parts := strings.SplitN(r.URL.Path[len(pool.basePath):], "/", 2)
if len(parts) != 2 { // 检查请求是否有效
http.Error(w, "bad request", http.StatusBadRequest) // 400
return
}
groupName := parts[0]
key := parts[1]
group := GetGroup(groupName)
if group == nil { // 请求的缓存不存在
http.Error(w, "no such group:" + groupName, http.StatusNotFound) // 404
return
}
view, err := group.GetFromCache(key)
if err != nil { // 缓存请求失败
http.Error(w, err.Error(), http.StatusInternalServerError) // 500
return
}
w.Header().Set("Content-Type", "application/octet-stream")
_, err = w.Write(view.GetByteCopy())
if err != nil { // 写入响应失败
http.Error(w, "write into responseWriter error: " + err.Error(), http.StatusInternalServerError) // 500
return
}
}
// New
// SetNewPeer 在本节点初始化远程节点信息
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock() // attention 加锁必要性?
defer p.mu.Unlock()
p.peers = consistenthash.NewMap(nil, defaultReplicas)
p.peers.AddRealNode(peers...)
p.httpGetters = make(map[string]*httpClient, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpClient{baseURL: peer + p.basePath}
}
}
// New
// PickPeer 根据一致性哈希挑选合适的远程节点
func (p *HTTPPool) PickPeer(key string) (PeerCacheValueGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if peer := p.peers.GetRealNodeByKey(key); peer != "" && peer != p.selfAddr { // 注意这里 这里排除了自身节点
p.Log("PickPeer picked %s", peer)
return p.httpGetters[peer], true
}
return nil, false
}
var _ PeerPicker = (*HTTPPool)(nil) // 检查PeerPicker接口是否已完全实现
go// src/geecache/geecache.go
package geecache
import (
"fmt"
"log"
"sync"
)
// Getter 接口 规定了一个Get方法 该方法用于规定缓存未命中时从哪里获得新的缓存
type Getter interface {
Get(key string) ([]byte, error)
}
// GetterFunc 函数类型 专门用来实现Getter接口的函数类型
type GetterFunc func(key string) ([]byte, error)
// Get GetterFunc类下的 从Getter接口的Get函数实现而来的函数
func (function GetterFunc) Get(key string) ([]byte, error) {
return function(key)
}
// Group 缓存对外交互的核心数据结构
type Group struct {
name string // 该缓存的标识
getter Getter // 缓存未能命中时的回调函数 类型是Getter接口
mainCache cache // 缓存主体 是具有并发保护的LRU缓存
// New
peers PeerPicker // 这是实现了PeerPicker的HTTPPool
// attention 为什么要将远程节点集成进HTTPPool 而不是节点本身? 是否可以优化?
}
// 全局变量
var (
mu sync.RWMutex // 互斥锁的高级版本 读写锁 在原有的锁功能上 增加读写特性 当读锁锁定时 只会阻止写 同理当写锁锁定时 会同时阻止读写
groups = make(map[string]*Group) // 保存多个Group缓存
)
// NewGroup 构造函数
func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil getter")
}
mu.Lock()
defer mu.Unlock()
group := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
}
groups[name] = group
return group
}
// GetGroup 获取Group缓存
func GetGroup(name string) *Group {
mu.RLock()
g := groups[name]
mu.RUnlock()
return g
}
// GetFromCache 从缓存中获取值
func (g *Group) GetFromCache(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}
if v, isOk := g.mainCache.get(key); isOk { // 缓存命中
log.Println("[GeeCache] hit")
return v, nil
}
// 缓存未命中 调用load函数
return g.load(key)
}
// load 缓存未命中时 从别的地方加载缓存
func (g *Group) load(key string) (value ByteView, err error) {
// New
if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok { // 先从存储着远程节点信息的HTTPPool中选出具体的远程节点
if value, err := g.getFromPeer(peer, key); err != nil { // 再根据这个具体的远程节点开始请求
return value, nil
}
log.Println("[GeeCache] Failed to get from peer", err)
}
}
return g.getFromLocal(key)
}
// getFromLocal 从本地加载缓存 在这里调用Getter的Get函数 并且通过populateCache存入缓存
func (g *Group) getFromLocal(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{cacheBytes: cloneBytes(bytes)}
g.populateCache(key, value)
return value, nil
}
// populateCache 新的缓存值 存入缓存
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}
// New
// RegisterPeers 将初始化完成的HTTPPool注入到group中 仅一次
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers == nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}
// New
// getFromPeer 从远程节点获得缓存
func (g *Group) getFromPeer(peer PeerCacheValueGetter, key string) (ByteView, error) {
bytes, err := peer.GetCacheFromPeer(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{cacheBytes: bytes}, nil
}
到此,GeeCache主体部分就完成了,之后几天的内容就是对GeeCache进行改进。
gopackage main
import (
"GeeCache/src/geecache"
"flag"
"fmt"
"log"
"net/http"
)
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}
func createGroup() *geecache.Group {
return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
log.Printf("[SlowDB] searched key: %s", v)
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))
}
func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
peers := geecache.NewHTTPPool(addr)
peers.SetNewPeer(addrs...)
gee.RegisterPeers(peers)
log.Println("geecache is running at", addr)
log.Fatal(http.ListenAndServe(addr[7:], peers))
}
func startAPIServer(apiAddr string, gee *geecache.Group) {
http.Handle("/api", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
view, err := gee.GetFromCache(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(view.GetByteCopy())
}))
log.Println("fontend server is running at", apiAddr)
log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
}
func main() {
var port int
var api bool
flag.IntVar(&port, "port", 8001, "Geecache server port")
flag.BoolVar(&api, "api", false, "Start a api server?")
flag.Parse()
apiAddr := "http://localhost:9999"
addrMap := map[int]string{
8001: "http://localhost:8001",
8002: "http://localhost:8002",
8003: "http://localhost:8003",
}
var addrs []string
for _, v := range addrMap {
addrs = append(addrs, v)
}
gee := createGroup()
if api {
go startAPIServer(apiAddr, gee)
}
startCacheServer(addrMap[port], []string(addrs), gee)
}
测试结果:
注意不要让它跑在协程下,会出问题。
本文作者:御坂19327号
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!