c++11实现的跨平台定时器

一、引言

在一个比较大的系统当中,很多地方都需要定时功能,比如某些存在内存中的数据,需要定时的做持久化(实时持久化可能带来很大的性能问题),如果这个系统要求跨平台。那么实现一个跨平台的定时器就很有必要了。
c++11的标准中,实现了多线程,头文件为<thread>,也有了系统时间相关的库,头文件为<chrono>,这使得使用c++11实现跨平台的定时器有了可能。在这之前,多线程一般都是使用操作系统相关的库。

二、 定时器的基本功能

在定时器的实现之前,需要事先计划好一个定时器的基本功能。大概的功能如下:
1.添加一个定时事件
2.删除一个指定的定时事件
3.定时事件支持只执行一次或者重复执行
4.定时器启动指令
这个部分的内容在Linux 下定时器的实现方式分析中有很详细的介绍

三、 数据结构知识预备

在实现定时器中,需要有一个容器去存储所有的定时事件,比如说最容易想到的数组,如果我们用数组保存定时事件,那么在找出当前需要执行的定时事件的伪代码就是

    for(i = 0 ; i < events_array_size; ++i){
        if(events[i].time <= current_time){
            exec_in_child_thread(events);
        }
    }

这里就是遍历所有的事件,将已经到了执行时间的事件放到子线程中去执行。

在这里有一个可以优化效率问题:注意到每一次查询是否有事件到了执行时间,都会遍历整个数组,也就是O(n)的复杂度,并且查询周期也是非常短的,所以会有非常大的性能问题。因此,我们可以将这里的数组变成有序的,每一次只需查询第一个事件,如果还没到执行时间,那么之后的事件就不用遍历了。这样平均下来就是O(1)的时间复杂度。考虑到我们在插入,删除时始终要保证数组的有序,可以采用最小堆。也就是堆顶始终是最先到执行时间的一项。

四、c++11多线程的知识预备

4.1 join()还是detach()

c++11中创建一个线程很容易

#include <iostream>
#include <thread>

void function_1(){
    std::cout << "hello world" << std::endl;
    while(true){}
}

int main(){
    std::thread t1(function_1);
    t1.join();      //加入运行

    std::cout << "hahah" << std::endl;

    return 0;
}

这段代码就是创建了一个t1的线程,来执行function_1函数。注意到我们这里使用了t1.join(),字面上的意思是将子线程加入的主线程来执行,这样导致的结果是主线程会在t1.join()这里停止,直到t1执行结束后再往下执行。在这里function_1中存在死循环,所以hahah永远不会被输出。而如果使用t1.detach()的话,即将子线程从主线程分离出去,那么主线程就会一直向下执行,在这里就会执行到return 0,然后主线程销毁,detach出去的子线程可能就没有任何显示的机会了(查过资料说detach出去子线程的行为的undefined的)。

在这个定时器的实现中,当然不能使用join(),因为会阻塞程序的正常运行。

4.2 如何使得定时器的使用是线程安全的。

在一个程序中,可能很多地方会使用到定时器去触发某个动作,但是为了效率,肯定不会创建多个定时器实例,因此定时器的实现应该是单例的。因为需要单例,因此定时器的创建,定时器中定时事件的添加和删除都是线程安全的(因为同一时间可能会有多个线程操作定时器)。

4.2.1 线程安全的单例模式

单例模式的实现方法有很多种,因为能力问题,不对这一点多做叙述。可以看这一篇单例模式(Singleton)及其C++实现

我这里使用的是使用static来创建局部可见的全局变量

static Timer* getInstance(std::chrono::milliseconds tick){
    static Timer timer(tick);
    return &timer;
}

4.2.2 使用锁保证定时事件的添加和删除是线程安全的。

