Swoole Server架构分析

一.简介

首页这里引用一下swoole的官方介绍:

swoole:面向生产环境的 PHP 异步网络通信引擎
使 PHP 开发人员可以编写高性能的异步并发 TCP、UDP、Unix Socket、HTTP,WebSocket 服务。Swoole 可以广泛应用于互联网、移动通信、企业软件、云计算、网络游戏、物联网(IOT)、车联网、智能家居等领域。 使用 PHP + Swoole 作为网络通信框架,可以使企业 IT 研发团队的效率大大提升,更加专注于开发创新产品。

通过上述的介绍,我们是可以得出几点信息的

  • 1.swoole可以投入生产环境
  • 2.使用php编写
  • 3.异步网络通信引擎,支持大量的网络协议,并且具有很高的网络性能

swoole server与传统的php运行模式是完全不同的,它是常驻内存的,省去了大量的php脚本的初始化。

二.swoole server的是怎么运行的

swoole进程/线程模型

这是一张官方的swoole运行时进程/线程模型。

1.Master进程

Master进程是一个多线程模型,其中包括Master线程,Reactor线程组,心跳检测线程,UDP收包线程。

以http server为例,Master线程负责监听(listen)端口,然后接受(accept)新的连接,然后将这个连接分配给一个Reactor线程,由这个Reactor线程监听此连接,一旦此连接可读时,读取数据,解析协议,然后将请求投递到worker进程中去执行。

Master进程是使用select/poll进行IO事件循环的,这是因为Master进程中的文件描述符只有几个(listenfd等),Reactor线程使用的是epoll,因为Reactor线程中会监听大量连接的可读事件,使用epoll可以支持大量的文件描述符。

2.Manager进程

Manager进程是专门用来管理Worker进程组和Task进程组的。它会Fork出指定数量的Worker进程和Task进程,并且有以下职能:

  • 子进程结束运行时,manager进程负责回收此子进程,避免成为僵尸进程。并创建新的子进程
  • 服务器关闭时,manager进程将发送信号给所有子进程,通知子进程关闭服务
  • 服务器reload时,manager进程会逐个关闭/重启子进程

3.Worker进程和Task进程

Worker进程接收Reactor线程投递过来的数据,执行php代码,然后生成数据交给Reactor线程,由Reactor线程通过tcp将数据返回给客户端。(如果是UDP,Worker进程直接将数据发送给客户端)。

Worker进程中执行的php代码和我们平时写php是一样的,它等同于php-fpm。但是众所周知,php-fpm下,php在处理异步操作时是很无力的,swoole提供的Task进程可以很好的解决这个问题。Worker进程可以将一些异步任务投递给Task进程,然后直接返回,处理其他的由Reactor线程投递过来的事件。

Task进程以完全同步阻塞的方式运行,一个Task进程在执行任务期间,是不接受从Worker进程投递的任务的,当Task进程执行完任务后,会异步地通知worker进程告诉它此任务已经完成。

所以介绍完上述的一些概念后,再引用一张官方的swoole执行流程图。
swoole运行流程图

这里需要注意,在文档上说的是:Workder进程组和Task进程组是由Manager进程Fork出来的,但是流程图上画的是在启动服务器时Fork出主进程和Worker进程组以及Tasker进程组。

三、使用swoole和传统php开发的优缺点

在说这个话题之前,需要先了解一下CGI,FASTCGI。

1.CGI
CGI的全称是Common Gateway Interface,通用网关接口,它使得任何一个拥有标准输入输出的程序拥有提供web server的能力。假设我们写了一个Hello World的c++程序,这个程序接受输入{text},输出{text},Hello World。

以nginx作为接受http请求为例,nginx接受一个http请求,Fork出一个进程,将http请求带来的text参数作为输入,执行完hello world程序,将输出{text},Hello World作为输出,销毁这个Fork出来的进程,由nginx返回给客户端。

这种方式虽然简单,但是要不断的Fork进程,销毁进程。

2.FASTCGI
FASTCGI,顾名思义,它是CGI的改进版,是一个常驻型的CGI服务。我们常用的php-fpm就是这种模式运行的,php-fpm负责Forl多个进程,每个进程中都运行了php的解释器。可以在终端下看一下php-fpm的进程:
php-fpm进程

一个php-fpm主进程,pid是1263,Fork出了3个子进程。在nginx+php-fpm的组合中,nginx负责接受http请求,将请求封装好交给php-fpm,php-fpm将请求按照一定的规则交给一个子进程去执行,这个子进程中的php解释器加载php代码运行。也是因为这个原因,传统的php只能作为web server。

然后我们发现,nginx+php-fpm的组合和我们Reactor+Worker子进程的运行方式非常相似。

3.swoole的运行方式
这里以swoole作为http server为例(传统php几乎都是作为web服务)。

首先swoole是实现了http server的,也就是说不需要nginx作为http服务器了,当然swoole并不是为了取代nginx,实际上swoole当前实现的http server功能有限,比如说只支持Get和Post,所有往往swoole前面还要运行一个nginx来作为前端代理服务器。

其次,swoole是内存常驻的。和php-fpm的常驻服务不同,php-fpm中常驻的是php的解释器,这个解释器会重复加载php代码,初始化环境,而swoole只在启动的时候加载,这样一来,性能就自然而然的提高了。这一点可以在开发中很明显的体现出来,php-fpm下,修改的php代码会即时生效,而使用swoole则需要重启swoole的server才能使代码生效。

通过上面的一些说明,就可以很明显的得出swoole和传统php开发的优缺点了。

swoole server优点:
– swoole性能更高
– 可以做为tcp,udp服务器
– 在高io高并发的服务器要求下,swoole的运行模式是完全可以胜任的

swoole server缺点:
– 更难上手。这要求开发人员对于多进程的运行模式有更清晰的认识
– 更容易内存泄露。在处理全局变量,静态变量的时候一定要小心,这种不会被GC清理的变量会存在整个生命周期中,如果没有正确的处理,很容易消耗完所有的内存。而以往的php-fpm下,php代码执行完内存就会被完全释放。
– 无法做密集计算。当然这一点是php甚至是所有动态语言都存在的问题。写在这里是因为防止误导读者以为使用swoole后,php可以用来做密集计算。

png格式分析与压缩原理

1.简介

 png图片格式采用了一种很灵活的编码设计,支持多种编码方式来展示一张图片:灰度图片、索引彩色图像、真彩色图像、带α通道数据的灰度图像、带α通道数据的真彩色图像。因为png的编码方式灵活,使得其压缩的发挥空间非常大。

2.几种编码方式的说明

注意:这里的一个通道并不等同于1个字节或2个字节,因为png IDAT块的编码单位是bit而不是byte,具体一个通道占用的位数是靠bit depth来决定的

a.灰度图片

灰度图片一般是用来生成我们常见的黑白照片,bit depth可选值为1、 2、 4、 8、16,其一个像素点用一个通道来表示(0~255)或(0~65535),因此这种图片的大小往往非常小

b.索引彩色图像

索引彩色图像很有意思,它可以和真彩色图像一样拥有表达彩色图片的能力,但是一张索引彩色图像上最多只有256种颜色,每种颜色用三个字节(RGB,0~255)表示。它的bit depth可选值为1、 2、4、8,它有一个调色板区域,用来记录索引颜色的值,然后像素点数据区域用一个通道(0~255)来表示调色板区域记录的颜色索引。所以对于色彩不多的图片来说,采用索引彩色来编码是很适合的。

c.真彩色图像

真彩色图像最好理解,它的bit depth为8或16,它的每个像素点都用3个字节(3个通道)来表示,分别为R(0~255),G(0~255),B(0~255),可以表达256256256=16777216种颜色,或者使用6个字节(3个通道)来表示,分别为R(0~65535),G(0~65535),B(0~65535),可以表达655356553565535种颜色。

d.带α通道数据的灰度图像

就是在灰度图片上增加了α通道,共2个通道,支持透明(0~255)或者(0~65535),但是它的bit depth为8或16,因此这种编码方式一个像素点是2个字节或4个字节

e.带α通道数据的真彩色图像

就是在真彩色图像上增加了α通道,同四个通道,支持透明(0~255)或者(0~65535),它的bit depth为8或16,因此这种编码方式一个像素点是4个字节或8个字节

3.png图片具体编码说明

png图片的编码一般如下:header,chunk,chunk,chunk….chunk。

header代表png图片的头,png文件头一般都是由固定的8个字节组成
89 50 4E 47 OD 0A 1A 0A
图片软件可以通过这8个字节来判断这个文件是不是png格式。

chunk代表png图片的数据块,数据块是有不同的类型的,记录了不同的信息。一张png图片可能包含多个数据块。
数据块的类型如下(关键部分高亮显示):

PNG文件格式中的数据块(表1.1)

数据块符号 数据块名称 多数据块 可选否 位置限制
IHDR 文件头数据块 第一块
cHRM 基色和白色点数据块 在PLTE和IDAT之前
gAMA 图像γ数据块 在PLTE和IDAT之前
sBIT 样本有效位数据块 在PLTE和IDAT之前
PLTE 调色板数据块 在IDAT之前
bKGD 背景颜色数据块 在PLTE之后IDAT之前
hIST 图像直方图数据块 在PLTE之后IDAT之前
tRNS 图像透明数据块 在PLTE之后IDAT之前
oFFs (专用公共数据块) 在IDAT之前
pHYs 物理像素尺寸数据块 在IDAT之前
sCAL (专用公共数据块) 在IDAT之前
IDAT 图像数据块 与其他IDAT连续
tIME 图像最后修改时间数据块 无限制
tEXt 文本信息数据块 无限制
zTXt 压缩文本数据块 无限制
fRAc (专用公共数据块) 无限制
gIFg (专用公共数据块) 无限制
gIFt (专用公共数据块) 无限制
gIFx (专用公共数据块) 无限制
IEND 图像结束数据 最后一个数据块

png 文件中,每个数据块由4个部分组成 length | type(name) | data | CRC, 说明如下

length: 4 bytes, data的长度,不包括type和CRC
type: 4 bytes, ASCII码([A-Z,a-z])
CRC: 4bytes
CRC(cyclic redundancy check)域中的值是对Chunk Type Code域和Chunk Data域中的数据进行计算得到的。CRC具体算法定义在ISO 3309和ITU-T V.42中,其值按下面的CRC码生成多项式进行计算: x32+x26+x23+x22+x16+x12+x11+x10+x8+x7+x5+x4+x2+x+1

除了高亮的关键数据块,还有下面比较有意思的数据块
tRNS

tRNS contains transparency information. For indexed images, it stores alpha channel values for one or more palette entries. For truecolor and grayscale images, it stores a single pixel value that is to be regarded as fully transparent.

tRNS包含透明度信息,
1.在索引图片中,它可以保存一或多个调色板像素对的alpha通道值。用来指明哪些像素需要透明,透明度是多少。然后剩下的像素透明度都被默认成255,完全不透明。

2.在灰度图片中,包含单个灰度级别值,格式如下:
Gray: 2 bytes, range 0 .. (2^bitdepth)-1
被指定的灰度值都会被当成是透明的,其他灰度值则完全不透明(如果bit depth小于16,则取最低有效位,其他位为0)

