主要内容:
项目本身分成缓存+内存中的索引+硬盘中的文件三部分,对外的用户只提供一个Set和Get接口。数据进来先进缓存,然后再进数据库,以期提供最快的读写速度。内存中的索引根据进入的数据类型不同,进入各自的索引,并且再次进入硬盘中的文件。项目整体采用bitcask模型,它的流程图是这样的:
不同的索引所使用的数据结构不同。例如String类型的数据用的就是跳表,list用的就是map等等。每个索引内部保存的是从key到entry的映射
跳表是一种基于链表的数据结构,它是有序的,它的目的是做到比普通链表查询更快的同时,保留链表插入快的特点,但是它的缺点是,由于使用了多层链表来链接一个链表的内容,所以它比普通链表占用内存更大。
它的示意图和实际的结构图是这样的:
可以在示意图中看到,由于多层索引的机制,从1开始寻找5节点时不再需要经过4个节点,而是直接从1到5即可,大大缩短了寻找的时间。但是这个示意图仅仅是作为参考,它不是实际跳表在内存中的结构。
提示
跳表和平衡树一样是需要维护的,否则就会退化。平衡树如果操作过多而不去及时修正,它的效率就会逐渐退化为和链表无异。跳表也是一样的,以上图为例,在什么操作都没有的情况下,访问4的顺序是1 -> 3 -> 4。如果删除3,则访问4的顺序和访问一个链表相同,这就是跳表的退化。
跳表避免退化的方式是随机晋升。图中的索引晋升是每2个节点就晋升一次。如果更换为每次随机一个0到1之间的小数,并使其和0.5进行比较,每新增一个节点,就以这种方式决定它的索引应该有的高度。这样构建的跳表,在节点足够多时,随机索引的效率就能无限逼近严格以每2个节点就晋升一次而构建的索引的效率;而删除任何的元素,剩余的索引在每个高度上的比重也不会改变,这样就避免了退化。
根据实际的结构图可知,每个节点都会存储一个索引的数组,数组里保存节点在每层索引上应该指向的下一个节点,这个数组根据该节点的索引高度来定制长度。
另外还有图没画上去的一个事是,跳表需要头节点。这个头节点的索引数组的长度是跳表的索引的最高高度,它指向每层索引的第一个节点。如果这层索引没元素就指向nil。
在上面的分析中,已经可以知道跳表所需要的结构:
另外,它还需要一个随机函数来随机索引高度,一个构造函数和一个toString方便随时检查跳表内容。
goconst (
indexUpgradeProbability = 0.5 // 索引晋升概率
indexMaxHeight = 32 // 索引最高高度
// 这种情况下跳表存储节点最好不要超过2^32个
)
type (
// SkipList 跳表的简单实现
SkipList struct {
head *skipListNode // 头节点
length uint32 // 跳表当前长度
height uint32 // 跳表当前高度 注意这个高度是1的时候 对应的索引是在index[0]
mtx sync.RWMutex // 读写锁 上并发保护用的
}
// index 具体的索引 指向节点在某一层的下一个节点
index struct {
nextNode *skipListNode
}
// skipListNode 跳表节点
skipListNode struct {
key string // 键
value any // 值 值可以是任何类型
indexLevel []index // 存储该节点在任意层的索引
}
)
// NewSkipList 跳表的构造函数 默认索引晋升概率0.5 默认索引最高高度为32
func NewSkipList() *SkipList {
return &SkipList{
length: 0,
head: &skipListNode{
indexLevel: make([]index, indexMaxHeight),
},
height: 1,
}
}
// randomLevel 为每个新加入的节点随机一个索引高度
func (sl *SkipList) randomLevel() int {
l := 1
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for r.Float64() < indexUpgradeProbability && l < indexMaxHeight {
l += 1
}
return l
}
// ToString 将跳表所存储的所有节点的键和索引高度信息转换为字符串的形式并返回 仅作为检查使用 pass
func (sl *SkipList) ToString() string {
pointer := sl.head
result := make([]string, sl.length + 1)
for i := 1; i <= int(sl.length); i++ {
pointer = pointer.indexLevel[0].nextNode
result[i] = "Key " + strconv.Itoa(i) + ": " + pointer.key + ", Index Height: " + strconv.Itoa(len(pointer.indexLevel))
}
return strings.Join(result, "\n")
}
参考链表的新增节点操作,链表需要先找到要新增节点的位置的前一个节点,然后再操作添加。跳表也是一样的,只是这里要找到的是新节点的随机索引高度下的每一层新增节点的位置的前一个节点。
就像这个图,对于新增的节点9而言,它的索引高度是4,那么它要找的所有“前一个节点”包括header,1,7和8,一层对应一个“前一个节点”。找到所有节点之后,对每个节点按链表的方式进行新增即可。记得并发保护和更新高度,如果新增的节点的索引高度比目前的所有节点的索引高度都要高,就必须更新高度。
go// AddNode 向跳表中添加节点 节点键值不允许相同 pass
func (sl *SkipList) AddNode(key string, value any) (err error) {
// 准备阶段
indexHeight := sl.randomLevel()
newNode := &skipListNode{
indexLevel: make([]index, indexHeight),
key: key,
value: value,
}
update := make([]*index, indexHeight)
pointer := sl.head
// 确定update 确定每层索引插入位置
sl.mtx.RLock()
for i := range update {
pointer = sl.head
for pointer.indexLevel[i].nextNode != nil {
if strings.Compare(key, pointer.indexLevel[i].nextNode.key) < 0 { // 参数s1 s2 如果结果为-1 说明s1 < s2
update[i] = &pointer.indexLevel[i]
break
} else if strings.Compare(key, pointer.indexLevel[i].nextNode.key) == 0 { // 如果结果为0 说明s1 == s2
return errors.New("Add Key: " + key + " is Existed! \n")
}
pointer = pointer.indexLevel[i].nextNode
}
if update[i] == nil { // 说明插入的元素是当前高度索引中最大的
update[i] = &pointer.indexLevel[i]
}
}
sl.mtx.RUnlock()
// 开始插入
sl.mtx.Lock()
for i := range update {
pointer = update[i].nextNode
update[i].nextNode = newNode
newNode.indexLevel[i].nextNode = pointer
}
if uint32(indexHeight) > sl.height { // 更新索引最高高度
sl.height = uint32(indexHeight)
}
sl.length += 1
sl.mtx.Unlock()
return nil
}
删其实和新增很像,它也要寻找每一层索引上的“前一个节点”,只是这次不知道要删除的元素的索引高度,所以必须所有高度的索引都要检查一遍。另外,由于第一层索引会包括所有节点,因此对第一层索引检查时如果找不到节点,就说明节点不存在。最后也别忘记并发保护和重置高度,如果当前节点和跳表保存的索引高度一致,就需要重置高度。
go// DeleteNode 删除指定节点 pass
func (sl *SkipList) DeleteNode(key string) (err error) {
sl.mtx.RLock()
update := make([]*index, sl.height) // 存储要修改的各层索引
for i := uint32(0); i <= sl.height - 1; i++ { // 开始寻找各层的指向要删除的节点的索引 从最底层找起
pointer := &sl.head.indexLevel[i]
for pointer.nextNode != nil && pointer.nextNode.key < key {
pointer = &pointer.nextNode.indexLevel[i]
}
if i == 0 { // 只有在最下面的索引才能检查该元素是否存在
if pointer.nextNode == nil || pointer.nextNode.key != key {
err = errors.New("Delete Key: " + key + " is not Existed! \n")
sl.mtx.RUnlock()
return
}
}
if pointer.nextNode != nil && pointer.nextNode.key == key { // 只有确定了是要删除的节点才能进行更新 上面循环完成的两种情况都要考虑 不然就会空指针
update[i] = pointer
}
}
deleteNode := update[0].nextNode // 具体要删除的节点
sl.mtx.RUnlock() // 寻找节点完成 开始删除
sl.mtx.Lock()
for i, v := range update {
if v == nil {
continue
} else {
v.nextNode = deleteNode.indexLevel[i].nextNode // 修改指向
}
}
err = nil
sl.length -= 1
if update[sl.height - 1] != nil { // 如果删除的节点的索引高度和当前跳表的索引高度一致 就有高度下降的可能 需要重置高度
for i := range sl.head.indexLevel {
if sl.head.indexLevel[i].nextNode == nil {
sl.height = uint32(i)
}
}
}
sl.mtx.Unlock()
return
}
先说查,查分两种,因为跳表是有序的,因此它可以查单节点的值,也可以查节点区间的所有值。这再加上改的需求,就可以写一个查单个节点本身的方法来避免重复代码。
查单节点,就是从最高的节点开始,一步一步向下找的过程。注意在一层索引循环到尽头时,如果这层索引不是最下面一层,就要下降一层。其他的和在链表中查询节点区别不是很大。
go// internalQueryNode 仅用于内部的查询指定节点的方法 如果存在则返回对应节点 如果不存在则返回空和一个error pass
func (sl *SkipList) internalQueryNode(key string) (result *skipListNode, err error) {
// 加读锁
sl.mtx.RLock()
defer sl.mtx.RUnlock()
// 先找到最高的索引
pointer := sl.head
height := sl.height
// 从最高的索引开始 一步一步向下开始寻找值
for {
if pointer.indexLevel[height - 1].nextNode == nil { // 如果当前层的索引循环完成仍然找不到值 就尝试使索引下降一个高度
if height == 1 {
break
} else {
height -= 1 // 索引已经到最低 无法再下降 说明找不到元素
continue
}
}
if strings.Compare(key, pointer.indexLevel[height - 1].nextNode.key) < 0 { // key较小 尝试使索引下降一个高度
if height == 1 {
break
} else {
height -= 1
continue
}
} else if strings.Compare(key, pointer.indexLevel[height - 1].nextNode.key) > 0 { // key较大 按当前高度索引继续向前寻找
pointer = pointer.indexLevel[height - 1].nextNode
} else {
result = pointer.indexLevel[height - 1].nextNode
err = nil
return
}
}
err = errors.New("Query Key: " + key + " is not Existed! \n")
result = nil
return
}
// QueryNode 跳表的单节点查询方法 返回给定键所对应的值 pass
func (sl *SkipList) QueryNode(key string) (value any, err error) {
result, e := sl.internalQueryNode(key)
if e != nil {
return nil, e
} else {
return result.value, nil
}
}
单节点的写完,多节点就好说了。先按单节点找到起始节点本身,然后按照最底层的索引一个节点一个节点找过去,同时记录过程中的节点的值,直到找到终点节点为止即可。
go// QueryNodeInterval 按给定的key1到key2的范围进行区间查询 返回这个区间(含两侧)内的所有value pass
func (sl *SkipList) QueryNodeInterval(key1 string, key2 string) (values []any, err error) {
if strings.Compare(key1, key2) > 0 { // key1大于key2时交换 强制key1必须小于key2
key1, key2 = key2, key1
} else if strings.Compare(key1, key2) == 0 { // key1等于key2时等于请求一个元素
value, e := sl.QueryNode(key1)
if e != nil {
return nil, e
} else {
return []any{value}, nil
}
}
node1, e := sl.internalQueryNode(key1) // 先找起点
if e != nil {
return nil, e
}
values = append(values, node1.value)
sl.mtx.RLock()
node2 := node1.indexLevel[0].nextNode
for node2 != nil { // 在找终点的过程中顺便添加值
if node2.key != key2 {
values = append(values, node2.value)
node2 = node2.indexLevel[0].nextNode
} else {
break
}
}
if node2 == nil {
e = errors.New("Query Key2: " + key2 + " is not Existed! \n")
return nil, e
}
values = append(values, node2.value)
sl.mtx.RUnlock()
return values, nil
}
最后是改,在已经有一个寻找单节点本身的方法下,这个方法实现就极为容易了。
go// SetNode 修改指定节点的值
func (sl *SkipList) SetNode(key string, value any) (err error) {
setNode, e := sl.internalQueryNode(key)
if e != nil {
return e
} else {
setNode.value = value
return nil
}
}
go// skipList.go
package skipList
import (
"errors"
"math/rand"
"strconv"
"strings"
"sync"
"time"
)
const (
indexUpgradeProbability = 0.5 // 索引晋升概率
indexMaxHeight = 32 // 索引最高高度
// 这种情况下跳表存储节点最好不要超过2^32个
)
type (
// SkipList 跳表的简单实现
SkipList struct {
head *skipListNode // 头节点
length uint32 // 跳表当前长度
height uint32 // 跳表当前高度 注意这个高度是1的时候 对应的索引是在index[0]
mtx sync.RWMutex // 读写锁 上并发保护用的
}
// index 具体的索引 指向节点在某一层的下一个节点
index struct {
nextNode *skipListNode
}
// skipListNode 跳表节点
skipListNode struct {
key string // 键
value any // 值 值可以是任何类型
indexLevel []index // 存储该节点在任意层的索引
}
)
// NewSkipList 跳表的构造函数 默认索引晋升概率0.5 默认索引最高高度为32
func NewSkipList() *SkipList {
return &SkipList{
length: 0,
head: &skipListNode{
indexLevel: make([]index, indexMaxHeight),
},
height: 1,
}
}
// randomLevel 为每个新加入的节点随机一个索引高度
func (sl *SkipList) randomLevel() int {
l := 1
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for r.Float64() < indexUpgradeProbability && l < indexMaxHeight {
l += 1
}
return l
}
// ToString 将跳表所存储的所有节点的键和索引高度信息转换为字符串的形式并返回 仅作为检查使用 pass
func (sl *SkipList) ToString() string {
pointer := sl.head
result := make([]string, sl.length + 1)
for i := 1; i <= int(sl.length); i++ {
pointer = pointer.indexLevel[0].nextNode
result[i] = "Key " + strconv.Itoa(i) + ": " + pointer.key + ", Index Height: " + strconv.Itoa(len(pointer.indexLevel))
}
return strings.Join(result, "\n")
}
// internalQueryNode 仅用于内部的查询指定节点的方法 如果存在则返回对应节点 如果不存在则返回空和一个error pass
func (sl *SkipList) internalQueryNode(key string) (result *skipListNode, err error) {
// 加读锁
sl.mtx.RLock()
defer sl.mtx.RUnlock()
// 先找到最高的索引
pointer := sl.head
height := sl.height
// 从最高的索引开始 一步一步向下开始寻找值
for {
if pointer.indexLevel[height - 1].nextNode == nil { // 如果当前层的索引循环完成仍然找不到值 就尝试使索引下降一个高度
if height == 1 {
break
} else {
height -= 1 // 索引已经到最低 无法再下降 说明找不到元素
continue
}
}
if strings.Compare(key, pointer.indexLevel[height - 1].nextNode.key) < 0 { // key较小 尝试使索引下降一个高度
if height == 1 {
break
} else {
height -= 1
continue
}
} else if strings.Compare(key, pointer.indexLevel[height - 1].nextNode.key) > 0 { // key较大 按当前高度索引继续向前寻找
pointer = pointer.indexLevel[height - 1].nextNode
} else {
result = pointer.indexLevel[height - 1].nextNode
err = nil
return
}
}
err = errors.New("Query Key: " + key + " is not Existed! \n")
result = nil
return
}
// QueryNode 跳表的单节点查询方法 返回给定键所对应的值 pass
func (sl *SkipList) QueryNode(key string) (value any, err error) {
result, e := sl.internalQueryNode(key)
if e != nil {
return nil, e
} else {
return result.value, nil
}
}
// QueryNodeInterval 按给定的key1到key2的范围进行区间查询 返回这个区间(含两侧)内的所有value pass
func (sl *SkipList) QueryNodeInterval(key1 string, key2 string) (values []any, err error) {
if strings.Compare(key1, key2) > 0 { // key1大于key2时交换 强制key1必须小于key2
key1, key2 = key2, key1
} else if strings.Compare(key1, key2) == 0 { // key1等于key2时等于请求一个元素
value, e := sl.QueryNode(key1)
if e != nil {
return nil, e
} else {
return []any{value}, nil
}
}
node1, e := sl.internalQueryNode(key1) // 先找起点
if e != nil {
return nil, e
}
values = append(values, node1.value)
sl.mtx.RLock()
node2 := node1.indexLevel[0].nextNode
for node2 != nil { // 在找终点的过程中顺便添加值
if node2.key != key2 {
values = append(values, node2.value)
node2 = node2.indexLevel[0].nextNode
} else {
break
}
}
if node2 == nil {
e = errors.New("Query Key2: " + key2 + " is not Existed! \n")
return nil, e
}
values = append(values, node2.value)
sl.mtx.RUnlock()
return values, nil
}
// SetNode 修改指定节点的值
func (sl *SkipList) SetNode(key string, value any) (err error) {
setNode, e := sl.internalQueryNode(key)
if e != nil {
return e
} else {
setNode.value = value
return nil
}
}
// DeleteNode 删除指定节点 pass
func (sl *SkipList) DeleteNode(key string) (err error) {
sl.mtx.RLock()
update := make([]*index, sl.height) // 存储要修改的各层索引
for i := uint32(0); i <= sl.height - 1; i++ { // 开始寻找各层的指向要删除的节点的索引 从最底层找起
pointer := &sl.head.indexLevel[i]
for pointer.nextNode != nil && pointer.nextNode.key < key {
pointer = &pointer.nextNode.indexLevel[i]
}
if i == 0 { // 只有在最下面的索引才能检查该元素是否存在
if pointer.nextNode == nil || pointer.nextNode.key != key {
err = errors.New("Delete Key: " + key + " is not Existed! \n")
sl.mtx.RUnlock()
return
}
}
if pointer.nextNode != nil && pointer.nextNode.key == key { // 只有确定了是要删除的节点才能进行更新 上面循环完成的两种情况都要考虑 不然就会空指针
update[i] = pointer
}
}
deleteNode := update[0].nextNode // 具体要删除的节点
sl.mtx.RUnlock() // 寻找节点完成 开始删除
sl.mtx.Lock()
for i, v := range update {
if v == nil {
continue
} else {
v.nextNode = deleteNode.indexLevel[i].nextNode // 修改指向
}
}
err = nil
sl.length -= 1
if update[sl.height - 1] != nil { // 如果删除的节点的索引高度和当前跳表的索引高度一致 就有高度下降的可能 需要重置高度
for i := range sl.head.indexLevel {
if sl.head.indexLevel[i].nextNode == nil {
sl.height = uint32(i)
}
}
}
sl.mtx.Unlock()
return
}
// AddNode 向跳表中添加节点 节点键值不允许相同 pass
func (sl *SkipList) AddNode(key string, value any) (err error) {
// 准备阶段
indexHeight := sl.randomLevel()
newNode := &skipListNode{
indexLevel: make([]index, indexHeight),
key: key,
value: value,
}
update := make([]*index, indexHeight)
pointer := sl.head
// 确定update 确定每层索引插入位置
sl.mtx.RLock()
for i := range update {
pointer = sl.head
for pointer.indexLevel[i].nextNode != nil {
if strings.Compare(key, pointer.indexLevel[i].nextNode.key) < 0 { // 参数s1 s2 如果结果为-1 说明s1 < s2
update[i] = &pointer.indexLevel[i]
break
} else if strings.Compare(key, pointer.indexLevel[i].nextNode.key) == 0 { // 如果结果为0 说明s1 == s2
return errors.New("Add Key: " + key + " is Existed! \n")
}
pointer = pointer.indexLevel[i].nextNode
}
if update[i] == nil { // 说明插入的元素是当前高度索引中最大的
update[i] = &pointer.indexLevel[i]
}
}
sl.mtx.RUnlock()
// 开始插入
sl.mtx.Lock()
for i := range update {
pointer = update[i].nextNode
update[i].nextNode = newNode
newNode.indexLevel[i].nextNode = pointer
}
if uint32(indexHeight) > sl.height { // 更新索引最高高度
sl.height = uint32(indexHeight)
}
sl.length += 1
sl.mtx.Unlock()
return nil
}
go// skipList_test.go
package skipList
import (
"strconv"
"strings"
"testing"
)
func TestSkipList_AddNode(t *testing.T) {
time := 2000
sl := NewSkipList()
for i := 0; i < time; i++ {
testValue := "TestValue" + strconv.Itoa(i)
err := sl.AddNode(strconv.Itoa(i), &testValue)
if err != nil {
t.Log(err)
}
}
t.Log("add node is completed")
s := sl.ToString()
t.Log(s)
}
func TestSkipList_QueryNode(t *testing.T) {
time := 100000
sl := NewSkipList()
for i := 0; i < time; i++ {
testValue := "TestValue" + strconv.Itoa(i)
err := sl.AddNode(strconv.Itoa(i), &testValue)
if err != nil {
t.Log(err)
}
}
for i := 0; i < time + 5; i++ {
queryValue, e := sl.QueryNode(strconv.Itoa(i))
if e != nil {
t.Log(e.Error())
continue
}
if *queryValue.(*string) != "TestValue" + strconv.Itoa(i) {
t.Errorf("Error :%d %s", i, *queryValue.(*string))
} else {
t.Log("Success: " + strconv.Itoa(i))
}
}
}
func TestSkipList_DeleteNode(t *testing.T) {
time := 2000
sl := NewSkipList()
for i := 0; i < time; i++ {
testValue := "TestValue" + strconv.Itoa(i)
err := sl.AddNode(strconv.Itoa(i), &testValue)
if err != nil {
t.Log(err)
}
}
for i := 0; i < time + 5; i++ {
err := sl.DeleteNode(strconv.Itoa(i))
_, err2 := sl.QueryNode(strconv.Itoa(i))
if err2 != nil {
t.Log(err2)
}
if err != nil {
t.Log(err)
} else {
t.Log("Success: " + strconv.Itoa(i))
}
}
}
func temp(p []any) []string {
result := make([]string, len(p))
for i, v := range p {
result[i] = *(v.(*string))
}
return result
}
func TestSkipList_QueryNodeInterval(t *testing.T) {
time := 2000
sl := NewSkipList()
for i := 0; i < time; i++ {
testValue := "TestValue" + strconv.Itoa(i)
err := sl.AddNode(strconv.Itoa(i), &testValue)
if err != nil {
t.Log(err)
}
}
for i := 0; i < time; i++ {
value, err := sl.QueryNodeInterval(strconv.Itoa(i), strconv.Itoa(i + 5))
if err != nil {
t.Log(err)
} else {
t.Log("Success: " + strings.Join(temp(value), ", "))
}
}
}
func TestSkipList_SetNode(t *testing.T) {
time := 2000
sl := NewSkipList()
for i := 0; i < time; i++ {
testValue := "TestValue" + strconv.Itoa(i)
err := sl.AddNode(strconv.Itoa(i), &testValue)
if err != nil {
t.Log(err)
}
}
t.Log(sl.ToString())
for i := 0; i < time; i++ {
testValue := "TestValue" + strconv.Itoa(i + 5)
err := sl.SetNode(strconv.Itoa(i), &testValue)
if err != nil {
t.Log(err)
} else {
result, e := sl.QueryNode(strconv.Itoa(i))
if e == nil {
t.Log("Success: " + strconv.Itoa(i) + ", Value: " + *result.(*string))
} else {
t.Log("Fail: " + e.Error())
}
}
}
}
本文作者:御坂19327号
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!