2024-05-26
Go & 后端
00

目录

1 Logger设计与实现
2 String数据类型可用操作
3 StringIndex实现

主要内容:

  1. Logger设计与实现
  2. String数据类型可用操作
  3. StringIndex实现

1 Logger设计与实现

先说说我认知中的Logger的基本要求:

  1. 在代码的全局范围内都能够被调用,调用之后能够输出想要输出的调试信息。
  2. 每一次记录,都可以记录等级,记录log发生的时间,记录从调用处传过来的log信息。
  3. 有并发保护,毕竟全局范围内调用,指不定哪块就有两个发生时间完全一致的调试信息。
  4. 当有error或者panic时,我能够选择是否输出堆栈信息和发生错误的参数。

基于这些条件,Logger的基本结构就有了:

  1. Logger包内提供两个对外的函数GenerateInfoLog和GenerateErrorLog两个函数,分别用于记录普通级别信息的日志和错误级别的日志。
  2. Logger包内有一个带缓冲区的channel,上述的两个函数会作为生产者将日志信息写入一个LogInfo结构体内再放入该channel,channel另一头是消费者ListenLoggerChannel,该函数会运行在另一个线程上,持续对channel进行select。如果有新的LogInfo,该函数会将其转换为字符串写入文件并且输出到控制台。
  3. 关于获取堆栈信息,GenerateErrorLog函数会通过runtime包来获取。
go
// logger.go package logger import ( "fmt" "os" "path/filepath" "runtime" "strconv" "strings" "time" ) var logInputChannel = make(chan LogInfo, 10) type LogLevel string const ( Error LogLevel = "E" Panic = "P" Info = "I" ) // LogInfo 传递Log信息的结构体 type LogInfo struct { level LogLevel timeString string message string } func GenerateInfoLog(message string) { log := LogInfo{ level: Info, timeString: time.Now().Format("2006-01-02 15:04:05"), } pc, _, _, ok := runtime.Caller(1) if !ok { log.message = "Can not Get Caller Function, Message: " + message } else { log.message = runtime.FuncForPC(pc).Name() + " :" + message } logInputChannel <- log return } func GenerateErrorLog(isPanic bool, needStackTrace bool, message string, keyParams ...string) { log := LogInfo{ timeString: time.Now().Format("2006-01-02 15:04:05"), } if isPanic { log.level = Panic } else { log.level = Error } param := strings.Join(keyParams, " ") if needStackTrace { // 需要全部堆栈信息 log.message = "Message: " + message + ", parameters: " + param + "\n" log.message += "Stack Trace: \n" pcs := make([]uintptr, 100) n := runtime.Callers(1, pcs) // 获取当前调用堆栈的程序计数器 参数1指定要跳过的堆栈帧数 不包括Callers函数本身 返回的n是实际的程序计数器数量 pcs = pcs[:n] // 调整切片容量 frames := runtime.CallersFrames(pcs) // 获取CallersFrames结构 该结构能够对程序计数器进行迭代 for frame, more := frames.Next(); more; frame, more = frames.Next() { // 迭代 log.message += frame.File + ": " + strconv.Itoa(frame.Line) + ", Function: " + frame.Function + "\n" } } else { // 不需要 pc, _, _, ok := runtime.Caller(1) if !ok { log.message = "Can not Get Caller Function, Message: " + message + ", parameters: " + param } else { log.message = runtime.FuncForPC(pc).Name() + " :" + message + ", parameters: " + param } } logInputChannel <- log } func (li *LogInfo) toByteArray() []byte { return []byte(fmt.Sprintf("%s %s: %s \n", li.level, li.timeString, li.message)) } // Logger 记录log信息的结构体 type Logger struct { loggerFile *os.File isStop bool } // NewLogger 传入log文件存储的路径 以获取一个新的Logger func NewLogger(logPath string) (*Logger, error) { result := &Logger{} f, e := os.OpenFile(GenerateLogFilePath(logPath), os.O_CREATE|os.O_RDWR, 0644) if e != nil { return nil, e } result.loggerFile = f result.ListenLoggerChannel() return result, e } func (logger *Logger) StopLogger() { logger.isStop = true } // ListenLoggerChannel 开始监听channel以接收log信息 写入log文件并且打印 func (logger *Logger) ListenLoggerChannel() { go func() { var ( log LogInfo bytes []byte e error ) for { // 循环监听 select { case log = <- logInputChannel: // 记录log bytes = log.toByteArray() _, e = logger.loggerFile.Write(bytes) if e != nil { // 写入logger失败 准备关闭logger fmt.Println("Can Not Write Log Cause of: ", e.Error()) fmt.Println("Will Close Logger!") logger.isStop = true } fmt.Println(string(log.level) + " " + log.timeString + " " + log.message) default: if logger.isStop { close(logInputChannel) // 销毁channel e = logger.loggerFile.Sync() if e != nil { fmt.Println("Can Not Sync Log File Cause of: ", e.Error()) } e = logger.loggerFile.Close() // 关闭文件 if e != nil { fmt.Println("Can Not Close Log File Cause of: ", e.Error()) } return } } } }() } func GenerateLogFilePath(path string) string { fileName := "log." + time.Now().Format("2006_01_02_15_04_05") + ".misaka" return filepath.Join(path, fileName) }