3.在真彩色图片中,包含单个RGB颜色值,格式如下:
Red: 2 bytes, range 0 .. (2^bitdepth)-1
Green: 2 bytes, range 0 .. (2^bitdepth)-1
Blue: 2 bytes, range 0 .. (2^bitdepth)-1
被指定的颜色值都会被当成是透明的,其他颜色值则完全不透明(如果bit depth小于16,则取最低有效位,其他位为0)

在带alpha通道的图片中,tRNS是被禁止使用的。

a.IHDR数据块

IHDR数据块存储了png图像的基本信息,一个图像中只能有一个IHDR数据块,并且也是png图像的第一个数据块。共有13个字节。记录的信息如下:

表1.2

域的名称 字节数 说明
Width 4 bytes 图像宽度,以像素为单位
Height 4 bytes 图像高度,以像素为单位
Bit depth 1 byte 图像深度.索引彩色图像: 1,2,4或8 , 灰度图像: 1,2,4,8或16 真彩色图像: 8或16
ColorType 1 byte 颜色类型. [0]:灰度图像, 1,2,4,8或16 [2]:真彩色图像,8或16 [3]:索引彩色图像,1,2,4或8 [4]:带α通道数据的灰度图像,8或16 [6]:带α通道数据的真彩色图像,8或16
Compression method 1 byte 压缩方法(LZ77派生算法)
Filter method 1 byte 滤波器方法
Interlace method 1 byte 隔行扫描方法. 0:非隔行扫描 1: Adam7(由Adam M. Costello开发的7遍隔行扫描方法)

从这个表中可以得出,一张png图片最大的宽高(4294967300 × 4294967300)

这里的Bit depth一开始很难理解,引用维基百科两段话:

Pixels in PNG images are numbers that may be either indices of sample data in the palette or the sample data itself. The palette is a separate table contained in the PLTE chunk. Sample data for a single pixel consists of a tuple of between one and four numbers. Whether the pixel data represents palette indices or explicit sample values, the numbers are referred to as channels and every number in the image is encoded with an identical format.

The permitted formats encode each number as an unsigned integral value using a fixed number of bits, referred to in the PNG specification as the bit depth. Notice that this is not the same as color depth, which is commonly used to refer to the total number of bits in each pixel, not each channel. The permitted bit depths are summarized in the table along with the total number of bits used for each pixel.

The number of channels depends on whether the image is grayscale or color and whether it has an alpha channel. PNG allows the following combinations of channels, called the color type.

大意是:png图片的像素(Pixels)是调色板中样本数据的索引或者样本数据其本身,调色板是在PLTE数据块中的。单个像素的样本数据包含1~4个数字的元组。像素数据表示调色板索引或者明确的样本值,每个数字代表每个通道,每个数字都有唯一可识别的编码。

允许编码每个数字的格式是一个使用固定位数的unsigned整数值,在PNG中称为bit depth(位深)。位深和color depth(颜色深度)不同,颜色深度通常代表一个像素的总位数,不是每个通道的总位数。

通道数取决于这个图片是否是灰度图还是有alpha通道的色彩图

因此可以知道bit depth是一个通道的位数,bit depth在索引彩色图片和灰度图中使用,使得图片的一个像素点的大小可以小于一个字节

b.PLTE数据块(调色板数据块)

  • a.PLTE数据块在索引彩色图像中必须存在,这里记录了整张图片所有的颜色,最多为256个。这个是bit depth规定的,由表1.2可知,索引彩色图像的bit depth为1,2,4,8,共为1、4、16、256种颜色。每种颜色是3个字节(RGB),所以PLTE数据块的length肯定是3的倍数,否则就是非法的。

  • b.PLTE数据块在真彩色图像或带α通道数据的真彩色图像中是可选的

  • c.PLTE数据块在灰度图像或带α通道数据的灰度图像中是不能存在的

c.IDAT数据块

这部分的数据块存放的就是图像的一个个像素(使用压缩算法之后生成的像素),一张图像中可以存在多个IDAT数据块,这种编码方式虽然稍微增加了图像的大小,但是可以使得PNG图像以一种流的方式生成。例如在浏览器中打开一张非常大的PNG图像,如果网络很慢,我们可以看到图片是从上到下一点点打开的,这是因为即使整张图片没有加载完,仍然可以显示局部内容。

IDAT数据块是最重要的一部分,这里的数据会被隔行扫描方法(Interlace method)滤波器(filtering)压缩算法(compression)处理。这三个部分会在下面详细介绍。

并且要注意的是,IDAT数据块存储的数据并不是以字节为最小单位,在灰度图片和索引彩色图片中,一个字节可能会包含多个像素点,由bit depth指定。

d.IENT数据块

这段数据块标记PNG文件或者数据流已经结束,并且必须要放在文件的尾部。

正常情况下, png 文件的结尾为如下12个字符:

00 00 00 00 49 45 4E 44 AE 42 60 82
由于数据块结构的定义,IEND数据块的长度总是0(00 00 00 00,除非人为加入信息),数据标识总是IEND(49 45 4E 44),因此,CRC码也总是AE 42 60 82

4.隔行扫描方法

隔行扫描方法是为了图片被更先进的展示出来。在图片传输过程中,可以让图片的展示有淡入的效果。虽然平均下来稍微增加了存储的大小,但是给带来用户更快的显示效果。

  • a.这个方法取值为0时,代表像素点是从左到右顺序存储,扫描线扫描时从上到下即可。
  • b.这个方法取值为1时,被称为Adam7算法

    Adam7分为7个扫描步骤,可见下图:

    5.过滤器

    过滤器中只有一种过滤方法,但是有多种过滤类型。过滤方法并不会影响数据的大小,也不会影响丢失任何信息,过滤的目的只有一个,为压缩方法提供更好压缩的数据,IDAT数据中的每一行第一个字节定义了过滤类型,过滤类型的介绍如下:

    a.过滤类型0:None

    也就是不做任何过滤

    b. 过滤类型1:Sub

    记录当前像素和左边像素的差值。左边起第一个像素是标准值,不做任何过滤

    c. 过滤类型2:Up

    记录X – B的值,即当前像素和上边像素点差值。如果当前行是第1行,则当前行数标准值,不做任何过滤。

    d.过滤类型3:Average

    记录当前像素与左边像素和上边像素的平均值的差值。

    • 如果当前行数第一行:做特殊的Sub过滤,左边起第一个像素是标准值,不做任何过滤。其他像素记录该像素与左边像素的二分之一的值的差值。
  • 如果当前行数不是第一行:左边起第一个像素记录该像素与上边像素的二分之一的值的差值,其他像素做正常的Average过滤。

    e.过滤类型4:Paeth

    记录X – Pr的值,这种过滤方式比较复杂,Pr的计算方式(伪代码)如下:

p = a + b - c
pa = abs(p - a)
pb = abs(p - b)
pc = abs(p - c)
if pa <= pb and pa <= pc then Pr = a
else if pb <= pc then Pr = b
else Pr = c
return Pr

如果当前行数第一行:做Sub过滤。
如果当前行数不是第一行:左边起第一个像素记录该像素与上边像素的差值,其他像素做正常的Peath过滤。

6.压缩算法

压缩算法当前只定义了一种,值为0,是使用一个滑动窗口的deflate/inflate压缩,这个算法可以调用zlib库来实现

7.关于压缩

对于一张图片,我们可以使用以上知识来完成一次无损压缩,以及去除所有的辅助数据块。但是很多场景下,常规的无损压缩并不能带来很高的压缩比,这个时候可以考虑使用有损压缩,大概的思路就是将总体颜色数减少到256以下(使用相近的颜色来代替),然后使用索引色彩来编码整张图片,带来很高的压缩比

实际操作验证

一级压缩图片(958.8k):https://www.myway5.com/wp-content/uploads/2017/11/output_1.png
九级压缩图片(857.0k):https://www.myway5.com/wp-content/uploads/2017/11/output_9.png
有损压缩图片(273.0k):https://www.myway5.com/wp-content/uploads/2017/11/test_tiny.png
原图(1.5m):https://www.myway5.com/wp-content/uploads/2017/11/test.png)

使用vim打开原图:

8950 4e47 0d0a 1a0a是png固定的头,

0000 000d是iHDR数据块的长度,为13,4948 4452是数据块的type,为IHDR,之后紧跟着是data, 0000 02bc是图片的宽度,0000 03a5是高度,08是Bit depth,也就是一个通道是8位,06是color type,这里表示图片是真彩色,00是压缩方法,png中目前只有一种,也就是LZ77派生的算法,00是滤波器方法,表示不使用,00是隔行扫描方法,代表不扫描。8f 1434 a4是四个字节的CRC校验码。

00 0000 01是sRGB数据块的长度(???),为1,73 5247 42是type,为sRGB,00是存储的一个字节数据,aece 1ce9是循环校验码

之后还有一个iDoT数据块,再紧接着才是IDAT数据块,下面就不分析了。

再打开9级压缩的图片

简单看一下,sRGB和iDOT数据块已经不存在了,IDAT数据块的长度发生了改变,其实就是被压缩了。这种压缩是无损的,9级压缩和1级压缩相差十几k的大小,但是压缩的越小,耗时也越长。

最后去看在tinypng上进行有损压缩的图片:

它多了一个PLTE字段,意味着这张图变成了索引彩色图片,索引彩色图片最关键的一点就是总颜色数不能超过256。如果这张图片的总颜色数没有256个,那么甚至可以说这次图片的压缩是无损的。但是一旦颜色数超过256,就需要将多个相近的颜色处理成一个颜色,也就是有损的压缩

参考

  • [1] png文件结构分析:http://www.360doc.com/content/11/0428/12/1016783_112894280.shtml
  • [2] php imagecreatefrom* 系列函数之 png:http://drops.xmd5.com/static/drops/tips-16034.html
  • [3] Portable Network Graphics: https://en.wikipedia.org/wiki/Portable_Network_Graphics
  • [4] png的故事:获取图片信息和像素内容: https://www.qcloud.com/community/article/864088
  • [5] https://www.w3.org/TR/2003/REC-PNG-20031110/#figure48

leveldb源代码阅读(四)- table cache的实现

一、前言

缓存在整个计算机体系中,占据着举足轻重的地位,往往被用于提升软件的运行速度。在计算机系统中,最典型的当属CPU高速缓存了,CPU高速缓存是介于CPU寄存器和内存之间,CPU向内存中请求数据时,会先检查CPU高速缓存中是否存在数据,如果不存在,则会将内存中的数据放入高速缓存中,再将高速缓存中的数据读入CPU。这个过程中,缓存之所以能够大大的提升系统速度,是因为程序在运行的时候对内存的访问具有局部性的特点,这种局部性我理解为程序在运行时对某一块的内存请求会非常频繁,而这一块内存在第一次请求之后就会被缓存,所以会大大提升之后的数据读取速度。所以,缓存设计的是否合理有效,在于缓存的命中率高不高。

在leveldb中,为了提升对数据的检索速度,也设计了缓存,对应的代码在db/table_cache.h和db/table_cache.cc中,这个table cache的实现主要借助于Cache类,关于Cache类,是可以用户自定义实现的,但leveldb也有一个内置的Cache的实现,文件是util/cache.cc。主要是LRU(最近最少使用)算法,原理是“如果一个数据最近被使用,那么将来被使用的概率同样也很大”。同样也实现了一个HashTable,leveldb中提供的数据显示在g++ 4.4.3下比built-in的哈希表性能稍高,大概5%左右。

