EasySwoole RPC 自定義注冊(cè)中心
EasySwoole
默認(rèn)為通過 UDP
廣播 + 自定義進(jìn)程定時(shí)刷新自身節(jié)點(diǎn)信息的方式來實(shí)現(xiàn)無主化/注冊(cè)中心的服務(wù)發(fā)現(xiàn)。在服務(wù)正常關(guān)閉的時(shí)候,自定義定時(shí)進(jìn)程的onShutdown
方法會(huì)執(zhí)行 deleteServiceNode
方法來實(shí)現(xiàn)節(jié)點(diǎn)下線。在非正常關(guān)閉的時(shí)候,心跳超時(shí)也會(huì)被節(jié)點(diǎn)管理器踢出。
有些情況,比如服務(wù)都不在一個(gè)網(wǎng)段上,由于udp協(xié)議的設(shè)置,將會(huì)廣播不到,只能點(diǎn)對(duì)點(diǎn)的進(jìn)行廣播數(shù)據(jù),就不是很方便。那么 EasySwoole
支持你自定義一個(gè)節(jié)點(diǎn)管理器,來變更服務(wù)注冊(cè)及發(fā)現(xiàn)方式。
下面實(shí)現(xiàn)的 Redis
節(jié)點(diǎn)管理器示例是基于 easyswoole/redis-pool
組件 實(shí)現(xiàn),所以請(qǐng)先執(zhí)行 composer require easyswoole/redis-pool
安裝 redis-pool
組件。關(guān)于 easyswoole/redis-pool
組件具體用戶請(qǐng)查看 easyswoole/redis-pool 章節(jié)。
例如使用 Redis
來實(shí)現(xiàn)
<?php
namespace App\RpcServices\NodeManager;
use EasySwoole\Redis\Redis;
use EasySwoole\RedisPool\Pool;
use EasySwoole\RedisPool\RedisPool;
use EasySwoole\Rpc\NodeManager\NodeManagerInterface;
use EasySwoole\Rpc\Server\ServiceNode;
class RedisManager implements NodeManagerInterface
{
protected $redisKey;
protected $ttl;
/**
* @var Pool $pool
*/
protected $pool;
public function __construct(Pool $pool, string $hashKey = 'rpc', int $ttl = 30)
{
$this->pool = $pool;
$this->redisKey = $hashKey;
$this->ttl = $ttl;
}
function getNodes(string $serviceName, ?int $version = null): array
{
$fails = [];
$hits = [];
$time = time();
$redisPool = $this->pool;
/** @var Redis $redis */
$redis = $redisPool->defer(15);
try {
$nodes = $redis->hGetAll("{$this->redisKey}_{$serviceName}");
$nodes = $nodes ?: [];
foreach ($nodes as $nodeId => $value) {
$node = json_decode($value, true);
if ($time - $node['lastHeartbeat'] > $this->ttl) {
$fails[] = $nodeId;
continue;
}
if ($node['service'] === $serviceName) {
if ($version !== null && $version === $node['version']) {
$serviceNode = new ServiceNode($node);
$serviceNode->setNodeId(strval($nodeId));
$hits[$nodeId] = $serviceNode;
} else {
$serviceNode = new ServiceNode($node);
$serviceNode->setNodeId(strval($nodeId));
$hits[] = $serviceNode;
}
}
}
if (!empty($fails)) {
foreach ($fails as $failKey) {
$this->deleteServiceNode($serviceName, $failKey);
}
}
return $hits;
} catch (\Throwable $throwable) {
// 如果該 redis 斷線則銷毀
$redisPool->unsetObj($redis);
} finally {
$redisPool->recycleObj($redis);
}
return [];
}
function getNode(string $serviceName, ?int $version = null): ?ServiceNode
{
$list = $this->getNodes($serviceName, $version);
if (empty($list)) {
return null;
}
$allWeight = 0;
$redisPool = $this->pool;;
/** @var Redis $redis */
$redis = $redisPool->getObj(15);
$time = time();
try {
foreach ($list as $node) {
/** @var ServiceNode $nodee */
$key = $node->getNodeId();
$nodeConfig = $redis->hGet("{$this->redisKey}_{$serviceName}", $key);
$nodeConfig = json_decode($nodeConfig, true);
$lastFailTime = $nodeConfig['lastFailTime'];
if ($time - $lastFailTime >= 10) {
$weight = 10;
} else {
$weight = abs(10 - ($time - $lastFailTime));
}
$allWeight += $weight;
$node->__weight = $weight;
}
mt_srand(intval(microtime(true)));
$allWeight = rand(0, $allWeight - 1);
foreach ($list as $node) {
$allWeight = $allWeight - $node->__weight;
if ($allWeight <= 0) {
return $node;
}
}
} catch (\Throwable $throwable) {
// 如果該 redis 斷線則銷毀
$redisPool->unsetObj($redis);
} finally {
$redisPool->recycleObj($redis);
}
return null;
}
function failDown(ServiceNode $serviceNode): bool
{
$redisPool = $this->pool;;
/** @var Redis $redis */
$redis = $redisPool->getObj(15);
try {
$serviceName = $serviceNode->getService();
$nodeId = $serviceNode->getNodeId();
$hashKey = "{$this->redisKey}_{$serviceName}";
$nodeConfig = $redis->hGet($hashKey, $nodeId);
$nodeConfig = json_decode($nodeConfig, true);
$nodeConfig['lastFailTime'] = time();
$redis->hSet($hashKey, $nodeId, json_encode($nodeConfig));
return true;
} catch (\Throwable $throwable) {
// 如果該 redis 斷線則銷毀
$redisPool->unsetObj($redis);
} finally {
$redisPool->recycleObj($redis);
}
return false;
}
function offline(ServiceNode $serviceNode): bool
{
$redisPool = $this->pool;;
/** @var Redis $redis */
$redis = $redisPool->getObj(15);
try {
$serviceName = $serviceNode->getService();
$nodeId = $serviceNode->getNodeId();
$hashKey = "{$this->redisKey}_{$serviceName}";
$redis->hDel($hashKey, $nodeId);
return true;
} catch (\Throwable $throwable) {
// 如果該 redis 斷線則銷毀
$redisPool->unsetObj($redis);
} finally {
$redisPool->recycleObj($redis);
}
return false;
}
function alive(ServiceNode $serviceNode): bool
{
$info = [
'service' => $serviceNode->getService(),
'ip' => $serviceNode->getIp(),
'port' => $serviceNode->getPort(),
'version' => $serviceNode->getVersion(),
'lastHeartbeat' => time(),
'lastFailTime' => 0
];
$redisPool = $this->pool;;
/** @var Redis $redis */
$redis = $redisPool->getObj();
try {
$serviceName = $serviceNode->getService();
$nodeId = $serviceNode->getNodeId();
$hashKey = "{$this->redisKey}_{$serviceName}";
$redis->hSet($hashKey, $nodeId, json_encode($info));
return true;
} catch (\Throwable $throwable) {
// 如果該 redis 斷線則銷毀
$redisPool->unsetObj($redis);
} finally {
$redisPool->recycleObj($redis);
}
return false;
}
private function deleteServiceNode($serviceName, $failKey): bool
{
$redisPool = $this->pool;;
/** @var Redis $redis */
$redis = $redisPool->getObj(15);
try {
$redis->hDel("{$this->redisKey}_{$serviceName}", $failKey);
return true;
} catch (\Throwable $throwable) {
$redisPool->unsetObj($redis);
} finally {
$redisPool->recycleObj($redis);
}
return false;
}
}
/** @var \EasySwoole\Rpc\Config $config */
$assistConfig = $config->getAssist();
// 服務(wù)定時(shí)自刷新到節(jié)點(diǎn)管理器
$assistConfig->setAliveInterval(5000);
即使關(guān)閉了
UDP
定時(shí)廣播,EasySwoole Rpc
的AssistWorker
進(jìn)程依舊會(huì)每 5 秒執(zhí)行一次serviceAlive
用于更新自身的節(jié)點(diǎn)心跳信息。
注冊(cè)
<?php
namespace EasySwoole\EasySwoole;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\EasySwoole\Swoole\EventRegister;use EasySwoole\Redis\Config\RedisConfig;use EasySwoole\RedisPool\Pool;use EasySwoole\RedisPool\RedisPool;
class EasySwooleEvent implements Event
{
public static function initialize()
{
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(EventRegister $register)
{
###### 注冊(cè) rpc 服務(wù) ######
/** rpc 服務(wù)端配置 */
// 采用了redis 節(jié)點(diǎn)管理器 可以關(guān)閉udp 廣播了。
$redisM = new RedisManager(new Pool(new RedisConfig(['host' => '127.0.0.1'])));
$config = new \EasySwoole\Rpc\Config($redisM);
$config->setNodeId('EasySwooleRpcNode1');
$config->setServerName('EasySwoole'); // 默認(rèn) EasySwoole
$config->setOnException(function (\Throwable $throwable) {
});
$serverConfig = $config->getServer();
$serverConfig->setServerIp('127.0.0.1');
// rpc 具體配置請(qǐng)看配置章節(jié)
$rpc = new \EasySwoole\Rpc\Rpc($config);
// 創(chuàng)建 Goods 服務(wù)
$goodsService = new \App\RpcServices\Goods();
// 添加 GoodsModule 模塊到 Goods 服務(wù)中
$goodsService->addModule(new \App\RpcServices\GoodsModule());
// 添加 Goods 服務(wù)到服務(wù)管理器中
$rpc->serviceManager()->addService($goodsService);
// 創(chuàng)建 Common 服務(wù)
$commonService = new \App\RpcServices\Common();
// 添加 CommonModule 模塊到 Common 服務(wù)中
$commonService->addModule(new \App\RpcServices\CommonModule());
// 添加 Common 服務(wù)到服務(wù)管理器中
$rpc->serviceManager()->addService($commonService);
// 此刻的rpc實(shí)例需要保存下來 或者采用單例模式繼承整個(gè)Rpc類進(jìn)行注冊(cè) 或者使用Di
// 注冊(cè) rpc 服務(wù)
$rpc->attachServer(ServerManager::getInstance()->getSwooleServer());
}
}