2024-05-16
Go & 后端
00

目录

1 Hash索引构建
2 Redis服务器实现
3 在Windows上交叉编译Linux可执行文件

主要内容:

  1. Hash索引构建
  2. Redis服务器实现
  3. 在Windows上交叉编译Linux可执行文件

1 Hash索引构建

根据上一篇定义的Hash数据类型的可用操作,确定Hash索引的操作。

提示

之前确定的Hash可用操作:

  1. HSet:hset key field value,设置值。如果设置成功则返回1,否则返回0。
  2. HSetNX:hsetnx key field value,设置值,但是它只有在field不存在时才能设置成功。
  3. HGet:hget key field,获取值,如果key或者field不存在则返回nil。
  4. HDel:hdel key field [field ...],删除值,返回被删除的值的数量。
  5. HLen:hlen key,获取值的field个数。
  6. HExists:hexists key field,判断field是否存在,存在返回1,反之返回0。
  7. HStrLen:hstrlen key field,获取value的长度,如果field不存在则返回0。

Hash数据类型的主要索引结构为双层map,第一层map负责key的映射,第二层map负责field到value的映射。另外在这里加锁以加入并发保护。之后就是在双层map的基础上实现这7个方法并且各自创建entry再写入文件的事情了。

另外,hash和别的一些数据结构也有个问题,那就是都是两个东西(key和field)确定一个键,但是entry中只有一个key位置,这里需要对key和field进行编码,让它们在一起。

