2014年11月3日 星期一

使用C++11編寫一可供PHP呼叫並產生多執行緒,並將所有執行緒的結果combine於一json格式內的PHP proxy class及PHP Extension(.so)

前言:

我們都知道PHP是直譯式語言。如果一支php script裡執行了三斷查詢或寫入DB的code,其中第一個查詢花了一秒,那麼這支script完成總共所要花的時間就是一秒以上起跳。

目標:

用C++寫出一支class可供PHP宣告後,將各script及argument一一代入,最後產生對應的執行緒將各個script的return值集合起來,combine在一個json格式的字串並return。

PHP ex:
$mtq = new multiThreadQuery();

$response = $mtq->attach('/www/abc.php 123')
                             ->attach('/www/def.php 0 456')
                             ->attach('/www/ghi.php 1 m')
                             ->exe();

$arr = json_decode($response, true);

JsonCpp的下載點& for Visual Studio 2012的編譯方法可參考此編:
http://xyz.cinc.biz/2013/05/c-json-jsoncpp.html

以下的code都是for linux的...
除了jsconcpp路徑需修改外,system()指令可以先mark起來,其餘應該都可以在Visual Studio 2012上編譯&執行。

以下會用到C++11的struct, pointer, reference, dynamic memory allocation for array & struct, thread, 有不熟的再自己去查資料吧~P.S. 如何在linux上安裝jsoncpp, swig,就也自己上網找吧~

multiThreadQuery.h:


#ifndef MULTITHREADQUERY_H
#define MULTITHREADQUERY_H
#endif

#include <string>
#include "YOUR LINUX PATH/json/son.h"

class multiThreadQuery{
public:
    struct dataInterchange{
        std::string scriptPath;
        std::string argv;
        std::string response;
    };

    explicit multiThreadQuery();

    ~multiThreadQuery();


    // 為配合SWIG不支援return reference,故於此return by pointer
    multiThreadQuery* clearAllScript();

    multiThreadQuery* attachPHP(std::string cmd);

    std::string exe();
protected:
    int _phpThreadNum;
    Json::Value _phpPathArr;
    Json::Value _phpArgvArr;

    void _exePHP(dataInterchange& di);
};




multiThreadQuery.cpp:


#include "multiThreadQuery.h"
#include <iostream>
#include <thread>
#include <cstdlib>
#include <sstream>
#include <chrono>

using namespace std;

multiThreadQuery::multiThreadQuery() : _phpThreadNum(0){
}

multiThreadQuery::~multiThreadQuery(){
}



multiThreadQuery* multiThreadQuery::clearAllScript(){
    _phpThreadNum = 0;
    Json::Value jsonValue;
    _phpPathArr = jsonValue;
    _phpArgvArr = jsonValue;
  
    return this;
}
multiThreadQuery* multiThreadQuery::attachPHP(std::string cmd){
    // 整數轉字串
    std::string str;
    stringstream ss(str);
    ss << _phpThreadNum;

    _phpPathArr[ss.str()] = cmd.substr(0, cmd.find(" "));
    _phpArgvArr[ss.str()] = cmd.substr(cmd.find(" ") + 1);
    ++_phpThreadNum;

    return this;
}

std::string multiThreadQuery::exe(){
    int i;
    // 動態設定array大小
    thread* threadArrPtr = new thread[_phpThreadNum];
    dataInterchange* diPtr = new dataInterchange[_phpThreadNum];

    for(i = 0; i < _phpThreadNum; ++i){
        // 整數轉字串
        std::string str;
        stringstream ss(str);
        ss << i;
       
        (*(diPtr + i)).scriptPath = _phpPathArr[ss.str()].asString();
        (*(diPtr + i)).argv = _phpArgvArr[ss.str()].asString();
        (*(diPtr + i)).response = "";
       
        *(threadArrPtr + i) = thread(&multiThreadQuery::_exePHP, this, ref((*(diPtr + i))));
    }

    string json = "{";

    for(i = 0; i < _phpThreadNum; ++i){
        // 不得放在產生thread的迴圈內,多執行緒會失效,變成跑一般迴圈
        (*(threadArrPtr + i)).join();

        json += (*(diPtr + i)).response;
        if(i != (_phpThreadNum - 1)) json += ",";
    }

    json += "}";

    delete [] threadArrPtr;
    delete [] diPtr;
    threadArrPtr = nullptr;
    diPtr = nullptr;
   
    return json;
}

