博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
muduo 源码分析 EventLoop 类
阅读量:3940 次
发布时间:2019-05-24

本文共 5450 字,大约阅读时间需要 18 分钟。

文章目录

EventLoop 用于监听文件描述符上的事件,EventLoop不进行事件的分发,只通知相应的事件分发器有事件产生,有相应的事件分发器进行分发。
对于一个服务器来说,可能有多个EventLoop进行事件的监听,但是一个线程只有一个EventLoop。当一个新的文件描述符产生的时候,哪个EventLoop进行监听,有一定的算法进行分配,在EventLoopThreadPool类中有说明。
在这里插入图片描述
EventLoopThreadPool用于管理所有的EventLoop。

构造与析构

EventLoop::EventLoop()  : looping_(false),    quit_(false),    //正在处理事件    eventHandling_(false),    callingPendingFunctors_(false),    iteration_(0),    //返回当前线程的ID     threadId_(CurrentThread::tid()),    poller_(Poller::newDefaultPoller(this)),    //定时器    timerQueue_(new TimerQueue(this)),    //eventfd 用于线程间的唤醒通知,    wakeupFd_(createEventfd()),    //创建一个Channel 分发eventfd文件描述符上产生的事件    wakeupChannel_(new Channel(this, wakeupFd_)),    currentActiveChannel_(NULL){
LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_; if (t_loopInThisThread) {
LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } else {
t_loopInThisThread = this; } //设置回调函数 wakeupChannel_->setReadCallback( std::bind(&EventLoop::handleRead, this)); // we are always reading the wakeupfd //设置当前的Channel 的读事件,并加入到EventLoop 循环中 //每个EventLoop 监听 wakeupChannel_->enableReading();}//析构函数EventLoop::~EventLoop(){
LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_ << " destructs in thread " << CurrentThread::tid(); //移除所有监听的Channel wakeupChannel_->disableAll(); wakeupChannel_->remove(); //wakeupFd_时EventLoop 所在的线程单独管理的文件描述符, //所以在这单独关闭 ::close(wakeupFd_); t_loopInThisThread = NULL;}

loop 监听所有的文件描述符

void EventLoop::loop(){
assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; // FIXME: what if someone calls quit() before loop() ? LOG_TRACE << "EventLoop " << this << " start looping"; //如果没有人调用quit() 函数, 下面将进入循环处理就绪的事件 while (!quit_) {
//原来的就绪的事件先清空 activeChannels_.clear(); //调用poll 函数开始监听所有的套接字 //当有就绪事件产生的时候返回就是事件发生的事件,并且将所有有就绪事件的Channel 放到 //activeChannels_中 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); ++iteration_; //记录日志 if (Logger::logLevel() <= Logger::TRACE) {
printActiveChannels(); } // TODO sort channel by priority //之间正在处理的状态置为true eventHandling_ = true; //poller 中的fillActiveChannel 将就绪的事件已经添加到了 //activeChannels_中 //处理就绪的事件 for (Channel* channel : activeChannels_) {
currentActiveChannel_ = channel;// currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; eventHandling_ = false; //执行pendingFunctors_队列中的函数,当前线程要执行的任务的集合 doPendingFunctors(); } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false;}

quit 退出监听状态

void EventLoop::quit(){
//将退出的状态置为true quit_ = true; // There is a chance that loop() just executes while(!quit_) and exits, // then EventLoop destructs, then we are accessing an invalid object. // Can be fixed using mutex_ in both places. //如果一直没有事件发生,此时EventLoop 处于poll 的等待状态,则线程无法正常退出 //wakeup 向wakeFd_写入数据,此时唤醒了线程,可以正常退出 if (!isInLoopThread()) {
wakeup(); }}

执行监听套接字以外的任务

例如:添加一个文件描述符,删除一个文件描述符等。

void EventLoop::runInLoop(Functor cb){
//当前线程是不是EventLoop 所在的线程 if (isInLoopThread()) {
cb(); } //处于poll 的等待状态,则将当前事件放入pendingFunctors_队列中 else {
queueInLoop(std::move(cb)); }}void EventLoop::queueInLoop(Functor cb){
{
MutexLockGuard lock(mutex_); pendingFunctors_.push_back(std::move(cb)); } if (!isInLoopThread() || callingPendingFunctors_) {
wakeup(); }}void EventLoop::doPendingFunctors(){
std::vector
functors; callingPendingFunctors_ = true; {
MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); } //执行当前线程要执行的任务 for (const Functor& functor : functors) {
functor(); } callingPendingFunctors_ = false;}

关于定时器的函数

//添加一个定时器对象在time 时刻触发一次,调用cd 回调函数TimerId EventLoop::runAt(Timestamp time, TimerCallback cb){
return timerQueue_->addTimer(std::move(cb), time, 0.0);}TimerId EventLoop::runAfter(double delay, TimerCallback cb){
Timestamp time(addTime(Timestamp::now(), delay)); return runAt(time, std::move(cb));}//添加一个定时器对象在time 触发,调用cd 回调函数,之后隔一段事件触发一次TimerId EventLoop::runEvery(double interval, TimerCallback cb){
Timestamp time(addTime(Timestamp::now(), interval)); return timerQueue_->addTimer(std::move(cb), time, interval);}//取消一个定时器void EventLoop::cancel(TimerId timerId){
return timerQueue_->cancel(timerId);}

更新一个Channel和删除一个Channel

void EventLoop::updateChannel(Channel* channel){
assert(channel->ownerLoop() == this); assertInLoopThread(); //调用Poller的updateChannel函数 poller_->updateChannel(channel);}void EventLoop::removeChannel(Channel* channel){
assert(channel->ownerLoop() == this); assertInLoopThread(); if (eventHandling_) {
assert(currentActiveChannel_ == channel || std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end()); } poller_->removeChannel(channel);}

wakeup 唤醒正在监听的线程

void EventLoop::wakeup(){
uint64_t one = 1; //用于唤醒一个线程 ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); if (n != sizeof one) {
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; }}//唤醒线程后执行的read 函数void EventLoop::handleRead(){
uint64_t one = 1; ssize_t n = sockets::read(wakeupFd_, &one, sizeof one); if (n != sizeof one) {
LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; }}

转载地址:http://nanwi.baihongyu.com/

你可能感兴趣的文章
类thinkpad笔记本安装deepinv20后启动黒屏的解决
查看>>
利用本地centos镜像升级centOS
查看>>
FreeBSD常用操作
查看>>
VC及esxi升级的必要性和步骤
查看>>
hp DL338服务器修改ilo管理地址
查看>>
vmware convert P2V 错误二三事
查看>>
让kali2020中的zsh有补完功能
查看>>
python解开压缩文件6位纯数字密码
查看>>
5620系列密码清除
查看>>
vncsever-centos&debian
查看>>
华为snmp模板
查看>>
kvm&xen挂载镜像文件
查看>>
华为路由器配置NAT使内网用户通过外网IP地址方式访问内网服务器示例
查看>>
virt命令
查看>>
15个保障服务器安全的方法:
查看>>
在VMware Workstation 中部署VCSA6.5
查看>>
openstack&ceph
查看>>
ME60 双机热备 奇偶mac负载分担
查看>>
oracle11G安装en
查看>>
关于丢失或者损坏etc/fstab文件后
查看>>