在往定时器中添加和删除定时事件时,就是在做最小堆的添加和删除。这个中间线程不安全的现象为:以添加为例,往最小堆数组末尾加入一个数据,然后再逐层向上调整。如果在这个调整的过程中,另外一个线程执行了删除操作,两个调整过程就可能同时发生,造成不可预知的错误,使得最小堆并不有序。

所以添加和删除操作应该是互斥的,只有其中一个操作执行结束,另一个操作才可以开始。c++中可以使用信号量mutex和锁locker轻松的完成互斥的需求。示例代码如下:

#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <fstream>

class LofFile{
    public:
        LofFile(){
            f.open("log.txt");
        }

        void shared_print(std::string id,int value){
            std::lock(m_mutex,m_mutex2);
            std::lock_guard<std::mutex> locker(m_mutex,std::adopt_lock);
            std::lock_guard<std::mutex> locker2(m_mutex2,std::adopt_lock);
            std::cout << "from" << id << ":" << value << std::endl;
        }

        void shared_print2(std::string id,int value){
            std::lock(m_mutex,m_mutex2);
            std::lock_guard<std::mutex> locker2(m_mutex2,std::adopt_lock);      //lock_guard是为了防止语句执行中出现异常,锁不被释放。这样做可以保证在退出代码块后解锁
            std::lock_guard<std::mutex> locker(m_mutex,std::adopt_lock);
            std::cout << "from" << id << ":" << value << std::endl;
        }

    protected:
    private:
        std::mutex m_mutex;     //使用信号量来解决资源竞争
        std::mutex m_mutex2;
        std::ofstream f;

};

void function_1(LofFile& log){
    for(int i = 0; i > -100; i--){
        log.shared_print("From t1:",i);
    }
};

int main(){
    LofFile log;
    std::thread t1(function_1,std::ref(log));//线程开始运行
    for(int i = 0; i < 100; i++){
        log.shared_print2("From main",i);
    }

    t1.join();
    return 0;
}

五、总结与实现

分析到这里,一个定时器的基本要素已经具备了。
1.用来存储定时事件,并且添加和删除操作都是线程安全的最小堆
2.可以多线程实现定时事件的执行从而不阻塞主线程。

下面贴出所有的实现代码:

5.1 最小堆的实现。

SortedHeap.hpp

/**
 * 排序堆的实现
 * 堆使用的是完全二叉树的结构,使用数组(数组从零开始)保存,那么最后一个非叶子节点为(n - 1) / 2
 * 堆的构建一直都是在有序的基础上的,那么每次调整只需比较i和(i - 1) / 2的元素,依次上推
 * 支持任意类的排序
 * 当前还不支持多线程环境下的使用
 * author:jiangpengfie
 * date:2017-05-09
 */
#ifndef SORTEDHEAP_H
#define SORTEDHEAP_H
#include <iostream>
#include <vector>
#include <functional>
#include <memory>
#include <mutex>
#include <condition_variable>
#include "src/core/util/util.h"


template<class T>
class SortedHeap{
    private:
        struct HeapNode{
            unsigned int id;
            T obj;
            HeapNode(unsigned int id,T t):obj(t){
                this->id = id;
            }
        };
        std::vector<HeapNode> heap;
        unsigned int autoIncrementId;
        std::function<bool(T& ,T&)> cmp;    //比较函数,实现选择构造最大堆还是最小堆
        std::mutex mu1;                
        std::mutex mu2;                

        /**
         * 插入节点后调整堆中不符合的节点
         */
        void adjustAfterInsert();

        /**
         * pop出堆顶元素后调整堆中不符合的节点
         */
        void adjustAfterPopTop();

        /**
         * 删除节点后调整堆中不符合的节点
         * @param i 删除的节点id
         */
        void adjustAfterDelete(int id);         

        void swap(HeapNode& t1,HeapNode& t2);