go
// util.go // EncodeKeyAndField 为了Hash等类型提供 将Key和Field编码在一起形成一个新的Key func EncodeKeyAndField(key string, field string) []byte { header := make([]byte, 10) index := 0 index += binary.PutVarint(header, int64(len(key))) index += binary.PutVarint(header[index:], int64(len(field))) result := make([]byte, index+len(key)+len(field)) copy(result[:index], header[:]) copy(result[index:], key) copy(result[index+len(key):], field) return result } // DecodeKeyAndField 为了Hash等类型提供 将由EncodeKeyAndField编码的结果解码为key和field func DecodeKeyAndField(input []byte) (key string, field string, e error) { index := 0 kSize, n := binary.Varint(input) if n <= 0 { logger.GenerateErrorLog(false, false, logger.DecodeKeyAndFieldFailed.Error(), TurnByteArrayToString(input)) return "", "", logger.DecodeKeyAndFieldFailed } index += n fSize, n := binary.Varint(input[index:]) if n <= 0 { logger.GenerateErrorLog(false, false, logger.DecodeKeyAndFieldFailed.Error(), TurnByteArrayToString(input)) return "", "", logger.DecodeKeyAndFieldFailed } index += n key = string(input[index : int64(index)+kSize]) index += int(kSize) field = string(input[index : int64(index)+fSize]) e = nil return }
go
// hashindex.go package index import ( "MisakaDB/logger" "MisakaDB/storage" "MisakaDB/util" "errors" "sync" "time" ) type HashIndex struct { index map[string]map[string]*IndexNode mutex sync.RWMutex activeFile *storage.RecordFile archivedFile map[uint32]*storage.RecordFile fileIOMode storage.FileIOType baseFolderPath string fileMaxSize int64 syncDuration time.Duration } // BuildHashIndex 给定当前活跃文件和归档文件 重新构建Hash类型的索引 该方法只会在数据库启动时被调用 如果不存在旧的文件 则新建一个活跃文件 func BuildHashIndex(activeFile *storage.RecordFile, archivedFile map[uint32]*storage.RecordFile, fileIOMode storage.FileIOType, baseFolderPath string, fileMaxSize int64, syncDuration time.Duration) (*HashIndex, error) { result := &HashIndex{ activeFile: activeFile, archivedFile: archivedFile, fileIOMode: fileIOMode, baseFolderPath: baseFolderPath, fileMaxSize: fileMaxSize, index: make(map[string]map[string]*IndexNode), syncDuration: syncDuration, } var offset int64 var fileLength int64 var entryLength int64 var entry *storage.Entry var e error // 如果活跃文件都读取不到的话 肯定也没有归档文件了 直接返回即可 if activeFile == nil { result.activeFile, e = storage.NewRecordFile(result.fileIOMode, storage.Hash, 1, result.baseFolderPath, result.fileMaxSize) if e != nil { return nil, e } result.archivedFile = make(map[uint32]*storage.RecordFile) // 如果activeFile为空 那么传进来的archivedFile一定也为空 这时再赋值会报错 result.archivedFile[1] = result.activeFile result.activeFile.StartSyncRoutine(syncDuration) // 开始定时同步 return result, nil } // 读取所有归档文件的entry 因为活跃文件也在这个归档文件里 所以不再单独读取活跃文件 for i := uint32(1); i <= uint32(len(archivedFile)); i++ { recordFile, ok := archivedFile[uint32(i)] if ok == false { continue } offset = 0 fileLength, e = recordFile.Length() if e != nil { return nil, e } for offset < fileLength { entry, entryLength, e = recordFile.ReadIntoEntry(offset) if e != nil { return nil, e } e = result.handleEntry(entry, recordFile.GetFileID(), offset) if e != nil { return nil, e } offset += entryLength } } result.activeFile.StartSyncRoutine(syncDuration) return result, nil } // CloseIndex 关闭Hash索引 同时停止定时Sync 关闭文件 func (hi *HashIndex) CloseIndex() error { hi.mutex.Lock() defer hi.mutex.Unlock() for _, v := range hi.archivedFile { if v.IsSyncing { v.StopSyncRoutine() } e := v.Close() if e != nil { return e } } return nil } // HSet 给定key field value 设定值 如果key field都存在即为更新值 func (hi *HashIndex) HSet(key string, field string, value string, expiredAt int64) error { entry := &storage.Entry{ EntryType: storage.TypeRecord, ExpiredAt: expiredAt, Key: util.EncodeKeyAndField(key, field), Value: []byte(value), } indexNode := &IndexNode{ expiredAt: expiredAt, } hi.mutex.Lock() defer hi.mutex.Unlock() // 写入文件 同时记录offset和fileID offset, e := hi.writeEntry(entry) if e != nil { return e } indexNode.offset = offset indexNode.fileID = hi.activeFile.GetFileID() indexNode.value = []byte(value) // 最后写入索引 if _, ok := hi.index[key]; ok { hi.index[key][field] = indexNode } else { hi.index[key] = make(map[string]*IndexNode) hi.index[key][field] = indexNode } return nil } // HSetNX 同HSet 但是只有在field不存在时才能写入 否则返回 func (hi *HashIndex) HSetNX(key string, field string, value string, expiredAt int64) error { // HExist和HSet都已加锁 所以这里没有锁操作 ok, e := hi.HExist(key, field) if e != nil { return e } if ok == true { logger.GenerateErrorLog(false, false, logger.FieldIsExisted.Error(), key, field, value) return logger.FieldIsExisted } else { return hi.HSet(key, field, value, expiredAt) } } // HGet 根据给定的key和field尝试获取value func (hi *HashIndex) HGet(key string, field string) (string, error) { hi.mutex.RLock() _, ok := hi.index[key] if ok != true { logger.GenerateErrorLog(false, false, logger.KeyIsNotExisted.Error(), key, field) hi.mutex.RUnlock() return "", logger.KeyIsNotExisted } indexNode, ok := hi.index[key][field] if ok != true { logger.GenerateErrorLog(false, false, logger.FieldIsNotExisted.Error(), key, field) return "", logger.FieldIsNotExisted } // 如果过期时间为-1则说明永不过期 if indexNode.expiredAt < time.Now().Unix() && indexNode.expiredAt != -1 { logger.GenerateInfoLog(logger.ValueIsExpired.Error() + " {" + field + ": " + string(indexNode.value) + "}") // 读取的Entry过期 删索引 hi.mutex.RUnlock() hi.mutex.Lock() delete(hi.index[key], field) hi.mutex.Unlock() return "", logger.ValueIsExpired } hi.mutex.RUnlock() return string(indexNode.value), nil } // HDel 根据key和field尝试删除键值对 如果deleteField为true 则认为删的是hash里面的键值对 反之则认为删除的是整个hash func (hi *HashIndex) HDel(key string, field string, deleteField bool) error { fieldIsExist, e := hi.HExist(key, field) // 查key和field if e != nil { return e } hi.mutex.Lock() defer hi.mutex.Unlock() if deleteField { // 删键值对 if fieldIsExist != true { // 查field logger.GenerateErrorLog(false, false, logger.FieldIsNotExisted.Error(), key, field) return logger.FieldIsNotExisted } entry := &storage.Entry{ EntryType: storage.TypeDelete, Key: util.EncodeKeyAndField(key, field), Value: []byte{}, ExpiredAt: 0, } // 尝试写入删除Entry 删除Entry不需要记录offset _, e = hi.writeEntry(entry) if e != nil { return e } // 然后修改索引 delete(hi.index[key], field) return nil } else { // 删hash entry := &storage.Entry{ EntryType: storage.TypeDelete, Key: util.EncodeKeyAndField(key, ""), Value: []byte{}, ExpiredAt: 0, } // 尝试写入删除Entry 删除Entry不需要记录offset _, e = hi.writeEntry(entry) if e != nil { return e } // 然后修改索引 delete(hi.index, key) return nil } } // HLen 根据给定的key 寻找field的个数 func (hi *HashIndex) HLen(key string) (int, error) { hi.mutex.RLock() defer hi.mutex.RUnlock() _, ok := hi.index[key] if ok != true { logger.GenerateErrorLog(false, false, logger.KeyIsNotExisted.Error(), key) return 0, logger.KeyIsNotExisted } return len(hi.index[key]), nil } // HExist 根据给定的key和field判断field是否存在 func (hi *HashIndex) HExist(key string, field string) (bool, error) { hi.mutex.RLock() defer hi.mutex.RUnlock() _, ok := hi.index[key] if ok != true { logger.GenerateErrorLog(false, false, logger.KeyIsNotExisted.Error(), key) return false, logger.KeyIsNotExisted } _, ok = hi.index[key][field] return ok, nil } // HStrLen 根据给定的key和field 确定value的长度 func (hi *HashIndex) HStrLen(key string, field string) (int, error) { // 不加锁原因同HSetNX value, e := hi.HGet(key, field) if e != nil { return 0, e } return len(value), nil } // writeEntry 尝试将entry写入文件 如果活跃文件写满则自动新开一个文件继续尝试写入 如果写入成功则返回nil和写入前的offset func (hi *HashIndex) writeEntry(entry *storage.Entry) (int64, error) { offset := hi.activeFile.GetOffset() e := hi.activeFile.WriteEntryIntoFile(entry) // 如果文件已满 if errors.Is(e, logger.FileBytesIsMaxedOut) { // 先结束旧文件的定时同步 hi.activeFile.StopSyncRoutine() // 开一个新的文件 这个新的活跃文件的序号自动在之前的活跃文件上 + 1 hi.activeFile, e = storage.NewRecordFile(hi.fileIOMode, storage.Hash, hi.activeFile.GetFileID()+1, hi.baseFolderPath, hi.fileMaxSize) if e != nil { return 0, e } // 这个新的活跃文件写入归档文件映射中 hi.archivedFile[hi.activeFile.GetFileID()] = hi.activeFile // 先开启定时同步 hi.activeFile.StartSyncRoutine(hi.syncDuration) // 再尝试写入 offset = hi.activeFile.GetOffset() e = hi.activeFile.WriteEntryIntoFile(entry) if e != nil { return 0, e } } else if e != nil { return 0, e } return offset, nil } // handleEntry 接收Entry 并且写入Hash索引 // attention 它只对索引进行操作 func (hi *HashIndex) handleEntry(entry *storage.Entry, fileID uint32, offset int64) error { key, field, e := util.DecodeKeyAndField(entry.Key) if e != nil { return e } hi.mutex.Lock() defer hi.mutex.Unlock() switch entry.EntryType { case storage.TypeDelete: { if field != "" { // 一个是按key删掉整个hash 一个实按field删一个value delete(hi.index[key], field) } else { delete(hi.index, key) } } case storage.TypeRecord: // attention 这里之所以只对RecordEntry进行检查 是因为对map的delete函数 如果传入的map本身就是空或者要删除的键找不到值 它就直接返回了 并不会报错 // 所以DeleteEntry不需要过期检查 过期就过期吧 过期了也只是no-op而已 // 如果过期时间为-1则说明永不过期 if entry.ExpiredAt < time.Now().Unix() && entry.ExpiredAt != -1 { // attention 过期logger return nil } if _, ok := hi.index[key]; ok { hi.index[key][field] = &IndexNode{ value: entry.Value, expiredAt: entry.ExpiredAt, fileID: fileID, offset: offset, } } else { hi.index[key] = make(map[string]*IndexNode) hi.index[key][field] = &IndexNode{ value: entry.Value, expiredAt: entry.ExpiredAt, fileID: fileID, offset: offset, } } } return nil }

