- 本文地址: https://www.laruence.com/2020/04/01/5726.html
- 转载请注明出处
Yar支持HTTP和TCP俩种Transporter, HTTP的是基于CURL,PHP中的Yar默认就是走的HTTP Transporter, 这个大家应该都不陌生, 但是基于TCP的, 可能大家会用的少一些。
事实上,我6年前也写过一个C的Yar server框架,叫做Yar-c, 代码地址在Yar-C at Github, 它提供了服务启动,worker进程管理,Yar打包协议等。当时我们用这个框架,实现了高性能的微博白名单等服务,以供PHP端使用Yar Client来调用。
只不过,Yar C需要用C来写Handle, 可能对于不少PHPer来说,会稍微有点陌生,那今天我们尝试用PHP来写一个TCP的Server,来介绍下如何实现对Yar RPC协议的处理, 这个例子可以方便的结合Swoole等异步PHP框架,实现一个高性能的Yar TCP Server。 这个过程中, 会让大家了解Yar的RPC通信协议,以及捎带了解下Socket编程。
我们今天还是用“白名单”服务作为例子,我们提供一个接口,接受RPC客户端的请求,参数是一个用户ID,返回bool,表示是否在白名单:
function query(int $id) : bool;
首先,我们建立一个文件yar_server, 为了方便的直接执行,我们在文件写下:
#!/bin/env php7 <?php class WhiteList { }
然后,通过chmod a+x 给这个文件增加可执行的权限。
第一步我们需要处理服务的启动参数处理, 接受一个参数S表示要监听的IP和端口,值的格式是host:port, 我们使用PHP的getopt函数来处理命令行参数:
class WhiteList { protected $host; public function __construct() { $options = getOpt("S:"); if (!isset($options["S"])) { $this->usage(); } } protected function usage() { exit("Usage: yar_server -S hostname:port\n"); } }
这样,当用户启动yar_server的时候,没有指定S参数,我们就退出,并提示Usage。 我们还需要另外一个配置,就是指向一个词表文件,词表文件中每一行是一个在白名单中的用户ID, 我们用F表示:
class WhiteList { protected $host; protected $dicts; public function __construct() { $options = getOpt("S:F:"); if (!isset($options["S"]) || !isset($options["F"])) { $this->usage(); } $this->host = $options["S"]; $this->dicts = $options["F"]; } protected function usage() { exit("Usage: yar_server -F path_to_dict -S hostname:port\n"); } }
好了, 现在启动参数处理完成, 当然为了简单,我省去了对输入参数的有效性检查。
接下来, 我们需要完成俩个函数, 第一个是读取-F指定的词表文件,把所有的用户ID读入到一个数组中,因为我们的这个服务会是常驻进行, 所以不用担心性能, 它只会在启动阶段处理这个词表文件:
protected function loadDict() { $this->ids = array(); $fp = fopen($this->dicts, "r"); while (!feof($fp)) { $line = trim(fgets($fp)); if ($line) { $this->ids[$line] = true; } } fclose($fp); echo "Loading dict successfully, ", count($this->ids), " loaded\n"; return $this; }
因为用户ID是整型,所以我们把它当作Hashtable的key,这样在将来查找的时候,使用isset会非常高效。 需要注意的是因为文件处理不是我们今天要讲的重点,也就省去了对文件存在行,可读性,合法性的检查。
好了, 接下来是重点了, 我们要启动一个IPV4 TCP Socket服务,监听在$host指定的地方, 为了方便大家了解Socket API,我们不采用PHP的Stream系列函数,而是采用PHP直接包装的Socket系列API, 首先我们用socket_create创建一个Socket套接字:
protected function listen() { $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if ($socket == false) { throw new Exception("socket_create() failed: reason: " . socket_strerror(socket_last_error())); } }
然后,我们需要使用socket_bind绑定这个Socket到我们需要监听的地址, 并且使用socket_listen来监听请求:
protected function listen() { $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if ($socket == false) { throw new Exception("socket_create() failed: reason: " . socket_strerror(socket_last_error())); } list($hostname, $port) = explode(":", $this->host); if (socket_bind($socket, $hostname, $port) == false) { throw new Exception("socket_bind() failed: reason: " . socket_strerror(socket_last_error())); } if (socket_listen($socket, 64) === false) { throw new Exception("socket_listen() failed: reason: " . socket_strerror(socket_last_error())); } echo "Starting Yar_Server at {$this->host}\nPresss Ctrl + C to quit\n"; $this->socket = $socket; return $this; }
好了, 如果一切没问题,接下来我们就可以socket_accept来监听请求了, 默认的socket是阻塞模式,如果没有请求,进程会一直阻塞等待, 对于高性能的服务来说, 最好采用非阻塞+select或者epoll的模式来同时处理多个请求, 但是我们的这个例子主要是为了介绍Yar的协议, 所以还是采用简单的阻塞模式。
接下来,我们来编写真正的RPC处理部分,首先我们通过accept接受一个请求, 然后读取请求的的内容,分析请求头中的Yar RPC Header信息, Yar RPC的协议头定义如下:
typedef struct _yar_header { uint32_t id; // transaction id uint16_t version; // protocl version uint32_t magic_num; // default is: 0x80DFEC60 uint32_t reserved; unsigned char provider[32]; // reqeust from who unsigned char token[32]; // request token, used for authentication uint32_t body_len; // request body len }
其中, magic_num是用来验证请求有效性的一个特殊值, 合法的Yar RPC请求都会设置这个值为0x80DFEC60(我很想告诉你为啥是这个值,但我真不记得当时我为啥用这个数字了),这个头部是82个字节,可能有同学会问,不对啊一看这个Struct不应该是82啊,那是因为头部申明的时候采用pack模式,也就是不对齐, 所以确实是82个字节.
pack("H*", "80DFEC60");
provider是一个字符串,标明了客户端的名字, 比如对于Yar扩展的Yar_Client就是"Yar PHP Cient-x.x.x"
token在设计的最初是为了做API key验证的,但是后来没用上,因为大部分都是内网应用,可以有多种办法来保证请求来源的合法性。
id是一个唯一请求id,这个是为了排查请求问题的, version默认为0,或者1,目前我没有升级过协议头,所以这个暂时我们也不用关心,reserved可以用来传递一些请求参数, 比如客户端可以说明是否保持连接。
body_len是我们需要关心的, 这个字段表明了这次请求,请求体一共多大(不包括Yar协议头部)。
所有的这些数字, 都是以网络字节序传递的, 我们采用PHP处理二进制流的unpack函数来解析读取进来的二进制流:
protected function parseHeader($header) { return unpack("Nid/nversion/Nmagic_num/Nreserved/A32provider/A32token/Nbody_len", $header); }
这个函数会返回一个上面说到的头部结构体的数组。
对应的我们也需要使用pack来实现生成Yar Header的方法:
const YAR_MAGIC_NUM = 0x80DFEC60; protected function genHeader($id, $len) { $bin = pack("NnNNA32A32N", $id, 0, self::YAR_MAGIC_NUM, 0, "Yar PHP TCP Server", "", $len); return $bin; }
如刚才说的,我们需要在接受一个请求以前, 验证请求的合法性:
protected function validRequest($header) { if ($header["magic_num"] != self::YAR_MAGIC_NUM) { return false; } return true; }
所以大概请求的处理整个逻辑框架是:
protected function accept() { while (($conn = socket_accept($this->socket))) { $buf = socket_read($conn, self::HEADER_SIZE, PHP_BINARY_READ); if ($buf === false) { socket_shutdown($conn); continue; } if (!$this->validHeader($header = $this->parseHeader($buf))) { $output = $this->response(1, "illegal Yar RPC request"); goto response; } $buf = socket_read($conn, $header["body_len"], PHP_BINARY_READ); if ($buf === false) { $output = $this->response(1, "insufficient request body"); goto response; } if (!$this->validPackager($buf)) { $output = $this->response(1, "unsupported packager"); goto response; } $buf = substr($buf, 8); /* 跳过打包信息的8个字节 */ $request = $this->parseRequest($buf); if ($request == false) { $this->response(1, "malformed request body"); goto response; } $status = $this->handle($request, $ret); $output = $this->response($status, $ret); response: socket_write($conn, $output, strlen($output)); socket_shutdown($conn); /* 关闭写 */ } }
现在整体的框架就算完成了,我们需要完成handle,response方法就可以了,handle是要根据用户的请求中的m, 来调用指定的方法
protected function handle($request, &$ret) { if ($request["m"] == "query") { $ret = $this->query(...$request["p"]); } else { $ret = "unsupported method '" . $request["m"]. "'"; return 1; } return 0; }
现在来实现query方法本身, 这个会很简单,就检查下id是不是在白名单数组:
protected function query($id) { return isset($this->ids[$id]); }
好了,接下来我们要完成response方法,这个方法是打包一个符合Yar协议的返回体,包括82个字节的头部,8个字节的打包信息,以及序列化后的响应体, 我们需要根据status不同,来选择设置响应体中的r还是e字段:
protected function response($status, $ret) { $body = array(); $body["i"] = 0; $body["s"] = $status; if ($status == 0) { $body["r"] = $ret; } else { $body["e"] = $ret; } $packed = serialize($body); $header = $this->genHeader(0, strlen($packed) + 8); return $header . str_pad("PHP", 8, "\0") . $packed; }
好了, 马上就要大功告成了,我们最后完成启动方法和析构函数(关闭socket):
public function run() { $this->loadDict()->listen()->accept(); } public function __destruct() { if ($this->socket) { socket_close($this->socket); } }
现在一切就绪, 我们最后在文件末尾加入:
(new Whitelist)->run();
在测试之前,我们先准备一个测试词表,比如1到1000的id:
seq 1 1 10000 > user_id.dict
然后启动服务, 监听在本机的9000端口:
$ ./yar_server -F user_id.dict -S127.0.0.1:9000 Loading dict successfully, 1000 loaded Starting Yar_Server at 127.0.0.1:9000 Presss Ctrl + C to quit
不错,服务启动成功,然后我们使用Yar扩展来编写客户端(你需要首先安装好Yar扩展), 测试下用户id 999和99999的调用效果:
<?php $yar = new Yar_Client("tcp://127.0.0.1:9000"); var_dump($yar->query("999")); var_dump($yar->query("99999")); ?>
和调用HTTP的Yar服务不同,此处我们应该使用tcp://做地址头,表示这是一个TCP的服务。
来,运行一下看看:
php7 client.php bool(true) bool(false)
看起来不错, 符合预期!
你也可以尝试故意构造一些错误的可能,比如调用不存在的方法之类的,来看看服务器的反应, 这个例子的代码你可以在这里找到.
到这里我就算介绍完了如何采用PHP来编写Yar的TCP服务, 大家应该可以很方便的把这个例子修改完善成自己希望的格式,或者嵌入Swoole(可以参考Swoole作者写的:这里)。
还是要再次说明,因为本文的主要目的是为了介绍Yar RPC通信协议,所以在服务管理这块并没有做的很完善,比如socket_accept, socket_read/write等都默认采用了阻塞模式,也没有加入超时设计,服务进程也只有一个,这个如果真的想用做实际服务的话,还是需要一些功课的,不过我相信你有兴趣的话,都是可以搞定的。:)
当然,最简单的是,你可以直接使用Yar-C服务框架来编写C Yar TCP服务。
在这里也有一个Yar-C Server的例子yar_server in C.
enjoy!
Your all code are very helpful. Thanks
great hanks for this code
Bird brother thanks for this code
The combination of socket and yar
Welcome to Tattvam on the Beach, One of the top Yoga Resorts in Goa , your gateway to a blissful seaside retreat. Located in the picturesque coastal town, invites you to explore a world where luxury, tranquility, and rejuvenation converge. With its pristine beaches, stunning ocean views, and a serene atmosphere, Tattvam on the Beach is a sanctuary designed to awaken your senses and nourish your soul.
Welcome to Gudlu Resort, a hidden gem nestled amidst the picturesque hills of Mudigere in Chikmagalur. If you are looking for a Chikmagalur resorts , Gudlu Resort is the place to be. Surrounded by acres of lush greenery, Gudlu Resort offers a range of luxurious accommodations that are perfect for couples, families, and groups. Each room is well-appointed with modern amenities and offers breathtaking views of the surrounding hills and valleys.
Welcome to Tattvamretreat Resort, Bangalore – a serene and tranquil Retreat near Bangalore that offers a holistic approach to wellness through Ayurveda and Yoga. Tucked away amidst lush greenery and rolling hills, Tattvamretreat Resort is a perfect getaway for those seeking a peaceful and rejuvenating experience.
Welcome to Gari Resort, Bangalore – one of the best resorts for stay in Bangalore that offers a unique blend of luxury, comfort, and relaxation. If you are looking for the perfect escape from the hustle and bustle of city life, Gari Resort is the place for you.
其实蛮希望原生的yar可以用tcp连接代替CURL,CURL在网络层面还是比较慢
鸟哥的这篇文章是目前用php实现超大规模应用的最佳实践!
Your article is very useful, the content is great, I have read a lot of articles, but for your article, it left me a deep impression, thank you for sharing.
我想问一下,这个是不是通用的rpc服务,比如客户端无论是不是使用yar_client,都可以和这个yar_server正常通信完成rpc功能呢?
从这里例子看,_yar_header的例子,好像rpc消息格式是自定义的吧?那这应该不通用。
也就是说:yar是自定义了一套rpc框架,yar_client必须和yar_server配对使用,
yar_client不能和其他的rcp服务进行连接沟通。
所以,鸟哥强调的是【Yar rpc】,而不是【rpc】,
按照GitHub 的两个 demo 试了一下,服务器端,body 解析不出来数据。
yar 版本:
yar support => enabled
Version => 2.1.2
$header = unpack(“Nid/nversion/Nmagic_num/Nreserved/A32provider/A32token/Nbody_len”, substr($data, 0,HEADER_SIZE));
var_dump($header); //header 数据正常
//var_dump(substr($data, HEADER_SIZE,8)); //output: MSGPACK
$buf = substr($data, HEADER_SIZE + 8);
var_dump($buf);
$request = unserialize($buf); // notice: unserialize(): Error at offset 0 of 23 bytes
var_dump($request); //false
可以了,需要修改配置文件,yar.packager=php
MSGPACK, 使用 msgpack_unpack 来解析body.
鸟哥,self::HEADER_SIZE 这个常量未找到,其值是否是 82?
还有几个方法的实现没有找到具体的示例:
accept() 中的 validHeader() 方法是 validRequest() 吗?
validPackager()、parseRequest() 均未找到实现,鸟哥能指导一下咩
你直接下载github上的那个示例代码就行吧?
哦哦哦,谢谢,没仔细看链接,谢谢
你的可以吗? 两个例子我都试了下,都解析不出body的数据。
您的文章中有一些有用的信息。感谢你的分享!
不错的文章,谢谢鸟哥
这里:
seq 1, 1, 10000 > user_id.dict
似乎不需要逗号分隔:
seq 1 1 10000 > user_id.dict
对,不需要逗号,写顺手了,我修正下
能不能整点有技术含量的东西, 能不能来点有创新的东西,难怪php越来越不行…
你是不是有毒?
你行的话超越鸟哥,做出自己的贡献,别在这阴阳怪气的。
这种人就是现实失意 来网上找存在感的 越搭理他越来劲
所以忽略就好了
作者是谁啊,就这?
相当高深 得细品
还是 HTTP Transporter 适合我-_-!
socket和yar的结合
看起来不错