c++, levelDB源码阅读

leveldb源码阅读(二)—— Varint和Arena的实现

一、Varint

Varint是在leveldb中广泛使用的一种变长的整数类型,Varint其实和unicode的实现非常相似,并且是little-endian。如果当前字节的最高位是1,则表示后面的字节也属于这个整数,如果最高位是0,则表示该整数结束了。所以,一个无符号4个字节的整型数,使用Varint可能以1,2,3,4,5个字节来表示。

比如130:

无符号整型表示起来为:00000000 00000000 00000000 10000010
使用Varint表示为:10000010 00000001             #这里注意Varint是little-endian

因为牺牲了每个字节的最高位作为标志位,所以遇到大的数可能需要5个字节。

某个无符号整型:10000001 00000001 00000001 00000001
使用Varint表示:10000001 10000010 10000100 10001000 00001000

但是平均下来,Varint是绝对会节省大量的存储空间的。

那么leveldb中Varint是如何编码和解码的呢?

char* EncodeVarint32(char* dst, uint32_t v) {
  // Operate on characters as unsigneds
  unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
  static const int B = 128; //10000000
  if (v < (1<<7)) {  v < 2^7
    *(ptr++) = v;
  } else if (v < (1<<14)) { v < 2^14
    *(ptr++) = v | B;
    *(ptr++) = v>>7;
  } else if (v < (1<<21)) { //v < 2^21
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = v>>14;
  } else if (v < (1<<28)) {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = v>>21;
  } else {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = (v>>21) | B;
    *(ptr++) = v>>28;
  }
  return reinterpret_cast<char*>(ptr);
}

上面这一段对uint32_t的变量进行Varint32编码的例子,编码的结果保存在dst里,返回的ptr是dst的最后一个字节的指针。使用if-else对1、2、3、4、5个字节的情况分别做了处理。主要就是移位和或运算符的使用。跟着代码走一遍就能理解。

const char* GetVarint32PtrFallback(const char* p,
                                   const char* limit,
                                   uint32_t* value) {
  uint32_t result = 0;
  for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
    uint32_t byte = *(reinterpret_cast<const unsigned char*>(p));
    p++;
    if (byte & 128) {
      // More bytes are present
      result |= ((byte & 127) << shift);
    } else {
      result |= (byte << shift);
      *value = result;
      return reinterpret_cast<const char*>(p);
    }
  }
  return NULL;
}
inline const char* GetVarint32Ptr(const char* p,
                                  const char* limit,
                                  uint32_t* value) {
  if (p < limit) {
    uint32_t result = *(reinterpret_cast<const unsigned char*>(p));
    if ((result & 128) == 0) {
      //如果低8位是0xxxxxxx,little-endian,也就是小于128
      *value = result;    //直接赋值
      return p + 1;
    }
  }
  return GetVarint32PtrFallback(p, limit, value);
}

上面GetVarint32Ptr是解码Varint32,当值大于128时,则调用GetVarint32PtrFallback。在则调用GetVarint32PtrFallback中,利用循环一个字节一个字节的取出,最终将value指针指向结果result。

二、Arena

Arena是leveldb中管理内存分配的类。所有的内存分配都通过Arena申请,可以根据申请的内存大小使用不同的内存分配策略,也可以避免过多的内存碎片问题,并且在内存释放时统一使用Arena来释放,方便管理。

Arena类的实现并不复杂。首先看一下成员变量。

  // Allocation state
  char* alloc_ptr_;                 //指向当前块中剩余的内存起点
  size_t alloc_bytes_remaining_;    //当前块中剩余的内存

  // Array of new[] allocated memory blocks
  std::vector<char*> blocks_;       //用来保存所有new出来的char数组,释放内存时也会使用到

  // Total memory usage of the arena.
  port::AtomicPointer memory_usage_;    //通过Arena申请的内存

然后就是public的成员函数:

// Return a pointer to a newly allocated memory block of "bytes" bytes.
  char* Allocate(size_t bytes);

  // Allocate memory with the normal alignment guarantees provided by malloc
  char* AllocateAligned(size_t bytes);

  // Returns an estimate of the total memory usage of data allocated
  // by the arena.
  size_t MemoryUsage() const {
    return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
  }

功能上很简单,Allocate用来申请指定大小的内存,AllocateAligned用来申请保证内存对齐的内存空间,MemoryUsage用来获取内存使用情况。

我对内存对齐的理解:现在假定内存对齐的最小单位是8个字节,那么如果你只申请14个字节,内存对齐会多给你分配2个字节凑成16个字节。内存对齐的好处就是访问速度的提升,在CPU一次读取8个字节的情况下。在没有内存对齐的情况下,你要读取第6~12个字节,就需要读取两次,第一次0~7个字节,通过移位取出6~7,再读取第8~15个字节,取出8~12。这样就需要读取两次。而如果在内存对齐的情况下,你需要读取的6~12个字节应该被放在8~15个字节中。只需读取一次即可。

私有的成员函数有:

char* AllocateFallback(size_t bytes);
char* AllocateNewBlock(size_t block_bytes);

这两个函数提供了具体的内存分配的实现。

具体分析这5个成员函数。

1.Allocate(size_t bytes)

