public function __construct()
{
$this->redis = new redis();
}
public function fire(Job $job, $data)
{
if ($job->attempts() > 2) {
$this->redis->setHear($data);
$job->delete();
} else {
$this->send($data);
$job->delete();
}
}
/**
* 根据消息中的数据进行实际的业务处理
* @param array|mixed $data 发布任务时自定义的数据
* @return boolean 任务执行的结果
*/
private function send($data)
{
try {
$result = CURLRequest($data['weburl'], $data, 'POST', $this->header);
} catch (\Exception $e) {
echo $e->getMessage();
return false;
}
if (empty($result)) {
echo json_encode(['errcode' => 1, 'msg' => '服务器异常:' . $data['SN'], 'data' => $result, 'url' => $data['weburl']], JSON_UNESCAPED_SLASHES);
return false; //请求异常尝试重试
} else {
echo json_encode(['errcode' => 1, 'msg' => $data['SN'] . ' ' . date("Y-m-d H:i:s") . '' . "心跳 ok", 'data' => $result, 'url' => $weburl], JSON_UNESCAPED_SLASHES);
return true; //请求成功退出
}
}
function CURLRequest($url, $params = [], $http_method = 'GET', $Header = [])
{
$SSL = substr($url, 0, 8) == "https://" ? true : false; //判断是否 https 连接
$httpInfo = array();
$ch = curl_init(); //初始化 CURL 会话
//设置 CURL 传输选项
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 1);
curl_setopt($ch, CURLOPT_TIMEOUT, 1);
//设置响应头头文件的信息作为数据流输出
curl_setopt($ch, CURLOPT_HEADER, 1); //返回 response 头部信息
curl_setopt($ch, CURLINFO_HEADER_OUT, true); //TRUE 时追踪句柄的请求字符串,从 PHP 5.1.3 开始可用。这个很关键,就是允许你查看请求 header
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
// 返回 response_header, 该选项非常重要,如果不为 true, 只会获得响应的正文
curl_setopt($ch, CURLOPT_HEADER, true);
if ($http_method == 'POST') {
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, $params);
} else {
if (count($params) >= 1) {
$pstr = '?';
foreach ($params as $pkey=>$pv) {
$pstr .= $pstr == '?' ? $pkey.'='.$pv : '&'.$pkey.'='.$pv;
}
$url .= $pstr;
}
}
curl_setopt($ch, CURLOPT_URL, $url);
if (!empty($Header)) {
curl_setopt($ch, CURLOPT_HTTPHEADER, $Header);//设置请求头信息
}
if ($SSL) {
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false); // 信任任何证书
}
$response = curl_exec($ch); //执行 CURL 会话
if (curl_getinfo($ch, CURLINFO_HTTP_CODE) == '200') {
$headerSize = curl_getinfo($ch, CURLINFO_HEADER_SIZE);
$header = substr($response, 0, $headerSize);
$body = substr($response, $headerSize);
}
curl_close($ch); //关闭会话
list($header, $body) = explode("\r\n\r\n", $response, 2);
$headers = explode("\r\n", $header);
$headList = array();
foreach ($headers as $head) {
$value = explode(':', $head);
if(isset($value[1])) $headList[$value[0]] = $value[1];
}
$result = json_decode($body, true);//返回解析后的数据
if ($response === FALSE OR empty($response)) { //错误返回 false
$result = false;
}
return ['header' => $headList, 'result' => $result, 'raw_result' => $body];//header 响应头数据,格式化数组,原始数据
}
4G 8 核 5M
物联网设备会通过 Http 方式回调心跳 30 秒一次,我搭建了中转服务器承担转发心跳,那么每次中转服务器同时间收到的请求可能是 1000 次,或者 500 次不等,这时又同时需要转发 1000 次或者 500 次不等
1.为何服务器重启后,CPU 会长时间占用率 100%,而且有队列不停的死循环,几百万次,我删除队列后,它依然不停增长到百万次,是不是队列中有异常,我没有捕获到
2.如果我分批转发心跳,它服务器能正常运行,比如 A B C D E F G 客户物联设备都关机,然后让客户再依次按顺序开机,中转服务器能正常运行
3.php 如何应对这种瞬间请求多的方式
1
encro 211 天前 1
1 ,fpm 同一个进程只能一次处理一个请求,所以如果并发 1000 ,一个进程大约 20M ,那么你需要内存 20G ,而且因为是瞬时的,所以你这个配置明显不太符合。
2 ,curl 要并发请求,需要用到 curl_multi_init 。 |
2
zhwq 211 天前
又看到这个问题,我记得是 ssl 连接会消耗系统的随机数导致等待服务器 load 超高,某些版本的 libcurl 即使是 http 请求,启动的时候就会自动消耗随机数。所以,可以 1 ,使用 curl 长链接。2 ,修改 curl 的随机数来源。3 ,换 libcurl 版本试试。最好是 4 ,换掉 PHP 。
|
3
InDom 211 天前
每秒 3000 ,优先考虑换掉 PHP , 可以单独写一个 go 服务来解决这个,如果只是转发,甚至可以直接 nginx 层面处理。
|
4
patrickyoung 211 天前 via Android
物联网设备 http? 为啥不用 mqtt?
|
5
zjsxwc 211 天前
正常操作:
- 搞个 mqtt broker 比如最通用的 c 语言蚊子: https://mosquitto.org/ - 搞个 mqtt client 来订阅处理设备心跳等消息,比如 php workerman 的: https://github.com/walkor/mqtt 不正常操作: 设备发送 http 请求到 php-fpm 服务器,php 处理设备消息。 |
7
yKXSkKoR8I1RcxaS 211 天前
Workman 了解一下
|
8
cs5117155 OP @patrickyoung 工厂那边已经固定 http 模式了,除非我们定制几万台,他就能改协议
|
12
cs5117155 OP @encro 需要 20GB ,太吓人了,记得那天老板问我,加服务器配置能不能解决,现在客户电话都打爆了。我说可以,然后他马上阿里云充值,加到内容 12GB ,CPU 10 核。结果问题依然存在,被重重打脸,而且老板就在背后坐着看你解决问题,弄好才能走。当时老板想法就是,限定你 30 分钟内,马上解决问题,不然就把你解决了
|
14
encro 211 天前
简单的解决办法:
你需要一个 worker 在背后干这种事,不要在页面里面转发,而是用 worker 去转发。 你页面接收消息后塞入队列,用 worker 去队列读消息,然后完成转发。因为 work 可以用多线程或者携程去处理并发转发,而不是多进程。 |
15
cs5117155 OP @encro 谢谢提醒,我刚刚还想直接 worker 启动一个服务端,php-fpm 转发给 worker http 服务器,workman 再用 async-tcp-connection,忘记用 worker 读队列了
|
16
yc8332 210 天前
简单 redis 搞队列。。然后 cli 启个 10-20 个进程去做你这个动作,很快的
|
17
cs5117155 OP @yc8332 不行吧,我最开始就是用 tp+redis 队列,然后 hp think queue:work,有时候某队列还死循环百万次,而且请求的时候大多数卡再请求超时
|
18
lianxiaoyi 210 天前
fpm 单机问题很大,会受文件打开数限制等,反正那玩意的高并发我一直没调明白,各种乱七八糟的问题。用 webman 吧,我现在做秒级几千并发轻轻松松
https://www.workerman.net/webman |
19
cs5117155 OP @lianxiaoyi 现在正在研究了
|
21
cs5117155 OP @yc8332 下午我写代码的时候也发现这个问题,第一次心跳全部都是设备发起 http 请求到 fpm ,我再加入队列的,再使用 workman 转发,问题是可能服务器受到瞬间请求,就已经卡死在 fpm,队列都无加入,那岂不是连 http 请求入口都要改为 workman 了?
|
22
encro 206 天前
每秒 3000 请求,确实比较高了,用 fpm 解决不了,除非你 fpm 相应比较快。
假设每个请求不超过 20ms ,那么每个 fpm 一秒钟能完成 50 个请求。3000 个请求需要 60 个 fpm ,每个 fpm 的占用内存 30M ,不考虑其他消耗那么需要大于 2G 内存。理论上 4G 内存可以处理完,如果机器上跑数据库,要处理请求,那么可能存在 cpu 不足。 如果是阿里云,那么会存在打开文件句柄过多(默认好像是 4096 ),需要增加打开文件数。 |
23
coderzhangsan 202 天前
fpm 不适合做这种高并发请求业务,可以使用 workman/swoole 来代替。
如果非要使用 fpm 来实现,需要调整 fpm 配置以及尽可能的使用长连接: 1. max_requests 数值调高,延长 fpm 进程生命周期,在周期内处理的更多的请求,避免进程频繁创建销魂带来的开销。 2.使用 curl_multi_init 多句柄实现并发请求,并设置 curl_multi_setopt 选项, 开启 CURLMOPT_PIPELINING , 提高 CURLMOPT_MAX_HOST_CONNECTIONS 缓存链接数 3.或者使用并发的异步包来实现,例如 amphp 或 reactphp ,可以参考其文档示例。 最后,机器配置也很重要,提高机器内存(由于提高了 max_requests 数,使得的单个 fpm 整个生命周期进程占用内存提升),并发处理与机器 cpu 核数相关,提高 cpu 核心数。 |
24
Jeyfang 202 天前
期待作者后续实践后的答复
|
25
Jeyfang 202 天前
仔细看了下代码,这个默认一进来就是同步去 request ,这个策略得换一下
|
26
cs5117155 OP @encro 这个很早之前就设置文件句柄 65536 ,而且也无法保证每个请求 20ms ,因为之前试过有一个客户服务器崩了,后台一直转发给他,搞到自己的服务器也崩了,虽然我设置 curl 超时 1s,实际也没用。我现在目前的解决思路,就是保证 30ms 内,物联网设备请求进来,我直接丢队列,让 workman 取队列,再转发,但需要保证 30ms 内接收完一个请求,是我需要考虑的
|
27
cs5117155 OP @coderzhangsan 使用异步包这个方案也不错,赞一个,之前我还不知道有这种包
|
29
jy28520 186 天前
走 nginx 转发或是看看 workman 有个 TCP 转发的功能,都比较简单
|
30
fengshils 156 天前
好奇问题解决了吗
|