您现在的位置是:网站首页>文章内容文章内容
基于ThinkPHP + Workman 的 简易 RPC 解决方案
李鹏2024-04-30【PHP】753人已围观
RPC是指远程过程调用,当一台服务器上的某个应用现需要调用另一台服务器上的某个应用提供的函数/方法时,由于两个应用各自运行在不同的服务器上,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。这种通过网络,来远程调用其他服务的技术,称之为RPC。
一:服务端
1:创建服务端启动脚本
php think make:command Rpc
2: 修改脚本内容如下;
<?php declare (strict_types = 1);
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use think\Exception;
use Workerman\Connection\TcpConnection;
use Workerman\Worker;
class Rpc extends Command
{
protected function configure()
{
// 指令配置
$this->setName('rpc')
->setDescription('the rpc command');
}
/*签名校验*/
protected function checkSign(array $params) : bool
{
$access_key = $params['access_key'] ?? '';
$sign = $params['sign'] ?? '';
if (!$access_key || !$sign) {
return false;
}
$access_list = config('rpc.user');
if (!isset($access_list['aox2056'])) {
return false;
}
$access_secret = $access_list[$access_key] ?? '';
unset($params['sign']);
ksort($params);
$params = array_filter($params, function($v) {
return ($v !== '' && $v !== null);
});
$_sign = md5(urldecode(http_build_query($params)).$access_secret);
return $_sign === $sign;
}
protected function execute(Input $input, Output $output)
{
// 指令输出
$output->writeln('rpc 开启中'.PHP_EOL);
$port = config('rpc.port');
//创建一个TCP连接
$tcp_worker = new Worker("tcp://0.0.0.0:$port");
// 设置开启多少个进程
$tcp_worker->count = 4;
//当收到客户端发送的数据时触发
$tcp_worker->onMessage = function (TcpConnection $connection, $data){
$params = @json_decode($data,true);
if ($params && is_array($params)){
if (!$this->checkSign($params)){
$data = [
'code' => -2,
'msg' => '签名校验失败',
'data' => [],
'time' => time()
];
$connection->send(json_encode($data,JSON_UNESCAPED_UNICODE));
}
$module = isset($params['module']) ? (string)$params['module'] : '';
$service = isset($params['service']) ? (string)$params['service'] : '';
$action = isset($params['action']) ? (string)$params['action'] : '';
$data = isset($params['params']) ? $params['params']:[];
$class = '\app\rpc\\'.$module . '\\' . $service;
if (class_exists($class) && method_exists($class,$action)){
$res = new $class;
$result = $res->$action($data);
$connection->send(json_encode($result,JSON_UNESCAPED_UNICODE));
}else{
$data = [
'code' => -1,
'msg' => 'service not found ~~',
'data' => ['class'=>$class,'action'=>$action],
'time' => time()
];
$connection->send(json_encode($data,JSON_UNESCAPED_UNICODE));
}
}else{
//如果不给返回,客户端无法再发送消息
$data = [
'code' => -1,
'msg' => 'service not found ~~',
'data' => [],
'time' => time()
];
$connection->send(json_encode($data,JSON_UNESCAPED_UNICODE));
}
};
// 运行worker
Worker::runAll();
}
}
2: 配置文件 rpc.php 如下
<?php use think\facade\Env;
return [
'host' => '127.0.0.1',
'port' => Env::get('rpc.port', 3000),
'user'=>[
'aox2056' => '1420282a70842131135dfbf5ad8fc853',
'cus9618' => 'a9e324h6e02eg39fqe5b6zn5ms4kx197',
]
];
3: 注册服务端启动脚本命令,修改 config 目录下的 console.php 文件,在 commands 项 中,添加一列如下;
<?php // +----------------------------------------------------------------------
// | 控制台配置
// +----------------------------------------------------------------------
return [
// 指令定义
'commands' => [
'rpc' => 'app\command\rpc',
],
];
4:添加 RPC 服务代码目录,并添加示例模块与服务。在app 目录下,创建 Rpc 目录,并在 Rpc 目录下创建 User 目录,在 User 目录中添加 User.php 文件,即: app/Rpc/User/User.php , 修改 User.php 文件如下:
<?php namespace app\Rpc\User;
class User
{
public function index(array $params)
{
$data = [
'code' => 0,
'msg' => 'success',
'data' => $params,
'time' => time()
];
return $data;
}
}
注:User 目录 表示 User 模块,User.php 表示该模块下的 User 服务。在这个例子中,提供了index 方法,它接受一个数组,并返回一个数组。
二:客户端
1:在 app/service 目录下创建 RpcService.php 文件,并修改文件内容如下:
<?php namespace app\service;
use think\Exception;
final class RpcService
{
private static $instance = null;
private static $access_key = 'aox2056';
private static $access_secret = '1420282a70842131135dfbf5ad8fc853';
private static $module = '';
private static $service = '';
private static $action = '';
private function __construct()
{
//
}
private function __clone()
{
//
}
public static function getInstance() : RpcService
{
if (self::$instance === null) {
self::$instance = new RpcService();
}
return self::$instance;
}
private static function makeSign(array $params) : string
{
$params['access_key'] = self::$access_key;
ksort($params);
$params = array_filter($params, function($v) {
return ($v !== '' && $v !== null);
});
return md5(urldecode(http_build_query($params)).self::$access_secret);
}
public function module(string $module) : RpcService
{
self::$module = $module;
return $this;
}
public function service(string $service) : RpcService
{
$service_item = explode('.',$service);
self::$service = $service_item[0];
self::$action = $service_item[1] ?? '';
return $this;
}
public function send(array $params)
{
if (!self::$module || !self::$service || !self::$action) {
throw new Exception('module或service参数错误!');
}
$data = [
'module' => self::$module,
'service' => self::$service,
'action' => self::$action,
'params' => $params
];
$data['sign'] = self::makeSign($data);
$data['access_key'] = self::$access_key;
$data = json_encode($data, JSON_UNESCAPED_UNICODE);
$host = config('rpc.host') ?? "0.0.0.0";
$port = config('rpc.port');
/*两种方法均可*/
/*方法1*/
/*
$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
$result = socket_connect($socket, $host, $port);
if ($result === false) {
echo "连接服务器失败:" . socket_strerror(socket_last_error($socket));
exit;
}
socket_write($socket, $data, strlen($data));
$bufferSize = 4096;
$receivedData = socket_read($socket, $bufferSize);
socket_close($socket);
return $receivedData;
*/
/*方法2*/
$bufferSize = 4096;
$client = stream_socket_client("tcp://127.0.0.1:$port", $errno, $errmsg, 1);//注意:IP地址不能是0.0.0.0或者localhost ;
if (!$client) {
echo "ERROR: $errno - $errmsg<br>\n";
} else {
fwrite($client, $data);
$receivedData = fread($client, $bufferSize);
fclose($client);
return $receivedData;
}
}
}
2:服务请求,在控制器中调用方式如下:
<?php namespace app\controller;
use app\BaseController;
use app\service\RpcService;
class Index extends BaseController
{
public function index()
{
return '';
}
public function rpc()
{
$data = ['user'=>'lipeng','age'=>'31','sex'=>'1'];
return RpcService::getInstance()->module('User')->service('User.index')->send($data);
}
}
3:启动服务端脚本
php think rpc
4:浏览器请求 http://你的域名/index/rpc 响应如下:
{"code":0,"msg":"success","data":{"user":"lipeng","age":"31","sex":"1"},"time":1714456483}
0