2014年11月11日 星期二

使用JsonCpp, Poco library製作多執行緒擷取多筆api回傳值,並combine為一json格式,再利用SWIG包成PHP Extension

http://zf-php-oo.blogspot.tw/2014/11/c11phpcombinejsonphp-proxy-classphp.html此編,由於該篇是擷取system()的response值,光是開兩個thread去擷取response就花了90~200ms。因為太慢的關係,之前有嘗試調用libcurl裡的curl.h去擷取http連線的response,一樣開兩個連線,只加快到78~90ms。個人猜想libcurl是用C寫的,會是因為用C++調用C開發出來的libcurl,所以沒快到哪裡去嗎?後來用了Poco這個用C++編寫的library後,開兩個thread並擷取,個人遇到最快的反應時間是700us(微秒, 10的負6次方),若查詢的sql裡的where條件不包含任何key,大約在14ms以內。移植到gentoo with Xeon Ivy Bridge Based 2.2G(6核/12thread)上,開5個thread去擷取,大約在25~35ms以內。

multiThreadQuery.h:

#ifndef MULTITHREADQUERY_H
#define MULTITHREADQUERY_H
#endif

#include "/usr/include/jsoncpp/json/json.h"

#include <string>

class multiThreadQuery{
public:
    struct inputOutput{
        std::string url;
        std::string alias;
        std::string response;
    };

    explicit multiThreadQuery();

    ~multiThreadQuery();



    
    // 為配合SWIG不支援return reference,故於此return by pointer
    multiThreadQuery* clearAllUrls();

    multiThreadQuery* attachUrl(std::string url, std::string alias = "");

    multiThreadQuery* fetchResponse(bool yesNo);

    std::string exe();
protected:
    int _threadNum;
    Json::Value _urlArr;
    bool _fetchResponse;

    void _curl(inputOutput& io);
};


multiThreadQuery.cpp:

#include "multiThreadQuery.h"
#include "/usr/include/Poco/Net/HTTPClientSession.h"
#include "/usr/include/Poco/Net/HTTPRequest.h"
#include "/usr/include/Poco/Net/HTTPResponse.h"
#include "/usr/include/Poco/StreamCopier.h"
#include "/usr/include/Poco/Path.h"
#include "/usr/include/Poco/URI.h"
#include "/usr/include/Poco/Exception.h"

#include <iostream>
#include <thread>
#include <cstdlib>
#include <sstream>
#include <chrono>

using namespace std;
using namespace Poco;
using namespace Poco::Net;

multiThreadQuery::multiThreadQuery(void): _threadNum(0), _fetchResponse(false){
}

multiThreadQuery::~multiThreadQuery(void){
}

multiThreadQuery* multiThreadQuery::clearAllUrls(){
    _threadNum = 0;
    Json::Value jsonValue;
    _urlArr = jsonValue;

    return this;
}

multiThreadQuery* multiThreadQuery::attachUrl(std::string url, std::string alias){
    // 整數轉字串
    std::string str;
    stringstream ss(str);
    ss << _threadNum;

    _urlArr[ss.str()]["url"] = url;

    if(alias != ""){
        _urlArr[ss.str()]["alias"] = alias;
    }else{
        // time_point的time_since_epoch()才會包含小數點
        chrono::system_clock::time_point tp = chrono::system_clock::now();
        chrono::microseconds us = chrono::duration_cast(tp.time_since_epoch());
        time_t tt = us.count();

        stringstream strm;
        strm << tt;
        _urlArr[ss.str()]["alias"] = strm.str();
    }
   
    ++_threadNum;

    return this;
}

void multiThreadQuery::_curl(inputOutput& io){
    try{
        URI uri(io.url);
       
        HTTPClientSession session(uri.getHost(), uri.getPort());

        // prepare path
        string path(uri.getPathAndQuery());
        if(path.empty()) path = "/";
       
        // send request
        HTTPRequest req(HTTPRequest::HTTP_GET, path, HTTPMessage::HTTP_1_1);
        session.sendRequest(req);
       
        if(_fetchResponse){
            // get response
            HTTPResponse res;
           
            // print response
            istream &is = session.receiveResponse(res);
            std::string response;
            StreamCopier::copyToString(is, response);
           
            io.response = "\"" + io.alias + "\":" + response;
        }
    }catch(Poco::Exception &ex){
        io.response = "\"" + io.alias + "\":{\"s\":-5,\"desc\":\"Curl '" + io.url + "' failed.\"}";
    }
}

