Workerman 原理解析概述

首先说明一下Workerman究竟是什么东西:

WebSocket 是 HTML5 提供的一种网络通讯协议,用于服务端与客户端实时数据传输。广泛用于浏览器与服务器的实时通讯,APP与服务器的实时通讯等场景,相比传统HTTP协议请求响应式通讯,WebSocket协议可以做到实时的双向通讯,服务端可以在任何时候向客户端推送数据(HTTP协议需要客户端发起请求后才能推送),就是传统说的长链接(指在一个连接上可以连续发送多个数据包)

workman特点:

1.Workerman类似一个PHP版本的nginx

2.核心也是多进程+Epoll+非阻塞IO

3.Workerman每个进程能维持上万并发连接

4.由于本身常驻内存,不依赖Apache、nginx、php-fpm这些容器,拥有超高的性能

5.同时支持TCP、UDP、UNIXSOCKET,支持长连接,支持Websocket、HTTP、WSS、HTTPS等通讯协议以及各种自定义协议。

6.拥有定时器、异步socket客户端、异步Redis、异步Http、异步消息队列等众多高性能组件。

7.主进程为了保持稳定性,只负责监控子进程,不负责接收数据也不做任何业务逻辑

8.纯PHP开发

9.支持高并发:WorkerMan支持Libevent事件轮询库(需要安装event扩展),如果没有安装Event扩展则使用PHP内置的Select相关系统调用

相关命令:

启动命令:php php文件名 start

停止命令:php php文件名 stop

重启命令:PHP php文件名 restart

查看状态:PHP php文件名 status

链接状态:PHP php文件名 connections

注意点:

心跳是必须的:一定要有(非常重要):否则连接可能由于长时间不活跃而被路由节点防火墙断开 客户端和服务端协议一定要对应才能通讯,那么心跳断了该怎么办?这里就需要一个机制:断线重连。

心跳断开的原因:

1.浏览器最小化js被暂停

2.浏览器切换到其它tab页面js被暂停

3.电脑进入睡眠等等

4.移动端切换网络

5.信号变弱、手机黑屏、手机应用切换到后台、路由故障、业务主动断开

解决办法:

断线重连只能客户端做,服务端无法实现

例如浏览器websocket需要监听onclose事件,当发生onclose时建立新的连接(为避免需崩可延建立连接)。更严格一点,服务端也应该定时发起心跳数据,并且客户端需要定时监测服务端的心跳数据是否超时,超过规定时间未收到服务端心跳数据应该认定连接已经断开,需要执行close关闭连接,并重新建立新的连接

支持协议的示例:

websocket

$websocket_worker = new Worker('websocket://0.0.0.0:2345');

text

$text_worker = new Worker('text://0.0.0.0:2346');

frame

$frame_worker = new Worker('frame://0.0.0.0:2347');

tcp

$tcp_worker = new Worker('tcp://0.0.0.0:2348');

udp

$udp_worker = new Worker('udp://0.0.0.0:2349');

unix

$unix_worker = new Worker('unix:///tmp/wm.sock');

获取包:

composer config -g --unset repos.packagist

composer create-project workerman/webmanw

目录结构:

├── app 应用目录

│ ├── controller 控制器目录

│ ├── model 模型目录

│ ├── view 视图目录

│ └── middleware 中间件目录

│ └── StaticFile.php 自带静态文件中间件

| |—— functions.php 自定义函数

├── config 配置目录

│ ├── app.php 应用配置

│ ├── autoload.php 这里配置的文件会被自动加载

│ ├── bootstrap.php 进程启动时onWorkerStart时运行的回调配置

│ ├── container.php 容器配置

│ ├── dependence.php 容器依赖配置

│ ├── database.php 数据库配置

│ ├── exception.php 异常配置

│ ├── log.php 日志配置

│ ├── middleware.php 中间件配置

│ ├── process.php 自定义进程配置

│ ├── redis.php redis配置

│ ├── route.php 路由配置

│ ├── server.php 端口、进程数等服务器配置

│ ├── view.php 视图配置

│ ├── static.php 静态文件开关及静态文件中间件配置

│ ├── translation.php 多语言配置

│ └── session.php session配置

├── public 静态资源目录

├── process 自定义进程目录

├── runtime 应用的运行时目录,需要可写权限

├── start.php 服务启动文件

├── vendor composer安装的第三方类库目录

└── support 类库适配(包括第三方类库)

├── Request.php 请求类

├── Response.php 响应类

├── Plugin.php 插件安装卸载脚本

├── helpers.php 助手函数

└── bootstrap.php 进程启动后初始化脚本

原理(这里是精髓,请逐行阅读,这里面的几个函数一定要对照手册仔细研究):

class worker

