Monday, 8 September 2014

Linux C++ : create a process and bind its stdin, stdout and stderr to C++ I/O streams

The following function creates a new process from command line and bind its stdin, stdout and stderr to C++ I/O streams.
It's more preferable to use Boost to do so since Boost is cross-platform. In fact, the code is simpler if you use Boost


// C code
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <vector>
#include <iostream>
#include <fstream>
#include <sstream>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <ext/stdio_filebuf.h>


int spawn(const char *cmd, int io_mask, ostream **stdin, istream **stdout, istream **stderr){
    // io_mask: bit0:stdin, bit1:stdout, bit2:stderr
    int pipes[3][2];
    for(int x=0; x<3; ++x)
        pipe(pipes[x]);
    int pid;
    if((pid=fork())!=0){    // parent
        if(io_mask&1){    // create stdin
            __gnu_cxx::stdio_filebuf<char> *filebuf = new __gnu_cxx::stdio_filebuf<char>(pipes[0][1], std::ios::out);
            *stdin = new ostream(filebuf);
        }
        if(io_mask&2){    // create stdout
            __gnu_cxx::stdio_filebuf<char> *filebuf = new __gnu_cxx::stdio_filebuf<char>(pipes[1][0], std::ios::in);
            *stdout = new istream(filebuf);
        }
        if(io_mask&4){    // create stderr
            __gnu_cxx::stdio_filebuf<char> *filebuf = new __gnu_cxx::stdio_filebuf<char>(pipes[2][0], std::ios::in);
            *stderr = new istream(filebuf);
        }
        return pid;
    } else {            // child
        if(io_mask&1)
            dup2(pipes[0][0], STDIN_FILENO);
        if(io_mask&2)
            dup2(pipes[1][1], STDOUT_FILENO);
        if(io_mask&4)
            dup2(pipes[2][1], STDERR_FILENO);
        int retval;
        do{
            cerr << "Creating process with command-line \"" << cmd << "\"" << endl;
            retval = system(cmd);
            cerr << "Decoder thread has exited with code " << retval << endl;
        }while(io_mask&0x80000000);
        _exit(retval);
    }
}





// C++ code
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <sstream>
#include <fstream>
#include <string>
#include <map>
#include <set>
#include <boost/thread.hpp>

using namespace std;


class Thread{
public:
int m_pipes[3][2]; // for stdin, stdout and stderr respectively
int m_mask; // mask: bit0:stdin, bit1:stdout, bit2:stderr, bit31:persistent launch
string m_cmdline;
ostream *m_stdin;
istream *m_stdout;
istream *m_stderr;
boost::thread *m_thread;
__gnu_cxx::stdio_filebuf<char> *m_filebuf[3]; // for stdin, stdout and stderr respectively

static void _thread_func(Thread *p){ // OOP connector
p->thread_func();
}

template <class A>
ostream& operator << (A &a){
return (*m_stdin) << a;
}

template <class A>
istream& operator >> (A &a){
return (*m_stdout) >> a;
}

void flush(){
m_stdin->flush();
}

istream& getline(string &line){
return std::getline(*m_stdout, line);
}

ostream& putline(string &line){
ostream& ret = (*m_stdin << line << endl);
m_stdin->flush();
return ret;
}

void thread_func(){
// child thread
if(m_mask&1)
dup2(m_pipes[0][0], STDIN_FILENO);
if(m_mask&2)
dup2(m_pipes[1][1], STDOUT_FILENO);
if(m_mask&4)
dup2(m_pipes[2][1], STDERR_FILENO);
int retval;
do{
cerr << "Creating process with command-line \"" << m_cmdline << "\"" << endl;
retval = system(m_cmdline.c_str());
cerr << "Decoder thread has exited with code " << retval << endl;
}while(m_mask&0x80000000);
_exit(retval);
}

Thread(const string &cmdline, int mask=7):m_cmdline(cmdline),m_mask(mask){
for(int x=0; x<3; ++x)
pipe(m_pipes[x]);
m_thread = new boost::thread(&_thread_func, this);

// parent thread
if(m_mask&1){ // bind stdin
m_filebuf[0] = new __gnu_cxx::stdio_filebuf<char>(m_pipes[0][1], std::ios::out);
m_stdin = new ostream(m_filebuf[0]);
}
if(m_mask&2){ // bind stdout
m_filebuf[1] = new __gnu_cxx::stdio_filebuf<char>(m_pipes[1][0], std::ios::in);
m_stdout = new istream(m_filebuf[1]);
}
if(m_mask&4){ // bind stderr
m_filebuf[2] = new __gnu_cxx::stdio_filebuf<char>(m_pipes[2][0], std::ios::in);
m_stderr = new istream(m_filebuf[2]);
}
}

~Thread(){
for(int x=0; x<3; ++x)
close(m_pipes[x][0]), close(m_pipes[x][1]);
delete m_thread;
}

};

Wednesday, 12 February 2014

Python multi-threading/multi-processing

Python multi-threading does not actually run on multiple CPUs due to Global Interpreter Lock, so it is useless. We can only use python multi-processing, which also has some bugs.

# 1. include package
from multiprocessing import Process, Manager

# 2. only variables created using manager can be shared across difference processes
manager=Manager()
arr=manager.list()

# 3. create process
p=Process(target=filterRuleTable, args=(arr, i, inputSentences, N, options,))

# 4. start running
p.start()

# 5. wait for termination
p.join()