2 String数据类型可用操作

  1. set [key] [value]
  2. set [key] [value] expired [time] time是Unix时间戳
  3. setnx [key] [value]
  4. setnx [key] [value] expired [time]
  5. get [key]
  6. getrange [key] [start] [end] start和end都是数字
  7. getset [key] [value]
  8. append [key] [appendValue]
  9. del [key]

3 StringIndex实现

还是和Hash类型一样的步骤,按照上面的可用操作一个一个来,以跳表为主索引结构,写就完事了。

提示

String数据类型在Redis服务器那块的解析已经在前一次的记录中了。

警告

跳表目前在StringIndex中的表现极差,插入速度极其慢,甚至到了内存比硬盘慢的地步。经过分析个人认为原因有以下两点:

  1. strings.Compare函数占用了大量的时间。当初设计跳表时没多想,为了Redis考虑,键设置为字符串,顺序是字典序。但是没想到该函数在插入过程中极其耗时,甚至已经到了不可接受的程度:

image.png

  1. 实现跳表的代码中,添加节点的代码不合理。添加节点的过程中,确定update数组的过程是每次都从头节点确定。实际上并不需要,每次只需要从上一次确定的update节点的下一层索引开始即可。

关于这两点问题,之后会做出调整:

  1. 优化跳表,跳表采用int作为键。
  2. 实现ART树,并使其作为String数据类型的主索引结构。

以下是还没作出调整的StringIndex。