二、概览

2.1 hash table数据结构

哈希表是一个很常见的结构了,存储的是key-value结构,一个key-value对常被称作entry,它最大的特点是查找快。在网上找了一张hash table的图
hash table

首先它是一个长为len的数组,每个数组中的元素都是一个链表。如图所示,john Smith和Sandra Dee都被hash到152这个地方,所以在152这个地方使用链表来存储这两个entry.查找的时候Sandra Dee的时候,先找到152这个地方,在遍历链表,直到找到key是Sandra的entry。

2.2 LRU算法

在leveldb中,LRU算法的实现是使用两个双向环形链表,一个链表(in-use)存储当前正在使用的数据,另一个链表(lru)按照访问时间先后顺序存储缓存数据,每个数据都可以在in-use和lru之间切换。当我们需要使用LRU算法来淘汰数据时,只需要在lru上淘汰排序靠后的数据即可。

2.3 分片LRU缓存

分片LRU缓存很简单,其实就是同时创建多个LRU缓存对象,然后使用hash将特定的缓存数据放置到相应的LRU缓存对象中。这个方式可以避免一个LRU缓存中存储过多的数据。

三、详细分析

3.1 Hash Table

leveldb中实现了HashTable,使用的是最经典的数组+链表的实现。通过hash值定位到数组中的某一处,然后在这一处的链表上遍历查找。

HashTable的长度是哈希算法效率的最大影响因素,如果存在较多的hash冲突,则在链表上遍历查找的时间花费会很长。所以这个长度的选择是很重要的,比如在Java中的HashMap,实现中有一个负载因子0.7,如果数组上已经有值的数量超过总长度的0.7就会对整个哈希表resize。在leveldb中,这个resize的阈值是当前所有Insert进来的元素个数超过了数组的长度length,就会进行resize。这里简单的分析一下最重要的查找和resize过程。

  • 哈希表查找过程
  // Return a pointer to slot that points to a cache entry that
  // matches key/hash.  If there is no such cache entry, return a
  // pointer to the trailing slot in the corresponding linked list.
  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
    LRUHandle** ptr = &list_[hash & (length_ - 1)];     //这里hash & (length_ - 1)会定位到小于length的位置,ptr就是hash到的位置
    while (*ptr != NULL &&
           ((*ptr)->hash != hash || key != (*ptr)->key())) {
           //hash到特定位置后,如果当前位置的hash和当前hash不一样,或者key不一样,并且指针也不为空,则继续向下找,直到找到
      ptr = &(*ptr)->next_hash;
    }
    return ptr;
  }
  • 哈希表resize的过程
   void Resize() {
    uint32_t new_length = 4;        //哈希表的长度从4开始,逐倍增长
    while (new_length < elems_) {
      new_length *= 2;
    }
    LRUHandle** new_list = new LRUHandle*[new_length];
    memset(new_list, 0, sizeof(new_list[0]) * new_length);      //申请新的哈希表的所需的空间
    uint32_t count = 0;
    for (uint32_t i = 0; i < length_; i++) {    //将旧的哈希表的数据重新计算复制到新的哈希表中
      LRUHandle* h = list_[i];
      while (h != NULL) {
        LRUHandle* next = h->next_hash;
        uint32_t hash = h->hash;
        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
        h->next_hash = *ptr;
        *ptr = h;
        h = next;
        count++;
      }
    }
    assert(elems_ == count);
    delete[] list_; //删除旧的哈希表
    list_ = new_list;
    length_ = new_length;
  }
};

3.2 LRUCache的实现

这里的LRUCache的私有成员变量包括以下几个

  // Initialized before use.
  size_t capacity_;         //LRUCache的大小。

  // mutex_ protects the following state.
  mutable port::Mutex mutex_;       //互斥变量,用来同步访问
  size_t usage_;                    //LRUCache已经使用的大小

  // Dummy head of LRU list.
  // lru.prev is newest entry, lru.next is oldest entry.
  // Entries have refs==1 and in_cache==true.
  LRUHandle lru_;                   //LRU链表的头,lru.prev代表新的节点,lru.next代表旧的节点,节点的refs == 1,in_cache == true

  // Dummy head of in-use list.
  // Entries are in use by clients, and have refs >= 2 and in_cache==true.
  LRUHandle in_use_;                //正在使用的节点链表的头,refs >= 2,in_cache == true

  HandleTable table_;               //哈希表,用来在缓存中实现快速查找

LRU的构造函数,直接构造出空的环形链表

LRUCache::LRUCache()
    : usage_(0) {
  // Make empty circular linked lists.
  lru_.next = &lru_;
  lru_.prev = &lru_;
  in_use_.next = &in_use_;
  in_use_.prev = &in_use_;
}

LRU中节点的引用和解引用

//增加引用时,如果在缓存中,则移动到in_use中
void LRUCache::Ref(LRUHandle* e) {
  if (e->refs == 1 && e->in_cache) {  // If on lru_ list, move to in_use_ list.
    LRU_Remove(e);
    LRU_Append(&in_use_, e);
  }
  e->refs++;
}

//解引用时会出现两种情况。1.节点不再需要,使用deleter来删除节点 2.不再被使用,移动到缓存
void LRUCache::Unref(LRUHandle* e) {
  assert(e->refs > 0);
  e->refs--;
  if (e->refs == 0) { // Deallocate.
    assert(!e->in_cache);
    (*e->deleter)(e->key(), e->value);
    free(e);
  } else if (e->in_cache && e->refs == 1) {  // No longer in use; move to lru_ list.
    LRU_Remove(e);
    LRU_Append(&lru_, e);
  }
}

缓存的插入

Cache::Handle* LRUCache::Insert(
    const Slice& key, uint32_t hash, void* value, size_t charge,
    void (*deleter)(const Slice& key, void* value)) {
  MutexLock l(&mutex_);

  LRUHandle* e = reinterpret_cast<LRUHandle*>(
      malloc(sizeof(LRUHandle)-1 + key.size()));    //这个地方申请内存需要注意,在LRUHandle中,key_data只有1个字节,其实是整个key的开头一个字节,所以申请的空间实际上是包含整个key的
  e->value = value;
  e->deleter = deleter;
  e->charge = charge;
  e->key_length = key.size();
  e->hash = hash;
  e->in_cache = false;
  e->refs = 1;  // for the returned handle.
  memcpy(e->key_data, key.data(), key.size());

  if (capacity_ > 0) {      //如果capacity大于0,也就是需要进行缓存
    e->refs++;  // for the cache's reference.
    e->in_cache = true;
    LRU_Append(&in_use_, e);
    usage_ += charge;
    FinishErase(table_.Insert(e));
  } // else don't cache.  (Tests use capacity_==0 to turn off caching.)

    //当使用的内存大于容量时,则要移除旧的缓存,直到缓存小于指定的容量
  while (usage_ > capacity_ && lru_.next != &lru_) {
    LRUHandle* old = lru_.next;
    assert(old->refs == 1);
    bool erased = FinishErase(table_.Remove(old->key(), old->hash));
    if (!erased) {  // to avoid unused variable when compiled NDEBUG
      assert(erased);
    }
  }

  return reinterpret_cast<Cache::Handle*>(e);
}

缓存的查找

Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
  MutexLock l(&mutex_);
  LRUHandle* e = table_.Lookup(key, hash);      //直接借用哈希表完成缓存的快速查找
  if (e != NULL) {
    Ref(e);
  }
  return reinterpret_cast<Cache::Handle*>(e);
}

3.3 分片缓存的实现(ShardedLRUCache)

分片缓存的实现其实就是借助上面的LRUCaChe,只不过同时拥有多个LRUCache,然后特定的缓存数据会被缓存到相应的LRUCache中。

static const int kNumShardBits = 4;                     //可以理解为缓存片数量的容量因子(这里是4个bits,所以共有16个缓存片)
static const int kNumShards = 1 << kNumShardBits;       //一共有多少个缓存片

//通过Shard函数可以将任意一个hash分配到16个缓存片中的任意一个)
static uint32_t Shard(uint32_t hash) {
    return hash >> (32 - kNumShardBits);
}

所有的缓存存储和读取都会通过上面的Shard函数来找到特定的缓存片,从而实现了分片缓存。

五、总结

在这个部分,400行左右的代码就实现了哈希表,LRU缓存,分片LRU缓存。了解到LRU缓存的具体实现方式,特别是LRU中Ref和UnRef的实现非常精炼,很好的解决了一个数据在LRU缓存中被引用时脱离缓存,不使用时进入LRU缓存的功能。同时将LRU缓存简单的封装,就可以实现分片LRU缓存。

ubuntu上使用chrome进行手机页面调试的方案

虽然大多数时候我们都可以使用chrome的开发者工具,通过模拟手机来调试手机页面,但是对于一些特殊的动作是无法在电脑上做的,比如说手机的多点触控操作,在页面上模拟双指缩放时没法使用鼠标完成,因此需要通过手机来进行真机调试,但是调试的同时,我们希望可以看到调试信息,希望可以实时修改查看。这个时候就可以借助chrome的远程设备使用手机chrome打开页面,在电脑chrome中调试。具体的调试过程在
远程调试 Android 设备使用入门
可以看到。下面只是记录ubuntu下使用该方案遇到的问题。

一.打开远程调试界面

打开chrome的控制台,点击右上角的列表按钮,找到more tool->remote device即可。

二.找不到已连接的手机

这个在windows上一般比较容易,因为基本上各种安全软件都会自动帮助用户连上,在Ubuntu下我们需要使用adb来连接手机。使用adb start-server来开启服务监听手机的连接。如果出现权限错误,可以adb kill-server,再adb start-server。只要手机上弹出连接确认框,即说明已经连上了。

三.Inspect的页面空白

这个问题困扰了我很久,因为我在windows上是没有问题的,后来查了一下,是因为chrome需要翻墙才行,在windows上我使用的是shadowsocks的pac方案,linux上则是自己建立的翻墙规则,导致有遗漏。所以如果出现Inspect页面空白,可以尝试全局翻墙

四.如果在手机上访问开发机器上的服务

这个问题有这种解决方案。
1.如果手机和电脑在同一个局域网内,可以直接访问电脑的ip地址即可。但是http服务监听的地址不能只是localhost,应该是电脑的ip地址或者0.0.0.0
2.使用远程设备的端口转发功能,设置如下图
端口转发设置
成功的界面如下
端口转发设置成功
这样,我在手机上访问localhost:4200,相当于在电脑上访问localhost:4200,在手机上访问localhost:1025,相当于在电脑上访问localhost:80。需要注意的是,端口转发的左侧输入框端口号必须大于1024,这是因为小于等于1024的端口号是被划分出来特殊使用的。

树的可视化以及家谱绘制的算法

一、前言

树的可视化意思就是将一颗树(数据结构的树)用图形展现出来。树的可视化有很多种用途,比如说很常见的组织结构图就是树的可视化的应用。家谱也可以算是一颗树,但是与树的可视化稍微不同的是,会有配偶这个角色,同时一个家谱可能并不是从一个根节点向下(可能存在某个家族向上追溯到某一代,之前的资料已经完全丢失了,那么这个家谱的起源应该是这一代人,而不是具体到某一个人)。

二、难点

