主要内容:
目前 Zset 支持的操作如下:
ZAdd
,添加元素到指定的有序集合中。如果元素已存在则更新,如果有序列表不存在则创建一个新的有序列表。ZRem
,从指定的有序集合中删除元素。如果删除后有序集合不再持有元素则自动移除。ZScore
,按给定的 key 和 member 获取对应的 score。ZCard
,获取指定的有序列表的元素个数。ZCount
,获取有序列表中,score 在[min, max]
之间的元素个数。ZRange
,获取有序列表中,score 在[min, max]
之间的元素内容。除了这些功能之外,有序集合内的元素都支持单独的过期时间。
Zset 有序集合的索引的数据结构,是由跳表和哈希表共同组合而成的。先用一个哈希表来存储所有的 key 和有序集合,有序集合本身是用哈希表和跳表共同存储同一类型的节点,哈希表以 member 为键,负责“集合”的特征;跳表以 score 为键,负责“有序”的特征。
gotype zsetNode struct {
indexNode
score int
}
// 这个其实不应该这么写 这个应该写成 int 的类型别名比较好
type zsetScore struct {
score int
}
type zset struct {
dict map[string]*zsetNode
skipList *skipList.SkipList[*zsetNode]
expireNum int // 过期计数 计算有过期删除的这个需求的 member 的数量 即 expiredAt 字段不为-1的 member 数量 如果它是0 获取有序集合信息的时候就不需要检查是否过期
}
type ZSetIndex struct {
index map[string]*zset
mutex sync.RWMutex
activeFile *storage.RecordFile
archivedFile map[uint32]*storage.RecordFile
fileIOMode storage.FileIOType
baseFolderPath string
fileMaxSize int64
syncDuration time.Duration
}
单独的过期时间这一点,实现思路上大体和之前的类似,即并不在该元素过期时就立刻删除,而是在再次使用到该元素时再判断是否过期。这样的思路有一个问题,即支持的操作中有一些是需要过滤掉所有过期的元素的,比如ZCard
,因此我选择额外维护了一个expireNum
信息,用以存储有序集合中过期元素的数量。如果它是0的话,就不需要针对过期元素进行过滤。
go// refreshZset 对有序集合进行循环 删除过期元素
func (z *zset) refreshZset() {
// 这块不加锁 因为调用者已经把锁加好了
var node *zsetNode
for key := range z.dict {
node = z.dict[key]
if node.expiredAt != -1 && node.expiredAt < time.Now().UnixMilli() {
// 过期
delete(z.dict, key)
_ = z.skipList.DeleteNode(zsetScore{score: node.score})
z.expireNum -= 1
}
}
}
// ZRange 获取 score 在 [min, max] 区间内的所有 member
func (zi *ZSetIndex) ZRange(key []byte, min, max int) ([][]byte, error) {
...
if targetZset.expireNum != 0 {
zi.mutex.RUnlock()
zi.mutex.Lock()
targetZset.refreshZset()
zi.mutex.Unlock()
zi.mutex.RLock()
}
...
return result, nil
}
剩下的别的就没有什么特别要说的了,就是对数据结构的各种操作的封装外加写入文件。
目前 List 支持的操作如下:
LInsert
,向列表中插入元素,插入位置由 index 指定。比如说列表里有1 2 3 4
这几个元素,如果 index 指定为2插入一个10,那么插入后列表是1 2 10 3 4
。LPop
,弹出操作,比如说列表里有1 2 3 4
这几个元素,弹出后列表是2 3 4
。此外,该函数也承担删除整个列表的操作。只要列表中只有一个元素或者没有元素,该函数都会从 index 中移除列表。LPush
,压入操作,比如说列表里有1 2 3 4
这几个元素,压入10列表是10 1 2 3 4
。该函数也承担创建一个列表的操作,如果列表不存在,就创建一个列表。LSet
,修改更新操作。该方法不支持修改更新时间。LRem
,删除操作,删除的对象由 count 和 value 一起指定。如果count > 0
就是从列表表头开始搜索,删除 count 个符合条件的元素;如果count < 0
就是从列表表尾开始搜索,删除 count 个符合条件的元素;如果count = 0
就是删除列表中所有符合条件的元素。注意,当 count != 0 时,如果被删除的元素个数不满足 count 就会返回RemoveCountIsNotEnough
错误,返回该错误时,并不会影响索引中存储的值。LIndex
,按 index 进行查询操作。比如说列表里有1 2 3 4
这几个元素,查询 index 为2的元素,返回的就是3。LLen
,查询列表长度。LRange
,按范围查询列表内的元素,查询范围是[start, end)
。比如说列表里有1 2 3 4
这几个元素,查询 start 为1 end 为3的元素区间,返回的就是2 3
。提示
我知道 Redis 里的列表其实是双端队列,只是我实在懒得写另一头的弹出压入,反正代码上是差不了太多的。
除了这些功能之外,有序集合内的所有元素都支持独立的过期时间。因为 List 是形似队列的,我故意将其设计为删除元素后其他元素补位的形式。也就是说,如果1 2 3 4
,我删除3后,查询 index 为2的元素,仍然能返回有效值,并且返回的是4。
List 的索引的数据结构为哈希表套切片,这一点无需多说了。但是为了支持元素的过期时间,我加了一些东西。
gotype expiredInfo struct {
key []byte
expiredNode *indexNode
}
type ListIndex struct {
index map[string][]*indexNode
mutex sync.RWMutex
activeFile *storage.RecordFile
archivedFile map[uint32]*storage.RecordFile
fileIOMode storage.FileIOType
baseFolderPath string
fileMaxSize int64
syncDuration time.Duration
expiredAtChan chan *expiredInfo
closeMonitor chan int
}
在维护列表的有序性的同时还维护每个元素的单独过期时间实在是有点挑战。在一开始的时候,我设想的是和别的数据结构差不多的意思,即不单独进行过期删除的操作,只在用到该元素的时候判断是否过期,只不过后来我意识到这事行不通,太容易影响到别的元素。假设列表中有1 2 3 4
4个元素,其中元素2已经过期。如果我要更新索引为2的元素,理论上我应该更新元素4,但是因为过期的元素还没有被删掉,反而会更新到元素3上面去。
如果只在用到该元素的时候判断是否过期会因为影响别的元素的索引所以行不通,那么我在每次读写列表的时候都扫一遍整个列表如何?这一点也不太能行得通,不仅是因为这一点是挺消耗时间的(在这里提消耗时间,上面的有序集合没提的原因是,有序数组内只有3个命令是需要扫描整个数据结构的,而在这里是所有的命令都得扫一遍,那时间差距可就太大了),并且这个思路本身就有一点行不通,因为问题出在还原列表上。
这个项目的文件存储机制是存储操作的,也就是说我在还原列表的时候,拿到的是一条条的操作记录。还是之前的那个例子,假设列表中有1 2 3 4
4个元素,其中元素2已经过期,要更新索引为2的元素。这些操作放在底层文件记录上是完全看不出来是元素2先过期还是更新操作先来,也就造成了还原列表不准确。
所以要想设计元素单独过期,要达成的效果有两个:
所以我想了想,还是用协程来处理这件事:
go// BuildListIndex 给定当前活跃文件和归档文件 重新构建List类型的索引 该方法只会在数据库启动时被调用 如果不存在旧的文件 则新建一个活跃文件
func BuildListIndex(activeFile *storage.RecordFile, archivedFile map[uint32]*storage.RecordFile, fileIOMode storage.FileIOType, baseFolderPath string, fileMaxSize int64, syncDuration time.Duration) (*ListIndex, error) {
...
ReadFileFished:
result.refreshList()
result.activeFile.StartSyncRoutine(result.syncDuration)
go result.expiredChanMonitor()
return result, nil
}
// expiredChanMonitor 监控 channel 如果有过期消息发过来就处理 具体包括修改 index 向文件中写入过期 entry
func (li *ListIndex) expiredChanMonitor() {
var expiredNode *expiredInfo
var e error
var i int
for {
select {
case expiredNode = <-li.expiredAtChan:
li.mutex.RLock()
key := string(expiredNode.key)
targetList, ok := li.index[key]
if !ok {
li.mutex.RUnlock()
continue
}
i = 0
for ; i < len(targetList); i++ {
if targetList[i] == expiredNode.expiredNode { // 指针比较总比字节数组循环快吧
// 找到过期值
break
}
}
li.mutex.RUnlock()
li.mutex.Lock()
// 写入文件
_, e = li.writeEntry(&storage.Entry{
Key: expiredNode.key,
Value: expiredNode.expiredNode.value,
EntryType: storage.TypeListExpired,
ExpiredAt: 0,
})
if e != nil {
logger.GenerateErrorLog(false, false, e.Error())
continue
}
// 正式开始删除
for ; i < len(targetList)-1; i++ {
targetList[i] = targetList[i+1]
}
// 写回去
li.index[key] = targetList[:len(targetList)-1]
li.mutex.Unlock()
case <-li.closeMonitor:
return
}
}
}
// delayExpiredMessage 延迟发送过期消息到 channel 中
//
// 比如说列表里有1 2 3 4这几个元素 如果 index 指定为2过期 那么过期处理后列表是1 2 4
func (li *ListIndex) delayExpiredMessage(key []byte, expiredNode *indexNode) {
time.Sleep(time.Until(time.UnixMilli(expiredNode.expiredAt)))
li.expiredAtChan <- &expiredInfo{
key: key,
expiredNode: expiredNode,
}
}
// CloseIndex 关闭 List 索引 同时停止定时Sync 关闭文件 关闭内部 channel
func (li *ListIndex) CloseIndex() (err error) {
defer func() {
if e := recover(); e != nil {
panicString := fmt.Sprintf("%s", e)
logger.GenerateErrorLog(true, false, panicString)
err = errors.New(panicString)
}
}()
li.mutex.Lock()
defer li.mutex.Unlock()
for _, v := range li.archivedFile {
if v.IsSyncing {
v.StopSyncRoutine()
}
e := v.Close()
if e != nil {
if errors.Is(e, os.ErrClosed) {
continue
}
return e
}
}
li.closeMonitor <- 1
close(li.closeMonitor)
close(li.expiredAtChan)
return nil
}
func (li *ListIndex) LInsert(key []byte, index int, value []byte, expiredAt int64) error {
...
if expiredAt != -1 {
// 有实际的过期时间
go li.delayExpiredMessage(key, targetSlice[index])
}
return nil
}
先是启动一个协程来监听并且处理过期元素,在每次插入新的可过期的元素时,延迟到过期发生那一刻向监听协程发送过期消息(当然我传得是地址而不是别的,因为索引和元素值都有可能在过期之前改变)。同时,底层文件存储上,我加了一个新的操作类型TypeListExpired
,专门用来设置过期消息的。还有一点,还原列表之后我还是需要对所有的元素再遍历一次。因为有些过期是有可能发生在数据库关闭期间的,而且就算是没过期,也需要使其加入上面的过期机制中一并处理。
经测量,这套方法的超时误差大概在2ms左右。
goconst (
TypeDelete EntryType = iota + 1 // 标识记录删除信息的entry
TypeRecord // 标识记录信息的entry
TypeLInsert // 以下为 list 准备的 entry type
TypeLPop
TypeLPush
TypeListExpired // 过期标识 专门问 list 用的
)
提示
这里其实还有改进的空间。假设在同一时刻下,有元素过期的同时还有操作请求,势必会抢锁,我得确保必须优先处理元素过期再说操作请求的事,但是目前的代码并没有实现这一点。
要实现这一点,可以引入sync.Cond
来解决协程抢锁优先级的问题。
提示
还有一个 Set 集合我没写,有点不想写了,它的索引数据结构很 Hash 一模一样,都是哈希表嵌套,差别不是很大。
本文作者:御坂19327号
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!