记住用户名密码
最近项目上线一个模块需要获取火币的K线数据,初期我用的Workerman定时任务每秒通过URL请求获取,做出来之后老板感觉数据实时性不强,要优化,没办法我只能继续研究,幸好在GitHub看到一个老哥写的通过WebSocket获取火币数据的,话不多说,下面直接开始上代码。第一次写博客,写的如果不好,还请大家见谅。
URL请求方法
发现这种方法实现起来也有坑,网上大部分都只是贴出接口文档和代码,但是实际操作会发现无法请求火币服务器,为啥,因为人家在国外,偶尔能请求概率也很低,所以代码只能放到外网服务器才能执行,这样一来开发调试就很麻烦。后面我就想了一个办法,找一台外网服务器,布置一个脚本代理请求(非火币的外网请求也可以),这样在国内也可以请求火币接口了,调试什么的方便多了。
下面贴出代理脚本代码:
$url = urldecode($_GET['url']); if ($url) { echo curl_get($url); die(); }else{ echo "How are you"; } function curl_get($url, $timeout = 5) { $ssl = substr($url, 0, 8) == "https://" ? TRUE : FALSE; $ch = curl_init(); $headers = array( "Content-Type: application/json charset=utf-8", 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36', ); $opt = array( CURLOPT_URL => $url, CURLOPT_HEADER => 0, CURLOPT_CUSTOMREQUEST => strtoupper('GET'), CURLOPT_RETURNTRANSFER => 1, CURLOPT_TIMEOUT => $timeout, CURLOPT_HTTPHEADER => $headers, ); if ($ssl) { $opt[CURLOPT_SSL_VERIFYHOST] = false; $opt[CURLOPT_SSL_VERIFYPEER] = FALSE; } curl_setopt_array($ch, $opt); $result = curl_exec($ch); curl_close($ch); return $result; }
获取数据的代码:
$now = time(); $diff = intval((strtotime(date('Y-m-d H:i:00'), $now) - $find['add_time']) / self::$time_list[$period]); $size = $diff + 1 > 2000 ? 2000 : $diff + 1; $url = "https://api.huobipro.com/market/history/kline?period={$period}&size={$size}&symbol={$symbol}"; $log .= ",url:{$url}"; //如果服务器在国内,需要把public目录下的post.php文件部署到外网服务器代理请求火币api $post_url = 'http://xxx.com/post.php?url='.urlencode($url); $log .= ",post_url:{$post_url}"; $res = self::curl_get($post_url, 5); if (!$res) throw new Exception(lang('火币请求失败')); $res = json_decode($res, true); if ($res['status'] != 'ok') throw new Exception("火币网返回错误,err-code:{$res['err-code']},err-msg:{$res['err-msg']}"); if (empty($res['data'])) throw new Exception("火币网返回数据为空"); $huobi = $res['data']; $ids = array_column($huobi,'id'); array_multisort($ids,SORT_ASC,$huobi); $add_list = []; $update_list = []; foreach ($huobi as $key1 => $value1) { $where1 = $where; $where1['add_time'] = $value1['id']; $find1 = (new self)->where($where1)->order('id', 'desc')->find(); if ($find1) {//记录已存在,更新已有记录 $update_list[] = [ 'id'=>$find1['id'], 'open_price'=>number_format($value1['open'],6,".",""), 'close_price'=>number_format($value1['close'],6,".",""), 'high_price'=>number_format($value1['high'],6,".",""), 'low_price'=>number_format($value1['low'],6,".",""), 'amount'=>number_format($value1['amount'],6,".",""), 'count'=>number_format($value1['count'],6,".",""), 'vol'=>number_format($value1['vol'],6,".",""), 'ch'=>$res['ch'], //'add_time'=>$value1['id'], 'update_time'=>time(), ]; } else { $add_list[] = [ 'period'=>$period, 'symbol'=>$symbol, 'open_price'=>number_format($value1['open'],6,".",""), 'close_price'=>number_format($value1['close'],6,".",""), 'high_price'=>number_format($value1['high'],6,".",""), 'low_price'=>number_format($value1['low'],6,".",""), 'amount'=>number_format($value1['amount'],6,".",""), 'count'=>number_format($value1['count'],6,".",""), 'vol'=>number_format($value1['vol'],6,".",""), 'ch'=>$res['ch'], 'add_time'=>$value1['id'], 'update_time'=>time(), ]; } } static function curl_get($url, $timeout = 30) { $ssl = substr($url, 0, 8) == "https://" ? TRUE : FALSE; $ch = curl_init(); $headers = array( "Content-Type: application/json charset=utf-8", 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36', ); $opt = array( CURLOPT_URL => $url, CURLOPT_HEADER => 0, CURLOPT_CUSTOMREQUEST => strtoupper('GET'), CURLOPT_RETURNTRANSFER => 1, CURLOPT_TIMEOUT => $timeout, CURLOPT_HTTPHEADER => $headers, ); if ($ssl) { $opt[CURLOPT_SSL_VERIFYHOST] = false; $opt[CURLOPT_SSL_VERIFYPEER] = FALSE; $opt[CURLOPT_SSLVERSION] = 3; } curl_setopt_array($ch, $opt); $result = curl_exec($ch); if (!$result) { $error = curl_error($ch); $errno = curl_errno($ch); Log::write("curl_get,url:{$url},error:{$error},error:{$errno}", 'INFO'); } curl_close($ch); return $result; }
WebSocket方法获取
这种方法主要是把服务器当成一个WebSocket Client连接到火币的WebSocket服务器,订阅交易对的不同时间粒度的K线数据,成功之后火币服务器就会在K线数据变化的时候主动推送消息给到服务器,接收到推送之后就可以进行存储等操作,同时服务器也把接收到的推送消息再推送给连接到服务端的WebSocket Client。
因为火币服务器在外网,所以采用WebSocket方法也需要部署一台国外服务器,这台服务器用于向火币订阅数据并接收火币推送,其他国内服务器就可以通过连接这台服务器获取火币K线数据。这种应该也可以分布式部署,大家可以试一下。结构图如下:
连接到火币服务器代码:
$info = "连接到服务器:{$this->host}"; echo "\r\n".$info; $this->saveLog("huobi", $info); // 异步建立一个到火币服务器的连接 $con = new AsyncTcpConnection($this->host); if ($this->flag) {//正式环境 $con->transport = 'ssl'; } $con->onConnect = function($con) { $this->onAsyncConnect($con); }; // 当服务器连接发来数据时,转发给对应客户端的连接 $con->onMessage = function($con, $message) use($worker) { $this->onAsyncMessage($con, $message, $worker); }; $con->onError = function($con, $err_code, $err_msg) { echo "$err_code, $err_msg"; $info = "Async onError err_code:{$err_code},err_msg:{$err_msg}"; echo "\r\n ".$info; $this->saveLog("huobi", $info); }; $con->onClose = function($con) { $info = "Async onClose"; echo "\r\n ".$info; $this->saveLog("huobi", $info); $this->reconnect_num++;//重连次数+1 //重连之前先更新K线数据 $info = "更新K线数据-重连之前-start:".date('Y-m-d H:i:s'); echo "\r\n ".$info; $this->saveLog("huobi", $info); foreach ($this->trade_list as $value) { $symbol = $value; foreach ($this->time_list as $k => $v) { $info = "create_kline:{$symbol}-{$k}"; echo "\r\n ".$info; $this->saveLog("huobi", $info); $r = \app\common\model\TradeKlineKline::create_kline($symbol, $k); if ($r['code'] == SUCCESS) { } } }; $info = "更新K线数据-重连之前-end:".date('Y-m-d H:i:s'); echo "\r\n ".$info; $this->saveLog("huobi", $info); // 如果连接断开,则在1秒后重连 $con->reConnect(1); }; // 执行异步连接 $con->connect(); //连接火币成功回调方法 function onAsyncConnect($con) { $this->async_message_time = time(); $info = "连接到服务器:{$this->host},成功"; echo "\r\n".$info; $this->saveLog("huobi", $info); $info = "开始订阅K线数据"; echo "\r\n".$info; $this->saveLog("huobi", $info); //$this->saveLog("huobi", 'onAsyncConnect:'.print_r($con, true)); $this->saveLog("huobi", 'onAsyncConnect,cid:'.$con->id.',reconnect_num:'.$this->reconnect_num); $make = explode(',', TradeConfig::get_value('trade_kline_symbols', 'btcusdt,ethusdt,eosusdt,ltcusdt,etcusdt')); $this->huobi_id = $con->id; foreach ($make as $key => $value) { $symbol = $value; foreach ($this->time_list as $k => $v) { $info = "sub:{$symbol}-{$k}"; echo "\r\n".$info; $this->saveLog("huobi", $info); $data = json_encode([ //行情 'sub' => "market." . $symbol . ".kline." . $k, 'id' => "id" . time(), 'freq-ms' => 5000 ]); $con->send($data); } } /*foreach ($this->trade_list as $key => $value) { $symbol = $key; foreach ($this->time_list as $k => $v) { echo "sub:{$symbol}-{$k}\r\n"; $data = json_encode([ //行情 'sub' => "market." . $symbol . ".kline." . $k, 'id' => "id" . time(), 'freq-ms' => 5000 ]); $con->send($data); } };*/ } //接收到火币推送回调方法 function onAsyncMessage($con, $message, $worker) { $data = json_decode($message, true); if (!$data) {//说明采用了GZIP压缩 $data = gzdecode($message); $this->saveLog("huobi", $data); $data = json_decode($data, true); } else { $this->saveLog("huobi", $message); } if(isset($data['ping'])) { $this->async_message_time = time(); $con->send(json_encode([ "pong" => $data['ping'] ])); // 给客户端心跳 foreach($this->all_cons as $kk=>$vv){ if (array_key_exists($vv["sid"], $worker->connections)) { $info = "\r\n sid ".$vv["sid"]." send ping"; echo $info; $this->saveLog("all", $info); $worker->connections[$vv["sid"]]->send(json_encode($data)); } else { unset($this->all_cons[$kk]); } } } else if (isset($data['ch'])) { $this->async_message_time = time(); $info = "接收到推送,ch:{$data['ch']}"; echo "\r\n".$info; $this->saveLog("huobi", $info); //Log::write(print_r($data, true), 'INFO'); $symbol = $data["ch"]; $info = "\r\n on mess size:".sizeof($this->all_cons)." conn-size: ".sizeof($worker->connections)." symbol:".$symbol; echo $info; $this->saveLog("all", $info); $pieces = explode(".", $data['ch']); switch ($pieces[2]) { case "kline": //行情图 $market = $pieces[1]; //火币对 if (in_array($market, $this->symbol_list)) { $period = $pieces[3]; $tick = $data['tick']; //tick 说明 //"tick": { // "id": K线id, // "amount": 成交量, // "count": 成交笔数, // "open": 开盘价, // "close": 收盘价,当K线为最晚的一根时,是最新成交价 // "low": 最低价, // "high": 最高价, // "vol": 成交额, 即 sum(每一笔成交价 * 该笔的成交量) //} $id = $tick['id']; $where = [ 'period'=>$period, 'symbol'=>$market, 'add_time'=>$id, ]; $find1 = \app\common\model\TradeKline::where($where)->order('id', 'desc')->find(); if ($find1) {//记录已存在,更新已有记录 if ($find1['open_price'] != $tick['open'] || $find1['close_price'] != $tick['close'] || $find1['high_price'] != $tick['high'] || $find1['low_price'] != $tick['low'] || $find1['amount'] != $tick['amount'] || $find1['count'] != $tick['count'] || $find1['vol'] != $tick['vol']) {//没有数据变化不做更新 $update_list[] = [ 'id'=>$find1['id'], 'open_price'=>number_format($tick['open'],6,".",""), 'close_price'=>number_format($tick['close'],6,".",""), 'high_price'=>number_format($tick['high'],6,".",""), 'low_price'=>number_format($tick['low'],6,".",""), 'amount'=>number_format($tick['amount'],6,".",""), 'count'=>number_format($tick['count'],6,".",""), 'vol'=>number_format($tick['vol'],6,".",""), 'update_time'=>time(), ]; $kline = new \app\common\model\TradeKline; $res2 = $kline->isUpdate()->saveAll($update_list); if (empty($res2)) { var_dump(lang('更新记录失败-2').'-in line:'.__LINE__); //throw new Exception(lang('更新记录失败-2').'-in line:'.__LINE__); } } } else { $add_list[] = [ 'period'=>$period, 'symbol'=>$market, 'open_price'=>number_format($tick['open'],6,".",""), 'close_price'=>number_format($tick['close'],6,".",""), 'high_price'=>number_format($tick['high'],6,".",""), 'low_price'=>number_format($tick['low'],6,".",""), 'amount'=>number_format($tick['amount'],6,".",""), 'count'=>number_format($tick['count'],6,".",""), 'vol'=>number_format($tick['vol'],6,".",""), 'ch'=>$data['ch'], 'add_time'=>$id, 'update_time'=>time(), ]; $kline = new \app\common\model\TradeKline; $res1 = $kline->saveAll($add_list); if (empty($res1)) { var_dump(lang('插入记录失败').'-in line:'.__LINE__); //throw new Exception(lang('插入记录失败').'-in line:'.__LINE__); } } } break; } $time_1 = microtime(true); if (array_key_exists($symbol, $this->all_symbols)) { foreach ($this->all_symbols[$symbol] as $key => $val) { $info = " symbol ".$symbol." | ch ".$data["ch"]." sid ".$val." send \r\n"; echo $info; $this->saveLog("all", $info); $worker->connections[$val]->send(json_encode($data)); } } $time_2 = microtime(true); $cost = $time_2 - $time_1; if ($cost > 1) { $info = " symbol ".$symbol." | ch ".$data["ch"]." cost {$cost} \r\n"; echo $info; $this->saveLog("all", $info); } } else { echo "undefind message\r\n"; var_dump($data); } }
目前有 0 条留言 其中:访客:0 条, 博主:0 条