在树的可视化过程中,难点就在于如何确定每一个节点的位置(X,Y),Y坐标相对来说是比较好确定的,可以根据它在一颗树中的第几层来确定Y的值;X的值既受到同一层其他的节点位置的影响,同时也受到其子代位置的影响。因为其子代可能会和其兄弟节点的子代交叉。所以这篇文章主要讲解的是每个点的位置的计算。

家谱的绘制和树的可视化也有前言中的一些不同之处,因为也不能完全的将树的可视化算法生搬硬套。

下面我将一步步的讲解整个树的可视化算法的过程,以及如何将这个算法应用到家谱的绘制上,并给出具体的实现代码(因为是网页上有这个需求,所以代码是js写的)。算法的主要提出者是John Q. Walker II,这里有他对这个算法的相关讲解。POSITIONING NODES FOR GENERAL TREES

三、算法介绍

图1:
示意图

1、节点属性定义

var Node = function(key,image,description){
    this.key = key;
    this.parent = null;         //父辈
    this.offspring = null;      //指向最左边的子辈
    this.leftSibling = null;    //左兄弟
    this.rightSibling = null;   //右兄弟
    this.prelim = 0;            //x坐标的预定义
    this.modifier = 0;          //调整的大小
    this.level = 0;
    this.width = 60;            //每个节点的最终宽度,如果存在配偶这个宽度是会发生改变的
    this.height = 80;           
    this.nodeWidth = 60;        //每个节点的基本宽度,不会变
    this.nodeHeight = 80;
    this.x = 0;
    this.y = 0;
    this.spouses = [];
    this.image = image||"assets/post-photo.jpg";
    this.description = description || "";
};

这里首先介绍几个重要的属性:

parent,offspring,leftSibling,rightSibling:分别是当前节点的父节点指针、最左边的孩子节点的指针、左兄弟、右兄弟。以上图为例,M点的parent是N,offspring是H,leftSibling是G,rightSibling是null。

prelim:当前点的X的预定义位置。如同在难点中所说,X的位置受到很多因素的影响,所以需要多次计算。prelim只是预定义的位置,不是最终的位置。

modifier:调整值,不过这个调整值并不是说当前点的X还要调整多少,而是当前点的后代的X还要调整的大小。

level:当前点所处的层级。

width、height、nodeWidth、nodeHeight:这些参数用来表示节点的宽和高,width和nodeWidth的区别在于,nodeWidth是画出来的节点的宽度,是一个基本属性,不会改变,width则表示这个点最终会占用的宽度。在树的可视化算法中,只需要nodeWidth和nodeHeight,width和height是专门为家谱的绘制增加的。

x:当前点的X,如何确定X的大小呢?以M为例,M.X = M.prelim + N.modifier + O.modifier。

y:当前点的y,以M为例: M.y = M.level * (每层的间隔 + M.height);

spouses:这个是专门为家谱的绘制准备的。表示当前点的配偶,一个点的配偶可以有多个,所以使用数组。

2、算法的大概流程

算法大概可以划分为:

  • 1.倒序遍历整颗树,初次确定每个点的prelim和modifier。
  • 2.从最底层逐级向上检查是否存在交叉,如果存在交叉则调整(这一步和我参考的算法不同,它是从上向下检查,但是我在使用中发现,如果存在子树的子树交叉的情况,那么在高层做了调整之后,这里再做一次调整就可能出现再次重叠的情况,所以改用从底层向上检查)。
    (勘误:这个地方不对,应该还是从上向下遍历,这样从上方调整后,即使下方还有重叠,也可以再次检测并调整)
  • 3.最后确定根节点的位置

3、算法详细介绍

这里我们先定义几个常量:

SiblingSeparation = 4,   //兄弟节点之间的间隔
SubtreeSeparation = 6,   //子树之间的间隔
width = nodeWidth = 2   //节点的宽度

还是以图1所示的树为例,这里先做一遍倒序遍历,初步确定每一个节点的位置:

A:因为A的左节点是null,所以

A.prelim = 0;
A.modifier = 0;

B:

B.prelim = 0;
B.modifier = 0;

C:因为C有左节点B,所以C要在B的基础上做向右的偏移,偏移量为B的prelim加上B的宽度加上间隔大小(这个地方的处理和John Q. Walker II的算法不同,是为了家谱的显示做的更改)。所以

C.prelim = B.prelim + B.width + SiblingSeparation = 0 + 2 + 4 = 6;
C.modifier = 0;

D:因为D是B、C的父节点,同时是A的右兄弟,所以D的位置由A确定后,为了保证D应该在B和C中间,应该对B、C做调整,但是我们并不会再回头对B、C做处理,而是计算D的modifier,以此来表示D的后代需要调整的位置。并且D.modifer应该等于D.prelim – (B.prelim + C.prelim)/2。

D.prelim = A.prelim + A.width +  SiblingSeparation= 0 + 2 + 4 = 6;
D.modifier = D.prelim - (B.prelim + C.prelim) / 2 = 6 - (0 + 6) / 2 = 3;

E:因为E没有左兄弟,所以直接通过A、D来确定位置即可

E.prelim = (A.prelim + D.prelim) / 2 = (0 + 6) / 2  = 3;
E.modifier = 0;

F:

F.prelim = E.prelim + E.width + SiblingSeparation = 3 + 2 + 4 = 9;
F.modifier = 0;

G:

G.prelim = 0;
G.modifier = 0;

H:

H.prelim = 0;
H.modifier = 0;

I:

I.prelim = H.prelim + H.width + SiblingSeparation = 0 + 2 + 4 = 6;
I.modifier = 0;

J:

J.prelim = I.prelim + I.width + SiblingSeparation = 6 + 2 + 4 = 12;
J.modifier = 0;

K:

K.prelim = J.prelim + J.width + SiblingSeparation = 12 + 2 + 4 = 18;
K.modifier = 0;

L:

L.prelim = K.prelim + k.width + SiblingSeparation = 18 + 2 + 4 = 24;
L.modifier = 0;

M:

M.prelim = G.prelim + G.width + SiblingSeparation = 6;
M.modifer = M.prelim - (H.prelim + L.prelim) / 2 = -6;

N:

N.prelim = F.prelim + F.width + SiblingSeparation = 9 + 2 + 4 = 15;
N.modifier = N.prelim - (G.prelim + M.prelim) / 2 =  12;

走到这里我们就不再向上走了,因为根节点的位置肯定是有E和N来最终确定。所以这个时候应该来调整整颗树,检查是否存在节点位置重叠的情况。检查是否重叠是一个递归,所以使用递归检查会降低很多复杂度。但是这里我们走一遍过程,并非是严格按照代码执行过程。

我们先检查最底层,也就是这颗树的第4层:
检查C和H是否有重叠(当然是最右和最左做对比了):
C.X = D.modifier + E.modifier + C.prelim = 3 + 0 + 6 = 9;
H.X = M.modifier + N.modifier + H.prelim = -6 + 12 + 0 = 6;

所以H在C的左边3处,显然是重叠了的。所以需要将H所在的子树全部向右移offset = SubtreeSeparation - (H.X-C.X) = 9;我们一直向上找,直到C和H的祖先是兄弟时,也就是E和N,修改N.prelim、N.modifier就代表将N和其子树全体右移。

N.prelim = N.prelim + offset = 15 + 9 = 24;
N.modifier = N.modifier + offset = 12 + 9 = 21;

这时候E和N之间距离变得更大了,但是E和F的距离与F和N的距离是不同的,为了让树显示的好看一点

F.prelim = F.prelim + offset / 2 = 9 + 9 / 2 = 13.5;

F的子树也应该调整一下(虽然这里并没有)

F.modifier = F.modifier + offset / 2 = 0 + 9 / 2 = 4.5;

我们再往上看,发现已经没有重叠的了。

这时候确定根节点O的位置

O.prelim = (E.prelim + N.prelim) / 2 = (3 + 24) / 2 = 13.5;
O.modifier = 0;

四、如何将树的可视化算法应该到家谱的绘制上。

在之前已经介绍过家谱和树的结构有稍许不同,所以我们尽量将家谱做一些处理,使其变成标准的树结构。

  • 1.配偶问题。
    将某个节点的配偶不当成一个节点看待,而是这个节点的一部分,所以我们只需在有配偶的时候改变节点的width就可以了。
  • 2.不只一个根节点的问题
    既然有不只一个根节点存在,那么我们就自己定义一个虚拟的节点作为根节点,将第一层所有的点的parent都指向这个根节点即可。这样就是一个非常标准的树结构了。

五、源代码

源代码放在了github上,地址为https://github.com/joyme123/candraw

leveldb源代码阅读(三)-memtable的实现

一、序言

在之前的文章中提到,leveldb所有的记录都是由一个个log文件逐渐转变来的,与存储在磁盘上的log文件对应的就是内存中的memtable。memtable和log文件存储的内容是一致的。

二、SkipList

memtable中的键值对是通过SkipList这种数据结构存储的,SkipList相对于普通的List来说,普通的List查找时间复杂度为O(n),而SkipList可以达到O(log(n)),同时,SkipList相对与红黑树,在多线程上,需要锁住的数据量更小,性能更优。

SkipList示意图

比如上图就是一个SkipList结构,如果我们需要搜索45这个点,我们首先在最高层(第二层)找到30,然后发现下一个点是57,已经大于45了,于是来到下一层(第一层)从30开始,向后找,就能找到45了,在这个过程中,我们跳过了很多个点。因此这种数据结构得名SkipList。

2.1 SkipList的基本信息

SkipList是memtable中用到的最主要的数据结构。
SkipList的线程安全特点:写操作需要外部的同步,比如信号量这种。读操作需要保证在读的时候SkipList不会被销毁。除此之外,读操作没有任何的内部锁或者同步。
SkipList的基本原则:
(1)分配节点在SkipList销毁之前不会被删除。因为在代码中我们没有任何的删除节点的操作。
(2)一个节点的内容,除了next/prev指针,在这个节点被存储到SkipList之后其他数据都是不变的。只有Insert()会修改list的内容,初始化一个节点并使用release-stores去将这个节点存放到一个或多个list中需要谨慎对待。

2.2 SkipList的成员函数

SkipList中,公共的成员函数只有两个
void Insert(const Key& key);
bool Contains(const Key& key) const;

顾名思义,Insert用来向SkipList中插入key,Contains用来检查SkipList中是否包含某个key。

私有的成员函数如下:

  Node* NewNode(const Key& key, int height);
  int RandomHeight();
  bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }

  // Return true if key is greater than the data stored in "n"
  bool KeyIsAfterNode(const Key& key, Node* n) const;

  // Return the earliest node that comes at or after key.
  // Return NULL if there is no such node.
  //
  // If prev is non-NULL, fills prev[level] with pointer to previous
  // node at "level" for every level in [0..max_height_-1].
  Node* FindGreaterOrEqual(const Key& key, Node** prev) const;

  // Return the latest node with a key < key.
  // Return head_ if there is no such node.
  Node* FindLessThan(const Key& key) const;

  // Return the last node in the list.
  // Return head_ if list is empty.
  Node* FindLast() const;

2.3 SkipList的节点Node实现