        void deleteNodeByPos(const unsigned int pos);
    public:
        /**
         * 构造函数
         * @param cmp 用来比较
         */
        SortedHeap(std::function<bool(T&,T&)> cmp);
        /**
         * 插入节点
         * @param node 插入的节点
         */
        unsigned int insertNode(T& node);
        /**
         * 删除节点,时间复杂度为O(n)
         * @param id  要删除的节点id
         */
        void deleteNode(unsigned int id);

        /**
         * pop最小的节点
         * @return T* 返回的最顶部的节点指针
         */
        std::unique_ptr<T> popTopNode();

        /**
         * 获取最顶部的节点
         * @return T 最顶部的节点指针
         */
        std::unique_ptr<T> getTopNode();

        /**
         * 删除顶部的节点
         *
         */
        void deleteTopNode();
};

template<typename T>
SortedHeap<T>::SortedHeap(std::function<bool(T&,T&)> cmp){
    this->cmp = cmp;
    this->autoIncrementId = 0;
}

template<typename T>
void SortedHeap<T>::swap(HeapNode& t1,HeapNode& t2){
    HeapNode tmp = t1;
    t1 = t2;
    t2 = tmp;
}

template<typename T>
void SortedHeap<T>::adjustAfterInsert(){
    int last = this->heap.size() - 1;
    int flag = true;
    //从插入的节点位置开始向上调整
    while(last > 0 && flag){
        if(this->cmp(this->heap[last].obj,this->heap[(last - 1) / 2].obj)){
            this->swap(this->heap[(last - 1) / 2],this->heap[last]);
        }else{
            //不需要调整了
            flag = false;
        }
        last = (last - 1) / 2;
    }
}

template<typename T>
void SortedHeap<T>::adjustAfterDelete(int pos){
    //从pos位置开始向下调整
    int last = this->heap.size() - 1;
    if(last == 0)
        return;     //最后一个不需要调整
    bool flag = true;   //标记是否需要调整
    while(pos <= (last - 1) / 2 && flag){
        //一直调整到最后一个非叶子结点
        int topNum = 0;     //记录最小的结点编号

          //(pos + 1) * 2 - 1是左孩子,pos是父
        if(this->cmp(this->heap[(pos + 1) * 2 - 1].obj,this->heap[pos].obj)){
            topNum = (pos + 1) * 2 - 1;
        }else{
            topNum = pos;
        }

        if((pos + 1) * 2 <= last){
            //如果存在右结点
            if(this->cmp(this->heap[(pos + 1) * 2].obj,this->heap[topNum].obj)){
                topNum = (pos + 1) * 2;
            }
        }

        //看看topNum是不是自己
        if(pos == topNum){
            //是自己就不用调整了
            flag = false;
        }else{
            //交换
            this->swap(this->heap[pos],this->heap[topNum]);
        }
        pos = topNum;
    }
}


template<typename T>
void SortedHeap<T>::deleteNodeByPos(const unsigned int pos){
    unsigned int last = this->heap.size() - 1;
    if(pos > last){
        return;
    }
    std::lock(mu1,mu2);             //上锁
    std::lock_guard<std::mutex> locker1(mu1,std::adopt_lock);
    std::lock_guard<std::mutex> locker2(mu2,std::adopt_lock);
    //与最后一个交换
    swap(this->heap[pos],this->heap[last]);
    //删除最后一个
    this->heap.pop_back();      

    this->adjustAfterDelete(pos);
}



template<typename T>
unsigned int SortedHeap<T>::insertNode(T& node){
    HeapNode hNode(this->autoIncrementId++,node);
    std::lock(mu1,mu2);             //上锁
    std::lock_guard<std::mutex> locker1(mu1,std::adopt_lock);
    std::lock_guard<std::mutex> locker2(mu2,std::adopt_lock);
    this->heap.push_back(hNode);     //先将node放在最后一位
    if(this->heap.size() != 1){
        //如果大小不等于1,则在新增节点后调整
        this->adjustAfterInsert();
    }
    return this->autoIncrementId - 1;
}