inline char* Arena::Allocate(size_t bytes) {
  // The semantics of what to return are a bit messy if we allow
  // 0-byte allocations, so we disallow them here (we don't need
  // them for our internal use).
  assert(bytes > 0);
  if (bytes <= alloc_bytes_remaining_) {
    char* result = alloc_ptr_;
    alloc_ptr_ += bytes;
    alloc_bytes_remaining_ -= bytes;
    return result;
  }
  return AllocateFallback(bytes);
}

Allocate是一个内联函数。内联函数的好处在于编译时会直接在调用处展开,对于程序执行速度有一定提升,但是也会导致程序大小变大。
assert(bytes > 0);是一个断言语句,断言语句可以在编译时由编译器去除,但是在调试的时候可以帮助程序员发现问题所在。这里用来保证没有出现0个字节分配的情况。
后面的判断语句用来判断当前块的剩余空间是否够分配,如果够则分配。如果不够则交给AllocateFallback()处理。返回的是分配的空间起点。

2.AllocateAligned(size_t bytes)

char* Arena::AllocateAligned(size_t bytes) {
  const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
  assert((align & (align-1)) == 0);   // Pointer size should be a power of 2
  size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
  size_t slop = (current_mod == 0 ? 0 : align - current_mod);
  size_t needed = bytes + slop;
  char* result;
  if (needed <= alloc_bytes_remaining_) {
    result = alloc_ptr_ + slop;
    alloc_ptr_ += needed;
    alloc_bytes_remaining_ -= needed;
  } else {
    // AllocateFallback always returned aligned memory
    result = AllocateFallback(bytes);
  }
  assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
  return result;
}

AllocateAligned(size_t bytes)用来做内存对齐的分配。
const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;用来获取分配的最小单位,如果指针大小大于8,则取指针大小,否则使用8作为最小的对齐单位。
assert((align & (align-1)) == 0);这是一个很有意思的判断,可以保证只有2的n次方才能满足。

size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);current_mod是余数,比如当前指针地址是15,align是8,那么余数为7,这个余数就是alloc_prt_&(align-1)的值。
size_t slop = (current_mod == 0 ? 0 : align - current_mod);因为当前指针可能不是align的整数倍,slop即代表当前指针需要偏移的大小,来保证是align的整数倍。
size_t needed = bytes + slop; needed代表需要分配的空间。

之后的判断则和Allocate()函数做法相同了。需要注意的是返回的result指针是align的整数倍,这样就能保证以最少的内存读取次数来获取想要的数据。

3.AllocateFallback(size_t bytes)

char* Arena::AllocateFallback(size_t bytes) {
  if (bytes > kBlockSize / 4) {
    //如果需要分配的bytes大于块大小的1/4,则单独在新的块中分配,以此避免浪费过多的剩下的空间(这样能保证浪费的空间小与kBlockSize / 4)
    char* result = AllocateNewBlock(bytes);
    return result;
  }

  // 重新开辟新的块空间进行分配,上一个块剩下的空间都浪费了。
  alloc_ptr_ = AllocateNewBlock(kBlockSize);
  alloc_bytes_remaining_ = kBlockSize;

  char* result = alloc_ptr_;
  alloc_ptr_ += bytes;
  alloc_bytes_remaining_ -= bytes;
  return result;
}

上面两个成员函数在当前块分配空间不足时,都将分配任务交给AllocateFallback处理。

4.AllocateNewBlock(size_t block_bytes)。

char* Arena::AllocateNewBlock(size_t block_bytes) {
  char* result = new char[block_bytes];
  blocks_.push_back(result);
  memory_usage_.NoBarrier_Store(
      reinterpret_cast<void*>(MemoryUsage() + block_bytes + sizeof(char*)));
  return result;
}

使用new char来申请一个新的块,并将块的起始指针存入blocks_中。更新memory_usage的大小。

5.MemoryUsage()
size_t MemoryUsage() const {
return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
}

获取memory_usage_中存储的使用空间数据。

三、Memeory Barrier(内存屏障)

memory_usage_的类型是AtomicPointer,这是一个原子类型,针对不同平台有不同的实现。可以用来线程安全的存储和获取值。

AtomicPointer类实现了几个函数:

NoBarrier_Load();
NoBarrier_Store(void* v)
Acquire_Load();
Release_Store(void* v)

这里面涉及到一个Memory Barrier(内存屏障,Memory fence)的概念。

引用一个wikipedia的例子:

Processor #1:

while (f == 0);
// Memory fence required here
print x;

Processor #2:

x = 42;
// Memory fence required here
f = 1;

一个两核心的CPU按照上述例子执行。可能会出现很多种情况:
虽然我们希望的是打印“42”。但是如果核心#2存储操作乱序执行(out-of-order),就有可能f在x之前被更新了。这样打印语句就可能打印出“0”。同样的,核心#1读取操作可能也乱序执行,x就有可能在f被判断之前读取了,打印语句可能打印出一个无法预期的值。对于大多数程序来说,上面的这些情况都是不可以接受的。使用memory barrier可以避免out-of-order的问题。

简而言之就是,在多CPU共享内存空间的情况下,两条语句的执行顺序已经无法保证了。需要memory barrier来保证执行顺序的正确。

当然,单CPU是不会出现这个问题的。

现在再来解释上面的4个函数的作用。

NoBarrier_Load();           //不使用内存屏障的读取
NoBarrier_Store(void* v)    //不使用内存屏障的存储
Acquire_Load();             //使用内存屏障的读取
Release_Store(void* v)      //使用内存屏障的存储
Be the First to comment.

Leave a Comment

电子邮件地址不会被公开。 必填项已用*标注