主要内容:
根据上一篇定义的Hash数据类型的可用操作,确定Hash索引的操作。
提示
之前确定的Hash可用操作:
hset key field value
,设置值。如果设置成功则返回1,否则返回0。hsetnx key field value
,设置值,但是它只有在field不存在时才能设置成功。hget key field
,获取值,如果key或者field不存在则返回nil。hdel key field [field ...]
,删除值,返回被删除的值的数量。hlen key
,获取值的field个数。hexists key field
,判断field是否存在,存在返回1,反之返回0。hstrlen key field
,获取value的长度,如果field不存在则返回0。Hash数据类型的主要索引结构为双层map,第一层map负责key的映射,第二层map负责field到value的映射。另外在这里加锁以加入并发保护。之后就是在双层map的基础上实现这7个方法并且各自创建entry再写入文件的事情了。
另外,hash和别的一些数据结构也有个问题,那就是都是两个东西(key和field)确定一个键,但是entry中只有一个key位置,这里需要对key和field进行编码,让它们在一起。
go// util.go
// EncodeKeyAndField 为了Hash等类型提供 将Key和Field编码在一起形成一个新的Key
func EncodeKeyAndField(key string, field string) []byte {
header := make([]byte, 10)
index := 0
index += binary.PutVarint(header, int64(len(key)))
index += binary.PutVarint(header[index:], int64(len(field)))
result := make([]byte, index+len(key)+len(field))
copy(result[:index], header[:])
copy(result[index:], key)
copy(result[index+len(key):], field)
return result
}
// DecodeKeyAndField 为了Hash等类型提供 将由EncodeKeyAndField编码的结果解码为key和field
func DecodeKeyAndField(input []byte) (key string, field string, e error) {
index := 0
kSize, n := binary.Varint(input)
if n <= 0 {
logger.GenerateErrorLog(false, false, logger.DecodeKeyAndFieldFailed.Error(), TurnByteArrayToString(input))
return "", "", logger.DecodeKeyAndFieldFailed
}
index += n
fSize, n := binary.Varint(input[index:])
if n <= 0 {
logger.GenerateErrorLog(false, false, logger.DecodeKeyAndFieldFailed.Error(), TurnByteArrayToString(input))
return "", "", logger.DecodeKeyAndFieldFailed
}
index += n
key = string(input[index : int64(index)+kSize])
index += int(kSize)
field = string(input[index : int64(index)+fSize])
e = nil
return
}
go// hashindex.go
package index
import (
"MisakaDB/logger"
"MisakaDB/storage"
"MisakaDB/util"
"errors"
"sync"
"time"
)
type HashIndex struct {
index map[string]map[string]*IndexNode
mutex sync.RWMutex
activeFile *storage.RecordFile
archivedFile map[uint32]*storage.RecordFile
fileIOMode storage.FileIOType
baseFolderPath string
fileMaxSize int64
syncDuration time.Duration
}
// BuildHashIndex 给定当前活跃文件和归档文件 重新构建Hash类型的索引 该方法只会在数据库启动时被调用 如果不存在旧的文件 则新建一个活跃文件
func BuildHashIndex(activeFile *storage.RecordFile, archivedFile map[uint32]*storage.RecordFile, fileIOMode storage.FileIOType, baseFolderPath string, fileMaxSize int64, syncDuration time.Duration) (*HashIndex, error) {
result := &HashIndex{
activeFile: activeFile,
archivedFile: archivedFile,
fileIOMode: fileIOMode,
baseFolderPath: baseFolderPath,
fileMaxSize: fileMaxSize,
index: make(map[string]map[string]*IndexNode),
syncDuration: syncDuration,
}
var offset int64
var fileLength int64
var entryLength int64
var entry *storage.Entry
var e error
// 如果活跃文件都读取不到的话 肯定也没有归档文件了 直接返回即可
if activeFile == nil {
result.activeFile, e = storage.NewRecordFile(result.fileIOMode, storage.Hash, 1, result.baseFolderPath, result.fileMaxSize)
if e != nil {
return nil, e
}
result.archivedFile = make(map[uint32]*storage.RecordFile) // 如果activeFile为空 那么传进来的archivedFile一定也为空 这时再赋值会报错
result.archivedFile[1] = result.activeFile
result.activeFile.StartSyncRoutine(syncDuration) // 开始定时同步
return result, nil
}
// 读取所有归档文件的entry 因为活跃文件也在这个归档文件里 所以不再单独读取活跃文件
for i := uint32(1); i <= uint32(len(archivedFile)); i++ {
recordFile, ok := archivedFile[uint32(i)]
if ok == false {
continue
}
offset = 0
fileLength, e = recordFile.Length()
if e != nil {
return nil, e
}
for offset < fileLength {
entry, entryLength, e = recordFile.ReadIntoEntry(offset)
if e != nil {
return nil, e
}
e = result.handleEntry(entry, recordFile.GetFileID(), offset)
if e != nil {
return nil, e
}
offset += entryLength
}
}
result.activeFile.StartSyncRoutine(syncDuration)
return result, nil
}
// CloseIndex 关闭Hash索引 同时停止定时Sync 关闭文件
func (hi *HashIndex) CloseIndex() error {
hi.mutex.Lock()
defer hi.mutex.Unlock()
for _, v := range hi.archivedFile {
if v.IsSyncing {
v.StopSyncRoutine()
}
e := v.Close()
if e != nil {
return e
}
}
return nil
}
// HSet 给定key field value 设定值 如果key field都存在即为更新值
func (hi *HashIndex) HSet(key string, field string, value string, expiredAt int64) error {
entry := &storage.Entry{
EntryType: storage.TypeRecord,
ExpiredAt: expiredAt,
Key: util.EncodeKeyAndField(key, field),
Value: []byte(value),
}
indexNode := &IndexNode{
expiredAt: expiredAt,
}
hi.mutex.Lock()
defer hi.mutex.Unlock()
// 写入文件 同时记录offset和fileID
offset, e := hi.writeEntry(entry)
if e != nil {
return e
}
indexNode.offset = offset
indexNode.fileID = hi.activeFile.GetFileID()
indexNode.value = []byte(value)
// 最后写入索引
if _, ok := hi.index[key]; ok {
hi.index[key][field] = indexNode
} else {
hi.index[key] = make(map[string]*IndexNode)
hi.index[key][field] = indexNode
}
return nil
}
// HSetNX 同HSet 但是只有在field不存在时才能写入 否则返回
func (hi *HashIndex) HSetNX(key string, field string, value string, expiredAt int64) error {
// HExist和HSet都已加锁 所以这里没有锁操作
ok, e := hi.HExist(key, field)
if e != nil {
return e
}
if ok == true {
logger.GenerateErrorLog(false, false, logger.FieldIsExisted.Error(), key, field, value)
return logger.FieldIsExisted
} else {
return hi.HSet(key, field, value, expiredAt)
}
}
// HGet 根据给定的key和field尝试获取value
func (hi *HashIndex) HGet(key string, field string) (string, error) {
hi.mutex.RLock()
_, ok := hi.index[key]
if ok != true {
logger.GenerateErrorLog(false, false, logger.KeyIsNotExisted.Error(), key, field)
hi.mutex.RUnlock()
return "", logger.KeyIsNotExisted
}
indexNode, ok := hi.index[key][field]
if ok != true {
logger.GenerateErrorLog(false, false, logger.FieldIsNotExisted.Error(), key, field)
return "", logger.FieldIsNotExisted
}
// 如果过期时间为-1则说明永不过期
if indexNode.expiredAt < time.Now().Unix() && indexNode.expiredAt != -1 {
logger.GenerateInfoLog(logger.ValueIsExpired.Error() + " {" + field + ": " + string(indexNode.value) + "}")
// 读取的Entry过期 删索引
hi.mutex.RUnlock()
hi.mutex.Lock()
delete(hi.index[key], field)
hi.mutex.Unlock()
return "", logger.ValueIsExpired
}
hi.mutex.RUnlock()
return string(indexNode.value), nil
}
// HDel 根据key和field尝试删除键值对 如果deleteField为true 则认为删的是hash里面的键值对 反之则认为删除的是整个hash
func (hi *HashIndex) HDel(key string, field string, deleteField bool) error {
fieldIsExist, e := hi.HExist(key, field) // 查key和field
if e != nil {
return e
}
hi.mutex.Lock()
defer hi.mutex.Unlock()
if deleteField {
// 删键值对
if fieldIsExist != true { // 查field
logger.GenerateErrorLog(false, false, logger.FieldIsNotExisted.Error(), key, field)
return logger.FieldIsNotExisted
}
entry := &storage.Entry{
EntryType: storage.TypeDelete,
Key: util.EncodeKeyAndField(key, field),
Value: []byte{},
ExpiredAt: 0,
}
// 尝试写入删除Entry 删除Entry不需要记录offset
_, e = hi.writeEntry(entry)
if e != nil {
return e
}
// 然后修改索引
delete(hi.index[key], field)
return nil
} else {
// 删hash
entry := &storage.Entry{
EntryType: storage.TypeDelete,
Key: util.EncodeKeyAndField(key, ""),
Value: []byte{},
ExpiredAt: 0,
}
// 尝试写入删除Entry 删除Entry不需要记录offset
_, e = hi.writeEntry(entry)
if e != nil {
return e
}
// 然后修改索引
delete(hi.index, key)
return nil
}
}
// HLen 根据给定的key 寻找field的个数
func (hi *HashIndex) HLen(key string) (int, error) {
hi.mutex.RLock()
defer hi.mutex.RUnlock()
_, ok := hi.index[key]
if ok != true {
logger.GenerateErrorLog(false, false, logger.KeyIsNotExisted.Error(), key)
return 0, logger.KeyIsNotExisted
}
return len(hi.index[key]), nil
}
// HExist 根据给定的key和field判断field是否存在
func (hi *HashIndex) HExist(key string, field string) (bool, error) {
hi.mutex.RLock()
defer hi.mutex.RUnlock()
_, ok := hi.index[key]
if ok != true {
logger.GenerateErrorLog(false, false, logger.KeyIsNotExisted.Error(), key)
return false, logger.KeyIsNotExisted
}
_, ok = hi.index[key][field]
return ok, nil
}
// HStrLen 根据给定的key和field 确定value的长度
func (hi *HashIndex) HStrLen(key string, field string) (int, error) {
// 不加锁原因同HSetNX
value, e := hi.HGet(key, field)
if e != nil {
return 0, e
}
return len(value), nil
}
// writeEntry 尝试将entry写入文件 如果活跃文件写满则自动新开一个文件继续尝试写入 如果写入成功则返回nil和写入前的offset
func (hi *HashIndex) writeEntry(entry *storage.Entry) (int64, error) {
offset := hi.activeFile.GetOffset()
e := hi.activeFile.WriteEntryIntoFile(entry)
// 如果文件已满
if errors.Is(e, logger.FileBytesIsMaxedOut) {
// 先结束旧文件的定时同步
hi.activeFile.StopSyncRoutine()
// 开一个新的文件 这个新的活跃文件的序号自动在之前的活跃文件上 + 1
hi.activeFile, e = storage.NewRecordFile(hi.fileIOMode, storage.Hash, hi.activeFile.GetFileID()+1, hi.baseFolderPath, hi.fileMaxSize)
if e != nil {
return 0, e
}
// 这个新的活跃文件写入归档文件映射中
hi.archivedFile[hi.activeFile.GetFileID()] = hi.activeFile
// 先开启定时同步
hi.activeFile.StartSyncRoutine(hi.syncDuration)
// 再尝试写入
offset = hi.activeFile.GetOffset()
e = hi.activeFile.WriteEntryIntoFile(entry)
if e != nil {
return 0, e
}
} else if e != nil {
return 0, e
}
return offset, nil
}
// handleEntry 接收Entry 并且写入Hash索引
// attention 它只对索引进行操作
func (hi *HashIndex) handleEntry(entry *storage.Entry, fileID uint32, offset int64) error {
key, field, e := util.DecodeKeyAndField(entry.Key)
if e != nil {
return e
}
hi.mutex.Lock()
defer hi.mutex.Unlock()
switch entry.EntryType {
case storage.TypeDelete:
{
if field != "" { // 一个是按key删掉整个hash 一个实按field删一个value
delete(hi.index[key], field)
} else {
delete(hi.index, key)
}
}
case storage.TypeRecord:
// attention 这里之所以只对RecordEntry进行检查 是因为对map的delete函数 如果传入的map本身就是空或者要删除的键找不到值 它就直接返回了 并不会报错
// 所以DeleteEntry不需要过期检查 过期就过期吧 过期了也只是no-op而已
// 如果过期时间为-1则说明永不过期
if entry.ExpiredAt < time.Now().Unix() && entry.ExpiredAt != -1 {
// attention 过期logger
return nil
}
if _, ok := hi.index[key]; ok {
hi.index[key][field] = &IndexNode{
value: entry.Value,
expiredAt: entry.ExpiredAt,
fileID: fileID,
offset: offset,
}
} else {
hi.index[key] = make(map[string]*IndexNode)
hi.index[key][field] = &IndexNode{
value: entry.Value,
expiredAt: entry.ExpiredAt,
fileID: fileID,
offset: offset,
}
}
}
return nil
}
警告
注意,上面的HashIndex是在Logger完成后才添加进来的,但下面的HashIndex_test不是,所以下面这个仅供参考,实际要用的时候需要修改。
go// hashindex_test.go
package index
import (
"MisakaDB/logger"
"MisakaDB/storage"
"strconv"
"testing"
"time"
)
func TestBuildHashIndex(t *testing.T) {
activeFiles, archiveFiles, e := storage.RecordFilesInit("D:\\MisakaDBTest", 65536)
if e != nil {
t.Error(e)
return
}
hashIndex, e := BuildHashIndex(activeFiles[storage.Hash], archiveFiles[storage.Hash], storage.TraditionalIOFile, "D:\\MisakaDBTest", 65536, time.Second)
if e != nil {
t.Error(e)
return
}
e = hashIndex.HSet("testKey1", "testField1", "testValue1", 32503637532)
if e != nil {
t.Error(e)
return
}
value, e := hashIndex.HGet("testKey1", "testField1")
if e != nil {
t.Error(e)
return
}
t.Log(value)
}
func TestBuildHashIndex2(t *testing.T) {
activeFiles, archiveFiles, e := storage.RecordFilesInit("D:\\MisakaDBTest", 65536)
if e != nil {
t.Error(e)
return
}
hashIndex, e := BuildHashIndex(activeFiles[storage.Hash], archiveFiles[storage.Hash], storage.TraditionalIOFile, "D:\\MisakaDBTest", 65536, time.Second)
if e != nil {
t.Error(e)
return
}
value, e := hashIndex.HGet("testKey1", "testField1")
if e != nil {
t.Error(e)
return
}
t.Log(value)
}
func TestBuildHashIndex3(t *testing.T) {
activeFiles, archiveFiles, e := storage.RecordFilesInit("D:\\MisakaDBTest", 65536)
if e != nil {
t.Error(e)
return
}
hashIndex, e := BuildHashIndex(activeFiles[storage.Hash], archiveFiles[storage.Hash], storage.TraditionalIOFile, "D:\\MisakaDBTest", 65536, time.Second)
if e != nil {
t.Error(e)
return
}
e = hashIndex.HDel("testKey1", "testField1", true)
if e != nil {
t.Error(e)
return
}
_, e = hashIndex.HGet("testKey1", "testField1")
if e != nil {
t.Error(e)
return
}
}
func TestBuildHashIndex4(t *testing.T) {
activeFiles, archiveFiles, e := storage.RecordFilesInit("D:\\MisakaDBTest", 65536)
if e != nil {
t.Error(e)
return
}
hashIndex, e := BuildHashIndex(activeFiles[storage.Hash], archiveFiles[storage.Hash], storage.TraditionalIOFile, "D:\\MisakaDBTest", 65536, time.Second)
if e != nil {
t.Error(e)
return
}
e = hashIndex.HSet("testKey1", "testField1", "testValue1", 32503637532)
if e != nil {
t.Error(e)
return
}
value, e := hashIndex.HExist("testKey1", "testField1")
if e != nil {
t.Error(e)
return
}
t.Log(value)
v, e := hashIndex.HLen("testKey1")
if e != nil {
t.Error(e)
return
}
t.Log(v)
va, e := hashIndex.HStrLen("testKey1", "testField1")
if e != nil {
t.Error(e)
return
}
t.Log(va)
}
func TestBuildHashIndex5(t *testing.T) {
l, _ := logger.NewLogger("D:\\MisakaDBLog")
l.ListenLoggerChannel()
startTime := time.Now()
activeFiles, archiveFiles, e := storage.RecordFilesInit("D:\\MisakaDBTest", 50000000)
if e != nil {
t.Error(e)
return
}
hashIndex, e := BuildHashIndex(activeFiles[storage.Hash], archiveFiles[storage.Hash], storage.TraditionalIOFile, "D:\\MisakaDBTest", 50000000, time.Second)
if e != nil {
t.Error(e)
return
}
endTime := time.Now()
t.Log(endTime.Sub(startTime).Seconds())
testData := make(map[string]map[string]string)
for i := 0; i < 1000; i++ {
testData["testKey"+strconv.Itoa(i)] = make(map[string]string)
for j := 0; j < 10000; j++ {
testData["testKey"+strconv.Itoa(i)]["testField"+strconv.Itoa(j)] = "testValue" + strconv.Itoa(j)
}
}
t.Log("Test Data is Ready!")
startTime = time.Now()
for key, fieldMap := range testData {
for field, value := range fieldMap {
e = hashIndex.HSet(key, field, value, 32503637532)
if e != nil {
t.Error(e)
return
}
}
}
endTime = time.Now()
t.Log(endTime.Sub(startTime).Seconds())
startTime = time.Now()
var getValue string
count := 0
for key, fieldMap := range testData {
for field, value := range fieldMap {
getValue, e = hashIndex.HGet(key, field)
if e != nil {
t.Error(e)
return
}
if getValue != value {
t.Log(value + "---" + getValue)
count += 1
}
}
}
endTime = time.Now()
t.Log(endTime.Sub(startTime).Seconds())
t.Log(count)
}
这个项目解析Redis协议是使用的Redcon库,该第三方库给的示例是这样的:
gofunc main() {
startTime := time.Now()
activeFiles, archiveFiles, e := storage.RecordFilesInit("D:\\MisakaDBTest", 50000000)
if e != nil {
fmt.Println(e)
return
}
hashIndex, e := index.BuildHashIndex(activeFiles[storage.Hash], archiveFiles[storage.Hash], storage.TraditionalIOFile, "D:\\MisakaDBTest", 50000000, time.Second)
if e != nil {
fmt.Println(e)
return
}
endTime := time.Now()
fmt.Println(endTime.Sub(startTime).Seconds())
testData := make(map[string]map[string]string)
for i := 0; i < 1000; i++ {
testData["testKey"+strconv.Itoa(i)] = make(map[string]string)
for j := 0; j < 10000; j++ {
testData["testKey"+strconv.Itoa(i)]["testField"+strconv.Itoa(j)] = "testValue" + strconv.Itoa(j)
}
}
fmt.Println("Test Data is Ready!")
startTime = time.Now()
for key, fieldMap := range testData {
for field, value := range fieldMap {
e = hashIndex.HSet(key, field, value, 32503637532)
if e != nil {
fmt.Println(e)
return
}
}
}
endTime = time.Now()
fmt.Println(endTime.Sub(startTime).Seconds())
startTime = time.Now()
var getValue string
count := 0
for key, fieldMap := range testData {
for field, value := range fieldMap {
getValue, e = hashIndex.HGet(key, field)
if e != nil {
fmt.Println(e)
return
}
if getValue != value {
fmt.Println(value + "---" + getValue)
count += 1
}
}
}
endTime = time.Now()
fmt.Println(endTime.Sub(startTime).Seconds())
fmt.Println(count)
}
该第三方库已经将请求的具体参数解析好了,只需要在handler函数中接收请求,调用索引的方法即可。当然对于本项目来说,Redis服务器启动之前,还需要先读取文件以还原索引。所以有以下代码:
警告
和上面一样,这里已经是实现了Logger和StringIndex的代码了,阅读时注意。
go// storage.go
package storage
import (
"MisakaDB/logger"
"os"
"path/filepath"
)
// RecordFilesInit 按路径读取所有文件 并且转换为RecordFile 按数据类型进行分类 默认情况下编号最大的文件是活跃文件
// attention 活跃文件也存在于归档文件中 等到活跃文件写满之后 再开一个活跃文件存入归档文件即可 之前的活跃文件自动成为归档文件
func RecordFilesInit(path string, fileMaxSize int64) (activeFiles map[FileForData]*RecordFile, archiveFiles map[FileForData]map[uint32]*RecordFile, e error) {
var filesPath []string
var walkFunc = func(path string, info os.FileInfo, err error) error {
if !info.IsDir() {
filesPath = append(filesPath, path)
}
return nil
}
e = filepath.Walk(path, walkFunc)
if e != nil {
logger.GenerateErrorLog(false, false, e.Error(), path)
return nil, nil, e
}
archiveFiles = make(map[FileForData]map[uint32]*RecordFile)
activeFiles = make(map[FileForData]*RecordFile)
var recordFile *RecordFile
for _, i := range filesPath {
// 读取文件
recordFile, e = LoadRecordFileFromDisk(i, fileMaxSize)
if e != nil {
return nil, nil, e
}
// 先写入归档文件 记得先检查双重hash是否有nil
if _, ok := archiveFiles[recordFile.dataType]; ok {
archiveFiles[recordFile.dataType][recordFile.fileID] = recordFile
} else {
archiveFiles[recordFile.dataType] = make(map[uint32]*RecordFile)
archiveFiles[recordFile.dataType][recordFile.fileID] = recordFile
}
// 再看fileID大小写入活跃文件
if activeFiles[recordFile.dataType] == nil {
activeFiles[recordFile.dataType] = recordFile
} else {
if activeFiles[recordFile.dataType].fileID < recordFile.fileID {
activeFiles[recordFile.dataType] = recordFile
}
}
}
e = nil
return
}
go// misakaDataBase.go
package main
import (
"MisakaDB/index"
"MisakaDB/logger"
"MisakaDB/storage"
"errors"
"fmt"
"github.com/tidwall/redcon"
"runtime/debug"
"strconv"
"strings"
"time"
)
// 以下为可选配置项
const (
MisakaDataBaseFolderPath = "D:\\MisakaDBTest" // 数据库进行数据持久化时 文件的保存位置 注意该路径下不可以有其他人任何文件
RecordFileMaxSize = 65536 // 文件的最大存储字节数
RecordFileIOMode = storage.TraditionalIOFile // 对文件的读写方式 可以是传统的IO 也可以是Mmap
MisakaServerAddr = ":23456" // 数据库的端口
LoggerPath = "D:\\MisakaDBLog" // 数据库的Log保存位置 该位置下有无其它文件都可以
SyncDuration = 1000 // 持久化文件定时同步的时间间隔 单位为毫秒
)
// 下面这是Linux版的路径 方便我切换
//const (
// MisakaDataBaseFolderPath = "/home/MisakaDB"
// LoggerPath = "/home/MisakaDBLog"
//)
type MisakaDataBase struct {
server *redcon.Server
logger *logger.Logger
hashIndex *index.HashIndex
stringIndex *index.StringIndex
}
func Init() (*MisakaDataBase, error) {
database := &MisakaDataBase{}
var e error
// 初始化logger
database.logger, e = logger.NewLogger(LoggerPath)
if e != nil {
return nil, e
}
logger.GenerateInfoLog("Logger is Ready!")
// 读取文件
activeFiles, archiveFiles, e := storage.RecordFilesInit(MisakaDataBaseFolderPath, RecordFileMaxSize)
if e != nil {
return nil, e
}
// 开始构建索引
for key, value := range activeFiles {
if key == storage.Hash {
database.hashIndex, e = index.BuildHashIndex(value, archiveFiles[storage.Hash], RecordFileIOMode, MisakaDataBaseFolderPath, RecordFileMaxSize, time.Millisecond * SyncDuration)
if e != nil {
return nil, e
}
logger.GenerateInfoLog("Hash Index is Ready!")
}
if key == storage.String {
database.stringIndex, e = index.BuildStringIndex(value, archiveFiles[storage.String], RecordFileIOMode, MisakaDataBaseFolderPath, RecordFileMaxSize, time.Millisecond * SyncDuration)
if e != nil {
return nil, e
}
logger.GenerateInfoLog("String Index is Ready!")
}
}
// 开始检查索引是否构建 如果否 构建一个空的索引
// 这是防activeFiles本身
if database.hashIndex == nil {
database.hashIndex, e = index.BuildHashIndex(nil, nil, RecordFileIOMode, MisakaDataBaseFolderPath, RecordFileMaxSize, time.Millisecond * SyncDuration)
if e != nil {
logger.GenerateErrorLog(false, false, e.Error(), "Build Empty Hash Index Failed!")
return nil, e
}
logger.GenerateInfoLog("Hash Index is Ready!")
}
if database.stringIndex == nil {
database.stringIndex, e = index.BuildStringIndex(nil, nil, RecordFileIOMode, MisakaDataBaseFolderPath, RecordFileMaxSize, time.Millisecond * SyncDuration)
if e != nil {
logger.GenerateErrorLog(false, false, e.Error(), "Build Empty String Index Failed!")
return nil, e
}
logger.GenerateInfoLog("String Index is Ready!")
}
// 初始化服务器
e = database.ServerInit()
if e != nil {
logger.GenerateErrorLog(false, false, e.Error(), "Server Init Failed!")
return nil, e
}
logger.GenerateInfoLog("Server is Ready!")
return database, nil
}
func (db *MisakaDataBase) Destroy() error {
// 关闭服务器
e := db.server.Close()
if e != nil {
logger.GenerateErrorLog(false, false, e.Error())
return e
}
// 关闭索引 索引里会挨个关闭文件的
e = db.hashIndex.CloseIndex()
if e != nil {
return e
}
e = db.stringIndex.CloseIndex()
if e != nil {
return e
}
// 关闭logger
db.logger.StopLogger()
return nil
}
func (db *MisakaDataBase) ServerInit() error {
// redcon是多线程的 而且应该是线程安全的
// 创建一个Server需要三个回调函数:
// 1 通过连接接收请求时调用的函数
// 2 接受连接时调用的函数
// 3 断开连接时调用的函数
var (
e error
expired int
)
db.server = redcon.NewServer(MisakaServerAddr,
func(conn redcon.Conn, cmd redcon.Command) {
// 捕捉panic
defer func() {
p := recover()
if p != nil {
stackTrace := debug.Stack() // 获取引发panic位置的堆栈信息
logger.GenerateErrorLog(true, false, string(stackTrace), fmt.Sprintf("%v", p))
}
return
}()
switch strings.ToLower(string(cmd.Args[0])) {
default:
// 命令不能识别
conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'")
logger.GenerateInfoLog(conn.RemoteAddr() + ": Unknown Query: " + string(cmd.Args[0]))
return
case "ping":
conn.WriteString("PONG")
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: ping")
return
case "quit":
conn.WriteString("OK")
e = conn.Close()
if e != nil {
logger.GenerateErrorLog(false, false, e.Error())
}
return
// string部分的命令解析
case "set":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: set")
if len(cmd.Args) == 3 {
// set key value
e = db.stringIndex.Set(string(cmd.Args[1]), string(cmd.Args[2]), -1)
if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteString("OK")
} else if len(cmd.Args) == 5 {
// set key value expired time
expired, e = strconv.Atoi(string(cmd.Args[4]))
if e != nil {
conn.WriteError("Cannot Read Expired As Number: " + e.Error())
return
}
e = db.stringIndex.Set(string(cmd.Args[1]), string(cmd.Args[2]), int64(expired))
if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteString("OK")
return
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
case "setnx":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: setnx")
if len(cmd.Args) == 3 {
// setnx key value
e = db.stringIndex.SetNX(string(cmd.Args[1]), string(cmd.Args[2]), -1)
if e != nil {
if errors.Is(logger.KeyIsExisted, e) {
conn.WriteInt(0)
return
} else {
conn.WriteError(e.Error())
return
}
}
conn.WriteInt(1)
} else if len(cmd.Args) == 5 {
// setnx key value expired time
expired, e = strconv.Atoi(string(cmd.Args[4]))
if e != nil {
conn.WriteError("Cannot Read Expired As Number: " + e.Error())
return
}
e = db.stringIndex.SetNX(string(cmd.Args[1]), string(cmd.Args[2]), int64(expired))
if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteString("OK")
return
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
case "get":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: get")
if len(cmd.Args) == 2 {
// get key
var result string
result, e = db.stringIndex.Get(string(cmd.Args[1]))
if errors.Is(logger.KeyIsNotExisted, e) {
conn.WriteString("nil")
return
} else if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteString(result)
return
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
case "getrange":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: getrange")
if len(cmd.Args) == 4 {
// getrange key start end
var (
result string
start int
end int
)
start, e = strconv.Atoi(string(cmd.Args[2]))
if e != nil {
conn.WriteError("Cannot Read Start As Number: " + e.Error())
return
}
end, e = strconv.Atoi(string(cmd.Args[3]))
if e != nil {
conn.WriteError("Cannot Read End As Number: " + e.Error())
return
}
result, e = db.stringIndex.GetRange(string(cmd.Args[1]), start, end)
if errors.Is(logger.KeyIsNotExisted, e) {
conn.WriteString("nil")
} else if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteString(result)
return
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
case "getset":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: getset")
if len(cmd.Args) == 3 {
// getset key value
var result string
result, e = db.stringIndex.GetSet(string(cmd.Args[1]), string(cmd.Args[2]))
if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteString(result)
return
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
case "append":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: append")
if len(cmd.Args) == 3 {
// append key appendValue
e = db.stringIndex.Append(string(cmd.Args[1]), string(cmd.Args[2]))
if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteInt(len(cmd.Args[2]))
return
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
case "del":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: del")
if len(cmd.Args) == 2 {
// del key
e = db.stringIndex.Del(string(cmd.Args[1]))
if e != nil {
if errors.Is(logger.KeyIsNotExisted, e) {
conn.WriteInt(0)
} else {
conn.WriteError(e.Error())
}
return
}
conn.WriteInt(1)
return
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
// hash部分的命令解析
case "hset":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hset")
if len(cmd.Args) == 4 {
// hset key field value
e = db.hashIndex.HSet(string(cmd.Args[1]), string(cmd.Args[2]), string(cmd.Args[3]), -1)
if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteString("OK")
return
} else if len(cmd.Args) == 6 {
// 设置过期时间
// todo
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
case "hsetnx":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hsetnx")
if len(cmd.Args) == 4 {
// hset key field value
e = db.hashIndex.HSetNX(string(cmd.Args[1]), string(cmd.Args[2]), string(cmd.Args[3]), -1)
if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteString("OK")
return
} else if len(cmd.Args) == 6 {
// 设置过期时间
// todo
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
case "hget":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hget")
if len(cmd.Args) == 3 {
// hget key field
var result string
result, e = db.hashIndex.HGet(string(cmd.Args[1]), string(cmd.Args[2]))
if errors.Is(logger.KeyIsNotExisted, e) || errors.Is(logger.FieldIsNotExisted, e) {
conn.WriteString("nil")
} else if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteString(result)
return
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
case "hdel":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hdel")
if len(cmd.Args) == 3 {
// hdel key field
e = db.hashIndex.HDel(string(cmd.Args[1]), string(cmd.Args[2]), true)
if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteInt(1)
return
} else if len(cmd.Args) == 2 {
// hdel key
e = db.hashIndex.HDel(string(cmd.Args[1]), "", false)
if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteInt(1)
return
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
case "hlen":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hlen")
if len(cmd.Args) == 2 {
// hlen key
var result int
result, e = db.hashIndex.HLen(string(cmd.Args[1]))
if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteInt(result)
return
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
case "hexists":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hexists")
if len(cmd.Args) == 2 {
// hexists key field
var result bool
result, e = db.hashIndex.HExist(string(cmd.Args[1]), string(cmd.Args[2]))
if e != nil {
conn.WriteError(e.Error())
return
}
if result {
conn.WriteInt(1)
} else {
conn.WriteInt(0)
}
return
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
case "hstrlen":
logger.GenerateInfoLog(conn.RemoteAddr() + ": Query: hstrlen")
if len(cmd.Args) == 2 {
// hstrlen key field
var result int
result, e = db.hashIndex.HStrLen(string(cmd.Args[1]), string(cmd.Args[2]))
if errors.Is(logger.FieldIsNotExisted, e) {
conn.WriteInt(0)
return
} else if e != nil {
conn.WriteError(e.Error())
return
}
conn.WriteInt(result)
return
} else {
// 参数数量错误
conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command")
return
}
}
},
func(conn redcon.Conn) bool {
logger.GenerateInfoLog("DataBase Connection Accept: " + conn.RemoteAddr())
return true
},
func(conn redcon.Conn, err error) {
logger.GenerateInfoLog("DataBase Connection Closed: " + conn.RemoteAddr())
return
},
)
return nil
}
func (db *MisakaDataBase) StartServe() error {
logger.GenerateInfoLog("Server start Listen And Serve!")
return db.server.ListenAndServe()
// 翻源码可知:
// ListenAndServe -> ListenServeAndSignal -> serve -> 如果有tcp连接 -> go handle
// 所以ListenServeAndSignal是阻塞线程监听的
}
go// main.go
package main
import (
"fmt"
"runtime"
)
func main() {
db, e := Init()
if e != nil {
fmt.Println(e.Error())
return
}
e = db.StartServe()
if e != nil {
fmt.Println(e.Error())
}
buffer := make([]byte, 10000000)
bytesNum := runtime.Stack(buffer, false)
fmt.Println(string(buffer[:bytesNum]))
}
bash$env:GOOS="linux"
go build -ldflags "-s -w" -o MisakaDataBase MisakaDB
本文作者:御坂19327号
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!