Paracel is a distributed optimization framework, designed for many machine learning problems such as logistic regression, SVD, Matrix Factorization, LDA and so on. It provides a distributed, global key-value space which is called parameter server. Messages in paracel(also defined as parameters) must have a key-value structure. When doing commnication, you need to interact with parameter servers by read/write/update messages. This is a quick tutorial that should take no more than 20 minutes to complete.
Download Paracel project from github: https://github.com/douban/paracel. Build Paracel and all its dependencies on your cluster using cmake. Follow this deployment section in API reference document. Before everything is going on, make sure that you have already successfully installed paracel. To ensure that, follow the instruction here to test it out. In the following, we suppose that you have installed Paracel at /opt/paracel folder, modify the path if you installed paracel in different places.
Generate dataset, here we use word count for simplicity.
python /opt/paracel/tool/datagen.py -m wc -o data.txt
Create a json file named demo_cfg.json as below:
{ "input" : "data.txt", "output" : "./wc_output/", "topk" : 10, "handle_file" : "/opt/paracel/lib/libwc_update.so", "update_function" : "wc_updater", "filter_function" : "wc_filter" }
Use prun.py to start the program(4 worker, 2 server, local mode in this example).
/opt/paracel/prun.py -w 4 -p 2 -c demo_cfg.json -m local /opt/paracel/bin/wc
You will get the top10 word list:
the : 62075 and : 38850 of : 34434 to : 13384 And : 12846 that : 12577 in : 12334 shall : 9760 he : 9666 unto : 8940
There are two strong
reasons choosing Paracel to develop your applications. Firstly, parameter
server paradigm costs little effort with model transforming compared to
Mapreduce and Graphlab framework. You can add new features into your
application without refactoring the whole implementation. Secondly,
Paracel's program is very close to its sequential version, developers usally
write a sequential version and then modify it with few lines of code to
parallelize. For machine learning problems in Paracel, the usual processes are as follows:
Let's start with a simple word count example. Firstly, you must write a subclass inherits the paralg baseclass:
// wc.hpp #include "ps.hpp" // paralg class word_count : public paralg { public: word_count(paracel::Comm comm, string hosts_dct_str, string in, string out, int topk) : paralg(hosts_dct_str, comm, out), input(in), output(out) { /* init local member var here.. */ } private: string input, output; int ktop; }; // class word_count
Secondly, write a update function in update.cpp(similar to reduce function in Mapreduce). Compile it to update.so using cmake.
// update.cpp #include "proxy.hpp" // update_proxy #include "paracel_types.hpp" // update_result int local_reduce(int value, int delta) { return value + delta; } update_result updater = update_proxy(local_reduce) // register updater to paracel framework
Thirdly, implement the virtual method solve to work out the problem, for example:
// wc.hpp private: void local_learning(const vector<string> & lines) { paracel_register_bupdate("update.so", "updater"); // register update function in the subclass for(auto & line : lines) { auto word_lst = local_parser(line); // parse line to a word list for(auto & word : word_lst) { paracel_update_default(word, 1); // something like `setdefault in Python` } } paracel_sync(); // wait finish of every workers auto result = paracel_read_topk(ktop); // get ktop // dump result into output } public: virtual void solve() { auto lines = paracel_load(input); // parallel load input to lines local_learning(lines); }
Lastly, write a main function in main.cpp:
// main.cpp #include <google/gflags.h> #include "utils.hpp" #include "wc.hpp" DEFINE_string(server_info); DEFINE_string(cfg_file); int main(int argc, char *argv[]) { // init worker environment paracel::main_env comm_main_env(argc, argv); paracel::Comm comm(MPI_COMM_WORLD); // init parameter server environment google::SetUsageMessage("[options]\n\t--server_info\n"); google::ParseCommandLineFlags(&argcm &argv, true); // you can also init local paras by other fmt(we recomment json) string input = "wc.hpp"; string output = "result.txt"; int ktop = 10; // create a instance word_count solver(comm, FLAGS_server_info, input, output, ktop); // solve problems solver.solve(); return 0; }
You are almost done! The only thing you have to do now is compiling the codes above(see other algorithms's CMakeLiskts.txt in their directory for example), then run with prun.py script.
In the former word count example, we notice that lines in each worker may contain lots of same words, so we can combine them locally.
// wc.hpp private: void local_learning(const vector<string> & lines) { std::unordered_map<std::string, int> tmp; for(auto & line : lines) { auto word_lst = parser(line); for(auto & word : word_lst) { tmp[word] += 1; } } for(auto & wc : tmp) { paracel_bupdate(wc.first, wc.second, "update.so", updater"); } paracel_sync(); paracel_read_topk(topk, result); }
If you run the second version of word count, you will find the program is running very slow. It is a very common problem users will encounter when writing distributed algorithms at first time. The problem came out because communication time dominates computing time and it went against Amdahl's law. There is no free lunch, so you must try to optimize the communication time. A commonly used strategy is packaging small messages to save network lantancy. In the following code, we create local_vd: a local word dictory for every server to communicate with. In this case, you have to write a new update function to merge local word dictories into a global word count mapping relation. We will omit implementation of that and you could reference the wc.hpp source code for more details. Don't worry about that, you are already able to use Paracel in action. Congratulations!
// wc.hpp private: void local_learning(const vector<string> & lines) { std::unordered_map<std::string, int> tmp; for(auto & line : lines) { auto word_lst = parser(line); for(auto & word : word_lst) { tmp[word] += 1; } } int dct_sz = 100; std::vector<std::unordered_map<std::string, int> > local_vd(dct_sz); paracel::hash_type<std::string> hfunc; for(auto & kv : tmp) { auto indx = hfunc(kv.first) % dct_sz; loval_vd[indx][kv.first] = kv.second; } paracel_sync(); for(int k = 0; k < dct_sz; ++k) { paracel_bupdate("key_" + std::to_string(k), local_vd[k], "update.so", "optimize_updater"); } paracel_sync(); // get topk result ... } }
In this part, with a simple logistic regression example, we will show you how to use asynchronous controlling to speed up your application in Paracel. The key point is that the distributed version is very close to the sequential version and you only need to add few lines of code to make it asynchronously. The sequential version is really straightfoward, we use SGD optimization method to minimize the likelihood objective function:
void logistic_regression::solve() { // init data auto lines = paracel_load(input); local_parser(lines); serial_learning(); } void logistic_regression::serial_learning() { theta = paracel::random_double_list(data_dim); for(int rd = 0; rd < rounds; ++rd) { for(int i = 0; i < data_dim; ++i) { delta[i] = 0.; } random_shuffle(idx.begin(), idx.end()); for(auto sample_id : idx) { for(int i = 0; i < data_dim; ++i) { delta[i] += coff1 * samples[sample_id][i] - coff2 * theta[i]; } for(int i = 0; i < data_dim; ++i) { theta[i] += delta[i]; } } // traverse } }
To parallelize, you have to follow the training model in Paracel: pull parameters before training and push local updates after training. The code is very close to previous version:
void logistic_regression::solve() { // init data auto lines = paracel_load(input); local_parser(lines); paracel_sync(); parallel_learning(); } void logistic_regression::parallel_learning() { theta = paracel::random_double_list(data_dim); paracel_write("theta", theta); // init push for(int rd = 0; rd < rounds; ++rd) { for(int i = 0; i < data_dim; ++i) { delta[i] = 0.; } random_shuffle(idx.begin(), idx.end()); theta = paracel_read<vector<double> >("theta"); // pull thera for(auto sample_id : idx) { for(int i = 0; i < data_dim; ++i) { delta[i] += coff1 * samples[sample_id][i] - coff2 * theta[i]; } } // traverse paracel_bupdate("theta", delta, "update.so", "lg_theta_update"); // update with delta in parameter servers } theta = paracel_read<vector<double> >("theta"); // last pull }
Asynchronous learning is a very important feature in Paracel to speed up convergency. You can add 4 lines of code in the logistic regression example to make it work. In constructor of your subclass, open ssp_switch and set a limit_s parameter which means the fastest worker can lead no more than limit_s iterations than the slowest worker. Set total iterations during initialization and commit to parameter server at the end of each iteration.
logistic_regression::logistic_regression(paracel::Comm comm, std::string hosts_dct_str, std::string _output, int _rounds) : paracel::paralg(hosts_dct_str, comm, _output, _rounds, 3, // set limit_s true /* open ssp switch */ ) {} void logistic_regression::solve() { // init data auto lines = paracel_load(input); local_parser(lines); set_total_iters(rounds); // set total iterations paracel_sync(); dgd_learning(); } void logistic_regression::dgd_learning() { theta = paracel::random_double_list(data_dim); paracel_write("theta", theta); // init push for(int rd = 0; rd < rounds; ++rd) { for(int i = 0; i < data_dim; ++i) { delta[i] = 0.; } random_shuffle(idx.begin(), idx.end()); theta = paracel_read<vector<double> >("theta"); // pull thera for(auto sample_id : idx) { for(int i = 0; i < data_dim; ++i) { delta[i] += coff1 * samples[sample_id][i] - coff2 * theta[i]; } } // traverse paracel_bupdate("theta", delta, "update.so", "lg_theta_update"); // update with delta iter_commit(); // commit to parameter server at the end of each iteration } theta = paracel_read<vector<double> >("theta"); // last pull }
So that's it for the quick tour of Paracel and hope you enjoy using Paracel.