// Implementation details follow
template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node {
  explicit Node(const Key& k) : key(k) { }

  Key const key;

  // Accessors/mutators for links.  Wrapped in methods so we can
  // add the appropriate barriers as necessary.
  Node* Next(int n) {
    assert(n >= 0);
    // Use an 'acquire load' so that we observe a fully initialized
    // version of the returned Node.
    return reinterpret_cast<Node*>(next_[n].Acquire_Load());
  }
  void SetNext(int n, Node* x) {
    assert(n >= 0);
    // Use a 'release store' so that anybody who reads through this
    // pointer observes a fully initialized version of the inserted node.
    next_[n].Release_Store(x);
  }

  // No-barrier variants that can be safely used in a few locations.
  Node* NoBarrier_Next(int n) {
    assert(n >= 0);
    return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
  }
  void NoBarrier_SetNext(int n, Node* x) {
    assert(n >= 0);
    next_[n].NoBarrier_Store(x);
  }

 private:
  // Array of length equal to the node height.  next_[0] is lowest level link.
  port::AtomicPointer next_[1];
};

注意到Node实现了两个版本的Next和SetNext,带有NoBarrier_的是多进程不安全的,因为在少数情况下会用到,因此做了额外的实现。另外,可以注意这里只实现了Next()方法,并没有Prev()指向上一个节点,可以知道SkipList并不是一个双向链表,但是SkipList的迭代器是支持Prev()方法的,所以这是一个值得注意的地方。

Node节点的创建是通过NewNode()方法

template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
  char* mem = arena_->AllocateAligned(
      sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
  return new (mem) Node(key);
}

NewNode中使用了arena来分配对齐的内存,分配的内存大小是sizeof(Node)+sizeof(port::AtomicPointer) * (height - 1)。这里可以注意到申请的空间不仅仅是一个Node的大小,还包括sizeof(port::AtomicPointer) * (height – 1),这是因为SkipList是一个多层的结构,所以如果当前Node是3层,那么就需要3个Next指针,在Node的代码中,默认只申请了一个Next指针空间,所以需要额外的多申请(height – 1)个AtomicPointer的空间。

2.4 SkipList的迭代器(Iterator)

SkipList实现了自己的迭代器,迭代器的声明如下:

class Iterator {
   public:
    // Initialize an iterator over the specified list.
    // The returned iterator is not valid.
    explicit Iterator(const SkipList* list);

    // 如果迭代器位于一个有效的节点上,就会返回true.
    bool Valid() const;

    // 返回当前位置的key.
    // REQUIRES: Valid()
    const Key& key() const;

    // 前进到下一个位置
    // REQUIRES: Valid()
    void Next();

    // 后退到上一个位置
    // REQUIRES: Valid()
    void Prev();

    // 移到第一个key >= target的entry
    void Seek(const Key& target);

    // 移到这个list的第一个entry
    // 如果list非空,迭代器的最终状态是Valid()
    void SeekToFirst();

    // 移到这个list的最后一个entry
    // 如果list非空,迭代器的最终状态是Valid()
    void SeekToLast();

   private:
    const SkipList* list_; 
    Node* node_;
    // Intentionally copyable
  };

其中Prev()的实现并不是通过prev指针,因为上文中已经介绍了Node只有next指针。Prev()是通过FindLessThan来实现,找到小于指定Node的最后一个值。这里有一个问题,为什么不使用prev指针,难道不会大大的降低时间复杂度吗?看一下FindLessThan的具体实现。

template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    assert(x == head_ || compare_(x->key, key) < 0);
    Node* next = x->Next(level);
    if (next == NULL || compare_(next->key, key) >= 0) {
      if (level == 0) {
        return x;
      } else {
        // Switch to next list
        level--;
      }
    } else {
      x = next;
    }
  }
}

其实就是SkipList查找过程的实现。时间复杂度应该是O(log(n)).相比于prev指针的话,在空间占用上是少一点的。具体为什么这样实现还得继续往下看。

SkipList还以同样的方式实现了FindGreaterOrEqualFindLast

template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
    const {
  Node* x = head_;
  int level = GetMaxHeight() - 1;
  while (true) {
    Node* next = x->Next(level);
    if (KeyIsAfterNode(key, next)) {
      // Keep searching in this list
      x = next;
    } else {
      if (prev != NULL) prev[level] = x;
      if (level == 0) {
        return next;
      } else {
        // Switch to next list
        level--;
      }
    }
  }
}

FindGreaterOrEqual的实现还是比较有意思的,在返回比指定key大于或等于的节点的同时,还可以选择性的将该节点的prev指针保存到Node** prev中,返回给用户。其实这也是Iterator中Prev()的实现不采用prev指针的原因,因为有了FindGreaterOrEqual这样的实现,SkipList的内部实现中根本不需要Prev()来将指针向前移动,同时又大大的节约了内存。

下面看一下SkipList的Insert的实现。

void SkipList<Key,Comparator>::Insert(const Key& key) {
  // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
  // here since Insert() is externally synchronized.
  Node* prev[kMaxHeight];
  Node* x = FindGreaterOrEqual(key, prev);

  // Our data structure does not allow duplicate insertion
  assert(x == NULL || !Equal(key, x->key));

  int height = RandomHeight();
  if (height > GetMaxHeight()) {
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;
    }
    //fprintf(stderr, "Change height from %d to %d\n", max_height_, height);

    // It is ok to mutate max_height_ without any synchronization
    // with concurrent readers.  A concurrent reader that observes
    // the new value of max_height_ will see either the old value of
    // new level pointers from head_ (NULL), or a new value set in
    // the loop below.  In the former case the reader will
    // immediately drop to the next level since NULL sorts after all
    // keys.  In the latter case the reader will use the new node.
    max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
  }

  x = NewNode(key, height);
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);
  }
}

首先,Insert方法中没有做任何的多线程同步,所以需要调用者在外部进行写入同步(防止多个线程同步写入)。

插入的过程是先找到需要插入的位置,然后调用RandomHeight()来随机得到当前节点的高度,之后向普通链表一样执行插入操作。

注意到这里有一个多线程读写的问题。也就是当我们改变max_height_的值的时候,并没有改变head_指针。这种情况下有两种可能,一是读到了新的max_height_,但是head_指针还没有更改,这时候指针会立刻下降到下一级,SkipList的多级只是为了提高查找速度,所以在这里并没有其他的副作用;二是正常的情况,不作讨论。

2.Memtable

Memtable的实现主要就是依赖于SkipList,分析了SKipList之后,Memtable的插入,查找操作都是SkipList的插入和查找。

  // Increase reference count.
  void Ref() { ++refs_; }

  // Drop reference count.  Delete if no more references exist.
  void Unref() {
    --refs_;
    assert(refs_ >= 0);
    if (refs_ <= 0) {
      delete this;
    }
  }

Memtable的实现使用了类似于计数器引用的机制,不过这是手动实现的,所以需要用户在使用Memtable时调用Ref()来增加引用计数,引用结束后调用Unref来减少引用计数,当引用计数为0时则销毁自己。

总结

Memtable的设计还是很让人佩服的,至少对我来说是这样。了解到了SkipList这样的数据结构,也了解到内存屏障这种多进程情况下代码执行乱序的问题,还有内存对齐的必要、内存对齐的实现。同时也了解了一些C++编写程序的独特风格(我写的C++一直像Java,反而少了一点C++的美)

leveldb源码阅读(二)—— Varint和Arena的实现

一、Varint

Varint是在leveldb中广泛使用的一种变长的整数类型,Varint其实和unicode的实现非常相似,并且是little-endian。如果当前字节的最高位是1,则表示后面的字节也属于这个整数,如果最高位是0,则表示该整数结束了。所以,一个无符号4个字节的整型数,使用Varint可能以1,2,3,4,5个字节来表示。

比如130:

无符号整型表示起来为:00000000 00000000 00000000 10000010
使用Varint表示为:10000010 00000001             #这里注意Varint是little-endian

因为牺牲了每个字节的最高位作为标志位,所以遇到大的数可能需要5个字节。

某个无符号整型:10000001 00000001 00000001 00000001
使用Varint表示:10000001 10000010 10000100 10001000 00001000

但是平均下来,Varint是绝对会节省大量的存储空间的。

那么leveldb中Varint是如何编码和解码的呢?

char* EncodeVarint32(char* dst, uint32_t v) {
  // Operate on characters as unsigneds
  unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
  static const int B = 128; //10000000
  if (v < (1<<7)) {  v < 2^7
    *(ptr++) = v;
  } else if (v < (1<<14)) { v < 2^14
    *(ptr++) = v | B;
    *(ptr++) = v>>7;
  } else if (v < (1<<21)) { //v < 2^21
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = v>>14;
  } else if (v < (1<<28)) {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = v>>21;
  } else {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = (v>>21) | B;
    *(ptr++) = v>>28;
  }
  return reinterpret_cast<char*>(ptr);
}

上面这一段对uint32_t的变量进行Varint32编码的例子,编码的结果保存在dst里,返回的ptr是dst的最后一个字节的指针。使用if-else对1、2、3、4、5个字节的情况分别做了处理。主要就是移位和或运算符的使用。跟着代码走一遍就能理解。

const char* GetVarint32PtrFallback(const char* p,
                                   const char* limit,
                                   uint32_t* value) {
  uint32_t result = 0;
  for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
    uint32_t byte = *(reinterpret_cast<const unsigned char*>(p));
    p++;
    if (byte & 128) {
      // More bytes are present
      result |= ((byte & 127) << shift);
    } else {
      result |= (byte << shift);
      *value = result;
      return reinterpret_cast<const char*>(p);
    }
  }
  return NULL;
}
inline const char* GetVarint32Ptr(const char* p,
                                  const char* limit,
                                  uint32_t* value) {
  if (p < limit) {
    uint32_t result = *(reinterpret_cast<const unsigned char*>(p));
    if ((result & 128) == 0) {
      //如果低8位是0xxxxxxx,little-endian,也就是小于128
      *value = result;    //直接赋值
      return p + 1;
    }
  }
  return GetVarint32PtrFallback(p, limit, value);
}

上面GetVarint32Ptr是解码Varint32,当值大于128时,则调用GetVarint32PtrFallback。在则调用GetVarint32PtrFallback中,利用循环一个字节一个字节的取出,最终将value指针指向结果result。

二、Arena

Arena是leveldb中管理内存分配的类。所有的内存分配都通过Arena申请,可以根据申请的内存大小使用不同的内存分配策略,也可以避免过多的内存碎片问题,并且在内存释放时统一使用Arena来释放,方便管理。

Arena类的实现并不复杂。首先看一下成员变量。

  // Allocation state
  char* alloc_ptr_;                 //指向当前块中剩余的内存起点
  size_t alloc_bytes_remaining_;    //当前块中剩余的内存

  // Array of new[] allocated memory blocks
  std::vector<char*> blocks_;       //用来保存所有new出来的char数组,释放内存时也会使用到

  // Total memory usage of the arena.
  port::AtomicPointer memory_usage_;    //通过Arena申请的内存

然后就是public的成员函数:

// Return a pointer to a newly allocated memory block of "bytes" bytes.
  char* Allocate(size_t bytes);

  // Allocate memory with the normal alignment guarantees provided by malloc
  char* AllocateAligned(size_t bytes);

  // Returns an estimate of the total memory usage of data allocated
  // by the arena.
  size_t MemoryUsage() const {
    return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
  }