multiThreadQuery* multiThreadQuery::fetchResponse(bool yesNo){
    _fetchResponse = yesNo;

    return this;
}

std::string multiThreadQuery::exe(){
    int i;
    std::string json = "";
    // 動態設定array大小
    thread* threadArrPtr = new thread[_threadNum];
    inputOutput* ioPtr = new inputOutput[_threadNum];

    for(i = 0; i < _threadNum; ++i){
        // 整數轉字串
        std::string url;
        stringstream urlStream(url);
        urlStream << i;

        (*(ioPtr + i)).alias =  _urlArr[urlStream.str()]["alias"].asString();
        (*(ioPtr + i)).url = _urlArr[urlStream.str()]["url"].asString();
        (*(ioPtr + i)).response = "";
       
        *(threadArrPtr + i) = thread(&multiThreadQuery::_curl, this, ref((*(ioPtr + i))));
    }

    if(_fetchResponse) json = "{";

    for(i = 0; i < _threadNum; ++i){
        // 不得放在產生thread的迴圈內,多執行緒會失效,變成跑一般迴圈
        (*(threadArrPtr + i)).join();
       
        if(_fetchResponse){
            json += (*(ioPtr + i)).response;
            if(i != _threadNum - 1) json += ",";
        }
    }

    if(_fetchResponse) json += "}";

    delete [] threadArrPtr;
    delete [] ioPtr;
    threadArrPtr = nullptr;
    ioPtr = nullptr;
   
    return json;
}

int main(){
    const auto start = chrono::system_clock::now();

    multiThreadQuery mtq;
    std::string response;

    response = mtq.clearAllUrls()->fetchResponse(true)
                                    ->attachUrl("http://127.0.0.1/test.php?s=1")
                                    ->attachUrl("http://127.0.0.1/test.php?s=2", "member")
                                    ->attachUrl("http://127.0.0.1/test.php?s=3", "login_log")
                                    ->attachUrl("http://127.0.0.1/test.php?s=4", "payment")
                                    ->attachUrl("http://127.0.0.1/test.php?s=5", "detail")
                                    ->exe();

    cout << response << endl;
   
    const auto end = chrono::system_clock::now();
    auto ts = end - start;
    cout << "spent: " << chrono::duration_cast(ts).count() << "us" << endl;

    return 0;
}


以上在linux上建入指令:
#g++ -ljsoncpp -lpthread -lPocoNet -lPocoFoundation -std=c++11 multiThreadQuery.cpp -o multiThreadQuery.out
產生binary檔後再執行:
#./multiThreadQuery.out
若執行時間無太大問題,則覆製multiThreadQuery.cpp另存一份swig的interface檔: multiThreadQuery.i,並將main()全部移除。(記得.h, .cpp, .i裡的string都得寫成std::string)

multiThreadQuery.i:

%module multiThreadQuery
%include "std_string.i"
%{
multiThreadQuery.cpp裡的內容,除了main()以外!!!
%}

class multiThreadQuery{
public:
    multiThreadQuery();
    ~multiThreadQuery();
    multiThreadQuery* clearAllUrls();
    multiThreadQuery* attachUrl(std::string url, std::string alias = "");
    multiThreadQuery* fetchResponse(bool yesNo);
    std::string exe();
};

包成PHP Extension指令:
#swig -c++ -php5 multiThreadQuery.i

#g++ `php-config --includes` -O2 -march=native -mtune=native -std=c++11 -ljsoncpp -lpthread -lPocoNet -lPocoFoundation -fPIC -c multiThreadQuery_wrap.cpp

#g++ -ljsoncpp -lpthread -lPocoNet -lPocoFoundation -shared multiThreadQuery_wrap.o -o multiThreadQuery.so

最後在php.ini檔裡載入multiThreadQuery.so且PHP script裡記得載入multiThreadQuery.php這支PHP Proxy class即可宣告並使用了。

Ex:

$mtq = new multiThreadQuery();

$response = $mtq->clearAllUrls()
                             ->fetchResponse(true)
                             ->attachUrl('http://127.0.0.1/test1.php', 'test1')
                             ->attachUrl('http://127.0.0.1/test2.php', 'test2')
                             ->exe();

沒有留言:

張貼留言