警告

注意,上面的HashIndex是在Logger完成后才添加进来的,但下面的HashIndex_test不是,所以下面这个仅供参考,实际要用的时候需要修改。

go
// hashindex_test.go package index import ( "MisakaDB/logger" "MisakaDB/storage" "strconv" "testing" "time" ) func TestBuildHashIndex(t *testing.T) { activeFiles, archiveFiles, e := storage.RecordFilesInit("D:\\MisakaDBTest", 65536) if e != nil { t.Error(e) return } hashIndex, e := BuildHashIndex(activeFiles[storage.Hash], archiveFiles[storage.Hash], storage.TraditionalIOFile, "D:\\MisakaDBTest", 65536, time.Second) if e != nil { t.Error(e) return } e = hashIndex.HSet("testKey1", "testField1", "testValue1", 32503637532) if e != nil { t.Error(e) return } value, e := hashIndex.HGet("testKey1", "testField1") if e != nil { t.Error(e) return } t.Log(value) } func TestBuildHashIndex2(t *testing.T) { activeFiles, archiveFiles, e := storage.RecordFilesInit("D:\\MisakaDBTest", 65536) if e != nil { t.Error(e) return } hashIndex, e := BuildHashIndex(activeFiles[storage.Hash], archiveFiles[storage.Hash], storage.TraditionalIOFile, "D:\\MisakaDBTest", 65536, time.Second) if e != nil { t.Error(e) return } value, e := hashIndex.HGet("testKey1", "testField1") if e != nil { t.Error(e) return } t.Log(value) } func TestBuildHashIndex3(t *testing.T) { activeFiles, archiveFiles, e := storage.RecordFilesInit("D:\\MisakaDBTest", 65536) if e != nil { t.Error(e) return } hashIndex, e := BuildHashIndex(activeFiles[storage.Hash], archiveFiles[storage.Hash], storage.TraditionalIOFile, "D:\\MisakaDBTest", 65536, time.Second) if e != nil { t.Error(e) return } e = hashIndex.HDel("testKey1", "testField1", true) if e != nil { t.Error(e) return } _, e = hashIndex.HGet("testKey1", "testField1") if e != nil { t.Error(e) return } } func TestBuildHashIndex4(t *testing.T) { activeFiles, archiveFiles, e := storage.RecordFilesInit("D:\\MisakaDBTest", 65536) if e != nil { t.Error(e) return } hashIndex, e := BuildHashIndex(activeFiles[storage.Hash], archiveFiles[storage.Hash], storage.TraditionalIOFile, "D:\\MisakaDBTest", 65536, time.Second) if e != nil { t.Error(e) return } e = hashIndex.HSet("testKey1", "testField1", "testValue1", 32503637532) if e != nil { t.Error(e) return } value, e := hashIndex.HExist("testKey1", "testField1") if e != nil { t.Error(e) return } t.Log(value) v, e := hashIndex.HLen("testKey1") if e != nil { t.Error(e) return } t.Log(v) va, e := hashIndex.HStrLen("testKey1", "testField1") if e != nil { t.Error(e) return } t.Log(va) } func TestBuildHashIndex5(t *testing.T) { l, _ := logger.NewLogger("D:\\MisakaDBLog") l.ListenLoggerChannel() startTime := time.Now() activeFiles, archiveFiles, e := storage.RecordFilesInit("D:\\MisakaDBTest", 50000000) if e != nil { t.Error(e) return } hashIndex, e := BuildHashIndex(activeFiles[storage.Hash], archiveFiles[storage.Hash], storage.TraditionalIOFile, "D:\\MisakaDBTest", 50000000, time.Second) if e != nil { t.Error(e) return } endTime := time.Now() t.Log(endTime.Sub(startTime).Seconds()) testData := make(map[string]map[string]string) for i := 0; i < 1000; i++ { testData["testKey"+strconv.Itoa(i)] = make(map[string]string) for j := 0; j < 10000; j++ { testData["testKey"+strconv.Itoa(i)]["testField"+strconv.Itoa(j)] = "testValue" + strconv.Itoa(j) } } t.Log("Test Data is Ready!") startTime = time.Now() for key, fieldMap := range testData { for field, value := range fieldMap { e = hashIndex.HSet(key, field, value, 32503637532) if e != nil { t.Error(e) return } } } endTime = time.Now() t.Log(endTime.Sub(startTime).Seconds()) startTime = time.Now() var getValue string count := 0 for key, fieldMap := range testData { for field, value := range fieldMap { getValue, e = hashIndex.HGet(key, field) if e != nil { t.Error(e) return } if getValue != value { t.Log(value + "---" + getValue) count += 1 } } } endTime = time.Now() t.Log(endTime.Sub(startTime).Seconds()) t.Log(count) }