功能上很简单,Allocate用来申请指定大小的内存,AllocateAligned用来申请保证内存对齐的内存空间,MemoryUsage用来获取内存使用情况。

我对内存对齐的理解:现在假定内存对齐的最小单位是8个字节,那么如果你只申请14个字节,内存对齐会多给你分配2个字节凑成16个字节。内存对齐的好处就是访问速度的提升,在CPU一次读取8个字节的情况下。在没有内存对齐的情况下,你要读取第6~12个字节,就需要读取两次,第一次0~7个字节,通过移位取出6~7,再读取第8~15个字节,取出8~12。这样就需要读取两次。而如果在内存对齐的情况下,你需要读取的6~12个字节应该被放在8~15个字节中。只需读取一次即可。

私有的成员函数有:

char* AllocateFallback(size_t bytes);
char* AllocateNewBlock(size_t block_bytes);

这两个函数提供了具体的内存分配的实现。

具体分析这5个成员函数。

1.Allocate(size_t bytes)

inline char* Arena::Allocate(size_t bytes) {
  // The semantics of what to return are a bit messy if we allow
  // 0-byte allocations, so we disallow them here (we don't need
  // them for our internal use).
  assert(bytes > 0);
  if (bytes <= alloc_bytes_remaining_) {
    char* result = alloc_ptr_;
    alloc_ptr_ += bytes;
    alloc_bytes_remaining_ -= bytes;
    return result;
  }
  return AllocateFallback(bytes);
}

Allocate是一个内联函数。内联函数的好处在于编译时会直接在调用处展开,对于程序执行速度有一定提升,但是也会导致程序大小变大。
assert(bytes > 0);是一个断言语句,断言语句可以在编译时由编译器去除,但是在调试的时候可以帮助程序员发现问题所在。这里用来保证没有出现0个字节分配的情况。
后面的判断语句用来判断当前块的剩余空间是否够分配,如果够则分配。如果不够则交给AllocateFallback()处理。返回的是分配的空间起点。

2.AllocateAligned(size_t bytes)

char* Arena::AllocateAligned(size_t bytes) {
  const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
  assert((align & (align-1)) == 0);   // Pointer size should be a power of 2
  size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
  size_t slop = (current_mod == 0 ? 0 : align - current_mod);
  size_t needed = bytes + slop;
  char* result;
  if (needed <= alloc_bytes_remaining_) {
    result = alloc_ptr_ + slop;
    alloc_ptr_ += needed;
    alloc_bytes_remaining_ -= needed;
  } else {
    // AllocateFallback always returned aligned memory
    result = AllocateFallback(bytes);
  }
  assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
  return result;
}

AllocateAligned(size_t bytes)用来做内存对齐的分配。
const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;用来获取分配的最小单位,如果指针大小大于8,则取指针大小,否则使用8作为最小的对齐单位。
assert((align & (align-1)) == 0);这是一个很有意思的判断,可以保证只有2的n次方才能满足。

size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);current_mod是余数,比如当前指针地址是15,align是8,那么余数为7,这个余数就是alloc_prt_&(align-1)的值。
size_t slop = (current_mod == 0 ? 0 : align - current_mod);因为当前指针可能不是align的整数倍,slop即代表当前指针需要偏移的大小,来保证是align的整数倍。
size_t needed = bytes + slop; needed代表需要分配的空间。

之后的判断则和Allocate()函数做法相同了。需要注意的是返回的result指针是align的整数倍,这样就能保证以最少的内存读取次数来获取想要的数据。

3.AllocateFallback(size_t bytes)

char* Arena::AllocateFallback(size_t bytes) {
  if (bytes > kBlockSize / 4) {
    //如果需要分配的bytes大于块大小的1/4,则单独在新的块中分配,以此避免浪费过多的剩下的空间(这样能保证浪费的空间小与kBlockSize / 4)
    char* result = AllocateNewBlock(bytes);
    return result;
  }

  // 重新开辟新的块空间进行分配,上一个块剩下的空间都浪费了。
  alloc_ptr_ = AllocateNewBlock(kBlockSize);
  alloc_bytes_remaining_ = kBlockSize;

  char* result = alloc_ptr_;
  alloc_ptr_ += bytes;
  alloc_bytes_remaining_ -= bytes;
  return result;
}

上面两个成员函数在当前块分配空间不足时,都将分配任务交给AllocateFallback处理。

4.AllocateNewBlock(size_t block_bytes)。

char* Arena::AllocateNewBlock(size_t block_bytes) {
  char* result = new char[block_bytes];
  blocks_.push_back(result);
  memory_usage_.NoBarrier_Store(
      reinterpret_cast<void*>(MemoryUsage() + block_bytes + sizeof(char*)));
  return result;
}

使用new char来申请一个新的块,并将块的起始指针存入blocks_中。更新memory_usage的大小。

5.MemoryUsage()

“`
size_t MemoryUsage() const {
return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
}
“`
获取memory_usage_中存储的使用空间数据。

三、Memeory Barrier(内存屏障)

memory_usage_的类型是AtomicPointer,这是一个原子类型,针对不同平台有不同的实现。可以用来线程安全的存储和获取值。

AtomicPointer类实现了几个函数:

NoBarrier_Load();
NoBarrier_Store(void* v)
Acquire_Load();
Release_Store(void* v)

这里面涉及到一个Memory Barrier(内存屏障,Memory fence)的概念。

引用一个wikipedia的例子:

Processor #1:

while (f 0);
// Memory fence required here
print x;

Processor #2:

x = 42;
// Memory fence required here
f = 1;

一个两核心的CPU按照上述例子执行。可能会出现很多种情况:
虽然我们希望的是打印“42”。但是如果核心#2存储操作乱序执行(out-of-order),就有可能f在x之前被更新了。这样打印语句就可能打印出“0”。同样的,核心#1读取操作可能也乱序执行,x就有可能在f被判断之前读取了,打印语句可能打印出一个无法预期的值。对于大多数程序来说,上面的这些情况都是不可以接受的。使用memory barrier可以避免out-of-order的问题。

简而言之就是,在多CPU共享内存空间的情况下,两条语句的执行顺序已经无法保证了。需要memory barrier来保证执行顺序的正确。

当然,单CPU是不会出现这个问题的。

现在再来解释上面的4个函数的作用。

NoBarrier_Load();           //不使用内存屏障的读取
NoBarrier_Store(void* v)    //不使用内存屏障的存储
Acquire_Load();             //使用内存屏障的读取
Release_Store(void* v)      //使用内存屏障的存储

leveldb源码阅读(一)-log读取和写入

leveldb源码阅读(一)-log读取和写入

标签(空格分隔): leveldb log


一、 引言

leveldb中,所有对数据库的操作都会记录到log中,当log文件达到预定义的大小后,就会被转换成数据表(sstable),所以log文件的生成和读取算是leveldb核心部分的第一步。
这篇文章涉及到的leveldb的源代码文件有:

db/log_format.h
db/log_reader.h
db/log_reader.cc
db/log_writer.h
db/log_writer.cc
leveldb/slice.h
leveldb/status.h

二、日志存储格式——log_format.h

在之前的文章中,了解到log是由一个个block组成,每个block是32KB的大小。

block := record* trailer?
record :=
  checksum: uint32     // crc32c of type and data[] ; little-endian
  length: uint16       // little-endian
  type: uint8          // One of FULL, FIRST, MIDDLE, LAST
  data: uint8[length]

每个block中存储的是一条条的记录,如果一个block的末尾最后剩下小于等于6bytes的空间,就会使用一个trailer填充而不是用新的记录填充,这个trailer都是由0字节组成。一条记录包括一个crc32c生成的校验码checksum、记录的长度length、记录的类型type、数据。

如果一条记录跨越了多个block,就会被分段,分段类型有3种——First、Middle、Last

关于这部分的实现是在log_format.h中

enum RecordType {
  //0是保留字,用来表示预分配的文件
  kZeroType = 0,

  kFullType = 1,

  // 一条记录被分段的3种类型
  kFirstType = 2,
  kMiddleType = 3,
  kLastType = 4
};
//定义了最大的记录类型值
static const int kMaxRecordType = kLastType;
//定义了最大的`block`大小
static const int kBlockSize = 32768;

// 记录头是  checksum (4 bytes), length (2 bytes), type (1 byte).总共7bytes
static const int kHeaderSize = 4 + 2 + 1;

三、日志的写入

在log_writer代码中,实现了Writer类,最主要的就是一个AddRecord(const Slice& slice)

Writer类有两个构造函数,对于只传了一个参数——WritableFile指针的构造函数,使用了explicit显示构造,这样可以防止隐式转换。另外将Writer(const Writer&)void operator=(const Writer&)放在private域,防止拷贝赋值。

Writer有三个成员变量 WritableFile* dest_用来保存可写的文件,int block_offset_用来保存当前块的偏移量, uint32_t type_crc_[kMaxRecordType + 1]用来保存5种记录类型的crc32c计算结果。这里会提前计算好保存进这个数组来减少存储时计算的负载。

接下来就是最重要的AddRecord(const Slice& slice)了。Slice是leveldb中用来保存key或者value的值的,包含一个指针,一个长度。

Status Writer::AddRecord(const Slice& slice) {
  const char* ptr = slice.data();   //取出数据的指针
  size_t left = slice.size();       //取出数据的长度

  // 如果有必要的话,对记录分段,并保存.
  // 注意如果slice是空的,我们也会迭代一次,保存一个长度为0的记录
  Status s;     //用来保存这次操作的结果
  bool begin = true;    //是否是一条记录的开始
  do {
    //计算出当前块剩下的空间
    const int leftover = kBlockSize - block_offset_;
    //使用assert进行断言,这边是一定会>=0的,assert这里只是为了帮助调试程序,实际使用时在编译时通过参数去除assert语句
    assert(leftover >= 0);

    //如果剩下的空间已经不足一条记录的header了
    if (leftover < kHeaderSize) {
      // 切换到新的块
      if (leftover > 0) {
        // 如果剩下的空间>0,需要使用\x00填充
        assert(kHeaderSize == 7);
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      }
      //新的块偏移量是0
      block_offset_ = 0;
    }

    // 不变的道理:我们绝对不会让剩下的空间<kHeaderSize.
    assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

    //当前还可用的空间
    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;

    //记录片段的大小,取可用的空间大小和记录的大小的最小值
    const size_t fragment_length = (left < avail) ? left : avail;

    //当前的记录类型
    RecordType type;

    //如果记录的大小和片段的长度是一样的,表示到达了记录的末尾为true,否则false
    const bool end = (left == fragment_length);

    //记录类型的判断
    if (begin && end) {
      type = kFullType;
    } else if (begin) {
      type = kFirstType;
    } else if (end) {
      type = kLastType;
    } else {
      type = kMiddleType;
    }

    //写入到物理存储中,
    s = EmitPhysicalRecord(type, ptr, fragment_length);
    ptr += fragment_length;     //数据指针右移一个片段长度
    left -= fragment_length;    //数据的长度减去一个片段的长度
    begin = false;              //不再是开始了
  } while (s.ok() && left > 0); //如果没有出错,并且数据还剩就再循环

  return s;
}