{

//连接事件回调

public $onConnect =null;

//消息事件回调

public $onCMessage =null;

//连接关闭事件回调

public $onClose =null;

//所有socket,包括监听的所有socket

public $allsockets =array();

//构造函数

function __construct($address)

{

//创建监听socket

$this->socket = stream_socket_server($address,$errno,$errstr);

//设置为非阻塞

stream_set_blocking($this->socket,0);

//把监听的socet放入allsockets

$this->allsockets[(int)$this->socket]=$this->socket;

}

//运行

public function run(){

while (1){

//这里不监听socket可写事件和数据可读事件

$write = $except = null;

//监听所有的socket可读事件,包括客户端socket和监听端口的socket

$read = $this->allsockets;

//整个程序阻塞在这里,等待$read里面的socket可读,这里$read是个可读参数

stream_select($read,$write,$except);

//$read被重新赋值,遍历所有状态为可读的socket

foreach ($read as $index=>$socket){

//如果监听的是可读的socket,说明有新的连接

if($socket ===$this->socket){

//通过stream_socket_accept获取新的连接

$new_connect_socket = stream_socket_accept($this->socket);

if(!$new_connect_socket);continue;

//如果有onConnect事件回调,则尝试触发

if($this->onConnect){

call_user_func($this->onConnect,$new_connect_socket);

}

//将新的客户端新连接的socket放到allsockets中,stream_select监听可读事件

$this->allsockets[(int)$new_connect_socket] = $new_connect_socket;

}else{

//读数据

$buffer = fread($socket,65535);

//数据为空,代表连接已断开

if($buffer === '' || $buffer ===false){

//尝试触发onClose回调

if($this->onClose){

call_user_func($this->onClose,$socket);

}

fclose($socket);

//从allsockets中删除对应的连接

unset($this->allsockets[(int)$socket]);

continue;

}

//尝试触发onMessage回调

call_user_func($this->onCMessage);

}

}

}

}

}

$server = new worker("tcp://0.0.0.0:12115");

$server->onConnect = function ($conn){

echo 'onConnect\n';

};

$server->onCMessage = function ($conn,$msg){

fwrite($conn,"HTTP/1.1 200 OK\r\n Connection:Keep-alive\r\nServer:workman\1.1.4\r\nContent-length:5\r\n\rhellow");

};

$server->onClose = function ($conn){

echo 'onCLose\n';

};

$server->run();

例子:

发送端:

WebSocket大屏

接收端:

WebSocket大屏

服务文件:

require 'autoload.php';

use Workerman\Worker;

use Workerman\Connection\TcpConnection;

// 使用websocket协议监听6161端口

$worker = new Worker('websocket://0.0.0.0:6161');

// 当浏览器(包括用户手机浏览器和电脑浏览器)发来消息时的处理逻辑

$worker->onMessage = function(TcpConnection $connection, $data) {

// 这个静态变量用来存储电脑浏览器的websocket连接,方便推送使用

static $daping_connection = null;

switch ($data) {

// 发送 daping 字符串的是电脑浏览器,将其连接保存到静态变量中

case 'daping':

$daping_connection = $connection;

break;

// ping 是心跳数据,用来维持连接,只返回 pong 字符串,无需做其它处理

case 'ping':

$connection->send('pong');

break;

// 用户手机浏览器发来的祝福语

default:

// 直接使用电脑浏览器的连接将祝福语推送给电脑

if ($daping_connection) {

$daping_connection->send($data);

}

}

};

Worker::runAll();

环境已经搭建完成,还需要了解一下后续的属性,回调事件,接口

属性:

1.Id:当前worker进程的id编号

2.Count:设置当前Worker实例启动多少个进程

3.Name:设置当前Worker实例的名称

4.protocol:设置当前Worker实例的协议类

5.transport:设置当前Worker实例所使用的传输层协议

6.reusePort:设置当前worker是否开启监听端口复用(socket的SO_REUSEPORT选项)。

7.connections:存储了当前进程的所有的客户端连接对象,其中id为connection的id编号

8.logFile:指定日志文件路径

9. user:设置当前Worker实例以哪个用户运行

10. reloadable:设置当前Worker实例是否可以reload,即收到reload信号后是否退出重启

11.daemonize:是否以daemon(守护进程)方式运行

回调属性:

1.onWorkerStart:设置Worker子进程启动时的回调函数,每个子进程启动时都会执行。

2.onWorkerReload:设置Worker收到reload信号后执行的回调

3.onConnect:当客户端与Workerman建立连接时(TCP三次握手完成后)触发的回调函数。每个连接只会触发一次onConnect回调

4.onMessage:当客户端通过连接发来数据时(Workerman收到数据时)触发的回调函数

5.onClose:当客户端连接与Workerman断开时触发的回调函数。不管连接是如何断开的,只要断开就会触发onClose。每个连接只会触发一次

6.onBufferFull:每个连接都有一个单独的应用层发送缓冲区,如果客户端接收速度小于服务端发送速度,数据会在应用层缓冲区暂存,如果缓冲区满则会触发onBufferFull回调

7.onBufferDrain:每个连接都有一个单独的应用层发送缓冲区,缓冲区大小由TcpConnection::$maxSendBufferSize决定,默认值为1MB,可以手动设置更改大小,更改后会对所有连接生效。

8.onError:当客户端的连接上发生错误时触发

接口:

1.Send:向客户端发送数据

2.getRemoteIp:获得该连接的客户端ip

3.getRemotePort:获得该连接的客户端端口

4.Close:调用close会等待发送缓冲区的数据发送完毕后才关闭连接,并触发连接的onClose回调

5.Destroy:调用destroy后即使该连接的发送缓冲区还有数据未发送到对端,连接也会立刻被关闭,并立刻触发该连接的onClose回调

6.pauseRecv:使当前连接停止接收数据。该连接的onMessage回调将不会被触发。此方法对于上传流量控制非常有用

7.resumeRecv:使当前连接继续接收数据。此方法与Connection::pauseRecv配合使用,对于上传流量控制非常有用

8.Pipe:将当前连接的数据流导入到目标连接。内置了流量控制。此方法做TCP代理非常有用

文章还未写完,后续会继续续写跟新