进程池实例(使用消息队列通信)

主要是解决动态创建进程时的效率问题,动态创建进程的消息很大

fork一个进程,就要复制正文段【代码】+数据段

  1. 初始化一个主进程
  2. 主进程创建多个子进程,并且创建多个消息队列。保存每个子进程的消息队列id
  3. 每一个子进程执行从该进程绑定的消息队列中读取数据
  4. 创建socket,绑定socket文件,监听socket连接
  5. 事件循环,监听是否有客户端连接,有连接则读取数据
  6. 写入数据到连接socket 【conn socket],并关闭连接socket 【conn socket】
  7. 处理中断信号,接收终端信号,并发送消息到每个子进程
  8. 等待子进程回收,回收完成后删除进程关联的消息队列,删除socket文件
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
<?php
namespace MsqServer;
class Process
{
    public $_pid;
    public $_msgid;

}
class Server
{
    public $_socketFile = "pool.sock";
    public $_processNum = 3;
    public $_keyFile = __FILE__;
    public $_process = [];
    public $_idx;
    public $_sockfd;
    public $_roll = 0;
    public $_run = true;
    public function __construct($num = 3)
    {
        $this->_processNum = $num;
        pcntl_signal(SIGINT, [$this,"sigHandler"]);
        $this->forkWorker();
        $this->Listen();
        $exitPid = [];
        while (1){
            $pid = pcntl_wait($status);
            if($pid > 0){
                $exitPid[] = $pid;
            }
            if(count($exitPid) == $this->_processNum){
                break;
            }
        }
        foreach ($this->_process as $p){
            /** @var Process $p */
            msg_remove_queue($p->_msgid);
        }
        fprintf(STDOUT, "master shutdown\n");
        unlink($this->_socketFile);
    }

    public function sigHandler($signo)
    {
        $this->_run = false;
    }
    public function forkWorker()
    {
        $processObj = new Process();
        for ($i=0;$i<$this->_processNum;$i++){
            $key = ftok($this->_keyFile, $i);
            $msg_id = msg_get_queue($key);

            $process = clone $processObj;
            $process->_msgid = $msg_id;
            $this->_process[$i] = $process;
            $this->_idx = $i;

            $this->_process[$i]->_pid = pcntl_fork();
            if($this->_process[$i]->_pid == 0){
                $this->worker();
            }else{
                continue;
            }
        }
    }
    public function Listen()
    {
        $this->_sockfd = socket_create(AF_UNIX, SOCK_STREAM, 0);
        if(!is_resource($this->_sockfd)){
            fprintf(STDOUT, "socket create fail %s\n",socket_strerror(socket_last_error($this->_sockfd)));
        }

        if(!socket_bind($this->_sockfd, $this->_socketFile)){
            fprintf(STDOUT, "socket bind fail %s\n",socket_strerror(socket_last_error($this->_sockfd)));
        }
        socket_listen($this->_sockfd,10);
        $this->eventLoop();
    }

    public function worker()
    {
        fprintf(STDOUT, "child pid %d start \n",posix_getpid());
        /** @var Process $process */
        $process = $this->_process[$this->_idx];
        $msgid = $process->_msgid;
        while (1){
            if(msg_receive($msgid, 0, $msgType, 1024, $message)){
                fprintf(STDOUT, "child pid %d recv %s\n",posix_getpid(),$message);
                if(strncasecmp($message, "quit", 4) == 0){
                    break;
                }
            }
        }
        fprintf(STDOUT, "child pid %d shutdown \n",posix_getpid());
        exit(0);
    }

    public function selectWorker($data)
    {
        /** @var Process $process */
        $process = $this->_process[$this->_roll++%$this->_processNum];
        $msgid = $process->_msgid;
        if(msg_send($msgid, 1, $data,true,false)){
            fprintf(STDOUT, "master send ok\n");
        }
    }
    public function eventLoop()
    {
        $readFds = [$this->_sockfd];
        $writeFds = [];
        $expFds = [];

        while ($this->_run){
            pcntl_signal_dispatch();
            $ret = socket_select($readFds, $writeFds, $expFds,NULL);
            if($ret === false){
                break;
            }else if($ret === 0){
                continue;
            }
            if($readFds){
                foreach ($readFds as $fd){
                    if($fd === $this->_sockfd){
                        $connfd = socket_accept($this->_sockfd);
                        $data = socket_read($connfd, 1024);
                        if($data){
                            $this->selectWorker($data);
                        }
                        socket_write($connfd, "ok",2);
                        socket_close($connfd);
                    }
                }
            }
        }
        socket_close($this->_sockfd);

        foreach ($this->_process as $p){

            /** @var \MsqServer\Process  $p **/
            if(msg_send($p->_msgid, 1, "quit")){
                fprintf(STDOUT, "master send quit ok\n");
            }
        }
    }
}
(new Server(3));

客户端连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
<?php

$sockFile = "pool.sock";
$sockfd = socket_create(AF_UNIX, SOCK_STREAM, 0);
if(socket_connect($sockfd, $sockFile)){

    echo socket_write($sockfd, "i am client",11);

    echo socket_read($sockfd, 1024);

}
socket_close($sockfd);