AddRecord(const Slice& slice)调用了一个私有成员函数EmitPhysicalRecord(RecordType t, const char* ptr, size_t n),看一看这个将记录写入持久化存储是如何实现的。

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
  assert(n <= 0xffff);  // n必须可以用2KB表示,因为一条记录的length是uint16大小的,little-endian
  assert(block_offset_ + kHeaderSize + n <= kBlockSize);

  // 对记录的header进行格式化
  char buf[kHeaderSize];
  //取低位字节
  buf[4] = static_cast<char>(n & 0xff);
  //取高位字节
  buf[5] = static_cast<char>(n >> 8);
  //一位的记录类型
  buf[6] = static_cast<char>(t);

  // 计算记录类型的crc值和有效载荷
  uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
  crc = crc32c::Mask(crc);                 // Adjust for storage
  //填充4位的校验值
  EncodeFixed32(buf, crc);

  // 将记录的header写入
  Status s = dest_->Append(Slice(buf, kHeaderSize));
  if (s.ok()) {
    //将记录的数据写入
    s = dest_->Append(Slice(ptr, n));
    if (s.ok()) {
      //flush一下
      s = dest_->Flush();
    }
  }

  //块的偏移量右移header和数据的长度和
  block_offset_ += kHeaderSize + n;
  return s;
}

上面的代码分析其实就是log部分逻辑上的实现,将一条记录格式化成设计好的格式。这里有一个疑问就是dest_->Append(Slice& slice)dest_->Flush()是如何工作的。所以继续追踪下去,到include/leveldb/env.h中:

// A file abstraction for sequential writing.  The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
class WritableFile {
 public:
  WritableFile() { }
  virtual ~WritableFile();

  virtual Status Append(const Slice& data) = 0;
  virtual Status Close() = 0;
  virtual Status Flush() = 0;
  virtual Status Sync() = 0;

 private:
  // No copying allowed
  WritableFile(const WritableFile&);
  void operator=(const WritableFile&);
};

这部分是WritableFile的抽象部分,Append、Close、Flush、Sync都是纯虚函数,具体实现必须由用户自己去做。并且实现时必须提供缓冲区,这样调用者可以append小的数据段到文件中。

实际上leveldb中也有默认(posix)的实现。在util/env/env_posix.cc中。目前只看Append和Flush具体实现:

  virtual Status Append(const Slice& data) {
    size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
    if (r != data.size()) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }
  virtual Status Flush() {
    if (fflush_unlocked(file_) != 0) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }

fwrite_unlocked和fflush_unlocked都是define的fwrite和fflush方法。

所以这里的设计是很好的案例。通过将WritableFile抽象化,提供给用户自己来实现,从而很轻松的就可以在多平台间移植。

四、日志的读取

在log_reader中,实现了Reader类,用来读取日志文件。
Reader的构造函数定义如下:

Reader(SequentialFile* file, Reporter* reporter, bool checksum,
         uint64_t initial_offset);

file是代表要读取的文件,reporter是用来报告日志文件中发现错误的,checksum是当前文件的校验码,initial_offset是当前文件中开始读取的物理位置。

Reader类中私有的成员变量和函数需要注意的有:

  SequentialFile* const file_;      //要读取的文件
  Reporter* const reporter_;        //报告错误的对象
  bool const checksum_;             //校验值
  char* const backing_store_;       //备份存储
  Slice buffer_;                    //缓冲
  bool eof_;   // Last Read() indicated EOF by returning < kBlockSize

  // Offset of the last record returned by ReadRecord.
  uint64_t last_record_offset_;         //最后一个Record的偏移量
  // Offset of the first location past the end of buffer_.
  uint64_t end_of_buffer_offset_;       //

  // Offset at which to start looking for the first record to return
  uint64_t const initial_offset_;   //第一个记录的初始偏移量

  // True if we are resynchronizing after a seek (initial_offset_ > 0). In
  // particular, a run of kMiddleType and kLastType records can be silently
  // skipped in this mode
  bool resyncing_;      //在一次查找后我们再同步则是True。特别的,kMiddleType和kLastType会被跳过   

  // Extend record types with the following special values
  enum {
    kEof = kMaxRecordType + 1,
    // 无效的记录值,可能有以下情况
    // * 具有无效的CRC值(checksum不正确)(ReadPhysicalRecord reports a drop)
    // * 长度为0的记录 (No drop is reported)
    // * The record is below constructor's initial_offset (No drop is reported)
    kBadRecord = kMaxRecordType + 2
  };

  bool SkipToInitialBlock();        //跳过直到initial_offset的位置,成功则返回true
  unsigned int ReadPhysicalRecord(Slice* result);   //读取Record,返回记录类型,或者kEof、kBadRecord

  void ReportCorruption(uint64_t bytes, const char* reason);    //汇报丢弃的字节,以及原因
  void ReportDrop(uint64_t bytes, const Status& reason);        //汇报丢弃的字节,以及原因

这些成员函数中着重看一下ReadPhysicalRecord

unsigned int Reader::ReadPhysicalRecord(Slice* result) {
  while (true) {
    if (buffer_.size() < kHeaderSize) {
      if (!eof_) {
        // Last read was a full read, so this is a trailer to skip
        buffer_.clear();
        Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
        end_of_buffer_offset_ += buffer_.size();
        if (!status.ok()) {
          buffer_.clear();
          ReportDrop(kBlockSize, status);
          eof_ = true;
          return kEof;
        } else if (buffer_.size() < kBlockSize) {
          eof_ = true;
        }
        continue;
      } else {
        // Note that if buffer_ is non-empty, we have a truncated header at the
        // end of the file, which can be caused by the writer crashing in the
        // middle of writing the header. Instead of considering this an error,
        // just report EOF.
        buffer_.clear();
        return kEof;
      }
    }

    // Parse the header
    const char* header = buffer_.data();
    const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
    const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
    const unsigned int type = header[6];
    const uint32_t length = a | (b << 8);
    if (kHeaderSize + length > buffer_.size()) {
      size_t drop_size = buffer_.size();
      buffer_.clear();
      if (!eof_) {
        ReportCorruption(drop_size, "bad record length");
        return kBadRecord;
      }
      // If the end of the file has been reached without reading |length| bytes
      // of payload, assume the writer died in the middle of writing the record.
      // Don't report a corruption.
      return kEof;
    }

    if (type == kZeroType && length == 0) {
      // Skip zero length record without reporting any drops since
      // such records are produced by the mmap based writing code in
      // env_posix.cc that preallocates file regions.
      buffer_.clear();
      return kBadRecord;
    }

    // Check crc
    if (checksum_) {
      uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
      uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
      if (actual_crc != expected_crc) {
        // Drop the rest of the buffer since "length" itself may have
        // been corrupted and if we trust it, we could find some
        // fragment of a real log record that just happens to look
        // like a valid log record.
        size_t drop_size = buffer_.size();
        buffer_.clear();
        ReportCorruption(drop_size, "checksum mismatch");
        return kBadRecord;
      }
    }

    buffer_.remove_prefix(kHeaderSize + length);

    // Skip physical record that started before initial_offset_
    if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
        initial_offset_) {
      result->clear();
      return kBadRecord;
    }

    *result = Slice(header + kHeaderSize, length);
    return type;
  }
}

Reader类中公共的成员函数需要注意的有:
bool ReadRecord(Slice* record, std::string* scratch);
uint64_t LastRecordOffset();

ReadRecord用来读取下一个要读取的Record,如果读取成功,返回true,存储在record中,如果读到了结尾没有一个完整的Record,则存储在scratch中

bool Reader::ReadRecord(Slice* record, std::string* scratch) {
  if (last_record_offset_ < initial_offset_) {
    if (!SkipToInitialBlock()) {    //跳到初始的偏移位置
      return false;
    }
  }

  scratch->clear();
  record->clear();
  bool in_fragmented_record = false;
  // Record offset of the logical record that we're reading
  // 0 is a dummy value to make compilers happy
  uint64_t prospective_record_offset = 0;

  Slice fragment;
  while (true) {
    const unsigned int record_type = ReadPhysicalRecord(&fragment);

    // ReadPhysicalRecord may have only had an empty trailer remaining in its
    // internal buffer. Calculate the offset of the next physical record now
    // that it has returned, properly accounting for its header size.
    uint64_t physical_record_offset =
        end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();

    if (resyncing_) {
      if (record_type == kMiddleType) {
        continue;
      } else if (record_type == kLastType) {
        resyncing_ = false;
        continue;
      } else {
        resyncing_ = false;
      }
    }

    switch (record_type) {
      case kFullType:
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record = false;
          } else {
            ReportCorruption(scratch->size(), "partial record without end(1)");
          }
        }
        prospective_record_offset = physical_record_offset;
        scratch->clear();
        *record = fragment;
        last_record_offset_ = prospective_record_offset;
        return true;

      case kFirstType:
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (scratch->empty()) {
            in_fragmented_record = false;
          } else {
            ReportCorruption(scratch->size(), "partial record without end(2)");
          }
        }
        prospective_record_offset = physical_record_offset;
        scratch->assign(fragment.data(), fragment.size());
        in_fragmented_record = true;
        break;

      case kMiddleType:
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           "missing start of fragmented record(1)");
        } else {
          scratch->append(fragment.data(), fragment.size());
        }
        break;

      case kLastType:
        if (!in_fragmented_record) {
          ReportCorruption(fragment.size(),
                           "missing start of fragmented record(2)");
        } else {
          scratch->append(fragment.data(), fragment.size());
          *record = Slice(*scratch);
          last_record_offset_ = prospective_record_offset;
          return true;
        }
        break;

      case kEof:
        if (in_fragmented_record) {
          // This can be caused by the writer dying immediately after
          // writing a physical record but before completing the next; don't
          // treat it as a corruption, just ignore the entire logical record.
          scratch->clear();
        }
        return false;

      case kBadRecord:
        if (in_fragmented_record) {
          ReportCorruption(scratch->size(), "error in middle of record");
          in_fragmented_record = false;
          scratch->clear();
        }
        break;

      default: {
        char buf[40];
        snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
        ReportCorruption(
            (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
            buf);
        in_fragmented_record = false;
        scratch->clear();
        break;
      }
    }
  }
  return false;
}

leveldb文档翻译(3)-leveldb的日志格式和表格式

日志文件的格式

日志文件的内容是一个32KB大小的块的序列。唯一可能的意外是日志文件结尾可能包含一个不完整的块。

每一个块由以下记录组成

block := record* trailer?
record :=
  checksum: uint32     // crc32c of type and data[] ; little-endian
  length: uint16       // little-endian
  type: uint8          // One of FULL, FIRST, MIDDLE, LAST
  data: uint8[length]

(注:crc32c是一种检验算法)

一条记录绝不会在块的最后6个字节内开始(因为剩下的6个字节已经不够大,checksum,length,type总共需要7个字节)。任何留下的字节形成trailer(拖车?),这部分必须完全是0字节,读取的时候必须跳过。

注:如果当前块恰好有7个字节留下来,一个新的非0长度的记录被增加,写入必须用一个FIRST记录(包含0字节的用户数据,FIRST用来表示这是一条记录的开始部分)去填充块的最后7个字节,之后将所有的用户数据写在后来的块中。

更多的类型可能在以后会加入进来。一些读取程序可能会跳过它们不认识的记录类型,其他读取程序可能会报告有些数据被跳过了。

FULL == 1
FIRST == 2
MIDDLE == 3
LAST == 4

FULL记录包含一个完整的用户记录的内容