go
// stringIndex.go package index import ( "MisakaDB/customDataStructure/skipList" "MisakaDB/logger" "MisakaDB/storage" "errors" "sync" "time" ) type StringIndex struct { index *skipList.SkipList mutex sync.RWMutex activeFile *storage.RecordFile archivedFile map[uint32]*storage.RecordFile fileIOMode storage.FileIOType baseFolderPath string fileMaxSize int64 syncDuration time.Duration } // BuildStringIndex 给定当前活跃文件和归档文件 重新构建String类型的索引 该方法只会在数据库启动时被调用 如果不存在旧的文件 则新建一个活跃文件 func BuildStringIndex (activeFile *storage.RecordFile, archivedFile map[uint32]*storage.RecordFile, fileIOMode storage.FileIOType, baseFolderPath string, fileMaxSize int64, syncDuration time.Duration) (*StringIndex, error) { result := &StringIndex{ index: skipList.NewSkipList(), activeFile: activeFile, archivedFile: archivedFile, fileIOMode: fileIOMode, baseFolderPath: baseFolderPath, fileMaxSize: fileMaxSize, syncDuration: syncDuration, } var ( e error offset int64 fileLength int64 entryLength int64 entry *storage.Entry ) // 如果活跃文件都读取不到的话 肯定也没有归档文件了 直接返回即可 if activeFile == nil { result.activeFile, e = storage.NewRecordFile(result.fileIOMode, storage.String, 1, result.baseFolderPath, result.fileMaxSize) if e != nil { return nil, e } result.archivedFile = make(map[uint32]*storage.RecordFile) // 如果activeFile为空 那么传进来的archivedFile一定也为空 这时再赋值会报错 result.archivedFile[1] = result.activeFile result.activeFile.StartSyncRoutine(syncDuration) // 开始定时同步 return result, nil } // 有归档文件的话 挨个读取归档文件 构建索引 // 读取所有归档文件的entry 因为活跃文件也在这个归档文件里 所以不再单独读取活跃文件 for i := uint32(1); i <= uint32(len(archivedFile)); i++ { recordFile, ok := archivedFile[i] if ok == false { continue } offset = 0 fileLength, e = recordFile.Length() if e != nil { return nil, e } for offset < fileLength { entry, entryLength, e = recordFile.ReadIntoEntry(offset) if e != nil { return nil, e } e = result.handleEntry(entry, recordFile.GetFileID(), offset) if e != nil { return nil, e } offset += entryLength } } result.activeFile.StartSyncRoutine(syncDuration) return result, nil } // CloseIndex 关闭Hash索引 同时停止定时Sync 关闭文件 func (si *StringIndex) CloseIndex() error { si.mutex.Lock() defer si.mutex.Unlock() for _, v := range si.archivedFile { if v.IsSyncing { v.StopSyncRoutine() } e := v.Close() if e != nil { return e } } return nil } // Set 给定key和value 设定值 如果key存在则为更新值 func (si *StringIndex) Set (key string, value string, expiredAt int64) error { entry := &storage.Entry{ EntryType: storage.TypeRecord, ExpiredAt: expiredAt, Key: []byte(key), Value: []byte(value), } indexNode := &IndexNode{ expiredAt: expiredAt, } si.mutex.Lock() defer si.mutex.Unlock() // 写入文件 offset, e := si.writeEntry(entry) if e != nil { return e } indexNode.offset = offset indexNode.fileID = si.activeFile.GetFileID() indexNode.value = []byte(value) // 写入索引 e = si.index.AddNode(key, indexNode) if e != nil { } return nil } // Get 根据给定的key尝试获取value func (si *StringIndex) Get (key string) (string, error) { si.mutex.RLock() value, e := si.index.QueryNode(key) if e != nil { si.mutex.RUnlock() return "", logger.KeyIsNotExisted } indexNode := assertIndexNodePointer(value) // 如果过期时间为-1则说明永不过期 if indexNode.expiredAt < time.Now().Unix() && indexNode.expiredAt != -1 { logger.GenerateInfoLog(logger.ValueIsExpired.Error() + key) // 读取的Entry过期 删索引 si.mutex.RUnlock() si.mutex.Lock() e = si.index.DeleteNode(key) si.mutex.Unlock() if e != nil { logger.GenerateErrorLog(false, false, e.Error(), si.index.ToString(), key) return "", e } return "", logger.ValueIsExpired } si.mutex.RUnlock() return string(indexNode.value), nil } // GetRange 返回key中字符串值的子字符 func (si *StringIndex) GetRange (key string, start int, end int) (string, error) { if start > end { return "", logger.ParameterIsNotAllowed } value, e := si.Get(key) if e != nil { return "", e } return value[start:end], nil } // GetSet 先按key获取旧的值 然后再设置新的值并且返回旧值 func (si *StringIndex) GetSet (key string, newValue string) (string, error) { si.mutex.RLock() // 先尝试Get value, e := si.index.QueryNode(key) if e != nil { logger.GenerateErrorLog(false, false, e.Error(), key) return "", e } indexNode := assertIndexNodePointer(value) // 如果过期时间为-1则说明永不过期 if indexNode.expiredAt < time.Now().Unix() && indexNode.expiredAt != -1 { logger.GenerateInfoLog(logger.ValueIsExpired.Error() + key) // 读取的Entry过期 删索引 si.mutex.RUnlock() si.mutex.Lock() e = si.index.DeleteNode(key) si.mutex.Unlock() if e != nil { logger.GenerateErrorLog(false, false, e.Error(), si.index.ToString(), key) return "", e } return "", logger.ValueIsExpired } si.mutex.RUnlock() entry := &storage.Entry{ EntryType: storage.TypeRecord, ExpiredAt: indexNode.expiredAt, Key: []byte(key), Value: []byte(newValue), } oldValue := string(indexNode.value) // 再尝试Set si.mutex.Lock() // 先写入文件 offset, e := si.writeEntry(entry) if e != nil { return "", e } // 再更新indexNode indexNode.value = []byte(newValue) indexNode.offset = offset indexNode.fileID = si.activeFile.GetFileID() si.mutex.Unlock() // 之后就不再需要写入索引了 return oldValue, nil } // SetNX 只有在key不存在时设置key的值 func (si *StringIndex) SetNX (key string, value string, expiredAt int64) error { _, e := si.Get(key) if e != nil { return si.Set(key, value, expiredAt) } else { return logger.KeyIsExisted } } // Append 在key存在的情况下 向其已经存在的value追加一个字符串 func (si *StringIndex) Append (key string, appendValue string) error { si.mutex.RLock() value, e := si.index.QueryNode(key) if e != nil { si.mutex.RUnlock() return logger.KeyIsNotExisted } indexNode := assertIndexNodePointer(value) si.mutex.RUnlock() entry := &storage.Entry{ EntryType: storage.TypeRecord, ExpiredAt: indexNode.expiredAt, Key: []byte(key), Value: append(indexNode.value, []byte(appendValue)...), } // 再尝试Set si.mutex.Lock() // 先写入文件 offset, e := si.writeEntry(entry) if e != nil { return e } // 再更新indexNode indexNode.value = append(indexNode.value, []byte(appendValue)...) indexNode.offset = offset indexNode.fileID = si.activeFile.GetFileID() si.mutex.Unlock() return nil } // Del 如果key存在 则删除key对应的value func (si *StringIndex) Del (key string) error { si.mutex.RLock() value, e := si.index.QueryNode(key) if e != nil { si.mutex.RUnlock() return logger.KeyIsNotExisted } indexNode := assertIndexNodePointer(value) si.mutex.RUnlock() entry := &storage.Entry{ EntryType: storage.TypeDelete, Key: []byte(key), Value: indexNode.value, ExpiredAt: 0, } si.mutex.Lock() _, e = si.writeEntry(entry) if e != nil { return e } e = si.index.DeleteNode(key) if e != nil { logger.GenerateErrorLog(false, false, e.Error(), key) return e } si.mutex.Unlock() return nil } // writeEntry 尝试将entry写入文件 如果活跃文件写满则自动新开一个文件继续尝试写入 如果写入成功则返回nil和写入前的offset func (si *StringIndex) writeEntry(entry *storage.Entry) (int64, error) { offset := si.activeFile.GetOffset() e := si.activeFile.WriteEntryIntoFile(entry) // 如果文件已满 if errors.Is(e, logger.FileBytesIsMaxedOut) { // 先结束旧文件的定时同步 si.activeFile.StopSyncRoutine() // 开一个新的文件 这个新的活跃文件的序号自动在之前的活跃文件上 + 1 si.activeFile, e = storage.NewRecordFile(si.fileIOMode, storage.String, si.activeFile.GetFileID()+1, si.baseFolderPath, si.fileMaxSize) if e != nil { return 0, e } // 这个新的活跃文件写入归档文件映射中 si.archivedFile[si.activeFile.GetFileID()] = si.activeFile // 先开启定时同步 si.activeFile.StartSyncRoutine(si.syncDuration) // 再尝试写入 offset = si.activeFile.GetOffset() e = si.activeFile.WriteEntryIntoFile(entry) if e != nil { return 0, e } } else if e != nil { return 0, e } return offset, nil } // handleEntry 接收Entry 并且写入String索引 注意它只对索引进行操作 func (si *StringIndex) handleEntry(entry *storage.Entry, fileID uint32, offset int64) error { var e error si.mutex.Lock() defer si.mutex.Unlock() switch entry.EntryType { case storage.TypeDelete: { e = si.index.DeleteNode(string(entry.Key)) if e != nil { logger.GenerateErrorLog(false, false, e.Error(), si.index.ToString(), string(entry.Key), string(entry.Value)) } } case storage.TypeRecord: // 这里之所以只对RecordEntry进行检查 是因为对map的delete函数 如果传入的map本身就是空或者要删除的键找不到值 它就直接返回了 并不会报错 // 所以DeleteEntry不需要过期检查 过期就过期吧 过期了也只是no-op而已 // 如果过期时间为-1则说明永不过期 if entry.ExpiredAt < time.Now().Unix() && entry.ExpiredAt != -1 { // attention 过期logger return nil } e = si.index.AddNode(string(entry.Key), &IndexNode{ value: entry.Value, expiredAt: entry.ExpiredAt, fileID: fileID, offset: offset, }) if e != nil { logger.GenerateErrorLog(false, false ,e.Error(), si.index.ToString(), string(entry.Key), string(entry.Value)) } } return nil }

本文作者:御坂19327号

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!