void multiThreadQuery::_exePHP(dataInterchange& diRef){
    try{
        if(fopen(diRef.scriptPath.c_str(), "r") == NULL) throw -999;

        FILE* fpipe;
        char* command;
        char line[256];
        std::string response;
        std::string cmd;

        cmd = "/usr/local/bin/php " + diRef.scriptPath + " " + diRef.argv;
        command = (char*)cmd.c_str();

        if (!(fpipe = (FILE*)popen(command, "r"))) throw -998;
   
        while(fgets(line, sizeof(line), fpipe)){
            std::string str;
            str.assign(line);
            response += str;
        }
   
        pclose(fpipe);
   
        diRef.response = "\"" + diRef.scriptPath + " " + diRef.argv + "\":" + response;
    }catch(int e){
        std::string desc;

        switch(e){
            case -999:
                desc = "File not exist.";
                break;
            case -998:
                desc = "Pipe failed.";
                break;
        }

        // 整數轉字串
        std::string str;
        stringstream ss(str);
        ss << e;

        diRef.response = "\"" + diRef.scriptPath + " " + diRef.argv + "\":{\"s\":" + ss.str() + ",\"desc\":\"" + desc + "\"}";
    }
}

int main(){
    const auto start = std::chrono::system_clock::now();

    multiThreadQuery mtq;
    cout << mtq.attachPHP("/www/abc.php 1")->attachPHP("/www/dev.php 2")->exe() << endl;

    const auto end = std::chrono::system_clock::now();
    auto ts = end - start;
    cout << "spent: " << std::chrono::duration_cast(ts).count() << "ms" << endl;

    return 0;
}


以上在linux裡使用g++編譯並執行,若沒出任何錯誤,可以將main()整個方法可以刪掉了,因為待會使用SWIG包成PHP proxy class及extension(.so)檔時都不會用到。P.S. PHP自己寫幾支吐些json_encode()的字串回來,每支可以設sleep()不同秒數,看看執行總共花的秒數是不是趨近於最花時間的script執行時間。

因為有用到jsoncpp函式庫及C++11的thread,所以編譯時待指定lpthread及ljsoncpp參數,如:
g++ -std=c++11 -ljsoncpp -lpthread multiThreadQuery.cpp -o multiThreadQuery.out

砍掉main()方法後,#cp multiThreadQuery.cpp multiThreadQuery.i。這支.i檔就是SWIG要把C++編譯出來的binary檔wrap成.so檔及php proxy class用的。


multiThreadQuery.i:


%module multiThreadQuery
%include "std_string.i"
%{
#include "multiThreadQuery.h"
#include <iostream>
#include <thread>
#include <cstdlib>
#include <sstream>

using namespace std;

multiThreadQuery::multiThreadQuery(void) : _phpThreadNum(0){
}

multiThreadQuery::~multiThreadQuery(void){
}


multiThreadQuery* multiThreadQuery::clearAllScripts(){
    _phpThreadNum = 0;
    Json::Value jsonValue;
    _phpPathArr = jsonValue;
    _phpArgvArr = jsonValue;
   
    return this;
}

multiThreadQuery* multiThreadQuery::attachPHP(std::string cmd){
    // 整數轉字串
    std::string str;
    stringstream ss(str);
    ss << _phpThreadNum;

    _phpPathArr[ss.str()] = cmd.substr(0, cmd.find(" "));
    _phpArgvArr[ss.str()] = cmd.substr(cmd.find(" ") + 1);
    ++_phpThreadNum;

    return this;
}

std::string multiThreadQuery::exe(){
    int i;
    // 動態設定array大小
    thread* threadArrPtr = new thread[_phpThreadNum];
    dataInterchange* diPtr = new dataInterchange[_phpThreadNum];

    for(i = 0; i < _phpThreadNum; ++i){
        // 整數轉字串
        std::string str;
        stringstream ss(str);
        ss << i;
       
        (*(diPtr + i)).scriptPath = _phpPathArr[ss.str()].asString();
        (*(diPtr + i)).argv = _phpArgvArr[ss.str()].asString();
        (*(diPtr + i)).response = "";
       
        *(threadArrPtr + i) = thread(&multiThreadQuery::_exePHP, this, ref((*(diPtr + i))));
    }

    string json = "{";

    for(i = 0; i < _phpThreadNum; ++i){
        // 不得放在產生thread的迴圈內,多執行緒會失效,變成跑一般迴圈
        (*(threadArrPtr + i)).join();

        json += (*(diPtr + i)).response;
        if(i != (_phpThreadNum - 1)) json += ",";
    }

    json += "}";

    delete [] threadArrPtr;
    delete [] diPtr;
    threadArrPtr = nullptr;
    diPtr = nullptr;
   
    return json;
}

void multiThreadQuery::_exePHP(dataInterchange& diRef){
    try{
        if(fopen(diRef.scriptPath.c_str(), "r") == NULL) throw -999;

        FILE* fpipe;
        char* command;
        char line[256];
        std::string response;
        std::string cmd;

        cmd = "/usr/local/bin/php " + diRef.scriptPath + " " + diRef.argv;
        command = (char*)cmd.c_str();

        if (!(fpipe = (FILE*)popen(command, "r"))) throw -998;
   
        while(fgets(line, sizeof(line), fpipe)){
            std::string str;
            str.assign(line);
            response += str;
        }
   
        pclose(fpipe);
   
        diRef.response = "\"" + diRef.scriptPath + " " + diRef.argv + "\":" + response;
    }catch(int e){
        std::string desc;

        switch(e){
            case -999:
                desc = "File not exist.";
                break;
            case -998:
                desc = "Pipe failed.";
                break;
        }

        // 整數轉字串
        std::string str;
        stringstream ss(str);
        ss << e;

        diRef.response = "\"" + diRef.scriptPath + " " + diRef.argv + "\":{\"s\":" + ss.str() + ",\"desc\":\"" + desc + "\"}";
    }
}
%}

class multiThreadQuery{
public:
    multiThreadQuery();
    ~multiThreadQuery();

    multiThreadQuery clearAllScripts();
    multiThreadQuery attachPHP(std::string cmd);
    std::string exe();
};




%module到%}的用法請參考: http://www.swig.org/Doc3.0/SWIGDocumentation.html#SWIGPlus_nn6,最後一小斷是為了產生一PHP proxy class,供PHP require後,即可宣告此class。PHP proxy class請參考: http://www.swig.org/Doc3.0/SWIGDocumentation.html#Php_nn2_6

#swig -c++ -php5 multiThreadQuery.i

#g++ `php-config --includes` -O2 -march=native -mtune=native -std=c++11 -ljsoncpp -lpthread -fPIC -c multiThreadQuery_wrap.cpp

#g++ -ljsoncpp -lpthread -shared multiThreadQuery_wrap.o -o multiThreadQuery.so

最後在php.ini裡載入so檔,PHP裡require multiThreadQuery.php檔,即可在PHP script裡下:

$mtq = new multiThreadQuery();

$response = $mtq->attach('/www/abc.php 123')
                             ->attach('/www/def.php 0 456')
                             ->attach('/www/ghi.php 1 m')
                             ->exe();

$arr = json_decode($response, true);

沒有留言:

張貼留言