template<typename T>
void SortedHeap<T>::deleteNode(unsigned int id){
    for(unsigned int i = 0; i < this->heap.size(); i++){
        if(heap[i].id == id){
            //找到了id
            this->deleteNodeByPos(i);
            break;
        }
    }


}

template<typename T>
std::unique_ptr<T> SortedHeap<T>::popTopNode(){
    if(this->heap.size() != 0){
        std::unique_ptr<T> top(new T(this->heap[0].obj));
        this->deleteNodeByPos(0);
        return top;
    }else{
        std::unique_ptr<T> p = nullptr;
        return p;
    }
}

template<typename T>
std::unique_ptr<T> SortedHeap<T>::getTopNode(){
    if(this->heap.size() != 0){
        std::unique_ptr<T> top(new T(this->heap[0].obj));
        return top;
    }else{
        std::unique_ptr<T> p = nullptr;
        return p;
    }
}

template<typename T>
void SortedHeap<T>::deleteTopNode(){
   if(this->heap.size() != 0){
        this->deleteNodeByPos(0);
    }
}

#endif



5.2 定时器的实现

Timer.h

/**
 * 定时器的实现
 * 支持int setTimer(T interval,function action):设置一个定时器,指定间隔interval和回调函数action,返回定时器id
 * 支持void deleteTimer(int timerId):删除一个定时器
 * 数据结构:最小堆模型,按照定时器触发的时间排序
 * author:jiangpengfei
 * date:2017-05-09
 */
#ifndef TIMER_H
#define TIMER_H
#include <iostream>
#include <chrono>
#include <functional>
#include <thread>
#include <memory>
#include "SortedHeap.hpp"

class Timer{
    private:
        std::chrono::milliseconds tick;
        double timeline;     //当前时间线,long double的字节数为12
        bool isStart;        //标志当前定时器的启动状态
        struct SchedulerEvent{
          unsigned int id;                   //定时事件的唯一标示id
          double interval;                   //事件的触发间隔,在重复事件中会用到这个属性
          double deadline;                   //定时事件的触发时间
          std::function<void()> action;      //触发的事件
          bool isRepeat;                     //是否是重复执行事件
          SchedulerEvent( double interval, double timeline,std::function<void()> action,bool isRepeat){
              this->interval = interval;
              this->deadline = interval + timeline;
              this->action = action;
              this->isRepeat = isRepeat;
          }
        };

        SortedHeap<SchedulerEvent> eventQueue;

        /**
         * 执行到达期限的定时器
         */
        void loopForExecute();

        //私有的构造函数
        Timer(std::chrono::milliseconds tick):eventQueue(
            [](SchedulerEvent& a,SchedulerEvent& b){
                return a.deadline < b.deadline;
            }
        ){
            this->timeline = 0;
            this->tick = tick;
            this->isStart = false;
        }

    public:

        //单例模式
        static Timer* getInstance(std::chrono::milliseconds tick){
            static Timer timer(tick);
            return &timer;
        }

        /**
         * 设置定时器
         * @param interval 定时间隔
         * @param action 定时执行的动作
         * @param isRepeat 是否重复执行,默认不重复执行
         * @return unsigned int 定时器的id,可以根据这个id执行删除操作
         */
        unsigned int addEvent(double interval,std::function<void()> action,bool isRepeat = false);

        /**
         * 删除定时器
         * @param timerId 定时器id
         *
         */
        void deleteEvent(unsigned int timerId);

        /**
         * 同步执行启动定时器
         */
         void syncStart();

         /**
         * 异步执行启动定时器
         */
         void asyncStart();

};


#endif

Timer.cpp

#include "src/core/util/Timer.h"

unsigned int Timer::addEvent(double interval,std::function<void()> action,bool isRepeat){
    SchedulerEvent event(interval,this->timeline,action,isRepeat);
    return this->eventQueue.insertNode(event);
}