2 Redis服务器实现

这个项目解析Redis协议是使用的Redcon库,该第三方库给的示例是这样的:

go
func main() { startTime := time.Now() activeFiles, archiveFiles, e := storage.RecordFilesInit("D:\\MisakaDBTest", 50000000) if e != nil { fmt.Println(e) return } hashIndex, e := index.BuildHashIndex(activeFiles[storage.Hash], archiveFiles[storage.Hash], storage.TraditionalIOFile, "D:\\MisakaDBTest", 50000000, time.Second) if e != nil { fmt.Println(e) return } endTime := time.Now() fmt.Println(endTime.Sub(startTime).Seconds()) testData := make(map[string]map[string]string) for i := 0; i < 1000; i++ { testData["testKey"+strconv.Itoa(i)] = make(map[string]string) for j := 0; j < 10000; j++ { testData["testKey"+strconv.Itoa(i)]["testField"+strconv.Itoa(j)] = "testValue" + strconv.Itoa(j) } } fmt.Println("Test Data is Ready!") startTime = time.Now() for key, fieldMap := range testData { for field, value := range fieldMap { e = hashIndex.HSet(key, field, value, 32503637532) if e != nil { fmt.Println(e) return } } } endTime = time.Now() fmt.Println(endTime.Sub(startTime).Seconds()) startTime = time.Now() var getValue string count := 0 for key, fieldMap := range testData { for field, value := range fieldMap { getValue, e = hashIndex.HGet(key, field) if e != nil { fmt.Println(e) return } if getValue != value { fmt.Println(value + "---" + getValue) count += 1 } } } endTime = time.Now() fmt.Println(endTime.Sub(startTime).Seconds()) fmt.Println(count) }

