wokerman 启动分析
@(学习)[workerman, php]
前期想说的
也是最近才看的代码,遇到不懂得地方就去google,所以这篇文章里面穿插了很多参考资料,可以直接点击阅览。
需要了解一些知识pcntl
、posix
、libevent
,然后我们从服务的启动开始来看。
启动
runAll
顾名思义,运行所有的,注释中也写了,Run all worker instances,运行所有的实例,也就是说脚本中可以同时new
多个worker服务,这也是后面一个重要的$workers
包含了每一个$worker
都是一个服务实例。然后会根据每一个实例初始化count
个子进程。
/** * Run all worker instances. * * @return void */public static function runAll(){ // 判断是否命令行模式 self::checkSapiEnv(); // 创建目录、设置权限、绑定时钟信号脚本 self::init(); // 解析cli的命令,完成start、reload、restart、kill、stop等 self::parseCommand(); // 是否开启守护进程 self::daemonize(); // 对socket进行了一系列的配置 self::initWorkers(); // 注册信号处理器 self::installSignal(); // 保存主进程的pid self::saveMasterPid(); // 每个$worker服务fork出count个子进程,然后给每个子进程绑定loop循环监听事件tcp self::forkWorkers(); self::displayUI(); self::resetStd(); // 主进程来监听子进程状态 self::monitorWorkers();}
生成实例的id
根据对象生成hashid,并且同一个对象生成的id是一样的。
$this->workerId = spl_object_hash($this);
并且将当前对象$this
存储到静态数组中self::$_workers
创建资源环境
$this->_context = stream_context_create($context_option);
校验是否命令行模式
只有命令行模式可以执行
if(php_sapi_name() != 'cli') {}
获取初始化运行的脚本文件的绝对路径
debug_backtrace
返回的数组的最后一个元素就是初始化的文件,第一个元素就是当前执行该命令的文件。
$backtrace = debug_backtrace();$self::startFile = $backtrace[count($backtrace) -1 ]['file']
生成pid的存储文件
将debug_backtrace
获取到的绝对路径通过_
进行分割作为pid的文件名。
创建日志文件
如果路径没定义直接指定../workerman.log
touch(self::$logFile);chmod(self::$logFile,0622); // 可读写、写、写
初始化当前主进程状态
self:$_status = self::STATUS_STARTING;
脚本使用self::$_status
来存储当前运行的状态(current status),一共有四种状态
// 开始const STATUS_STARTING = 1;// 运行const STATUS_RUNNING = 2;// 关闭const STATUS_SHUTDOWN=4;// 重新加载const STATUS_RELOADING=8;
临时文件
slef::$_statisticsFile = sys_get_temp_dir().'/workerman.status';
设置进程标题
// 设置当前进程的标题cli_set_process_title($title);// 如果不存在上面的方法,那么使用proc_title扩展if(extension_loaded('proctitle') && function_exists('setproctitle')) { setproctitle($title);}
填充idMap
每个$worker_id
都是当前脚本中要初始化的实例,每个服务要开启$worker->count
个子进程,先用0来填充数组。后面每开启一个子进程,会将子进程的pid存储到idMap中,用来后面主进程监控子进程,如果子进程意外终止,主进程可以重新佛。
self::$_idMap[$worker_id] = array_fill(0, $worker->count, 0);
进程信号处理器
pcntl_signal
安装一个信号处理器,用来监听信号SIGALRM
。如果不安装SIGALRM
信号,当进程接收到SIGALRM
信号时会默认终止进程。
pcntl_signal(SIGALRM, ['\Workerman\Lib\Timer','signalHandle'],false);
解析cli参数
允许进程脚本接受参数,用来完成相应的指令。
xxx.php start -d 会开启守护进程xxx.php start 会进入debug模式
判断进程是否已经存在
从pid文件中读取存在的进程号,如果进程号存在,并且进程也存活。如果传入的命令是start
并且pid文件中的进程id和当前脚本执行的不一样,那么说明脚本重复执行了,报错。
posix_kill($master_pid,0)
用来判断进程是否存在,posix_kill
本意是向该进程发送信号。 // Get master process PID.$master_pid = @file_get_contents(self::$pidFile);$master_is_alive = $master_pid && @posix_kill($master_pid, 0);// Master is still alive?if ($master_is_alive) { // 如果已经有进程存在,并且当前执行的进程和已存在的进程不一样,报错。 if ($command === 'start' && posix_getpid() != $master_pid) { self::log("Workerman[$start_file] already running"); exit; }}
命令处理
使用
pcntl_signal
注册了信号,然后使用posix_kill
发送信号,使用pcntl_signal_dispatch
来处理信号。
kill
先发送kill -SIGINT
,千分之一毫秒之后发送kill -SIGKILL
status
删掉临时文件
向进程发送
SIGUSR2
信号从临时文件中读取内容
结束脚本
restart、stop
向进程发送终止信号
$master_pid && posix_kill($master_pid, SIGINT);
while(1)循环判断是否进程已经被终止,如果超过时间5s,那么写入日志终止失败,并且结束当前脚本。(没有结束进程,只是结束了当前的命令脚本)
reload
发送特殊信号SIGUSR1
,但是这个信号不会被立即执行,而是需要等待pcntl_signal_dispatch
来进行信号分发。
posix_kill($master_pid,SIGUSR1);
例如用户现在输入了php worker.php reload
,那么会使所有的进程重新进行加载配置。
当前脚本解析参数
判断进程是否存活,如果进程pid存在,但是进程没有存活,那么报错,说not run
进程存活,注册信号
SIGUSR1
退出当前脚本
运行中的父进程开始触发所有的信号,由于之前已经安装了信号处理方法,所以会触发
self::reload()
方法。
启动守护进程
fork两次并不是为了避免僵尸进程,而是为了避免svr4系统重新打开终端
方法通用,可以稍作修改,用到别的地方。用这个方法做过一个队列监听,redis的list启动一个rpop。
umask(0)
将默认权限的掩码修改为0,即将要创建的所有的文佳你的权限都是777$pid = pcntl_fork()
启动子进程,判断$pid
是否存在,只有在父进程中pcntl_fork()
才会返回id,我们要将父进程kill掉。posix_setsid()
将当前子进程设置为会话组leader再次创建子进程,为了防止在SVR4
的系统下重新打开控制终端。 protected static function daemonize(){ if (!self::$daemonize) { return; } // 将默认权限掩码修改为0,意味着即将要创建的文件的权限都是777 umask(0); // 子进程 $pid = pcntl_fork(); // 子进程创建失败 if (-1 === $pid) { throw new Exception('fork fail'); } elseif ($pid > 0) { //说明当前进程是父进程,只有在父进程中fork才会返回pid // 关闭父进程,让子进程成为孤儿进程被init进程收养 exit(0); } // 将子进程作为进程组的leader,开启一个新的会话,脱离之前的会话和进程组。即使用户logout也不会终止 if (-1 === posix_setsid()) { throw new Exception("setsid fail"); } // Fork again avoid SVR4 system regain the control of terminal. // 避免svr4系统重新获取控制终端 $pid = pcntl_fork(); if (-1 === $pid) { throw new Exception("fork fail"); } elseif (0 !== $pid) { // 如果不是孙子进程,直接干掉。让孙子进程成为孤儿进程被init进程(1号进程收养) exit(0); }}
初始化所有的woker实例
获取进程的当前用户信息
$user_info = posix_getpwuid(posix_getuid());
开启监听
创建socket服务,启动一个本地的socket服务。如果是unix域socket的话,$local_socket
需要是本地的文件。个人理解就是通过某个端口来监听某个协议。 $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
将包含socket的流导入到socket的扩展资源中
Imports a stream that encapsulates a socket into a socket extension resource.
php提供两种socket:php提供了两种类型的socket,stream_socket 和 sockets,二者api不兼容。stream_socket是php内置的,可以直接使用,并且api和stream 的api通用(可以调用fread fwrite...)。sockets需要php安装sockets扩展才能使用。
设置socket
设置心跳检测,减少传输延迟,设置为非阻塞模式。
设置心跳
SO_KEEPALIVE 保持连接检测对方主机是否崩溃,避免(服务器)永远阻塞于TCP连接的输入
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
设置最小化传输延迟
// 最小化传输延迟,而不是追求最小化报文数量@socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
设置为非阻塞模式
在非阻塞模式下,调用 fgets() 总是会立即返回;而在阻塞模式下,将会一直等到从资源流里面获取到数据才能返回。
stream_set_blocking($this->_mainSocket,0);
注册信号处理器
// stoppcntl_signal(SIGINT, array('\Workerman\Worker', 'signalHandler'), false);// reload 重新加载的时候发送R1信号pcntl_signal(SIGUSR1, array('\Workerman\Worker', 'signalHandler'), false);// status 查看状态的时候发送R2信号pcntl_signal(SIGUSR2, array('\Workerman\Worker', 'signalHandler'), false);// ignorepcntl_signal(SIGPIPE, SIG_IGN, false);
保存当前进程的pid
如果已经开启了守护进程,那么获取的是当前孙子进程(守护进程)的pid,将其写入到pid文件中。
将每个woker实例(服务)创建count个子进程
遍历整个$workers,将进程按照里面的每一个实例的参数fork count个数的子进程。并且将子进程的号码,记录到父进程中
// 创建子进程$pid = pcntl_fork();// Get available worker id.$id = self::getId($worker->workerId, 0);// For master process.// 如果当前进程是父进程if ($pid > 0) { // 将子进程的号码,记录到父进程中(重要) self::$_pidMap[$worker->workerId][$pid] = $pid; // 父进程将自己的pid写入到idMap的第一位(非常重要),在此之后,同一个实例下每个进程的id都不一样。 self::$_idMap[$worker->workerId][$id] = $pid;} // For child processes.
让每个子进程下都开始启动对应worker的服务
$worker->run();
设置当前状态为运行中
self::$_status = self::STATUS_RUNNING;
设置globalEvent
self::getEventLoopName
从三个事件扩展中选择一个libevent,event,ev
事件监听
向socket添加事件监听回调函数
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
displayUI
终端下打印UI界面
再次重置标准化输入输出
self::resetStd();
监控所有子进程
pcntl_signal()函数仅仅是注册信号和它的处理方法,真正接收到信号并调用其处理方法的是pcntl_signal_dispatch()函数,而posix_kill会发送信号。
只有父进程才会监控所有的子进程,因为子进程运行的是run方法
触发所有的信号
等待子进程退出
将退出的子进程从实例维护的pidMap中移除
将退出的子进程从对应的idMap中进行重置为0,取消占取的位置
如果脚本仍然在执行,但是子进程退出了,那么重启子进程
获取所有的子进程号,如果子进程都退出了,那么结束父进程
关于pcntl_wait
pcntl_signal(SIGUSR1,'signalHandle1');
用来注册信号;posix_kill($pid,SIGUSR1)
向指定进程发送信号(返回结果是即时的,但是并不会触发信号所绑定的行为);pcntl_signal_dispatch()
用来触发收到的信号的回调函数。
pcntl_wait($status,WUNTRACED)
开启阻塞模式来监控子进程是否退出,之后子进程退出之后,才会执行后面的操作。
0) { $status = 0; // 等待子进程终止 pcntl_wait($status,WUNTRACED); echo "wait已执行 {$pid}?\n"; // 执行信号的回调函数 pcntl_signal_dispatch();}else { // 发送信号2 posix_kill($pid,SIGUSR2); // 处理信号2的回调函数 pcntl_signal_dispatch(); // 父进程处理信号2 sleep(3); // 子进程发送信号,看是否pcntl_wait会执行(结果,wait没有执行) posix_kill($pid,SIGUSR1); // 继续等待,观察是否pcntl_wait等子进程结束后执行 sleep(3); echo "子进程终止\n"; // 这个时候wait往下执行了,看来wait等待子进程让父进程进行了挂起操作}
输出结果
信号2回调子进程终止wait已执行 18425?信号2回调信号1回调