void Timer::deleteEvent(unsigned int timerId){
    this->eventQueue.deleteNode(timerId);
}

void Timer::loopForExecute(){
    std::unique_ptr<SchedulerEvent> top = this->eventQueue.getTopNode();
    while(top != nullptr && top->deadline <= this->timeline){
        //如果已经到了执行的时间,新开一个子线程执行任务
        std::thread t(top->action);
        t.detach();    //子线程分离

        if(top->isRepeat){
            //如果是重复事件,则重新添加
            this->addEvent(top->interval,top->action,top->isRepeat);
        }

        //从堆中删除
        this->eventQueue.deleteTopNode();
        top = this->eventQueue.getTopNode();
    }
    //执行一次后等待一个周期
    std::this_thread::sleep_for(this->tick);
    //周期增1
    this->timeline++;
}

void Timer::asyncStart(){
    if(!this->isStart){
        std::thread daemon_thread(&Timer::syncStart,this);
        daemon_thread.detach();     //从当前主线程分离
    }
}

void Timer::syncStart(){
    if(!this->isStart){
        while(1)
            this->loopForExecute();
    }
}

测试执行的代码

#include <iostream>
#include <chrono>
#include <ctime>
#include <iomanip>
#include <string>
#include <functional>
#include <thread>
#include <memory>
#include <fstream>
#include "src/core/util/Timer.h"

void myprint(std::string msg){
    std::ofstream of("timer.txt", std::ios::app);
    std::thread::id this_id = std::this_thread::get_id();
    auto t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
    of << "From Thread " << this_id << "at time " << std::put_time(std::localtime(&t), "%Y-%m-%d %H.%M.%S") << ":" << msg << std::endl;
}

int main(){
    std::chrono::milliseconds tick(2000);       //1000毫秒作为一个周期
    Timer* timer = Timer::getInstance(tick);
    std::function<void()> f1 = std::bind(myprint,"第一个加入,10tick后执行");
    std::function<void()> f2 = std::bind(myprint,"第二个加入,被删除不执行");
    std::function<void()> f3 = std::bind(myprint,"第三个加入,每5tick重复执行");
    std::function<void()> f4 = std::bind(myprint,"第四个加入,5tick后执行");
    std::function<void()> f5 = std::bind(myprint,"第五个加入,5tick后执行");
    std::function<void()> f6 = std::bind(myprint,"第六个加入,5tick后执行");
    std::function<void()> f7 = std::bind(myprint,"第七个加入,5tick后执行");
    std::function<void()> f8 = std::bind(myprint,"第八个加入,5tick后执行");
    std::function<void()> f9 = std::bind(myprint,"第九个加入,5tick后执行");
    std::function<void()> f10 = std::bind(myprint,"第十个加入,5tick后执行");
    std::function<void()> f11 = std::bind(myprint,"第十一个加入,15tick后执行");
    std::function<void()> f12 = std::bind(myprint,"第十二个在执行后加入,20tick+5s后执行");

    timer->addEvent(10,f1);
    int id = timer->addEvent(11,f2);
    timer->addEvent(5,f3,true);
    timer->addEvent(5,f4);
    timer->addEvent(5,f5);
    timer->addEvent(5,f6);   
    timer->addEvent(5,f7);
    timer->addEvent(5,f8);
    timer->addEvent(5,f9);
    timer->addEvent(5,f10);
    timer->addEvent(15,f11);

    timer->deleteEvent(id);

    myprint("线程开始启动,每tick是2秒");

    //异步执行,程序退出后计时器也会终止,因此在下面使用while循环保证程序不会退出
    timer->asyncStart();
    //timer->syncStart();


    //休眠5秒钟
    std::this_thread::sleep_for(std::chrono::seconds(5));   
    //应该在大概20*tick+5秒后执行,
    //TODO 执行后加入的定时器不对
    timer->addEvent(20,f12);

    getchar();

    return 0;
}