FIRST,MIDDLE,LAST是用来表示用户记录被分成多个片段(尤其是块的边界)。FIRST是用户记录的第一个片段,LAST是用户记录的最后一个片段,MIDDLE是所有用户数据的内部片段。

例如:有以下的用户数据

A: length 1000
B: length 97270
C: length 8000

A将会在第一块中存储为FULL记录。

B将会划分成3段:第一段占用第一块剩下的部分,第二段占用整个第二块,第三段占用第三块的前面部分。这将会在第三块中留下6个字节的空间,被留作空白当成trailer。

C将会在第四块中存储为一个FULL记录。

这种记录格式的优点

1.我们不需要任何的同步的启发式算法-直接跳到下一个块的边界并且扫描。如果有一个错误发生,跳到下一块。作为一个边界效益,当一个日志文件的内容的一部分被作为一个记录嵌入到另外一个日志文件中,我们不会变的困惑。

2.在近似边界上分割(比如mapreduce)是简单的:找到下一个块的边界,跳过记录直到我们遇到一个FULL或FIRST记录。

3.对于大的记录我们不需要额外的缓冲区。

对比这个记录格式的缺点

1.没有对微小的记录的包装。这可能通过添加新的记录类型来修复,因此它只是当前实现的短处,不是这种格式的必须。

2.没有压缩。同样的,这也可以通过添加新的记录格式来修复。

表文件的格式

<beginning_of_file>
[data block 1]
[data block 2]
...
[data block N]
[meta block 1]
...
[meta block K]
[metaindex block]
[index block]
[Footer]        (fixed size; starts at file_size - sizeof(Footer))
<end_of_file>

这个文件包含内部的指针。每一个指针都被称作BlockHandle,包含下面的信息:

offset:   varint64
size:     varint64

参考varints,里面有varint64格式的解释。

1.键值对序列是有序存储的,被分成多个数据块。这些数据块从文件开头一个接着一个。每一个数据块被block_builder.cc中的代码格式化,之后有选择的进行压缩。

2.在数据块之后我们存储一束元块。支持的元块类型在下面描述了。更多的元块类型可能在以后会加进来。每一个元块类型也是用block_builder.cc格式化,然后之后被有选择的压缩。

3.一个“元索引”块。它包含每一个其他的元块的入口,入口的键是这个元块的名字,值是一个BlockHandle指向元块。

4.一个“索引”块。这个块包含每个数据块的入口,键是一个大于等于当前数据块的最后一个键的字符串,在连续的数据块的第一个键之前。值是这个数据块的BlockHandle。

5.在文件的最后面是一个填充满长度的脚部,包含了元索引块和索引块的BlockHandle,还有一个魔术数字。

    metaindex_handle: char[p];     // Block handle for metaindex
    index_handle:     char[q];     // Block handle for index
    padding:          char[40-p-q];// zeroed bytes to make fixed length
                                   // (40==2*BlockHandle::kMaxEncodedLength)
    magic:            fixed64;     // == 0xdb4775248b80fb57 (little-endian)

“filter”元块

在数据库打开时如果 FilterPolicy被指定,每个表都会有一个filter块。“元索引”块包含一个入口,将filter.<N>指向了 BlockHandle,对于filter块,<N>是filter 策略的
Name() 方法返回的。

filter块存储了一系列的filters,filter i包含了FilterPolicy::CreateFilter()在所有存储在文件偏移落入到范围[ ibase … (i+1)base-1 ]的块中键的输出。

当前,”base”是2KB,对这个例子来说,如果块X和Y开始在范围 [ 0KB .. 2KB-1 ],所有的在X和Y之间的键将会通过调用FilterPolicy::CreateFilter()被转换成一个filter,得到的filter将会被存储成当前filter块的第一个filter。

filter block是以下格式的:

[filter 0]
[filter 1]
[filter 2]
...
[filter N-1]

[offset of filter 0]                  : 4 bytes
[offset of filter 1]                  : 4 bytes
[offset of filter 2]                  : 4 bytes
...
[offset of filter N-1]                : 4 bytes

[offset of beginning of offset array] : 4 bytes
lg(base)                              : 1 byte

filter块的最后的偏移数组允许高效地mapping一个数据块偏移到相应的filter。

“统计”元块

这个元块包含一束统计。键是统计项的名字。值包含着统计内容。

TODO(预览):记录了以下的统计

data size
index size
key size (uncompressed)
value size (uncompressed)
number of entries
number of data blocks

leveldb文档翻译(2)-leveldb的实现

这部分是来自于leveldb的impl.md文档,详细介绍了leveldb实现方面的设计。

1、文件

leveldb其实是Bigtable的简单的实现。Bigtable论文地址:http://research.google.com/archive/bigtable.html。当然,文件的组织方法和bigtable有一些不同之处,下面会逐个介绍。

1.1 日志文件

一个日志文件(*.log)存储一系列的最近的更新。每一个更新都会添加到当前日志文件的最后。当所有日志文件达到了预定义的大小(默认是4M),它会转换成一个有序的表(参见下面介绍),一个新的日志文件被创建提供给之后的更新。

当前日志文件的副本是保存在一个内存结构中(称为memtable)。每一次读都会向这个memtable中查询,所以读操作其实还是映射到所有记录下来的更新中。

2.有序表

一个有序表(*.ldb)存储了一系列的由键排序的键值对。每个键值对要么键对应着值,要么是键对应着一个删除标记。(删除标记保存在旧的排序表中存储着过时的值)。

排序表的集合是由一系列的等级(level)组织的。排序表如果是由一个日志文件生成,就会被置为一个特别的younglevel(也被称为level-0)。当young文件的数量超过了某个特定的阈值(当前是4),所有的young level文件和所有键范围重叠的level-1文件被合并在一起产生一系列的新的level-1文件(我们每2MB的数据创建一个新的level-1文件)

属于young level的文件可能自己相互包含重叠的键,但属于其他等级的文件有唯一的不重叠的键范围。level-L(L >= 1),当level-L所有的文件大小超过了(10^L)MB(例如,level-1是10MB,level2是100MB),在level-L中取一个文件,和所有在level-(L+1)中与之重叠的文件,合并成level-(L+1)的新的文件集合。这些合并使得新的更新逐渐从young-level的文件转移到最大的等级,这个过程只需要批量的读写操作即可完成(注:最小的时间复杂度)

2.1 Manifest

一个MANIFEST文件中个列举了由各个level组成的有序表的集合,包含了相应的键范围,以及其他重要的元数据。任何时候数据库被打开,一个新的MANIFEST文件(文件名中有一个新的数字代表)被创建。MANIFEST文件和log文件的格式相同,任何发生的改变都会被添加到这个文件中(比如文件添加或者移除)。

2.2 Current

CURRENT是一个简单的文本文件,包含了最新的MANIFEST文件的名字。

2.3 信息日志(Info logs)

信息消息被打印到LOG和LOG.old文件中

2.4 其他

其他文件用来记录一些杂项。(LOCK,*.dbtmp)

3.Level 0

当一个日志文件增长到一个特定的值(默认1MB):
创建一个新的memtable和log文件,并将未来的更新放在这里。
在后台的操作:
将上一个memtable的内容写入到sstable
丢弃上一个memtable
删除旧的日志文件和旧的memtable
将新的sstable添加到young(level 0) level。

4.压实(不是对数据进行压缩,只是将数据整合到一起,并且在这个过程中会去除重复键以及删除的键)

当level L超出了文件大小的限制,我们在另外一个后台线程中压实它。这个压实操作选择level L中的一个文件以及来自下一个level L+1的所有与之重叠的文件。注意:如果一个level-L文件只和一个level-(L+1)文件部分重叠,这个文件的所有部分都被作为压实的输入,然后在压实之后被丢弃。例外:因为level-0是特别的(level-0的文件可能彼此重叠),我们特殊对待从level-0到level-1的压实:当这些level-0文件彼此重叠时,一个level-0的压实会选择多个level-0的文件。

一个压实操作合并选择的文件的内容,产生一系列的level-(L+1)文件。在当前的输出文件超过目标文件大小(2MB)时,我们切换到一个产生的新的level-(L+1)文件。在当前输出文件的键范围增长到足够大,以致于和超过10个level-(L+2)的文件产生重叠时,我们同样切换到产生的新的输出文件。最后一条准则保证了之后的一次level-(L+1)文件压实不会从level-(L+2)中选择太多的数据。

旧的文件被丢弃,新的文件被加入到服务状态。

特定level的压实通过键空间的压实。详细来说,对于每个level L,我们记住最后一次level L的压实的结尾的键。下一次level L的压实将会选择在这个键之后的第一个文件(如果没有这个文件,就从键空间的起点环绕。

压实会丢弃覆盖的值,如果没有更大的数字level包含一个范围重叠当前键的文件,也会丢弃删除标记。

4.1 时间复杂度

level-0的压实将会从level-0中读取最多4个1MB的文件,在最坏情况下读取所有的level-1文件(10MB)。这样一来,我们将会读取14MB,写入14MB。

对于其他等级的压实,我们将会从level L中选择一个2MB的文件。最坏的情况下,将会从level L+1中重叠大概12个文件(10个文件是因为level-(L+1)是level-L的10倍大,其他在两边的两个文件是因为level L的文件范围通常和level-(L+1)的文件范围是不对齐的)。压实因此会读取26MB,写入26MB。假设磁盘IO速度为100MB/s,最差的压实大概消耗0.5s

如果我们减慢后台写入,只取全速100MB/s的10%,一次压实可能耗时5s。如果用户的写入速度为10MB/s,我们可能创建许多level-0的文件(大概5*10MB总共50MB)。这将明显的增加读取消耗,因为每次读取合并了更多的文件增加了负载。

解决方案1:为了减轻这个问题,我们可能希望当level-0文件数很大的时候,增加日志切换的阈值。虽然缺点是这个阈值越大,需要越多的内存来保存相应的memtable。

解决方案2:我们可能希望当level-0的文件数变多时,人为增加写入速度。

解决方案3:我们致力于降低大量文件合并的成本。也许大多数level-0文件在缓存中都有未压缩的块,我们仅仅需要担心O(N)复杂度的合并迭代。

4.2 文件的数量

除了总是生成2MB的文件,我们也可以为更高的等级生成更大的文件,以此减少文件的总数,虽然代价是更多突发的压缩。当然,我们也可以将文件集合分片放在不同的文件夹中。

在2011年2月4日,在ex3文件系统上做了一次测试,该测试显示,在很多文件的目录中做100K次文件打开的平均用时如下:

Files in directory Microseconds to open a file
1000 9
10000 10
100000 16

因此,在现代文件系统中,也许连分片也是不需要的。

5.恢复

  • 读取CURRENT去找到最后一次提交的MANIFEST文件名
  • 读取该MANIFEST文件
  • 清除过期的文件
  • 我们可以在这里打开所有的sstables,但是懒加载可能会更好。
  • 将log块转换成新的level-0 sstable
  • 开始将恢复的序列导向到新的日志文件的新的写入流。

6.文件的垃圾回收

每次恢复的最后以及每次压缩的最后DeleteObsoleteFiles()会被调用。它会找出数据库中所有的文件的名字。然后删除所有的不是当前日志文件的日志文件。再删除所有的不涉及到某些level和不是一个活跃的压缩输出的表文件。