• Linux
  • apache
  • centos
  • Git版本管理
  • Linux基本命令
  • linux配置与优化
  • Nginx
  • PHP
  • Redis
  • Supervisor
  • Swoole
  • windows
  • THINKPHP
  • 开发者手册
  • Chat GPT / Open Ai
  • PHP通过URL和WebSocket两种方法获取火币K线数据

    全屏阅读
  • 基本信息
  • 作者:
  • 作者已发布:925篇文章
  • 发布时间:2021年05月11日 23:44:53
  • 所属分类:PHP+MySql
  • 阅读次数:3773次阅读
  • 标签:
  • 最近项目上线一个模块需要获取火币的K线数据,初期我用的Workerman定时任务每秒通过URL请求获取,做出来之后老板感觉数据实时性不强,要优化,没办法我只能继续研究,幸好在GitHub看到一个老哥写的通过WebSocket获取火币数据的,话不多说,下面直接开始上代码。第一次写博客,写的如果不好,还请大家见谅。

    URL请求方法

    发现这种方法实现起来也有坑,网上大部分都只是贴出接口文档和代码,但是实际操作会发现无法请求火币服务器,为啥,因为人家在国外,偶尔能请求概率也很低,所以代码只能放到外网服务器才能执行,这样一来开发调试就很麻烦。后面我就想了一个办法,找一台外网服务器,布置一个脚本代理请求(非火币的外网请求也可以),这样在国内也可以请求火币接口了,调试什么的方便多了。

    下面贴出代理脚本代码:

    $url = urldecode($_GET['url']);
    if ($url) {
        echo curl_get($url);
        die();
    }else{
      echo "How are you";
    }
    function curl_get($url, $timeout = 5)
    {
        $ssl = substr($url, 0, 8) == "https://" ? TRUE : FALSE;
        $ch = curl_init();
        $headers = array(
            "Content-Type: application/json charset=utf-8",
            'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36',
        );
    
        $opt = array(
            CURLOPT_URL => $url,
            CURLOPT_HEADER => 0,
            CURLOPT_CUSTOMREQUEST => strtoupper('GET'),
            CURLOPT_RETURNTRANSFER => 1,
            CURLOPT_TIMEOUT => $timeout,
            CURLOPT_HTTPHEADER => $headers,
        );
    
        if ($ssl) {
            $opt[CURLOPT_SSL_VERIFYHOST] = false;
            $opt[CURLOPT_SSL_VERIFYPEER] = FALSE;
        }
        curl_setopt_array($ch, $opt);
        $result = curl_exec($ch);
        curl_close($ch);
        return $result;
    }

    获取数据的代码:

    $now = time();
    $diff = intval((strtotime(date('Y-m-d H:i:00'), $now) - $find['add_time']) / self::$time_list[$period]);
    $size = $diff + 1 > 2000 ? 2000 : $diff + 1;
    $url = "https://api.huobipro.com/market/history/kline?period={$period}&size={$size}&symbol={$symbol}";
    $log .= ",url:{$url}";
    //如果服务器在国内,需要把public目录下的post.php文件部署到外网服务器代理请求火币api
    $post_url = 'http://xxx.com/post.php?url='.urlencode($url);
    $log .= ",post_url:{$post_url}";
    $res = self::curl_get($post_url, 5);
    if (!$res) throw new Exception(lang('火币请求失败'));
    $res = json_decode($res, true);
    if ($res['status'] != 'ok') throw new Exception("火币网返回错误,err-code:{$res['err-code']},err-msg:{$res['err-msg']}");
    if (empty($res['data'])) throw new Exception("火币网返回数据为空");
    $huobi = $res['data'];
    $ids = array_column($huobi,'id');
    array_multisort($ids,SORT_ASC,$huobi);
    $add_list = [];
    $update_list = [];
    foreach ($huobi as $key1 => $value1) {
        $where1 = $where;
        $where1['add_time'] = $value1['id'];
        $find1 = (new self)->where($where1)->order('id', 'desc')->find();
        if ($find1) {//记录已存在,更新已有记录
            $update_list[] = [
                'id'=>$find1['id'],
                'open_price'=>number_format($value1['open'],6,".",""),
                'close_price'=>number_format($value1['close'],6,".",""),
                'high_price'=>number_format($value1['high'],6,".",""),
                'low_price'=>number_format($value1['low'],6,".",""),
                'amount'=>number_format($value1['amount'],6,".",""),
                'count'=>number_format($value1['count'],6,".",""),
                'vol'=>number_format($value1['vol'],6,".",""),
                'ch'=>$res['ch'],
                //'add_time'=>$value1['id'],
                'update_time'=>time(),
            ];
        }
        else {
            $add_list[] = [
                'period'=>$period,
                'symbol'=>$symbol,
                'open_price'=>number_format($value1['open'],6,".",""),
                'close_price'=>number_format($value1['close'],6,".",""),
                'high_price'=>number_format($value1['high'],6,".",""),
                'low_price'=>number_format($value1['low'],6,".",""),
                'amount'=>number_format($value1['amount'],6,".",""),
                'count'=>number_format($value1['count'],6,".",""),
                'vol'=>number_format($value1['vol'],6,".",""),
                'ch'=>$res['ch'],
                'add_time'=>$value1['id'],
                'update_time'=>time(),
            ];
        }
    }
    
    
    static function curl_get($url, $timeout = 30)
    {
        $ssl = substr($url, 0, 8) == "https://" ? TRUE : FALSE;
        $ch = curl_init();
        $headers = array(
            "Content-Type: application/json charset=utf-8",
            'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36',
        );
    
        $opt = array(
            CURLOPT_URL => $url,
            CURLOPT_HEADER => 0,
            CURLOPT_CUSTOMREQUEST => strtoupper('GET'),
            CURLOPT_RETURNTRANSFER => 1,
            CURLOPT_TIMEOUT => $timeout,
            CURLOPT_HTTPHEADER => $headers,
        );
    
        if ($ssl) {
            $opt[CURLOPT_SSL_VERIFYHOST] = false;
            $opt[CURLOPT_SSL_VERIFYPEER] = FALSE;
            $opt[CURLOPT_SSLVERSION] = 3;
        }
        curl_setopt_array($ch, $opt);
        $result = curl_exec($ch);
        if (!$result) {
            $error = curl_error($ch);
            $errno = curl_errno($ch);
            Log::write("curl_get,url:{$url},error:{$error},error:{$errno}", 'INFO');
        }
        curl_close($ch);
        return $result;
    }

    WebSocket方法获取

    这种方法主要是把服务器当成一个WebSocket Client连接到火币的WebSocket服务器,订阅交易对的不同时间粒度的K线数据,成功之后火币服务器就会在K线数据变化的时候主动推送消息给到服务器,接收到推送之后就可以进行存储等操作,同时服务器也把接收到的推送消息再推送给连接到服务端的WebSocket Client。

    因为火币服务器在外网,所以采用WebSocket方法也需要部署一台国外服务器,这台服务器用于向火币订阅数据并接收火币推送,其他国内服务器就可以通过连接这台服务器获取火币K线数据。这种应该也可以分布式部署,大家可以试一下。结构图如下:

    image.png

    连接到火币服务器代码:

    $info = "连接到服务器:{$this->host}";
    echo "\r\n".$info;
    $this->saveLog("huobi", $info);
    // 异步建立一个到火币服务器的连接
    $con = new AsyncTcpConnection($this->host);
    if ($this->flag) {//正式环境
        $con->transport = 'ssl';
    }
    
    $con->onConnect = function($con)
    {
        $this->onAsyncConnect($con);
    };
    
    // 当服务器连接发来数据时,转发给对应客户端的连接
    $con->onMessage = function($con, $message) use($worker)
    {
        $this->onAsyncMessage($con, $message, $worker);
    };
    
    $con->onError = function($con, $err_code, $err_msg)
    {
        echo "$err_code, $err_msg";
        $info = "Async onError err_code:{$err_code},err_msg:{$err_msg}";
        echo "\r\n ".$info;
        $this->saveLog("huobi", $info);
    };
    
    $con->onClose = function($con)
    {
        $info = "Async onClose";
        echo "\r\n ".$info;
        $this->saveLog("huobi", $info);
        $this->reconnect_num++;//重连次数+1
    
        //重连之前先更新K线数据
        $info = "更新K线数据-重连之前-start:".date('Y-m-d H:i:s');
        echo "\r\n ".$info;
        $this->saveLog("huobi", $info);
        foreach ($this->trade_list as $value) {
            $symbol = $value;
            foreach ($this->time_list as $k => $v) {
                $info = "create_kline:{$symbol}-{$k}";
                echo "\r\n ".$info;
                $this->saveLog("huobi", $info);
                $r = \app\common\model\TradeKlineKline::create_kline($symbol, $k);
                if ($r['code'] == SUCCESS) {
    
                }
            }
        };
        $info = "更新K线数据-重连之前-end:".date('Y-m-d H:i:s');
        echo "\r\n ".$info;
        $this->saveLog("huobi", $info);
    
        // 如果连接断开,则在1秒后重连
        $con->reConnect(1);
    };
    
    // 执行异步连接
    $con->connect();
    
    //连接火币成功回调方法
    function onAsyncConnect($con)
     {
        $this->async_message_time = time();
        $info = "连接到服务器:{$this->host},成功";
        echo "\r\n".$info;
        $this->saveLog("huobi", $info);
    
        $info = "开始订阅K线数据";
        echo "\r\n".$info;
        $this->saveLog("huobi", $info);
        //$this->saveLog("huobi", 'onAsyncConnect:'.print_r($con, true));
        $this->saveLog("huobi", 'onAsyncConnect,cid:'.$con->id.',reconnect_num:'.$this->reconnect_num);
        $make = explode(',', TradeConfig::get_value('trade_kline_symbols', 'btcusdt,ethusdt,eosusdt,ltcusdt,etcusdt'));
        $this->huobi_id = $con->id;
        foreach ($make as $key => $value) {
            $symbol = $value;
            foreach ($this->time_list as $k => $v) {
                $info = "sub:{$symbol}-{$k}";
                echo "\r\n".$info;
                $this->saveLog("huobi", $info);
                $data = json_encode([                         //行情
                    'sub' => "market." . $symbol . ".kline." . $k,
                    'id' => "id" . time(),
                    'freq-ms' => 5000
                ]);
                $con->send($data);
            }
        }
        /*foreach ($this->trade_list as $key => $value) {
            $symbol = $key;
            foreach ($this->time_list as $k => $v) {
                echo "sub:{$symbol}-{$k}\r\n";
                $data = json_encode([                         //行情
                    'sub' => "market." . $symbol . ".kline." . $k,
                    'id' => "id" . time(),
                    'freq-ms' => 5000
                ]);
                $con->send($data);
            }
        };*/
    }
    
    //接收到火币推送回调方法
    function onAsyncMessage($con, $message, $worker)
    {
        $data = json_decode($message, true);
        if (!$data) {//说明采用了GZIP压缩
            $data = gzdecode($message);
            $this->saveLog("huobi", $data);
            $data = json_decode($data, true);
        }
        else {
            $this->saveLog("huobi", $message);
        }
        if(isset($data['ping'])) {
            $this->async_message_time = time();
            $con->send(json_encode([
                "pong" => $data['ping']
            ]));
    
            // 给客户端心跳
            foreach($this->all_cons as $kk=>$vv){
                if (array_key_exists($vv["sid"], $worker->connections)) {
                    $info = "\r\n sid ".$vv["sid"]." send ping";
                    echo $info;
                    $this->saveLog("all", $info);
    
                    $worker->connections[$vv["sid"]]->send(json_encode($data));
                }
                else {
                    unset($this->all_cons[$kk]);
                }
            }
        } else if (isset($data['ch'])) {
            $this->async_message_time = time();
            $info = "接收到推送,ch:{$data['ch']}";
            echo "\r\n".$info;
            $this->saveLog("huobi", $info);
            //Log::write(print_r($data, true), 'INFO');
    
            $symbol = $data["ch"];
            $info = "\r\n  on mess size:".sizeof($this->all_cons)." conn-size: ".sizeof($worker->connections)." symbol:".$symbol;
            echo $info;
            $this->saveLog("all", $info);
            $pieces = explode(".", $data['ch']);
            switch ($pieces[2]) {
                case "kline":              //行情图
                    $market = $pieces[1];  //火币对
                    if (in_array($market, $this->symbol_list)) {
                        $period = $pieces[3];
                        $tick = $data['tick'];
                        //tick 说明
                        //"tick": {
                        //  "id": K线id,
                        //  "amount": 成交量,
                        //  "count": 成交笔数,
                        //  "open": 开盘价,
                        //  "close": 收盘价,当K线为最晚的一根时,是最新成交价
                        //  "low": 最低价,
                        //  "high": 最高价,
                        //  "vol": 成交额, 即 sum(每一笔成交价 * 该笔的成交量)
                        //}
                        $id = $tick['id'];
                        $where = [
                            'period'=>$period,
                            'symbol'=>$market,
                            'add_time'=>$id,
                        ];
                        $find1 = \app\common\model\TradeKline::where($where)->order('id', 'desc')->find();
                        if ($find1) {//记录已存在,更新已有记录
                            if ($find1['open_price'] != $tick['open'] ||
                                $find1['close_price'] != $tick['close'] ||
                                $find1['high_price'] != $tick['high'] ||
                                $find1['low_price'] != $tick['low'] ||
                                $find1['amount'] != $tick['amount'] ||
                                $find1['count'] != $tick['count'] ||
                                $find1['vol'] != $tick['vol']) {//没有数据变化不做更新
                                $update_list[] = [
                                    'id'=>$find1['id'],
                                    'open_price'=>number_format($tick['open'],6,".",""),
                                    'close_price'=>number_format($tick['close'],6,".",""),
                                    'high_price'=>number_format($tick['high'],6,".",""),
                                    'low_price'=>number_format($tick['low'],6,".",""),
                                    'amount'=>number_format($tick['amount'],6,".",""),
                                    'count'=>number_format($tick['count'],6,".",""),
                                    'vol'=>number_format($tick['vol'],6,".",""),
                                    'update_time'=>time(),
                                ];
                                $kline = new \app\common\model\TradeKline;
                                $res2 = $kline->isUpdate()->saveAll($update_list);
                                if (empty($res2)) {
                                    var_dump(lang('更新记录失败-2').'-in line:'.__LINE__);
                                    //throw new Exception(lang('更新记录失败-2').'-in line:'.__LINE__);
                                }
                            }
                        }
                        else {
                            $add_list[] = [
                                'period'=>$period,
                                'symbol'=>$market,
                                'open_price'=>number_format($tick['open'],6,".",""),
                                'close_price'=>number_format($tick['close'],6,".",""),
                                'high_price'=>number_format($tick['high'],6,".",""),
                                'low_price'=>number_format($tick['low'],6,".",""),
                                'amount'=>number_format($tick['amount'],6,".",""),
                                'count'=>number_format($tick['count'],6,".",""),
                                'vol'=>number_format($tick['vol'],6,".",""),
                                'ch'=>$data['ch'],
                                'add_time'=>$id,
                                'update_time'=>time(),
                            ];
                            $kline = new \app\common\model\TradeKline;
                            $res1 = $kline->saveAll($add_list);
                            if (empty($res1)) {
                                var_dump(lang('插入记录失败').'-in line:'.__LINE__);
                                //throw new Exception(lang('插入记录失败').'-in line:'.__LINE__);
                            }
                        }
                    }
    
                    break;
            }
    
            $time_1 = microtime(true);
    
            if (array_key_exists($symbol, $this->all_symbols)) {
                foreach ($this->all_symbols[$symbol] as $key => $val) {
                    $info = " symbol ".$symbol." | ch ".$data["ch"]." sid ".$val." send \r\n";
                    echo $info;
                    $this->saveLog("all", $info);
    
                    $worker->connections[$val]->send(json_encode($data));
                }
            }
            $time_2 = microtime(true);
            $cost = $time_2 - $time_1;
            if ($cost > 1) {
                $info = " symbol ".$symbol." | ch ".$data["ch"]." cost {$cost} \r\n";
                echo $info;
                $this->saveLog("all", $info);
            }
    
        }
        else {
            echo "undefind message\r\n";
            var_dump($data);
        }
    }

    huobiwebsocket.zip

    顶一下
    (0)
    100%
    订阅 回复
    踩一下
    (0)
    100%
    » 郑重声明:本文由mpxq168发布,所有内容仅代表个人观点。版权归恒富网mpxq168共有,欢迎转载, 但未经作者同意必须保留此段声明,并给出文章连接,否则保留追究法律责任的权利! 如果本文侵犯了您的权益,请留言。
  • 【上一篇】
  • 【下一篇】
  • 目前有 0 条留言 其中:访客:0 条, 博主:0 条

    给我留言

    您必须 [ 登录 ] 才能发表留言!