Gateway.php 48 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381
  1. <?php
  2. namespace GatewayClient;
  3. use \Exception;
  4. /**
  5. * This file is part of workerman.
  6. *
  7. * Licensed under The MIT License
  8. * For full copyright and license information, please see the MIT-LICENSE.txt
  9. * Redistributions of files must retain the above copyright notice.
  10. *
  11. * @author walkor<walkor@workerman.net>
  12. * @copyright walkor<walkor@workerman.net>
  13. * @link http://www.workerman.net/
  14. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  15. */
  16. if (! class_exists(\Composer\Autoload\ClassLoader::class)) {
  17. require_once __DIR__ . '/Context.php';
  18. require_once __DIR__ . '/GatewayProtocol.php';
  19. }
  20. /**
  21. * 数据发送相关
  22. */
  23. class Gateway
  24. {
  25. /**
  26. * gateway 实例
  27. *
  28. * @var object
  29. */
  30. protected static $businessWorker = null;
  31. /**
  32. * 注册中心地址
  33. *
  34. * @var string|array
  35. */
  36. public static $registerAddress = '127.0.0.1:1236';
  37. /**
  38. * 秘钥
  39. * @var string
  40. */
  41. public static $secretKey = '';
  42. /**
  43. * 链接超时时间
  44. * @var int
  45. */
  46. public static $connectTimeout = 3;
  47. /**
  48. * 与Gateway是否是长链接
  49. * @var bool
  50. */
  51. public static $persistentConnection = false;
  52. /**
  53. * 是否清除注册地址缓存
  54. * @var bool
  55. */
  56. public static $addressesCacheDisable = false;
  57. /**
  58. * 向所有客户端连接(或者 client_id_array 指定的客户端连接)广播消息
  59. *
  60. * @param string $message 向客户端发送的消息
  61. * @param array $client_id_array 客户端 id 数组
  62. * @param array $exclude_client_id 不给这些client_id发
  63. * @param bool $raw 是否发送原始数据(即不调用gateway的协议的encode方法)
  64. * @return void
  65. * @throws Exception
  66. */
  67. public static function sendToAll($message, $client_id_array = null, $exclude_client_id = null, $raw = false)
  68. {
  69. $gateway_data = GatewayProtocol::$empty;
  70. $gateway_data['cmd'] = GatewayProtocol::CMD_SEND_TO_ALL;
  71. $gateway_data['body'] = $message;
  72. if ($raw) {
  73. $gateway_data['flag'] |= GatewayProtocol::FLAG_NOT_CALL_ENCODE;
  74. }
  75. if ($exclude_client_id) {
  76. if (!is_array($exclude_client_id)) {
  77. $exclude_client_id = array($exclude_client_id);
  78. }
  79. if ($client_id_array) {
  80. $exclude_client_id = array_flip($exclude_client_id);
  81. }
  82. }
  83. if ($client_id_array) {
  84. if (!is_array($client_id_array)) {
  85. echo new \Exception('bad $client_id_array:'.var_export($client_id_array, true));
  86. return;
  87. }
  88. $data_array = array();
  89. foreach ($client_id_array as $client_id) {
  90. if (isset($exclude_client_id[$client_id])) {
  91. continue;
  92. }
  93. $address = Context::clientIdToAddress($client_id);
  94. if ($address) {
  95. $key = long2ip($address['local_ip']) . ":{$address['local_port']}";
  96. $data_array[$key][$address['connection_id']] = $address['connection_id'];
  97. }
  98. }
  99. foreach ($data_array as $addr => $connection_id_list) {
  100. $the_gateway_data = $gateway_data;
  101. $the_gateway_data['ext_data'] = json_encode(array('connections' => $connection_id_list));
  102. static::sendToGateway($addr, $the_gateway_data);
  103. }
  104. return;
  105. } elseif (empty($client_id_array) && is_array($client_id_array)) {
  106. return;
  107. }
  108. if (!$exclude_client_id) {
  109. return static::sendToAllGateway($gateway_data);
  110. }
  111. $address_connection_array = static::clientIdArrayToAddressArray($exclude_client_id);
  112. // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
  113. if (static::$businessWorker) {
  114. foreach (static::$businessWorker->gatewayConnections as $address => $gateway_connection) {
  115. $gateway_data['ext_data'] = isset($address_connection_array[$address]) ?
  116. json_encode(array('exclude'=> $address_connection_array[$address])) : '';
  117. /** @var TcpConnection $gateway_connection */
  118. $gateway_connection->send($gateway_data);
  119. }
  120. } // 运行在其它环境中,通过注册中心得到gateway地址
  121. else {
  122. $all_addresses = static::getAllGatewayAddressesFromRegister();
  123. foreach ($all_addresses as $address) {
  124. $gateway_data['ext_data'] = isset($address_connection_array[$address]) ?
  125. json_encode(array('exclude'=> $address_connection_array[$address])) : '';
  126. static::sendToGateway($address, $gateway_data);
  127. }
  128. }
  129. }
  130. /**
  131. * 向某个client_id对应的连接发消息
  132. *
  133. * @param string $client_id
  134. * @param string $message
  135. * @return void
  136. */
  137. public static function sendToClient($client_id, $message)
  138. {
  139. return static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_SEND_TO_ONE, $message);
  140. }
  141. /**
  142. * 判断某个uid是否在线
  143. *
  144. * @param string $uid
  145. * @return int 0|1
  146. */
  147. public static function isUidOnline($uid)
  148. {
  149. return (int)static::getClientIdByUid($uid);
  150. }
  151. public static function isUidsOnline($uids)
  152. {
  153. return array_map(function ($item) {
  154. return (int) $item;
  155. }, static::batchGetClientIdByUid($uids));
  156. }
  157. /**
  158. * 判断client_id对应的连接是否在线
  159. *
  160. * @param string $client_id
  161. * @return int 0|1
  162. */
  163. public static function isOnline($client_id)
  164. {
  165. $address_data = Context::clientIdToAddress($client_id);
  166. if (!$address_data) {
  167. return 0;
  168. }
  169. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  170. if (isset(static::$businessWorker)) {
  171. if (!isset(static::$businessWorker->gatewayConnections[$address])) {
  172. return 0;
  173. }
  174. }
  175. $gateway_data = GatewayProtocol::$empty;
  176. $gateway_data['cmd'] = GatewayProtocol::CMD_IS_ONLINE;
  177. $gateway_data['connection_id'] = $address_data['connection_id'];
  178. return (int)static::sendAndRecv($address, $gateway_data);
  179. }
  180. /**
  181. * 获取所有在线用户的session,client_id为 key(弃用,请用getAllClientSessions代替)
  182. *
  183. * @param string $group
  184. * @return array
  185. */
  186. public static function getAllClientInfo($group = '')
  187. {
  188. echo "Warning: Gateway::getAllClientInfo is deprecated and will be removed in a future, please use Gateway::getAllClientSessions instead.";
  189. return static::getAllClientSessions($group);
  190. }
  191. /**
  192. * 获取所有在线client_id的session,client_id为 key
  193. *
  194. * @param string $group
  195. * @return array
  196. */
  197. public static function getAllClientSessions($group = '')
  198. {
  199. $gateway_data = GatewayProtocol::$empty;
  200. if (!$group) {
  201. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_ALL_CLIENT_SESSIONS;
  202. } else {
  203. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_CLIENT_SESSIONS_BY_GROUP;
  204. $gateway_data['ext_data'] = $group;
  205. }
  206. $status_data = array();
  207. $all_buffer_array = static::getBufferFromAllGateway($gateway_data);
  208. foreach ($all_buffer_array as $local_ip => $buffer_array) {
  209. foreach ($buffer_array as $local_port => $data) {
  210. if ($data) {
  211. foreach ($data as $connection_id => $session_buffer) {
  212. $client_id = Context::addressToClientId($local_ip, $local_port, $connection_id);
  213. if ($client_id === Context::$client_id) {
  214. $status_data[$client_id] = (array)$_SESSION;
  215. } else {
  216. $status_data[$client_id] = $session_buffer ? Context::sessionDecode($session_buffer) : array();
  217. }
  218. }
  219. }
  220. }
  221. }
  222. return $status_data;
  223. }
  224. /**
  225. * 获取某个组的连接信息(弃用,请用getClientSessionsByGroup代替)
  226. *
  227. * @param string $group
  228. * @return array
  229. */
  230. public static function getClientInfoByGroup($group)
  231. {
  232. echo "Warning: Gateway::getClientInfoByGroup is deprecated and will be removed in a future, please use Gateway::getClientSessionsByGroup instead.";
  233. return static::getAllClientSessions($group);
  234. }
  235. /**
  236. * 获取某个组的所有client_id的session信息
  237. *
  238. * @param string $group
  239. *
  240. * @return array
  241. */
  242. public static function getClientSessionsByGroup($group)
  243. {
  244. if (static::isValidGroupId($group)) {
  245. return static::getAllClientSessions($group);
  246. }
  247. return array();
  248. }
  249. /**
  250. * 获取所有在线client_id数
  251. *
  252. * @return int
  253. */
  254. public static function getAllClientIdCount()
  255. {
  256. return static::getClientCountByGroup();
  257. }
  258. /**
  259. * 获取所有在线client_id数(getAllClientIdCount的别名)
  260. *
  261. * @return int
  262. */
  263. public static function getAllClientCount()
  264. {
  265. return static::getAllClientIdCount();
  266. }
  267. /**
  268. * 获取某个组的在线client_id数
  269. *
  270. * @param string $group
  271. * @return int
  272. */
  273. public static function getClientIdCountByGroup($group = '')
  274. {
  275. $gateway_data = GatewayProtocol::$empty;
  276. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_CLIENT_COUNT_BY_GROUP;
  277. $gateway_data['ext_data'] = $group;
  278. $total_count = 0;
  279. $all_buffer_array = static::getBufferFromAllGateway($gateway_data);
  280. foreach ($all_buffer_array as $local_ip => $buffer_array) {
  281. foreach ($buffer_array as $local_port => $count) {
  282. if ($count) {
  283. $total_count += $count;
  284. }
  285. }
  286. }
  287. return $total_count;
  288. }
  289. /**
  290. * getClientIdCountByGroup 函数的别名
  291. *
  292. * @param string $group
  293. * @return int
  294. */
  295. public static function getClientCountByGroup($group = '')
  296. {
  297. return static::getClientIdCountByGroup($group);
  298. }
  299. /**
  300. * 获取某个群组在线client_id列表
  301. *
  302. * @param string $group
  303. * @return array
  304. */
  305. public static function getClientIdListByGroup($group)
  306. {
  307. if (!static::isValidGroupId($group)) {
  308. return array();
  309. }
  310. $data = static::select(array('uid'), array('groups' => is_array($group) ? $group : array($group)));
  311. $client_id_map = array();
  312. foreach ($data as $local_ip => $buffer_array) {
  313. foreach ($buffer_array as $local_port => $items) {
  314. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  315. foreach ($items as $connection_id => $info) {
  316. $client_id = Context::addressToClientId($local_ip, $local_port, $connection_id);
  317. $client_id_map[$client_id] = $client_id;
  318. }
  319. }
  320. }
  321. return $client_id_map;
  322. }
  323. /**
  324. * 获取集群所有在线client_id列表
  325. *
  326. * @return array
  327. */
  328. public static function getAllClientIdList()
  329. {
  330. return static::formatClientIdFromGatewayBuffer(static::select(array('uid')));
  331. }
  332. /**
  333. * 格式化client_id
  334. *
  335. * @param $data
  336. * @return array
  337. */
  338. protected static function formatClientIdFromGatewayBuffer($data)
  339. {
  340. $client_id_list = array();
  341. foreach ($data as $local_ip => $buffer_array) {
  342. foreach ($buffer_array as $local_port => $items) {
  343. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  344. foreach ($items as $connection_id => $info) {
  345. $client_id = Context::addressToClientId($local_ip, $local_port, $connection_id);
  346. $client_id_list[$client_id] = $client_id;
  347. }
  348. }
  349. }
  350. return $client_id_list;
  351. }
  352. /**
  353. * 获取与 uid 绑定的 client_id 列表
  354. *
  355. * @param string $uid
  356. * @return array
  357. */
  358. public static function getClientIdByUid($uid)
  359. {
  360. $gateway_data = GatewayProtocol::$empty;
  361. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_CLIENT_ID_BY_UID;
  362. $gateway_data['ext_data'] = $uid;
  363. $client_list = array();
  364. $all_buffer_array = static::getBufferFromAllGateway($gateway_data);
  365. foreach ($all_buffer_array as $local_ip => $buffer_array) {
  366. foreach ($buffer_array as $local_port => $connection_id_array) {
  367. if ($connection_id_array) {
  368. foreach ($connection_id_array as $connection_id) {
  369. $client_list[] = Context::addressToClientId($local_ip, $local_port, $connection_id);
  370. }
  371. }
  372. }
  373. }
  374. return $client_list;
  375. }
  376. /**
  377. * 批量获取与 uid 绑定的 client_id 列表
  378. *
  379. * @param array $uids
  380. * @return array
  381. */
  382. public static function batchGetClientIdByUid($uids)
  383. {
  384. $gateway_data = GatewayProtocol::$empty;
  385. $gateway_data['cmd'] = GatewayProtocol::CMD_BATCH_GET_CLIENT_ID_BY_UID;
  386. $gateway_data['ext_data'] = json_encode($uids);
  387. $client_list = [];
  388. $all_buffer_array = static::getBufferFromAllGateway($gateway_data);
  389. foreach ($all_buffer_array as $local_ip => $buffer_array) {
  390. foreach ($buffer_array as $local_port => $uid_connection_id_array) {
  391. if ($uid_connection_id_array) {
  392. foreach ($uid_connection_id_array as $uid => $connection_ids) {
  393. if (! isset($client_list[$uid])) {
  394. $client_list[$uid] = [];
  395. }
  396. foreach ($connection_ids as $connection_id) {
  397. $client_list[$uid][] = Context::addressToClientId($local_ip, $local_port, $connection_id);
  398. }
  399. }
  400. }
  401. }
  402. }
  403. return $client_list;
  404. }
  405. /**
  406. * 获取某个群组在线uid列表
  407. *
  408. * @param string $group
  409. * @return array
  410. */
  411. public static function getUidListByGroup($group)
  412. {
  413. if (!static::isValidGroupId($group)) {
  414. return array();
  415. }
  416. $group = is_array($group) ? $group : array($group);
  417. $data = static::select(array('uid'), array('groups' => $group));
  418. $uid_map = array();
  419. foreach ($data as $local_ip => $buffer_array) {
  420. foreach ($buffer_array as $local_port => $items) {
  421. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  422. foreach ($items as $connection_id => $info) {
  423. if (!empty($info['uid'])) {
  424. $uid_map[$info['uid']] = $info['uid'];
  425. }
  426. }
  427. }
  428. }
  429. return $uid_map;
  430. }
  431. /**
  432. * 获取某个群组在线uid数
  433. *
  434. * @param string $group
  435. * @return int
  436. */
  437. public static function getUidCountByGroup($group)
  438. {
  439. if (static::isValidGroupId($group)) {
  440. return count(static::getUidListByGroup($group));
  441. }
  442. return 0;
  443. }
  444. /**
  445. * 获取全局在线uid列表
  446. *
  447. * @return array
  448. */
  449. public static function getAllUidList()
  450. {
  451. $data = static::select(array('uid'));
  452. $uid_map = array();
  453. foreach ($data as $local_ip => $buffer_array) {
  454. foreach ($buffer_array as $local_port => $items) {
  455. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  456. foreach ($items as $connection_id => $info) {
  457. if (!empty($info['uid'])) {
  458. $uid_map[$info['uid']] = $info['uid'];
  459. }
  460. }
  461. }
  462. }
  463. return $uid_map;
  464. }
  465. /**
  466. * 获取全局在线uid数
  467. * @return int
  468. */
  469. public static function getAllUidCount()
  470. {
  471. return count(static::getAllUidList());
  472. }
  473. /**
  474. * 通过client_id获取uid
  475. *
  476. * @param $client_id
  477. * @return mixed
  478. */
  479. public static function getUidByClientId($client_id)
  480. {
  481. $data = static::select(array('uid'), array('client_id'=>array($client_id)));
  482. foreach ($data as $local_ip => $buffer_array) {
  483. foreach ($buffer_array as $local_port => $items) {
  484. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  485. foreach ($items as $info) {
  486. return $info['uid'];
  487. }
  488. }
  489. }
  490. }
  491. /**
  492. * 获取所有在线的群组id
  493. *
  494. * @return array
  495. */
  496. public static function getAllGroupIdList()
  497. {
  498. $gateway_data = GatewayProtocol::$empty;
  499. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_GROUP_ID_LIST;
  500. $group_id_list = array();
  501. $all_buffer_array = static::getBufferFromAllGateway($gateway_data);
  502. foreach ($all_buffer_array as $local_ip => $buffer_array) {
  503. foreach ($buffer_array as $local_port => $group_id_array) {
  504. if (is_array($group_id_array)) {
  505. foreach ($group_id_array as $group_id) {
  506. if (!isset($group_id_list[$group_id])) {
  507. $group_id_list[$group_id] = $group_id;
  508. }
  509. }
  510. }
  511. }
  512. }
  513. return $group_id_list;
  514. }
  515. /**
  516. * 获取所有在线分组的uid数量,也就是每个分组的在线用户数
  517. *
  518. * @return array
  519. */
  520. public static function getAllGroupUidCount()
  521. {
  522. $group_uid_map = static::getAllGroupUidList();
  523. $group_uid_count_map = array();
  524. foreach ($group_uid_map as $group_id => $uid_list) {
  525. $group_uid_count_map[$group_id] = count($uid_list);
  526. }
  527. return $group_uid_count_map;
  528. }
  529. /**
  530. * 获取所有分组uid在线列表
  531. *
  532. * @return array
  533. */
  534. public static function getAllGroupUidList()
  535. {
  536. $data = static::select(array('uid','groups'));
  537. $group_uid_map = array();
  538. foreach ($data as $local_ip => $buffer_array) {
  539. foreach ($buffer_array as $local_port => $items) {
  540. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  541. foreach ($items as $connection_id => $info) {
  542. if (empty($info['uid']) || empty($info['groups'])) {
  543. break;
  544. }
  545. $uid = $info['uid'];
  546. foreach ($info['groups'] as $group_id) {
  547. if(!isset($group_uid_map[$group_id])) {
  548. $group_uid_map[$group_id] = array();
  549. }
  550. $group_uid_map[$group_id][$uid] = $uid;
  551. }
  552. }
  553. }
  554. }
  555. return $group_uid_map;
  556. }
  557. /**
  558. * 获取所有群组在线client_id列表
  559. *
  560. * @return array
  561. */
  562. public static function getAllGroupClientIdList()
  563. {
  564. $data = static::select(array('groups'));
  565. $group_client_id_map = array();
  566. foreach ($data as $local_ip => $buffer_array) {
  567. foreach ($buffer_array as $local_port => $items) {
  568. //$items = ['connection_id'=>['uid'=>x, 'group'=>[x,x..], 'session'=>[..]], 'client_id'=>[..], ..];
  569. foreach ($items as $connection_id => $info) {
  570. if (empty($info['groups'])) {
  571. break;
  572. }
  573. $client_id = Context::addressToClientId($local_ip, $local_port, $connection_id);
  574. foreach ($info['groups'] as $group_id) {
  575. if(!isset($group_client_id_map[$group_id])) {
  576. $group_client_id_map[$group_id] = array();
  577. }
  578. $group_client_id_map[$group_id][$client_id] = $client_id;
  579. }
  580. }
  581. }
  582. }
  583. return $group_client_id_map;
  584. }
  585. /**
  586. * 获取所有群组在线client_id数量,也就是获取每个群组在线连接数
  587. *
  588. * @return array
  589. */
  590. public static function getAllGroupClientIdCount()
  591. {
  592. $group_client_map = static::getAllGroupClientIdList();
  593. $group_client_count_map = array();
  594. foreach ($group_client_map as $group_id => $client_id_list) {
  595. $group_client_count_map[$group_id] = count($client_id_list);
  596. }
  597. return $group_client_count_map;
  598. }
  599. /**
  600. * 根据条件到gateway搜索数据
  601. *
  602. * @param array $fields
  603. * @param array $where
  604. * @return array
  605. */
  606. protected static function select($fields = array('session','uid','groups'), $where = array())
  607. {
  608. $t = microtime(true);
  609. $gateway_data = GatewayProtocol::$empty;
  610. $gateway_data['cmd'] = GatewayProtocol::CMD_SELECT;
  611. $gateway_data['ext_data'] = array('fields' => $fields, 'where' => $where);
  612. $gateway_data_list = array();
  613. // 有client_id,能计算出需要和哪些gateway通讯,只和必要的gateway通讯能降低系统负载
  614. if (isset($where['client_id'])) {
  615. $client_id_list = $where['client_id'];
  616. unset($gateway_data['ext_data']['where']['client_id']);
  617. $gateway_data['ext_data']['where']['connection_id'] = array();
  618. foreach ($client_id_list as $client_id) {
  619. $address_data = Context::clientIdToAddress($client_id);
  620. if (!$address_data) {
  621. continue;
  622. }
  623. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  624. if (!isset($gateway_data_list[$address])) {
  625. $gateway_data_list[$address] = $gateway_data;
  626. }
  627. $gateway_data_list[$address]['ext_data']['where']['connection_id'][$address_data['connection_id']] = $address_data['connection_id'];
  628. }
  629. foreach ($gateway_data_list as $address => $item) {
  630. $gateway_data_list[$address]['ext_data'] = json_encode($item['ext_data']);
  631. }
  632. // 有其它条件,则还是需要向所有gateway发送
  633. if (count($where) !== 1) {
  634. $gateway_data['ext_data'] = json_encode($gateway_data['ext_data']);
  635. foreach (static::getAllGatewayAddress() as $address) {
  636. if (!isset($gateway_data_list[$address])) {
  637. $gateway_data_list[$address] = $gateway_data;
  638. }
  639. }
  640. }
  641. $data = static::getBufferFromSomeGateway($gateway_data_list);
  642. } else {
  643. $gateway_data['ext_data'] = json_encode($gateway_data['ext_data']);
  644. $data = static::getBufferFromAllGateway($gateway_data);
  645. }
  646. return $data;
  647. }
  648. /**
  649. * 生成验证包,用于验证此客户端的合法性
  650. *
  651. * @return string
  652. */
  653. protected static function generateAuthBuffer()
  654. {
  655. $gateway_data = GatewayProtocol::$empty;
  656. $gateway_data['cmd'] = GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT;
  657. $gateway_data['body'] = json_encode(array(
  658. 'secret_key' => static::$secretKey,
  659. ));
  660. return GatewayProtocol::encode($gateway_data);
  661. }
  662. /**
  663. * 批量向某些gateway发包,并得到返回数组
  664. *
  665. * @param array $gateway_data_array
  666. * @return array
  667. * @throws Exception
  668. */
  669. protected static function getBufferFromSomeGateway($gateway_data_array)
  670. {
  671. $gateway_buffer_array = array();
  672. $auth_buffer = static::$secretKey ? static::generateAuthBuffer() : '';
  673. foreach ($gateway_data_array as $address => $gateway_data) {
  674. if ($auth_buffer) {
  675. $gateway_buffer_array[$address] = $auth_buffer.GatewayProtocol::encode($gateway_data);
  676. } else {
  677. $gateway_buffer_array[$address] = GatewayProtocol::encode($gateway_data);
  678. }
  679. }
  680. return static::getBufferFromGateway($gateway_buffer_array);
  681. }
  682. /**
  683. * 批量向所有 gateway 发包,并得到返回数组
  684. *
  685. * @param string|array $gateway_data
  686. * @return array
  687. * @throws Exception
  688. */
  689. protected static function getBufferFromAllGateway($gateway_data)
  690. {
  691. $addresses = static::getAllGatewayAddress();
  692. $gateway_buffer_array = array();
  693. $gateway_buffer = GatewayProtocol::encode($gateway_data);
  694. $gateway_buffer = static::$secretKey ? static::generateAuthBuffer() . $gateway_buffer : $gateway_buffer;
  695. foreach ($addresses as $address) {
  696. $gateway_buffer_array[$address] = $gateway_buffer;
  697. }
  698. return static::getBufferFromGateway($gateway_buffer_array);
  699. }
  700. /**
  701. * 获取所有gateway内部通讯地址
  702. *
  703. * @return array
  704. * @throws Exception
  705. */
  706. protected static function getAllGatewayAddress()
  707. {
  708. if (isset(static::$businessWorker)) {
  709. $addresses = static::$businessWorker->getAllGatewayAddresses();
  710. if (empty($addresses)) {
  711. throw new Exception('businessWorker::getAllGatewayAddresses return empty');
  712. }
  713. } else {
  714. $addresses = static::getAllGatewayAddressesFromRegister();
  715. if (empty($addresses)) {
  716. return array();
  717. }
  718. }
  719. return $addresses;
  720. }
  721. /**
  722. * 批量向gateway发送并获取数据
  723. * @param $gateway_buffer_array
  724. * @return array
  725. */
  726. protected static function getBufferFromGateway($gateway_buffer_array)
  727. {
  728. $client_array = $status_data = $client_address_map = $receive_buffer_array = $recv_length_array = array();
  729. // 批量向所有gateway进程发送请求数据
  730. foreach ($gateway_buffer_array as $address => $gateway_buffer) {
  731. $client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout);
  732. if ($client && strlen($gateway_buffer) === stream_socket_sendto($client, $gateway_buffer)) {
  733. $socket_id = (int)$client;
  734. $client_array[$socket_id] = $client;
  735. $client_address_map[$socket_id] = explode(':', $address);
  736. $receive_buffer_array[$socket_id] = '';
  737. }
  738. }
  739. // 超时5秒
  740. $timeout = 5;
  741. $time_start = microtime(true);
  742. // 批量接收请求
  743. while (count($client_array) > 0) {
  744. $write = $except = array();
  745. $read = $client_array;
  746. if (@stream_select($read, $write, $except, $timeout)) {
  747. foreach ($read as $client) {
  748. $socket_id = (int)$client;
  749. $buffer = stream_socket_recvfrom($client, 65535);
  750. if ($buffer !== '' && $buffer !== false) {
  751. $receive_buffer_array[$socket_id] .= $buffer;
  752. $receive_length = strlen($receive_buffer_array[$socket_id]);
  753. if (empty($recv_length_array[$socket_id]) && $receive_length >= 4) {
  754. $recv_length_array[$socket_id] = current(unpack('N', $receive_buffer_array[$socket_id]));
  755. }
  756. if (!empty($recv_length_array[$socket_id]) && $receive_length >= $recv_length_array[$socket_id] + 4) {
  757. unset($client_array[$socket_id]);
  758. }
  759. } elseif (feof($client)) {
  760. unset($client_array[$socket_id]);
  761. }
  762. }
  763. }
  764. if (microtime(true) - $time_start > $timeout) {
  765. break;
  766. }
  767. }
  768. $format_buffer_array = array();
  769. foreach ($receive_buffer_array as $socket_id => $buffer) {
  770. $local_ip = ip2long($client_address_map[$socket_id][0]);
  771. $local_port = $client_address_map[$socket_id][1];
  772. $format_buffer_array[$local_ip][$local_port] = unserialize(substr($buffer, 4));
  773. }
  774. return $format_buffer_array;
  775. }
  776. /**
  777. * 踢掉某个客户端,并以$message通知被踢掉客户端
  778. *
  779. * @param string $client_id
  780. * @param string $message
  781. * @return void
  782. */
  783. public static function closeClient($client_id, $message = null)
  784. {
  785. if ($client_id === Context::$client_id) {
  786. return static::closeCurrentClient($message);
  787. } // 不是发给当前用户则使用存储中的地址
  788. else {
  789. $address_data = Context::clientIdToAddress($client_id);
  790. if (!$address_data) {
  791. return false;
  792. }
  793. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  794. return static::kickAddress($address, $address_data['connection_id'], $message);
  795. }
  796. }
  797. /**
  798. * 踢掉某个客户端并直接立即销毁相关连接
  799. *
  800. * @param string $client_id
  801. * @return bool
  802. */
  803. public static function destoryClient($client_id)
  804. {
  805. if ($client_id === Context::$client_id) {
  806. return static::destoryCurrentClient();
  807. } // 不是发给当前用户则使用存储中的地址
  808. else {
  809. $address_data = Context::clientIdToAddress($client_id);
  810. if (!$address_data) {
  811. return false;
  812. }
  813. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  814. return static::destroyAddress($address, $address_data['connection_id']);
  815. }
  816. }
  817. /**
  818. * 踢掉当前客户端并直接立即销毁相关连接
  819. *
  820. * @return bool
  821. * @throws Exception
  822. */
  823. public static function destoryCurrentClient()
  824. {
  825. if (!Context::$connection_id) {
  826. throw new Exception('destoryCurrentClient can not be called in async context');
  827. }
  828. $address = long2ip(Context::$local_ip) . ':' . Context::$local_port;
  829. return static::destroyAddress($address, Context::$connection_id);
  830. }
  831. /**
  832. * 将 client_id 与 uid 绑定
  833. *
  834. * @param string $client_id
  835. * @param int|string $uid
  836. * @return void
  837. */
  838. public static function bindUid($client_id, $uid)
  839. {
  840. static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_BIND_UID, '', $uid);
  841. }
  842. /**
  843. * 将 client_id 与 uid 解除绑定
  844. *
  845. * @param string $client_id
  846. * @param int|string $uid
  847. * @return void
  848. */
  849. public static function unbindUid($client_id, $uid)
  850. {
  851. static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_UNBIND_UID, '', $uid);
  852. }
  853. /**
  854. * 将 client_id 加入组
  855. *
  856. * @param string $client_id
  857. * @param int|string $group
  858. * @return void
  859. */
  860. public static function joinGroup($client_id, $group)
  861. {
  862. static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_JOIN_GROUP, '', $group);
  863. }
  864. /**
  865. * 将 client_id 离开组
  866. *
  867. * @param string $client_id
  868. * @param int|string $group
  869. *
  870. * @return void
  871. */
  872. public static function leaveGroup($client_id, $group)
  873. {
  874. static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_LEAVE_GROUP, '', $group);
  875. }
  876. /**
  877. * 取消分组
  878. *
  879. * @param int|string $group
  880. *
  881. * @return void
  882. */
  883. public static function ungroup($group)
  884. {
  885. if (!static::isValidGroupId($group)) {
  886. return false;
  887. }
  888. $gateway_data = GatewayProtocol::$empty;
  889. $gateway_data['cmd'] = GatewayProtocol::CMD_UNGROUP;
  890. $gateway_data['ext_data'] = $group;
  891. return static::sendToAllGateway($gateway_data);
  892. }
  893. /**
  894. * 向所有 uid 发送
  895. *
  896. * @param int|string|array $uid
  897. * @param string $message
  898. *
  899. * @return void
  900. */
  901. public static function sendToUid($uid, $message)
  902. {
  903. $gateway_data = GatewayProtocol::$empty;
  904. $gateway_data['cmd'] = GatewayProtocol::CMD_SEND_TO_UID;
  905. $gateway_data['body'] = $message;
  906. if (!is_array($uid)) {
  907. $uid = array($uid);
  908. }
  909. $gateway_data['ext_data'] = json_encode($uid);
  910. static::sendToAllGateway($gateway_data);
  911. }
  912. /**
  913. * 向 group 发送
  914. *
  915. * @param int|string|array $group 组(不允许是 0 '0' false null array()等为空的值)
  916. * @param string $message 消息
  917. * @param array $exclude_client_id 不给这些client_id发
  918. * @param bool $raw 发送原始数据(即不调用gateway的协议的encode方法)
  919. *
  920. * @return void
  921. */
  922. public static function sendToGroup($group, $message, $exclude_client_id = null, $raw = false)
  923. {
  924. if (!static::isValidGroupId($group)) {
  925. return false;
  926. }
  927. $gateway_data = GatewayProtocol::$empty;
  928. $gateway_data['cmd'] = GatewayProtocol::CMD_SEND_TO_GROUP;
  929. $gateway_data['body'] = $message;
  930. if ($raw) {
  931. $gateway_data['flag'] |= GatewayProtocol::FLAG_NOT_CALL_ENCODE;
  932. }
  933. if (!is_array($group)) {
  934. $group = array($group);
  935. }
  936. // 分组发送,没有排除的client_id,直接发送
  937. $default_ext_data_buffer = json_encode(array('group'=> $group, 'exclude'=> null));
  938. if (empty($exclude_client_id)) {
  939. $gateway_data['ext_data'] = $default_ext_data_buffer;
  940. return static::sendToAllGateway($gateway_data);
  941. }
  942. // 分组发送,有排除的client_id,需要将client_id转换成对应gateway进程内的connectionId
  943. if (!is_array($exclude_client_id)) {
  944. $exclude_client_id = array($exclude_client_id);
  945. }
  946. $address_connection_array = static::clientIdArrayToAddressArray($exclude_client_id);
  947. // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
  948. if (static::$businessWorker) {
  949. foreach (static::$businessWorker->gatewayConnections as $address => $gateway_connection) {
  950. $gateway_data['ext_data'] = isset($address_connection_array[$address]) ?
  951. json_encode(array('group'=> $group, 'exclude'=> $address_connection_array[$address])) :
  952. $default_ext_data_buffer;
  953. /** @var TcpConnection $gateway_connection */
  954. $gateway_connection->send($gateway_data);
  955. }
  956. } // 运行在其它环境中,通过注册中心得到gateway地址
  957. else {
  958. $addresses = static::getAllGatewayAddressesFromRegister();
  959. foreach ($addresses as $address) {
  960. $gateway_data['ext_data'] = isset($address_connection_array[$address]) ?
  961. json_encode(array('group'=> $group, 'exclude'=> $address_connection_array[$address])) :
  962. $default_ext_data_buffer;
  963. static::sendToGateway($address, $gateway_data);
  964. }
  965. }
  966. }
  967. /**
  968. * 更新 session,框架自动调用,开发者不要调用
  969. *
  970. * @param string $client_id
  971. * @param string $session_str
  972. * @return bool
  973. */
  974. public static function setSocketSession($client_id, $session_str)
  975. {
  976. return static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_SET_SESSION, '', $session_str);
  977. }
  978. /**
  979. * 设置 session,原session值会被覆盖
  980. *
  981. * @param string $client_id
  982. * @param array $session
  983. *
  984. * @return void
  985. */
  986. public static function setSession($client_id, array $session)
  987. {
  988. if (Context::$client_id === $client_id) {
  989. $_SESSION = $session;
  990. Context::$old_session = $_SESSION;
  991. }
  992. static::setSocketSession($client_id, Context::sessionEncode($session));
  993. }
  994. /**
  995. * 更新 session,实际上是与老的session合并
  996. *
  997. * @param string $client_id
  998. * @param array $session
  999. *
  1000. * @return void
  1001. */
  1002. public static function updateSession($client_id, array $session)
  1003. {
  1004. if (Context::$client_id === $client_id) {
  1005. $_SESSION = array_replace_recursive((array)$_SESSION, $session);
  1006. Context::$old_session = $_SESSION;
  1007. }
  1008. static::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_UPDATE_SESSION, '', Context::sessionEncode($session));
  1009. }
  1010. /**
  1011. * 获取某个client_id的session
  1012. *
  1013. * @param string $client_id
  1014. * @return mixed false表示出错、null表示用户不存在、array表示具体的session信息
  1015. */
  1016. public static function getSession($client_id)
  1017. {
  1018. $address_data = Context::clientIdToAddress($client_id);
  1019. if (!$address_data) {
  1020. return false;
  1021. }
  1022. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  1023. if (isset(static::$businessWorker)) {
  1024. if (!isset(static::$businessWorker->gatewayConnections[$address])) {
  1025. return null;
  1026. }
  1027. }
  1028. $gateway_data = GatewayProtocol::$empty;
  1029. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_SESSION_BY_CLIENT_ID;
  1030. $gateway_data['connection_id'] = $address_data['connection_id'];
  1031. return static::sendAndRecv($address, $gateway_data);
  1032. }
  1033. /**
  1034. * 向某个用户网关发送命令和消息
  1035. *
  1036. * @param string $client_id
  1037. * @param int $cmd
  1038. * @param string $message
  1039. * @param string $ext_data
  1040. * @return boolean
  1041. */
  1042. protected static function sendCmdAndMessageToClient($client_id, $cmd, $message, $ext_data = '')
  1043. {
  1044. // 如果是发给当前用户则直接获取上下文中的地址
  1045. if ($client_id === Context::$client_id || $client_id === null) {
  1046. $address = long2ip(Context::$local_ip) . ':' . Context::$local_port;
  1047. $connection_id = Context::$connection_id;
  1048. } else {
  1049. $address_data = Context::clientIdToAddress($client_id);
  1050. if (!$address_data) {
  1051. return false;
  1052. }
  1053. $address = long2ip($address_data['local_ip']) . ":{$address_data['local_port']}";
  1054. $connection_id = $address_data['connection_id'];
  1055. }
  1056. $gateway_data = GatewayProtocol::$empty;
  1057. $gateway_data['cmd'] = $cmd;
  1058. $gateway_data['connection_id'] = $connection_id;
  1059. $gateway_data['body'] = $message;
  1060. if (!empty($ext_data)) {
  1061. $gateway_data['ext_data'] = $ext_data;
  1062. }
  1063. return static::sendToGateway($address, $gateway_data);
  1064. }
  1065. /**
  1066. * 发送数据并返回
  1067. *
  1068. * @param int $address
  1069. * @param mixed $data
  1070. * @return bool
  1071. * @throws Exception
  1072. */
  1073. protected static function sendAndRecv($address, $data)
  1074. {
  1075. $buffer = GatewayProtocol::encode($data);
  1076. $buffer = static::$secretKey ? static::generateAuthBuffer() . $buffer : $buffer;
  1077. $client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout);
  1078. if (!$client) {
  1079. throw new Exception("can not connect to tcp://$address $errmsg");
  1080. }
  1081. if (strlen($buffer) === stream_socket_sendto($client, $buffer)) {
  1082. $timeout = 5;
  1083. // 阻塞读
  1084. stream_set_blocking($client, 1);
  1085. // 1秒超时
  1086. stream_set_timeout($client, 1);
  1087. $all_buffer = '';
  1088. $time_start = microtime(true);
  1089. $pack_len = 0;
  1090. while (1) {
  1091. $buf = stream_socket_recvfrom($client, 655350);
  1092. if ($buf !== '' && $buf !== false) {
  1093. $all_buffer .= $buf;
  1094. } else {
  1095. if (feof($client)) {
  1096. throw new Exception("connection close tcp://$address");
  1097. } elseif (microtime(true) - $time_start > $timeout) {
  1098. break;
  1099. }
  1100. continue;
  1101. }
  1102. $recv_len = strlen($all_buffer);
  1103. if (!$pack_len && $recv_len >= 4) {
  1104. $pack_len= current(unpack('N', $all_buffer));
  1105. }
  1106. // 回复的数据都是以\n结尾
  1107. if (($pack_len && $recv_len >= $pack_len + 4) || microtime(true) - $time_start > $timeout) {
  1108. break;
  1109. }
  1110. }
  1111. // 返回结果
  1112. return unserialize(substr($all_buffer, 4));
  1113. } else {
  1114. throw new Exception("sendAndRecv($address, \$bufer) fail ! Can not send data!", 502);
  1115. }
  1116. }
  1117. /**
  1118. * 发送数据到网关
  1119. *
  1120. * @param string $address
  1121. * @param array $gateway_data
  1122. * @return bool
  1123. */
  1124. protected static function sendToGateway($address, $gateway_data)
  1125. {
  1126. return static::sendBufferToGateway($address, GatewayProtocol::encode($gateway_data));
  1127. }
  1128. /**
  1129. * 发送buffer数据到网关
  1130. * @param string $address
  1131. * @param string $gateway_buffer
  1132. * @return bool
  1133. */
  1134. protected static function sendBufferToGateway($address, $gateway_buffer)
  1135. {
  1136. // 有$businessWorker说明是workerman环境,使用$businessWorker发送数据
  1137. if (static::$businessWorker) {
  1138. if (!isset(static::$businessWorker->gatewayConnections[$address])) {
  1139. return false;
  1140. }
  1141. return static::$businessWorker->gatewayConnections[$address]->send($gateway_buffer, true);
  1142. }
  1143. // 非workerman环境
  1144. $gateway_buffer = static::$secretKey ? static::generateAuthBuffer() . $gateway_buffer : $gateway_buffer;
  1145. $flag = static::$persistentConnection ? STREAM_CLIENT_PERSISTENT | STREAM_CLIENT_CONNECT : STREAM_CLIENT_CONNECT;
  1146. $client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout, $flag);
  1147. return strlen($gateway_buffer) == stream_socket_sendto($client, $gateway_buffer);
  1148. }
  1149. /**
  1150. * 向所有 gateway 发送数据
  1151. *
  1152. * @param string $gateway_data
  1153. * @throws Exception
  1154. *
  1155. * @return void
  1156. */
  1157. protected static function sendToAllGateway($gateway_data)
  1158. {
  1159. $buffer = GatewayProtocol::encode($gateway_data);
  1160. // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
  1161. if (static::$businessWorker) {
  1162. foreach (static::$businessWorker->gatewayConnections as $gateway_connection) {
  1163. /** @var TcpConnection $gateway_connection */
  1164. $gateway_connection->send($buffer, true);
  1165. }
  1166. } // 运行在其它环境中,通过注册中心得到gateway地址
  1167. else {
  1168. $all_addresses = static::getAllGatewayAddressesFromRegister();
  1169. foreach ($all_addresses as $address) {
  1170. static::sendBufferToGateway($address, $buffer);
  1171. }
  1172. }
  1173. }
  1174. /**
  1175. * 踢掉某个网关的 socket
  1176. *
  1177. * @param string $address
  1178. * @param int $connection_id
  1179. * @return bool
  1180. */
  1181. protected static function kickAddress($address, $connection_id, $message)
  1182. {
  1183. $gateway_data = GatewayProtocol::$empty;
  1184. $gateway_data['cmd'] = GatewayProtocol::CMD_KICK;
  1185. $gateway_data['connection_id'] = $connection_id;
  1186. $gateway_data['body'] = $message;
  1187. return static::sendToGateway($address, $gateway_data);
  1188. }
  1189. /**
  1190. * 销毁某个网关的 socket
  1191. *
  1192. * @param string $address
  1193. * @param int $connection_id
  1194. * @return bool
  1195. */
  1196. protected static function destroyAddress($address, $connection_id)
  1197. {
  1198. $gateway_data = GatewayProtocol::$empty;
  1199. $gateway_data['cmd'] = GatewayProtocol::CMD_DESTROY;
  1200. $gateway_data['connection_id'] = $connection_id;
  1201. return static::sendToGateway($address, $gateway_data);
  1202. }
  1203. /**
  1204. * 将clientid数组转换成address数组
  1205. *
  1206. * @param array $client_id_array
  1207. * @return array
  1208. */
  1209. protected static function clientIdArrayToAddressArray(array $client_id_array)
  1210. {
  1211. $address_connection_array = array();
  1212. foreach ($client_id_array as $client_id) {
  1213. $address_data = Context::clientIdToAddress($client_id);
  1214. if ($address_data) {
  1215. $address = long2ip($address_data['local_ip']) .
  1216. ":{$address_data['local_port']}";
  1217. $address_connection_array[$address][$address_data['connection_id']] = $address_data['connection_id'];
  1218. }
  1219. }
  1220. return $address_connection_array;
  1221. }
  1222. /**
  1223. * 设置 gateway 实例
  1224. *
  1225. * @param \GatewayWorker\BusinessWorker $business_worker_instance
  1226. */
  1227. public static function setBusinessWorker($business_worker_instance)
  1228. {
  1229. static::$businessWorker = $business_worker_instance;
  1230. }
  1231. /**
  1232. * 获取通过注册中心获取所有 gateway 通讯地址
  1233. *
  1234. * @return array
  1235. * @throws Exception
  1236. */
  1237. protected static function getAllGatewayAddressesFromRegister()
  1238. {
  1239. static $addresses_cache, $last_update;
  1240. if (static::$addressesCacheDisable) {
  1241. $addresses_cache = null;
  1242. }
  1243. $time_now = time();
  1244. $expiration_time = 1;
  1245. $register_addresses = (array)static::$registerAddress;
  1246. $client = null;
  1247. if(empty($addresses_cache) || $time_now - $last_update > $expiration_time) {
  1248. foreach ($register_addresses as $register_address) {
  1249. set_error_handler(function(){});
  1250. $client = stream_socket_client('tcp://' . $register_address, $errno, $errmsg, static::$connectTimeout);
  1251. restore_error_handler();
  1252. if ($client) {
  1253. break;
  1254. }
  1255. }
  1256. if (!$client) {
  1257. throw new Exception('Can not connect to tcp://' . $register_address . ' ' . $errmsg);
  1258. }
  1259. fwrite($client, '{"event":"worker_connect","secret_key":"' . static::$secretKey . '"}' . "\n");
  1260. stream_set_timeout($client, 5);
  1261. $ret = fgets($client, 655350);
  1262. if (!$ret || !$data = json_decode(trim($ret), true)) {
  1263. throw new Exception('getAllGatewayAddressesFromRegister fail. tcp://' .
  1264. $register_address . ' return ' . var_export($ret, true));
  1265. }
  1266. $last_update = $time_now;
  1267. $addresses_cache = $data['addresses'];
  1268. }
  1269. if (!$addresses_cache) {
  1270. throw new Exception('Gateway::getAllGatewayAddressesFromRegister() with registerAddress:' .
  1271. json_encode(static::$registerAddress) . ' return ' . var_export($addresses_cache, true));
  1272. }
  1273. return $addresses_cache;
  1274. }
  1275. /**
  1276. * 检查群组id是否合法
  1277. *
  1278. * @param $group
  1279. * @return bool
  1280. */
  1281. protected static function isValidGroupId($group)
  1282. {
  1283. if (empty($group)) {
  1284. echo new \Exception('group('.var_export($group, true).') empty');
  1285. return false;
  1286. }
  1287. return true;
  1288. }
  1289. }