该第三方库已经将请求的具体参数解析好了,只需要在handler函数中接收请求,调用索引的方法即可。当然对于本项目来说,Redis服务器启动之前,还需要先读取文件以还原索引。所以有以下代码:

警告

和上面一样,这里已经是实现了Logger和StringIndex的代码了,阅读时注意。

go
// storage.go package storage import ( "MisakaDB/logger" "os" "path/filepath" ) // RecordFilesInit 按路径读取所有文件 并且转换为RecordFile 按数据类型进行分类 默认情况下编号最大的文件是活跃文件 // attention 活跃文件也存在于归档文件中 等到活跃文件写满之后 再开一个活跃文件存入归档文件即可 之前的活跃文件自动成为归档文件 func RecordFilesInit(path string, fileMaxSize int64) (activeFiles map[FileForData]*RecordFile, archiveFiles map[FileForData]map[uint32]*RecordFile, e error) { var filesPath []string var walkFunc = func(path string, info os.FileInfo, err error) error { if !info.IsDir() { filesPath = append(filesPath, path) } return nil } e = filepath.Walk(path, walkFunc) if e != nil { logger.GenerateErrorLog(false, false, e.Error(), path) return nil, nil, e } archiveFiles = make(map[FileForData]map[uint32]*RecordFile) activeFiles = make(map[FileForData]*RecordFile) var recordFile *RecordFile for _, i := range filesPath { // 读取文件 recordFile, e = LoadRecordFileFromDisk(i, fileMaxSize) if e != nil { return nil, nil, e } // 先写入归档文件 记得先检查双重hash是否有nil if _, ok := archiveFiles[recordFile.dataType]; ok { archiveFiles[recordFile.dataType][recordFile.fileID] = recordFile } else { archiveFiles[recordFile.dataType] = make(map[uint32]*RecordFile) archiveFiles[recordFile.dataType][recordFile.fileID] = recordFile } // 再看fileID大小写入活跃文件 if activeFiles[recordFile.dataType] == nil { activeFiles[recordFile.dataType] = recordFile } else { if activeFiles[recordFile.dataType].fileID < recordFile.fileID { activeFiles[recordFile.dataType] = recordFile } } } e = nil return }
go
// misakaDataBase.go package main import ( "MisakaDB/index" "MisakaDB/logger" "MisakaDB/storage" "errors" "fmt" "github.com/tidwall/redcon" "runtime/debug" "strconv" "strings" "time" ) // 以下为可选配置项 const ( MisakaDataBaseFolderPath = "D:\\MisakaDBTest" // 数据库进行数据持久化时 文件的保存位置 注意该路径下不可以有其他人任何文件 RecordFileMaxSize = 65536 // 文件的最大存储字节数 RecordFileIOMode = storage.TraditionalIOFile // 对文件的读写方式 可以是传统的IO 也可以是Mmap MisakaServerAddr = ":23456" // 数据库的端口 LoggerPath = "D:\\MisakaDBLog" // 数据库的Log保存位置 该位置下有无其它文件都可以 SyncDuration = 1000 // 持久化文件定时同步的时间间隔 单位为毫秒 ) // 下面这是Linux版的路径 方便我切换 //const ( // MisakaDataBaseFolderPath = "/home/MisakaDB" // LoggerPath = "/home/MisakaDBLog" //) type MisakaDataBase struct { server *redcon.Server logger *logger.Logger hashIndex *index.HashIndex stringIndex *index.StringIndex } func Init() (*MisakaDataBase, error) { database := &MisakaDataBase{} var e error // 初始化logger database.logger, e = logger.NewLogger(LoggerPath) if e != nil { return nil, e } logger.GenerateInfoLog("Logger is Ready!") // 读取文件 activeFiles, archiveFiles, e := storage.RecordFilesInit(MisakaDataBaseFolderPath, RecordFileMaxSize) if e != nil { return nil, e } // 开始构建索引 for key, value := range activeFiles { if key == storage.Hash { database.hashIndex, e = index.BuildHashIndex(value, archiveFiles[storage.Hash], RecordFileIOMode, MisakaDataBaseFolderPath, RecordFileMaxSize, time.Millisecond * SyncDuration) if e != nil { return nil, e } logger.GenerateInfoLog("Hash Index is Ready!") } if key == storage.String { database.stringIndex, e = index.BuildStringIndex(value, archiveFiles[storage.String], RecordFileIOMode, MisakaDataBaseFolderPath, RecordFileMaxSize, time.Millisecond * SyncDuration) if e != nil { return nil, e } logger.GenerateInfoLog("String Index is Ready!") } } // 开始检查索引是否构建 如果否 构建一个空的索引 // 这是防activeFiles本身 if database.hashIndex == nil { database.hashIndex, e = index.BuildHashIndex(nil, nil, RecordFileIOMode, MisakaDataBaseFolderPath, RecordFileMaxSize, time.Millisecond * SyncDuration) if e != nil { logger.GenerateErrorLog(false, false, e.Error(), "Build Empty Hash Index Failed!") return nil, e } logger.GenerateInfoLog("Hash Index is Ready!") } if database.stringIndex == nil { database.stringIndex, e = index.BuildStringIndex(nil, nil, RecordFileIOMode, MisakaDataBaseFolderPath, RecordFileMaxSize, time.Millisecond * SyncDuration) if e != nil { logger.GenerateErrorLog(false, false, e.Error(), "Build Empty String Index Failed!") return nil, e } logger.GenerateInfoLog("String Index is Ready!") } // 初始化服务器 e = database.ServerInit() if e != nil { logger.GenerateErrorLog(false, false, e.Error(), "Server Init Failed!") return nil, e } logger.GenerateInfoLog("Server is Ready!") return database, nil } func (db *MisakaDataBase) Destroy() error { // 关闭服务器 e := db.server.Close() if e != nil { logger.GenerateErrorLog(false, false, e.Error()) return e } // 关闭索引 索引里会挨个关闭文件的 e = db.hashIndex.CloseIndex() if e != nil { return e } e = db.stringIndex.CloseIndex() if e != nil { return e } // 关闭logger db.logger.StopLogger() return nil } func (db *MisakaDataBase) ServerInit() error { // redcon是多线程的 而且应该是线程安全的 // 创建一个Server需要三个回调函数: // 1 通过连接接收请求时调用的函数 // 2 接受连接时调用的函数 // 3 断开连接时调用的函数 var ( e error expired int ) db.server = redcon.NewServer(MisakaServerAddr, func(conn redcon.Conn, cmd redcon.Command) { // 捕捉panic defer func() { p := recover() if p != nil { stackTrace := debug.Stack() // 获取引发panic位置的堆栈信息 logger.GenerateErrorLog(true, false, string(stackTrace), fmt.Sprintf("%v", p)) } return }() switch strings.ToLower(string(cmd.Args[0])) { default: // 命令不能识别 conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'") logger.GenerateInfoLog(conn.RemoteAddr() + ": Unknown Query: " + string(cmd.Args[0])) return case "ping": conn.WriteString("PONG") logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: ping") return case "quit": conn.WriteString("OK") e = conn.Close() if e != nil { logger.GenerateErrorLog(false, false, e.Error()) } return // string部分的命令解析 case "set": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: set") if len(cmd.Args) == 3 { // set key value e = db.stringIndex.Set(string(cmd.Args[1]), string(cmd.Args[2]), -1) if e != nil { conn.WriteError(e.Error()) return } conn.WriteString("OK") } else if len(cmd.Args) == 5 { // set key value expired time expired, e = strconv.Atoi(string(cmd.Args[4])) if e != nil { conn.WriteError("Cannot Read Expired As Number: " + e.Error()) return } e = db.stringIndex.Set(string(cmd.Args[1]), string(cmd.Args[2]), int64(expired)) if e != nil { conn.WriteError(e.Error()) return } conn.WriteString("OK") return } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } case "setnx": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: setnx") if len(cmd.Args) == 3 { // setnx key value e = db.stringIndex.SetNX(string(cmd.Args[1]), string(cmd.Args[2]), -1) if e != nil { if errors.Is(logger.KeyIsExisted, e) { conn.WriteInt(0) return } else { conn.WriteError(e.Error()) return } } conn.WriteInt(1) } else if len(cmd.Args) == 5 { // setnx key value expired time expired, e = strconv.Atoi(string(cmd.Args[4])) if e != nil { conn.WriteError("Cannot Read Expired As Number: " + e.Error()) return } e = db.stringIndex.SetNX(string(cmd.Args[1]), string(cmd.Args[2]), int64(expired)) if e != nil { conn.WriteError(e.Error()) return } conn.WriteString("OK") return } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } case "get": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: get") if len(cmd.Args) == 2 { // get key var result string result, e = db.stringIndex.Get(string(cmd.Args[1])) if errors.Is(logger.KeyIsNotExisted, e) { conn.WriteString("nil") return } else if e != nil { conn.WriteError(e.Error()) return } conn.WriteString(result) return } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } case "getrange": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: getrange") if len(cmd.Args) == 4 { // getrange key start end var ( result string start int end int ) start, e = strconv.Atoi(string(cmd.Args[2])) if e != nil { conn.WriteError("Cannot Read Start As Number: " + e.Error()) return } end, e = strconv.Atoi(string(cmd.Args[3])) if e != nil { conn.WriteError("Cannot Read End As Number: " + e.Error()) return } result, e = db.stringIndex.GetRange(string(cmd.Args[1]), start, end) if errors.Is(logger.KeyIsNotExisted, e) { conn.WriteString("nil") } else if e != nil { conn.WriteError(e.Error()) return } conn.WriteString(result) return } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } case "getset": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: getset") if len(cmd.Args) == 3 { // getset key value var result string result, e = db.stringIndex.GetSet(string(cmd.Args[1]), string(cmd.Args[2])) if e != nil { conn.WriteError(e.Error()) return } conn.WriteString(result) return } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } case "append": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: append") if len(cmd.Args) == 3 { // append key appendValue e = db.stringIndex.Append(string(cmd.Args[1]), string(cmd.Args[2])) if e != nil { conn.WriteError(e.Error()) return } conn.WriteInt(len(cmd.Args[2])) return } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } case "del": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: del") if len(cmd.Args) == 2 { // del key e = db.stringIndex.Del(string(cmd.Args[1])) if e != nil { if errors.Is(logger.KeyIsNotExisted, e) { conn.WriteInt(0) } else { conn.WriteError(e.Error()) } return } conn.WriteInt(1) return } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } // hash部分的命令解析 case "hset": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hset") if len(cmd.Args) == 4 { // hset key field value e = db.hashIndex.HSet(string(cmd.Args[1]), string(cmd.Args[2]), string(cmd.Args[3]), -1) if e != nil { conn.WriteError(e.Error()) return } conn.WriteString("OK") return } else if len(cmd.Args) == 6 { // 设置过期时间 // todo } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } case "hsetnx": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hsetnx") if len(cmd.Args) == 4 { // hset key field value e = db.hashIndex.HSetNX(string(cmd.Args[1]), string(cmd.Args[2]), string(cmd.Args[3]), -1) if e != nil { conn.WriteError(e.Error()) return } conn.WriteString("OK") return } else if len(cmd.Args) == 6 { // 设置过期时间 // todo } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } case "hget": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hget") if len(cmd.Args) == 3 { // hget key field var result string result, e = db.hashIndex.HGet(string(cmd.Args[1]), string(cmd.Args[2])) if errors.Is(logger.KeyIsNotExisted, e) || errors.Is(logger.FieldIsNotExisted, e) { conn.WriteString("nil") } else if e != nil { conn.WriteError(e.Error()) return } conn.WriteString(result) return } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } case "hdel": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hdel") if len(cmd.Args) == 3 { // hdel key field e = db.hashIndex.HDel(string(cmd.Args[1]), string(cmd.Args[2]), true) if e != nil { conn.WriteError(e.Error()) return } conn.WriteInt(1) return } else if len(cmd.Args) == 2 { // hdel key e = db.hashIndex.HDel(string(cmd.Args[1]), "", false) if e != nil { conn.WriteError(e.Error()) return } conn.WriteInt(1) return } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } case "hlen": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hlen") if len(cmd.Args) == 2 { // hlen key var result int result, e = db.hashIndex.HLen(string(cmd.Args[1])) if e != nil { conn.WriteError(e.Error()) return } conn.WriteInt(result) return } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } case "hexists": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hexists") if len(cmd.Args) == 2 { // hexists key field var result bool result, e = db.hashIndex.HExist(string(cmd.Args[1]), string(cmd.Args[2])) if e != nil { conn.WriteError(e.Error()) return } if result { conn.WriteInt(1) } else { conn.WriteInt(0) } return } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } case "hstrlen": logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hstrlen") if len(cmd.Args) == 2 { // hstrlen key field var result int result, e = db.hashIndex.HStrLen(string(cmd.Args[1]), string(cmd.Args[2])) if errors.Is(logger.FieldIsNotExisted, e) { conn.WriteInt(0) return } else if e != nil { conn.WriteError(e.Error()) return } conn.WriteInt(result) return } else { // 参数数量错误 conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") return } } }, func(conn redcon.Conn) bool { logger.GenerateInfoLog("DataBase Connection Accept: " + conn.RemoteAddr()) return true }, func(conn redcon.Conn, err error) { logger.GenerateInfoLog("DataBase Connection Closed: " + conn.RemoteAddr()) return }, ) return nil } func (db *MisakaDataBase) StartServe() error { logger.GenerateInfoLog("Server start Listen And Serve!") return db.server.ListenAndServe() // 翻源码可知: // ListenAndServe -> ListenServeAndSignal -> serve -> 如果有tcp连接 -> go handle // 所以ListenServeAndSignal是阻塞线程监听的 }
go
// main.go package main import ( "fmt" "runtime" ) func main() { db, e := Init() if e != nil { fmt.Println(e.Error()) return } e = db.StartServe() if e != nil { fmt.Println(e.Error()) } buffer := make([]byte, 10000000) bytesNum := runtime.Stack(buffer, false) fmt.Println(string(buffer[:bytesNum])) }

3 在Windows上交叉编译Linux可执行文件

bash
$env:GOOS="linux" go build -ldflags "-s -w" -o MisakaDataBase MisakaDB

本文作者:御坂19327号

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!