教你用100多行写一个数据库(附源码)
本文先容的是觉得中国的IT资深人士写的一个简朴的数据库,没有我们利用的数据库那么强大,可是值得各人警惕。可以用在特定情况中,越发机动利便。
数据库的名字叫WawaDB,是用python实现的。由此可见python是灰常强大啊!
简介
记录日志的需求一般是这样的:
只追加,不修改,写入定时间顺序写入;
大量写,少量读,查询一般查询一个时间段的数据;
MongoDB的牢靠荟萃很好的满意了这个需求,可是MongoDB占内存较量大,有点儿火穿蚊子,小题大做的感受。
WawaDB的思路是每写入1000条日志,在一个索引文件里记录下当前的时间和日志文件的偏移量。
然后定时间询日志时,先把索引加载到内存中,用二分法查出时间点的偏移量,再打开日志文件seek到指定位置,这样就能很快定位用户需要的数据并读取,而不需要遍历整个日志文件。
机能
Core 2 P8400,2.26GHZ,2G内存,32 bit win7
写入测试:
模仿1分钟写入10000条数据,共写入5个小时的数据, 插入300万条数据,每条数据54个字符,用时2分51秒
读取测试:读取指按时间段内包括某个子串的日志
数据范畴 遍历数据量 功效数 用时(秒)
5小时 300万 604 6.6
2小时 120万 225 2.7
1小时 60万 96 1.3
30分钟 30万 44 0.6
索引
只对日志记录的时间做索引, 简介里或许说了下索引的实现,二分查找必定没B Tree效率高,但一般环境下也差不了一个数量级,并且实现出格简朴。
因为是稀疏索引,并不是每条日志都有索引记录它的偏移量,所以读取数据时要往前多读一些数据,防备漏读,等读到真正所需的数据时再真正给用户返回数据。
如下图,好比用户要读取25到43的日志,用二分法找25,找到的是30地址的点,
索引:0 10 20 30 40 50 日志:|………|………|………|………|………|>>>a = [0, 10, 20, 30, 40, 50]>>>bisect.bisect_left(a, 35)>>>3>>>a[3]>>>30>>>bisect.bisect_left(a, 43)>>>5>>>a[5]>>>50
所以我们要往前倒一些,从20(30的前一个刻度)开始读取日志,21,22,23,24读取后因为比25小,所以扔掉, 读到25,26,27,…后返回给用户
读取到40(50的前一个刻度)后就要判定当前数据是否大于43了,假如大于43(返回全开区间的数据),就要遏制读了。
整体下来我们只操纵了大文件的很少一部门就获得了用户想要的数据。
缓冲区
为了淘汰写入日志时大量的磁盘写,索引在append日志时,把buffer配置成了10k,系统默认应该是4k。
同理,为了提高读取日志的效率,读取的buffer也配置了10k,也需要按照你日志的巨细做适当调解。
索引的读写配置成了行buffer,每满一行都要flush到磁盘上,防备读到不完整的索引行(其实实践证明,配置了行buffer,照旧能读到半拉的行)。
查询
啥?要支持SQL,别闹了,100行代码怎么支持SQL呀。
此刻查询是直接传入一个lambada表达式,系统遍历指按时间范畴内的数据行时,满意用户的lambada条件才会返回给用户。
虽然这样会多读取许多用户不需要的数据,并且每行都要举办lambda表达式的运算,不外没步伐,简朴就是美呀。
以前我是把一个需要查询的条件和日志时间,日志文件偏移量都记录在索引里,这样从索引里查找出切合条件的偏移量,然后每条数据都如日志文件里seek一次,read一次。这样长处只有一个,就是读取的数据量少了,但缺点有两个:
索引文件出格大,不利便加载到内存中
每次读取都要先seek,貌似缓冲区用不上,出格慢,比持续读一个段的数据,并用lambda过滤慢四五倍
写入
前面说过了,只append,不修改数据,并且每行日志最前面是时间戳。
多线程
查询数据,可以多线程同时查询,每次查询城市打开一个新的日志文件的描写符,所以并行的多个读取不会斗殴。
写入的话,固然只是append操纵,但不确认多线程对文件举办append操纵是否安详,所以发起用一个行列,一个专用线程举办写入。
锁
没有任何锁。
排序
默认查询出来的数据是定时间正序分列,如需其它排序,可取到内存后用python的sorted函数排序,想怎么排就怎么排。
100多行的数据库代码
# -*- coding:utf-8 -*- import os import time import bisect import itertools from datetime import datetime import logging default_data_dir = './data/' default_write_buffer_size = 1024*10 default_read_buffer_size = 1024*10 default_index_interval = 1000 def ensure_data_dir(): if not os.path.exists(default_data_dir): os.makedirs(default_data_dir) def init(): ensure_data_dir() class WawaIndex: def __init__(self, index_name): self.fp_index = open(os.path.join(default_data_dir, index_name + '.index'), 'a+', 1) self.indexes, self.offsets, self.index_count = [], [], 0 self.__load_index() def __update_index(self, key, offset): self.indexes.append(key) self.offsets.append(offset) def __load_index(self): self.fp_index.seek(0) for line in self.fp_index: try: key, offset = line.split() self.__update_index(key, offset) except ValueError: # 索引假如没有flush的话,大概读到有半行的数据 pass def append_index(self, key, offset): self.index_count += 1 if self.index_count % default_index_interval == 0: self.__update_index(key, offset) self.fp_index.write('%s %s %s' % (key, offset, os.linesep)) def get_offsets(self, begin_key, end_key): left = bisect.bisect_left(self.indexes, str(begin_key)) right = bisect.bisect_left(self.indexes, str(end_key)) left, right = left - 1, right - 1 if left < 0: left = 0 if right < 0: right = 0 if right > len(self.indexes) - 1: right = len(self.indexes) - 1 logging.debug('get_index_range:%s %s %s %s %s %s', self.indexes[0], self.indexes[-1], begin_key, end_key, left, right) return self.offsets[left], self.offsets[right] class WawaDB: def __init__(self, db_name): self.db_name = db_name self.fp_data_for_append = open(os.path.join(default_data_dir, db_name + '.db'), 'a', default_write_buffer_size) self.index = WawaIndex(db_name) def __get_data_by_offsets(self, begin_key, end_key, begin_offset, end_offset): fp_data = open(os.path.join(default_data_dir, self.db_name + '.db'), 'r', default_read_buffer_size) fp_data.seek(int(begin_offset)) line = fp_data.readline() find_real_begin_offset = False will_read_len, read_len = int(end_offset) - int(begin_offset), 0 while line: read_len += len(line) if (not find_real_begin_offset) and (line < str(begin_key)): line = fp_data.readline() continue find_real_begin_offset = True if (read_len >= will_read_len) and (line > str(end_key)): break yield line.rstrip('\r\n') line = fp_data.readline() def append_data(self, data, record_time=datetime.now()): def check_args(): if not data: raise ValueError('data is null') if not isinstance(data, basestring): raise ValueError('data is not string') if data.find('\r') != -1 or data.find('\n') != -1: raise ValueError('data contains linesep') check_args() record_time = time.mktime(record_time.timetuple()) data = '%s %s %s' % (record_time, data, os.linesep) offset = self.fp_data_for_append.tell() self.fp_data_for_append.write(data) self.index.append_index(record_time, offset) def get_data(self, begin_time, end_time, data_filter=None): def check_args(): if not (isinstance(begin_time, datetime) and isinstance(end_time, datetime)): raise ValueError('begin_time or end_time is not datetime') check_args() begin_time, end_time = time.mktime(begin_time.timetuple()), time.mktime(end_time.timetuple()) begin_offset, end_offset = self.index.get_offsets(begin_time, end_time) for data in self.__get_data_by_offsets(begin_time, end_time, begin_offset, end_offset): if data_filter: if data_filter(data): yield data else: yield data def test(): from datetime import datetime, timedelta import uuid, random logging.getLogger().setLevel(logging.NOTSET) def time_test(test_name): def inner(f): def inner2(*args, **kargs): start_time = datetime.now() result = f(*args, **kargs) print '%s take time:%s' % (test_name, (datetime.now() - start_time)) return result return inner2 return inner @time_test('gen_test_data') def gen_test_data(db): now = datetime.now() begin_time = now - timedelta(hours=5) while begin_time < now: print begin_time for i in range(10000): db.append_data(str(random.randint(1,10000))+ ' ' +str(uuid.uuid1()), begin_time) begin_time += timedelta(minutes=1) @time_test('test_get_data') def test_get_data(db): begin_time = datetime.now() - timedelta(hours=3) end_time = begin_time + timedelta(minutes=120) results = list(db.get_data(begin_time, end_time, lambda x: x.find('1024') != -1)) print 'test_get_data get %s results' % len(results) @time_test('get_db') def get_db(): return WawaDB('test') if not os.path.exists('./data/test.db'): db = get_db() gen_test_data(db) #db.index.fp_index.flush() db = get_db() test_get_data(db) init() if __name__ == '__main__': test()