本文共 5450 字,大约阅读时间需要 18 分钟。
EventLoop::EventLoop() : looping_(false), quit_(false), //正在处理事件 eventHandling_(false), callingPendingFunctors_(false), iteration_(0), //返回当前线程的ID threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), //定时器 timerQueue_(new TimerQueue(this)), //eventfd 用于线程间的唤醒通知, wakeupFd_(createEventfd()), //创建一个Channel 分发eventfd文件描述符上产生的事件 wakeupChannel_(new Channel(this, wakeupFd_)), currentActiveChannel_(NULL){ LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_; if (t_loopInThisThread) { LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } else { t_loopInThisThread = this; } //设置回调函数 wakeupChannel_->setReadCallback( std::bind(&EventLoop::handleRead, this)); // we are always reading the wakeupfd //设置当前的Channel 的读事件,并加入到EventLoop 循环中 //每个EventLoop 监听 wakeupChannel_->enableReading();}//析构函数EventLoop::~EventLoop(){ LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_ << " destructs in thread " << CurrentThread::tid(); //移除所有监听的Channel wakeupChannel_->disableAll(); wakeupChannel_->remove(); //wakeupFd_时EventLoop 所在的线程单独管理的文件描述符, //所以在这单独关闭 ::close(wakeupFd_); t_loopInThisThread = NULL;}
void EventLoop::loop(){ assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; // FIXME: what if someone calls quit() before loop() ? LOG_TRACE << "EventLoop " << this << " start looping"; //如果没有人调用quit() 函数, 下面将进入循环处理就绪的事件 while (!quit_) { //原来的就绪的事件先清空 activeChannels_.clear(); //调用poll 函数开始监听所有的套接字 //当有就绪事件产生的时候返回就是事件发生的事件,并且将所有有就绪事件的Channel 放到 //activeChannels_中 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); ++iteration_; //记录日志 if (Logger::logLevel() <= Logger::TRACE) { printActiveChannels(); } // TODO sort channel by priority //之间正在处理的状态置为true eventHandling_ = true; //poller 中的fillActiveChannel 将就绪的事件已经添加到了 //activeChannels_中 //处理就绪的事件 for (Channel* channel : activeChannels_) { currentActiveChannel_ = channel;// currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; eventHandling_ = false; //执行pendingFunctors_队列中的函数,当前线程要执行的任务的集合 doPendingFunctors(); } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false;}
void EventLoop::quit(){ //将退出的状态置为true quit_ = true; // There is a chance that loop() just executes while(!quit_) and exits, // then EventLoop destructs, then we are accessing an invalid object. // Can be fixed using mutex_ in both places. //如果一直没有事件发生,此时EventLoop 处于poll 的等待状态,则线程无法正常退出 //wakeup 向wakeFd_写入数据,此时唤醒了线程,可以正常退出 if (!isInLoopThread()) { wakeup(); }}
例如:添加一个文件描述符,删除一个文件描述符等。
void EventLoop::runInLoop(Functor cb){ //当前线程是不是EventLoop 所在的线程 if (isInLoopThread()) { cb(); } //处于poll 的等待状态,则将当前事件放入pendingFunctors_队列中 else { queueInLoop(std::move(cb)); }}void EventLoop::queueInLoop(Functor cb){ { MutexLockGuard lock(mutex_); pendingFunctors_.push_back(std::move(cb)); } if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); }}void EventLoop::doPendingFunctors(){ std::vectorfunctors; callingPendingFunctors_ = true; { MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); } //执行当前线程要执行的任务 for (const Functor& functor : functors) { functor(); } callingPendingFunctors_ = false;}
//添加一个定时器对象在time 时刻触发一次,调用cd 回调函数TimerId EventLoop::runAt(Timestamp time, TimerCallback cb){ return timerQueue_->addTimer(std::move(cb), time, 0.0);}TimerId EventLoop::runAfter(double delay, TimerCallback cb){ Timestamp time(addTime(Timestamp::now(), delay)); return runAt(time, std::move(cb));}//添加一个定时器对象在time 触发,调用cd 回调函数,之后隔一段事件触发一次TimerId EventLoop::runEvery(double interval, TimerCallback cb){ Timestamp time(addTime(Timestamp::now(), interval)); return timerQueue_->addTimer(std::move(cb), time, interval);}//取消一个定时器void EventLoop::cancel(TimerId timerId){ return timerQueue_->cancel(timerId);}
void EventLoop::updateChannel(Channel* channel){ assert(channel->ownerLoop() == this); assertInLoopThread(); //调用Poller的updateChannel函数 poller_->updateChannel(channel);}void EventLoop::removeChannel(Channel* channel){ assert(channel->ownerLoop() == this); assertInLoopThread(); if (eventHandling_) { assert(currentActiveChannel_ == channel || std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end()); } poller_->removeChannel(channel);}
void EventLoop::wakeup(){ uint64_t one = 1; //用于唤醒一个线程 ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; }}//唤醒线程后执行的read 函数void EventLoop::handleRead(){ uint64_t one = 1; ssize_t n = sockets::read(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; }}
转载地址:http://nanwi.baihongyu.com/