Overview
std::cout
<< "(0.5,1)->(0,0.5)->(1,0.5)->(0.5,1)->(0.5,0.25)->(0.25,0.25)";
print '(0.5,1)->(0,0.5)->(1,0.5)->(0.5,1)->(0.5,0.25)->(0.25,0.25)'
echo "(0.5,1)->(0,0.5)->(1,0.5)->(0.5,1)->(0.5,0.25)->(0.25,0.25)"
{
"logo": "(0.5,1)->(0,0.5)->(1,0.5)->(0.5,1)->(0.5,0.25)->(0.25,0.25)"
}
Welcome to the Paracel API Reference page!
You can use the interfaces listed here to build your own distributed algorithms or applications following Paracel’s paradigm. Till now, Paracel only provides C++ interface for the consideration of computational efficiency. You can view code examples in the right area.
Data Representation
Paracel use graph
and matrix
to represent the training data.
We define four types of graph
:
We import Eigen3 library for matrix/vector operations. Eigen3 support two types of matrix:
Bigraph
#include "graph.hpp" // paracel::bigraph
#include "paracel_types.hpp"
/*
* G:
* a, A, 3
* a, B, 4
* a, D, 2
* b, C, 5
* b, D, 4
* b, E, 5
* c, D, 3
* c, E, 1
* c, F, 2
* d, C, 3
*/
void foo(paracel::bigraph<std::string> & G) {
auto data = G.get_data();
auto print_lambda = [] (std::string u,
std::string v,
double wgt) {
std::cout << u << "|" << v << "|" << wgt << std::endl;
};
G.traverse(print_lambda);
G.add_edge("c", "G", 3.6);
G.traverse("c", print_lambda);
auto ubag = G.left_vertex_bag(); // a, b, c, d
auto uset = G.left_vertex_set(); // a, b, c, d
std::vector<std::tuple<std::string,
std::string, double> > dump_tpls;
G.dump2triples(dump_tpls);
std::unordered_map<std::string,
std::unordered_map<std::string, double> > dump_dict;
G.dump2dict(dump_dict);
auto v_sz = G.v(); // 4
auto e_sz = G.e(); // 11
auto od = G.outdegree("a"); // 3
auto id = G.indegree("E"); // 2
auto adj_info = G.adjacent("c");
}
In mathematical field of graph theory, a bipartite graph (or bigraph) is a graph whose vertices can be divided into two disjoint sets U
and V
(that is, U
and V
are each independent sets) such that every edge connects a vertex in U
to one in V
.
template <class T = paracel::default_id_type>
class bigraph {
public:
bigraph();
bigraph(std::unordered_map<T, std::unordered_map<T, double> > edge_info);
bigraph(std::vector<std::tuple<T, T> > tpls);
bigraph(std::vector<std::tuple<T, T, double> > tpls);
void add_edge(const T & v, const T & w);
void add_edge(const T & v, const T & w, double wgt);
// return bigraph data
std::unordered_map<T, std::unordered_map<T, double> > get_data();
// traverse bigraph edge using functor func
template <class F>
void traverse(F & func);
// traverse vertex v
’s related edges using functor func
template <class F>
void traverse(const T & v, F & func);
// return U
bag
std::vector<T> left_vertex_bag();
// return U
set
std::unordered_set<T> left_vertex_set();
// out: tpls
void dump2triples(std::vector<std::tuple<T, T, double> > & tpls);
// out: dict
void dump2dict(std::unordered_map > & dict);
// return number of vertexes in U
inline size_t v();
// return number of edges in bigraph
inline size_t e();
// return adjacent info of vertex v
std::unordered_map<T, double> adjacent(const T & v);
// return outdegree of vertex u
in U
inline size_t outdegree(const T & u);
// return indegree of vertex v
in V
inline size_t indegree(const T & v);
};
Bigraph_continuous
#include "graph.hpp" // paracel::bigraph_continuous
#include "paracel_types.hpp"
/*
* G:
* 0, 1, 3
* 0, 2, 4
* 0, 4, 2
* 1, 3, 5
* 1, 4, 4
* 1, 5, 5
* 2, 4, 3
* 2, 5, 1
* 2, 6, 2
* 3, 3, 3
*/
void foo() {
paracel::bigraph_continuous G;
G.add_edge(0, 1, 3.);
G.add_edge(0, 2, 4.);
G.add_edge(0, 4, 2.);
G.add_edge(1, 3, 5.);
G.add_edge(1, 4, 4.);
G.add_edge(1, 5, 5.);
G.add_edge(2, 4, 3.);
G.add_edge(2, 5, 1.);
G.add_edge(2, 6, 2.);
G.add_edge(3, 3, 3.);
auto print_lambda = [] (paracel::default_id_type u,
paracel::default_id_type v,
double wgt) {
std::cout << u << "|" << v << "|" << wgt << std::endl;
};
G.traverse(print_lambda);
G.traverse(0, print_lambda);
auto v_sz = G.v(); // 4
auto e_sz = G.e(); // 10
auto od = G.outdegree(0); // 3
auto id = G.indegree(5); // 2
auto adj_info = G.adjacent(2);
}
paracel::bigraph_continous
can also be used to represent a bipartite graph. Vertexes of a paracel::bigraph_continuous
must be indexed from 0 to N-1(N is the number of total number of vertexes in U
). It will cost less memory comparing to paracel::bigraph<paracel::default_id_type>
and will be more efficent since it is continous, the interface below is similar.
class bigraph_continuous {
public:
bigraph_continuous();
bigraph_continuous(paracel::default_id_type n);
// sequential interface, first row of file filename
is size of U
bigraph_continuous(const std::string & filename);
void add_edge(paracel::default_id_type src,
paracel::default_id_type dst);
void add_edge(paracel::default_id_type src,
paracel::default_id_type dst,
double rating);
// return number of vertexes in U
paracel::default_id_type v();
// return number of edges in bigraph_continuous
paracel::default_id_type e();
// return adjacent info of vertex v
paracel::bag_type<std::pair<paracel::default_id_type, double> > adjacent(paracel::default_id_type v);
// return outdegree of vertex u
in U
inline size_t outdegree(paracel::default_id_type u);
// return indegree of vertex v
in V
inline size_t indegree(paracel::default_id_type v);
// traverse bigraph edge using functor func
template <class F>
void traverse(F & func);
// traverse vertex v
’s related edges using functor func
template <class F>
void traverse(paracel::default_id_type v, F & func);
};
Digraph
#include "graph.hpp" // paracel::digraph
#include "paracel_types.hpp"
/*
* G:
* 0, 0, 3.0
* 0, 2, 5.0
* 1, 0, 4.0
* 1, 1, 3.0
* 1, 2, 1.0
* 2, 0, 2.0
* 2, 3, 1.0
* 3, 1, 3.0
* 3, 3, 1.0
*/
void foo(paracel::digraph<size_t> & G) {
G.add_edge(3, 4, 5.);
auto data = G.get_data();
auto vbag = G.vertex_bag(); // std::vector<size_t>({0, 1, 2, 3, 4})
auto vset = G.vertex_set(); // unordered_set<size_t>({0, 1, 2, 3, 4})
auto adj = G.adjacent(0); // {0 : 3.0; 2 : 5.0}
auto print_lambda = [] (size_t a, size_t b, double c) {
std::cout << a << " | " << b << " | " << c << std::endl;
};
grp.traverse(print_lambda);
grp.traverse(1, print_lambda); // 1|3|4.0\n1|1|3.0\n1|2|1.0, order does not matter
std::vector<std::tuple<size_t, size_t, double> > dump_tpls;
G.dump2triples(dump_tpls);
std::unordered_map<size_t,
std::unordered_map<size_t, double> > dump_dict;
G.dump2dict(dump_dict);
auto v_sz = G.v(); // 5
auto e_sz = G.e(); // 10
auto od = G.outdegree(0); // 2
auto id = G.indegree(0); // 2
auto ad = G.avg_degree(); // 2
auto sl = G.selfloops(); // 3
grp.reverse();
v_sz = G.v(); // 5
e_sz = G.e(); // 10
od = G.outdegree(0); // 3
id = G.indegree(0); // 2
ad = G.avg_degree(); // 2
sl = G.selfloops(); // 3
}
paracel::digraph
can be used to represent a weighted directed graph. You can use it to develop graph algorithms such as pagerank. Iterfaces of digraph
are quite similar with paracel::undirected_graph.
template <class T = paracel::default_id_type>
class digraph {
public:
digraph();
digraph(std::unordered_map<T, std::unordered_map<T, double> > edge_info);
digraph(std::vector<std::tuple<T, T> > tpls);
digraph(std::vector<std::tuple<T, T, double> > tpls);
void add_edge(const T & v, const T & w);
void add_edge(const T & v, const T & w, double wgt);
// return digraph data
std::unordered_map > get_data();
// traverse bigraph edge using functor func
template <class F>
void traverse(F & func);
// traverse vertex v
’s related edges using functor func
template <class F>
void traverse(const T & v, F & func);
template <class F>
void traverse_by_vertex(F & func);
// return vertexes of digraph with std::vector
std::vector<T> vertex_bag();
// return vertexes of digraph with std::unordered_set
std::unordered_set<T> vertex_set();
// out: tpls
void dump2triples(std::vector<std::tuple<T, T, double> > & tpls);
// out: dict
void dump2dict(std::unordered_map<T, std::unordered_map<T, double> > & dict);
// reverse digraph with edge direction
digraph reverse();
// return total number of vertexes
inline size_t v();
// return total number of edges
inline size_t e();
// return adjacent info of vertex v
std::unordered_map<T, double>
adjacent(const T & v);
unordered_map<T, double>
reverse_adjacent(const T & v);
// return outdegree of vertex v
inline size_t outdegree(const T & v);
// return indegree of vertex v
inline size_t indegree(const T & v);
// return average degree of digraph
inline double avg_degree();
// return total number of self-loops of digraph
inline int selfloops();
Undirected_graph
#include "graph.hpp" // paracel::undirected_graph
#include "paracel_types.hpp"
/*
* G:
* 0, 1
* 0, 2
* 0, 5
* 0, 6
* 3, 4
* 3, 5
* 4, 5
* 4, 6
* 9, 10
* 9, 11
* 9, 12
* 11, 12
*/
void foo(paracel::undirected_graph<size_t> & G) {
auto v_sz = G.v(); // 11
auto e_sz = G.e(); // 12
auto adj = G.adjacent(5); // {0: 1.0, 4:1.0}
auto ad = G.avg_degree(); // 24. / 11.
auto md = G.max_degree(); // 4
auto sl = G.selfloops(); // 0
auto print_lambda = [] (size_t a, size_t b, double c) {
std::cout << a << " | " << b << " | " << c << std::endl;
};
G.traverse(print_lambda);
G.traverse(0, print_lambda); // 0|1|1.0\n0|2|1.0\n0|5|1.0\n0|6|1.0, order does not matter
}
paracel::undirected_graph
can be used to represent a weighted undirected graph. You can use it to develop graph algorithms. Interfaces of undirected_graph
are quite similar with paracel::digraph.
template <class T = size_t>
class undirected_graph {
public:
undirected_graph();
undirected_graph(std::unordered_map<T, std::unordered_map<T, double> > edge_info);
undirected_graph(std::vector<std::tuple<T, T> > tpls);
undirected_graph(std::vector<std::tuple<T, T, double> > tpls);
void add_edge(const T & v, const T & w);
void add_edge(const T & v, const T & w, double wgt);
// return undirected_graph data
unordered_map > get_data();
// traverse undirected_graph edge using functor func
template <class F>
void traverse(F & func);
// traverse vertex v
’s related edges using functor func
template <class F>
oid traverse(const T & v, F & func);
template <class F>
void traverse_by_vertex(F & func);
// return vertexes of undirected_graph with std::vector
std::vector vertex_bag();
// return vertexes of undirected_graph with std::unordered_set
std::unordered_set vertex_set();
// return total number of vertexes
inline size_t v();
// return total number of edges
inline size_t e();
// return adjacent info of vertex v
std::unordered_map<T, double>
adjacent(const T & v);
// return outdegree of vertex v
inline size_t outdegree(const T & v);
// return indegree of vertex v
inline size_t indegree(const T & v);
// return average degree of digraph
inline double avg_degree();
// return total number of self-loops of digraph
inline int selfloops();
More Graph Operation
SparseMatrix
#include <vector>
#include <eigen3/Eigen/Dense>
#include <eigen3/Eigen/Sparse>
#include <eigen3/Eigen/QR>
#include <eigen3/Eigen/Cholesky>
#include "utils.hpp"
int main(int argc, char *argv[])
{
Eigen::MatrixXd mtx(4, 3);
std::cout << mtx << std::endl;
mtx << 1., 2., 3.,
4., 5., 6.,
7., 8., 9.,
10., 11., 12.;
Eigen::MatrixXd clusters_mtx(2, 3);
std::vector<double> c1 = {2.5, 3.5, 4.5}, c2 = {8.5, 9.5, 10.5};
clusters_mtx.row(0) = paracel::vec2evec(c1);
clusters_mtx.row(1) = paracel::vec2evec(c2);
for(size_t i = 0; i < (size_t)mtx.rows(); ++i) {
Eigen::MatrixXd::Index indx;
(clusters_mtx.rowwise() - mtx.row(i))
.rowwise()
.squaredNorm()
.minCoeff(&indx);
std::cout << "| " << indx << " |" << std::endl;
}
typedef Eigen::Triplet<double> eigen_triple;
std::vector<eigen_triple> tpls;
tpls.push_back(eigen_triple(0, 0, 0.6));
tpls.push_back(eigen_triple(0, 2, 0.7));
tpls.push_back(eigen_triple(0, 4, 0.4));
tpls.push_back(eigen_triple(1, 2, 0.6));
tpls.push_back(eigen_triple(1, 3, 0.5));
tpls.push_back(eigen_triple(1, 4, 0.3));
tpls.push_back(eigen_triple(2, 0, 0.3));
tpls.push_back(eigen_triple(2, 1, 0.1));
tpls.push_back(eigen_triple(3, 3, 0.1));
tpls.push_back(eigen_triple(3, 4, 0.7));
tpls.push_back(eigen_triple(4, 1, 0.3));
tpls.push_back(eigen_triple(5, 0, 0.1));
tpls.push_back(eigen_triple(5, 4, 0.7));
tpls.push_back(eigen_triple(6, 0, 0.2));
tpls.push_back(eigen_triple(6, 2, 0.8));
tpls.push_back(eigen_triple(7, 0, 0.3));
tpls.push_back(eigen_triple(8, 1, 0.1));
tpls.push_back(eigen_triple(8, 2, 0.2));
tpls.push_back(eigen_triple(8, 3, 0.3));
tpls.push_back(eigen_triple(8, 4, 0.4));
tpls.push_back(eigen_triple(9, 0, 0.9));
tpls.push_back(eigen_triple(9, 3, 0.1));
tpls.push_back(eigen_triple(9, 4, 0.2));
Eigen::SparseMatrix<double, Eigen::RowMajor> A;
A.resize(10, 5);
A.setFromTriplets(tpls.begin(), tpls.end());
Eigen::MatrixXd H(5, 3); // 5 * 3
H << 1., 2., 3.,
4., 5., 6.,
7., 8., 9.,
10., 11., 12.,
13., 14., 15.;
Eigen::MatrixXd W(1,1);
W.resize(10, 3);
W = A * H;
std::cout << W << std::endl;
return 0;
}
Paracel import Eigen::SparseMatrix
as sparse matrix representation. Check out more details here. You can directly use it to develop your sparse matrix related application. In the code area, we show a brief usage of Eigen::SparseMatrix
. Eigen::SparseVector
is similar usage.
// std::vector to Eigen::VectorXd
Eigen::VectorXd vec2evec(const std::vector & v);
// Eigen::VectorXd to std::vector
std::vector evec2vec(const Eigen::VectorXd & ev);
// traverse Eigen::SparseVector v
with functor func
template <class F>
void traverse_vector(Eigen::SparseVector<double> & v, F & func);
// traverse Eigen::SparseMatrix m
with functor func
template <class F>
void traverse_matrix(Eigen::SparseMatrix<double, Eigen::RowMajor> & m, F & func);
DenseMatrix
#include <eigen3/Eigen/Dense>
#include <iostream>
#include "utils.hpp"
void foo() {
std::cout.precision(20);
Eigen::MatrixXd H_blk(4, 3);
Eigen::MatrixXd stone = Eigen::MatrixXd::Random(2, 3);
H_blk.row(0) = stone.row(0);
H_blk.row(1) = stone.row(1);
H_blk.row(2) = stone.row(0);
H_blk.row(3) = stone.row(1);
std::cout << stone.transpose() * stone << std::endl;
auto kk = H_blk.block(0, 0, 2, 3).transpose() * H_blk.block(0,0,2,3);
auto kk2 = H_blk.block(2, 0, 2, 3).transpose() * H_blk.block(2,0,2,3);
Eigen::MatrixXd tt(3, 3);
tt <<
kk.row(0)[0] + kk2.row(0)[0],
kk.row(0)[1] + kk2.row(0)[1],
kk.row(0)[2] + kk2.row(0)[2],
kk.row(1)[0] + kk2.row(1)[0],
kk.row(1)[1] + kk2.row(1)[1],
kk.row(1)[2] + kk2.row(1)[2],
kk.row(2)[0] + kk2.row(2)[0],
kk.row(2)[1] + kk2.row(2)[1],
kk.row(2)[2] + kk2.row(2)[2];
std::cout << tt << std::endl;
std::cout << tt.inverse() << std::endl;
std::cout << (H_blk.transpose() * H_blk).inverse() << std::endl;
}
void goo() {
Eigen::MatrixXd m(2, 3);
m << 1., 2., 3.,
4., 5., 6.;
auto v = paracel::mat2vec(m);
for(auto & vv : v) std::cout << vv << std::endl; // 1\n4\n2\n5\n3\n6
Eigen::MatrixXd mat = paracel::vec2mat(v, 3); // 3 is columns
// 1 2 3
// 4 5 6
std::cout << mat << std::endl;
}
int main(int argc, char *argv[])
{
foo();
goo();
return 0;
}
Paracel import Eigen::MatrixXd
as dense matrix represestation. Check out more details here. You can directly use it to develop your matrix related application. In the right code area, we show a brief usage of Eigen::MatrixXd
.
// Eigen::MatrixXd is column-major and return col seq
std::vector mat2vec(const Eigen::MatrixXd & m);
// return row seq
Eigen::MatrixXd vec2mat(const std::vector & v, size_t rows);
Paralg
Paralg
is the basic class providing a marjority of functionalities of Paracel. Writing a Paracel program involves subclassing the paralg
baseclass and you have to override the virtual solve
method. Some of them are SPMD iterfaces, we will call them parallel interfaces in the following.
Initialize
#include <google/gflags.h>
#include "ps.hpp"
#include "utils.hpp"
class foo : public paracel::paralg {
public:
foo(paracel::Comm comm,
std::string hosts_dct_str,
std::string _output,
int k) : paracel::paralg(hosts_dct_str, comm, _output), topk(k) {}
virtual void solve() { /* ... */ }
private:
int topk;
};
DEFINE_string(server_info,
"host1:7777PARACELhost2:8888",
"hosts name string of paracel-servers.\n");
int main(int argc, char *argv[])
{
paracel::main_env comm_main_env(argc, argv);
paracel::Comm comm(MPI_COMM_WORLD);
google::SetUsageMessage("[options]\n\t--server_info\n\t--cfg_file\n");
google::ParseCommandLineFlags(&argc, &argv, true);
std::string output = "/nfs/tmp/demo/";
{
paracel::paralg *pt = new paracel::paralg(comm);
delete pt;
}
{
foo solver(comm, FLAGS_server_info, output, 10);
}
return 0;
}
paralg(paracel::Comm comm, std::string output = “”, int rounds = 1);
Constructor: for direct use. comm
refer to the worker communicator, output
refer to the output folder for dumping results, rounds
refer to the traverse count of the total dataset.
paralg(std::string hosts_dct_str,
paracel::Comm comm,
std::string output = “”,
int rounds = 1,
int limit_s = 0,
bool ssp_switch = false);
Constructor: for subclassing paralg
use. hosts_dct_str
is internal connect information which you will get from the main function, comm
refer to the worker communicator, output
refer to the output folder for dumping results, rounds
refer to the traverse count of the total dataset, limit_s
and ssp_switch
are used for Asynchronous Controlling in iterative tasks. output
, rounds
, limit_s
and ssp_switch
are default parameters, you do not need to define them.
Load
#include <google/gflags.h>
#include "ps.hpp"
#include "utils.hpp"
#include "graph.hpp"
class foo : public paracel::paralg {
public:
foo(paracel::Comm comm,
std::string hosts_dct_str) : paracel::paralg(hosts_dct_str, comm) {}
void load_demo1() {
size_t total_sz = 0;
auto line_list = paracel_loadall("./data/*.csv");
for(auto & line : line_lst) {
total_sz += line.size();
}
}
void load_demo2() {
size_t total_sz = 0;
auto handle_lambda = [&] (std::vector<std::string> & lines) {
for(auto & line : lines) {
total_sz += line.size();
}
};
paracel_sequential_loadall("./data/*.csv", handle_lambda);
}
void load_demo3() {
std::vector<std::string> files = {data1.csv, data2.csv};
size_t local_sz = 0;
auto local_line_list = paracel_load(files);
for(auto & line : local_line_list) {
local_sz += line.size();
}
}
void load_demo4() {
std::vector<std::string> files = {data1.csv, data2.csv};
size_t local_sz = 0;
paracel_load_handle(files, [&] (string & line) { local_sz += line.size(); });
}
void load_demo5() {
paracel::digraph<std::string> G1;
paracel::bigraph<paracel::default_id_type> G2;
paracel::bigraph_continuous G3;
/*
* graph1.txt graph2.txt
* 100,1,1 a,b
* 100,3,2 a,c
* 101,4,3 b,d
* 102,1,2 c,a
* ... ...
* ... ...
*/
auto local_parser = [] (const std::string & line) {
return paracel::str_split(line, ',');
};
// you have to wrap your pre-defined functor with paracel::gen_parser
auto f_parser = paracel::gen_parser(local_parser);
paracel_load_as_graph(G1, "graph2.txt", f_parser, "fmap", false);
paracel_load_as_graph(G2, "graph1.txt", f_parser, "smap", false);
std::unordered_map<paracel::default_id_type,
paracel::default_id_type> row_map, col_map;
paracel_load_as_graph(G3, row_map, col_map, "graph1.txt", f_parser);
}
void load_demo6() {
Eigen::SparseMatrix<double, Eigen::RowMajor> blk_mtx1, blk_mtx2, blk_mtx3;
std::unordered_map<paracel::default_id_type,
std::string> row_map, col_map;
/*
* matrix1.txt matrix2.txt
* a,b,1 a 1.1,2.1,3.1,4.1
* a,c,2 b 1.2,2.2,3.2,4.2
* b,d,3 c 1.3,2.3,3.3,4.3
* c,a,2 d 1.4,2.4,3.4,4.4
* ...
* ...
*/
auto local_parser = [] (const std::string & line) {
return paracel::str_split(line, ',');
};
// you have to wrap your pre-defined functor with paracel::gen_parser
auto f_parser = paracel::gen_parser(local_parser);
paracel_load_as_matrix(blk_mtx1, row_map, col_map,
"matrix1.txt", f_parser, "fmap", false);
paracel_load_as_matrix(blk_mtx2, row_map, "matrix1.txt", f_parser, "fmap");
paracel_load_as_matrix(blk_mtx3, "matrix1.txt", f_parser, "fmap");
Eigen::MatrixXd blk_dense_mtx1, blk_dense_mtx2;
paracel_load_as_matrix(blk_dense_mtx1, row_map,
"matrix2.txt", f_parser);
paracel_load_as_matrix(blk_dense_mtx2, "matrix2.txt", f_parser);
}
virtual void solve() {
// paracel_loadall interface usage
load_demo1();
// paracel_sequential_loadall interface usage
load_demo2();
// paracel_load interface usage
load_demo3();
// paracel_load_handle interface usage
load_demo4();
// paracel_load_as_graph interface usage
load_demo5();
// paracel_load_as_matrix interface usage
load_demo6();
}
};
DEFINE_string(server_info,
"host1:7777PARACELhost2:8888",
"hosts name string of paracel-servers.\n");
int main(int argc, char *argv[])
{
paracel::main_env comm_main_env(argc, argv);
paracel::Comm comm(MPI_COMM_WORLD);
google::SetUsageMessage("[options]\n\t--server_info\n\t--cfg_file\n");
google::ParseCommandLineFlags(&argc, &argv, true);
foo solver(comm, FLAGS_server_info);
solver.solve();
return 0;
}
Paracel provides various interfaces for loading input files. In this version, all load related interfaces only support text format files so that it will cost a little more memory. You can either read a partition lines of data parallelly then construct customized data structure or directly load input data as the Paracel’s graph
or matrix
types. In the latter case, you have to use pattern
and mix_flag
variables to describe the structure of the input files. Pattern
also decide the partition method for input data.
Paracel defines several patterns with variable pattern
:
pattern | structure | line example |
---|---|---|
linesplit(default) | partition with lines | all structures |
fmap | first-second case(value set to 1.0) first-second-value case partition with the first field |
a,b a,b,0.2 |
smap | second-first case(value set to 1.0) second-first-value case partition with the second field |
a,b a,b,0.2 |
fsmap | support the same structure as fmap and smap 2D partition |
a,b or a,b,0.2 |
fvec | id,feature1,…,featurek case partition with id |
1001 0.1|0.2|0.3|0.4 |
fset | attr1,attr2,attr3,… attr1,attr2|value2,attr3|value3,… partition with the first field |
a,b,c or a,b|0.2,c|0.4 |
Variable mix_flag
represents whether linking relation of a graph/matrix is defined in a single line. As you can see the example below, when mix_flag
is set to false, all the linking relation of node ‘a’ is expanded in three single lines. If pattern
equals to fvec
and fset
, mix_flag
is always true
.
mix_flag | example |
---|---|
true | a,b,c,d b,c,d … |
true | a,b a,c,d b,c b,d … |
false(default) | a,b a,c a,d b,c b,d … |
As you can see above, pattern
do not only decide data format but also decide partition strategy while mix_flag
tell Paracel if a link relation is mixed in a single line.
template<class T>
vector<string> paracel_loadall(const T & fn);
Each worker load all lines in fn
. T
can be either string type to represent one file or vector<string> to represent a set of files. See more details on the right.
template<class T, class F>
void paracel_sequential_loadall(const T & fn, F & func);
Each worker load all lines in fn
and handle with functor func
. To avoid memory exceed, we recommend you to use this interface. Paracel will load part of lines in fn
and call func
to handle them, then do it again and again until the end of fn
. T
can be either string type to represent one file or vector<string> to represent a set of files. See more details on the right.
template<class T>
vector paracel_load(const T & fn);
SPMD interface: each worker load fn
’s lines parallelly. T
can be either string type to represent one file or vector<string> to represent a set of files. See more details on the right.
template<class T, class F>
void paracel_load_handle(const T & fn,
F & func,
const string & pattern = “linesplit”,
bool mix_flag = false);
SPMD interface: parallelly load fn
and handle with func
line by line. func
takes string type as argument and Paracel will pass each line of fn
as input parameter. You can also specify pattern
and mix_flag
to describe the structure of fn
. T
can be either string type to represent one file or vector<string> to represent a set of files. See more details on the right.
template<class T, class G>
void paracel_load_as_graph(paracel::digraph<G> & grp,
const T & fn,
parser_type & parser,
const std::string pattern = “fmap”,
bool mix_flag = false);
SPMD interface: parallelly load fn
using parser
and then generate local digraph grp
for each worker. Input argument of parser
must be string type and it must return vector<string> type which refer to the fields of this digraph. You can also specify pattern
and mix_flag
to describe the structure of fn
. T
can be either string type to represent one file or vector<string> to represent a set of files. See more details on the right.
template<class T, class G>
void paracel_load_as_graph(paracel::bigraph<G> & grp,
const T & fn,
parser_type & parser,
const std::string pattern = “fmap”,
bool mix_flag = false);
SPMD interface: parallelly load fn
using parser
and then generate local bigraph grp
for each worker. Input argument of parser
must be string type and it must return vector<string> type which refer to the fields of this bigraph. You can also specify pattern
and mix_flag
to describe the structure of fn
. T
can be either string type to represent one file or vector<string> to represent a set of files. See more details on the right.
template<class T>
void paracel_load_as_graph(paracel::bigraph_continuous & grp,
unordered_map<default_id_type, default_id_type> & row_map,
unordered_map<default_id_type, default_id_type> & col_map,
const T & fn,
parser_type & parser,
const string & pattern = “fmap”,
bool mix_flag = false);
SPMD interface: parallelly load fn
using parser
and then generate local bigraph_continuous grp
for each worker. Since Paracel will map original ids into continuous indexes, this interface has two more output arguments to tell the mapping infomation: row_map
and col_map
. row_map
or col_map
store the corresponding relation between paracel::default_id_type
and generic type G
of graph node. Input argument of parser
must be string type and it must return vector<string> type which refer to the fields of this bigraph_continuous. You can also specify pattern
and mix_flag
to describe the structure of fn
. T
can be either string type to represent one file or vector<string> to represent a set of files. See more details on the right.
template<class T, class G>
void paracel_load_as_matrix(Eigen::SparseMatrix<double, Eigen::RowMajor> & blk_mtx,
std::unordered_map<default_id_type, G> & row_map,
std::unordered_map<default_id_type, G> & col_map,
const T & fn,
parser_type & parser,
const std::string & pattern = “fsmap”,
bool mix_flag = false);
SPMD interface: parallelly load fn
using parser
and then generate block Eigen::SparseMatrix blk_mtx
for each worker. Since indexes of a matrix must be continuous, Paracel will map original ids into continuous indexes. There are two more output arguments to store the mapping infomation: row_map
and col_map
. row_map
or col_map
store the corresponding relation between paracel::default_id_type
and generic type G
of Eigen::SparseMatrix. Input argument of parser
must be string type and it must return vector<string> type. You can also specify pattern
and mix_flag
to describe the structure of fn
. T
can be either string type to represent one file or vector<string> to represent a set of files. See more details on the right.
template<class T, class G>
void paracel_load_as_matrix(Eigen::SparseMatrix<double, Eigen::RowMajor> & blk_mtx,
std::unordered_map<default_id_type, G> & row_map,
const T & fn,
parser_type & parser,
const std::string & pattern = “fsmap”,
bool mix_flag = false);
Similar interface as the previous one without col_map
. See more details on the right.
template<class T, class G>
void paracel_load_as_matrix(Eigen::SparseMatrix<double, Eigen::RowMajor> & blk_mtx,
const T & fn,
parser_type & parser,
const std::string & pattern = “fsmap”,
bool mix_flag = false);
Similar interface as the previous one without row_map
and col_map
. See more details on the right.
template<class T, class G>
void paracel_load_as_matrix(Eigen::MatrixXd & blk_dense_mtx,
std::unordered_map<default_id_type, G> & row_map,
const T & fn,
parser_type & parser);
SPMD interface: parallelly load fn
using parser
and then generate block Eigen::MatrixXD blk_dense_mtx
for each worker. Since indexes of a matrix must be continuous, Paracel will map original ids into continuous indexes. The output arguments row_map
is used to store the corresponding relation between paracel::default_id_type
and generic type G
of Eigen::MatrixXd. Input argument of parser
must be string type and it must return vector<string> type. File structure in this case can be only fvec
. T
can be either string type to represent one file or vector<string> to represent a set of files. See more details on the right.
template<class T, class G>
void paracel_load_as_matrix(Eigen::MatrixXd & blk_dense_mtx,
const T & fn,
parser_type & parser);
Similar interface as the previous one without row_map
. See more details on the right.
Communication
#include <string>
#include <unordered_map>
#include <google/gflags.h>
#include "ps.hpp"
#include "utils.hpp"
using std::string;
using std::unordered_map;
using std::vector;
class foo : public paracel::pralg {
public:
foo(paracel::Comm comm, string hosts_dct_str)
: paracel::paralg(hosts_dct_str, comm) {}
void test_register() {
paracel_register_update(
"/nfs/lib/paracel/libupdate.so",
"sum_updater");
paracel_register_bupdate(
"/nfs/lib/paracel/libupdate.so",
"sum_updater");
paracel_register_read_special(
"/nfs/lib/paracel/lib/libfilter.so",
"key_filter");
}
void test_push() {
if(get_worker_id() == 0) {
paracel_write("key1", "value");
unordered_map<std::string, double> d;
d["key2"] = 3.; d["key3"] = 4.;
d["doc_key4"] = 5.;
d["key5"] = 6.; d["key6"] = 7.;
paracel_write_multi(d);
}
}
void test_pull() {
string r1 = paracel_read<string>("key1"); // "value"
double r2 = paracel_read<double>("key2"); // 3.
double r3;
bool f1 = paracel_read<double>("key3", r3); // r3 = 4., f1 = true
bool f2 = paracel_read<double>("key10", r3); // f2 = false
vector<string> keys_list = {"key5", "key6"};
// { 6., 7. }
auto val_list = paracel_read_multi<double>(keys_list);
// compile error since there contains string-type-value
auto rr = paracel_readall<double>();
double result = 0.;
auto special_read_handle_lambda =
[&] (unordered_map<string, V> & d) {
for(auto & kv : d) {
result += kv.second;
}
};
// compile error since there contains string-type-value
paracel_readall_handle(special_read_handle_lambda);
// compile error since there contains stirng-type-value
auto rrr = paracel_read_special<double>(
"/nfs/lib/libfilter.so",
"key_filter");
// result = 5.
paracel_read_special_handle<double>(
"/nfs/lib/libfilter.so",
"key_filter",
special_read_handle_lambda);
}
void test_update() {
paracel::async_functor_type future1, future2, future3;
paracel_update("key5", 4., future1); // update with registered "sum_updater"
paracel_update("key5", 3., future2);
// key doesn't exist, following code equals to paracel_write
paracel_update("new_key1", 1.,future3);
// do some computation here, parallel with sum_updater
if(paracel::wait(future1)) {
std::cout << "33.3%" << std::endl;
}
if(paracel::wait(future2)) {
std::cout << "66.6%" << std::endl;
}
if(paracel::wait(future3)) {
std::cout << "100%" << std::endl;
}
}
void test_bupdate() {
if(get_worker_id() == 0) {
paracel_bupdate("key2", 7.); // bupdate with registered "sum_updater"
// specify another update function instead of the registered one
paracel_bupdate("key3", 6.,
"/nfs/lib/paracel/libupdate.so",
"sum_updater2");
// key doesn't exist, following code equals to paracel_write
paracel_bupdate("new_key2", 1.);
vector<double> keys = {"key2", "key3", "key5"};
paracel_bupdate_multi(keys, deltas,
"/nfs/lib/paracel/libupdate.so",
"sum_updater2");
unordered_map<string, double> d;
d["key2"] = 1.; d["key3"] = 1.; d["key5"] = 1.;
paracel_bupdate_multi(d,
"/nfs/lib/paracel/libupdate.so",
"sum_updater2");
}
}
virtual void solve() {
test_register();
paracel_sync();
test_push();
paracel_sync();
test_pull();
test_update();
test_bupdate();
paracel_sync();
}
};
DEFINE_string(server_info,
"host1:7777PARACELhost2:8888",
"hosts name string of paracel-servers.\n");
int main(int argc, char *argv[])
{
paracel::main_env comm_main_env(argc, argv);
paracel::Comm comm(MPI_COMM_WORLD);
google::SetUsageMessage("[options]\n\t--server_info\n\t--cfg_file\n");
google::ParseCommandLineFlags(&argc, &argv, true);
foo solver(comm, FLAGS_server_info);
solver.solve();
return 0;
}
Paracel provides a distributed, global key-value store called parameter server. Parameter server is a novel paradigm, which will help you do information exchange much easier. Messages in Paracel(also defined as parameters) must have a key-value structure. When doing communication, workers only need to interact with servers to read/write/update messages. Communication interfaces in Paracel are really simple and flexible.
bool paracel_register_update(const std::string & file_name,
const std::string & func_name);
Register update function into Paracel with specified file_name
and func_name
. Corresponding interface with paracel_update
and so on.
bool paracel_register_bupdate(const std::string & file_name,
const std::string & func_name);
Register update function into Paracel with specified file_name
and func_name
. Corresponding interface with paracel_bupdate
and so on.
bool paracel_register_read_special(const std::string & file_name,
const std::string & func_name);
Register filter function into Paracel with specified file_name
and func_name
. Corresponding interface with paracel_read_special
and so on.
template<class V>
bool paracel_read(const std::string & key, V & val);
Pull val
from parameter server with specified key
. If key
does not exist, return false. Otherwise return true.
template<class V>
V paracel_read(const std::string & key);
Pull val
from parameter server with specified key
. If key
does not exist, ERROR_ABORT
will be invoked.
template<class V>
std::vector<V>
paracel_read_multi(const std::vector<std::string> & keys);
Pull multiply values from parameter server in one-time. Input argument is a list of keys and return value is the corresponding value with these keys. Every key in keys
must exist.
template<class V>
unordered_map<string, V> paracel_readall();
Pull all key-value pairs from parameter server into an unordered_map.
template<class V, class F>
void paracel_readall_handle(F & func);
Pull all key-value pairs from parameter server in batches and handle with functor func
. It is similar interface with paracel_readall
while this one can avoid memory exceed problem in large parameter case.
template<class V>
unordered_map<string, V>
paracel_read_special(const std::string & file_name,
const std::string & func_name);
Pull key-value pairs from parameter server into an unordered_map with filter function. The filter function is specified with file_name
and func_name
.
template<class V, class F>
void paracel_read_special_handle(const std::string & file_name,
const std::string & func_name,
F & func);
Pull key-value pairs from parameter server into an unordered_map in batches with handler func
and filter function. The filter function is specified with file_name
and func_name
.
template<class T>
void paracel_read_topk(int k, vector<pair<string, T> > & result);
Read topk key-value pairs(compare by value) from parameter server into a result
list.
template<class T>
void paracel_read_topk_with_key_filter(int k,
vector<pair<string, T> > & result,
const string & file_name,
const string & func_name);
Read topk key-value pairs(compare by value) from parameter server with filter function into a result
list. The filter function is specified with file_name
and func_name
.
template<class V>
bool paracel_write(const std::string & key, const V & val);
Push key-value pair into parameter server. paracel_write
is idempotent.
template<class V>
bool paracel_write_multi(const unordered_map & dct);
Push key-value pairs into parameter server. paracel_write_multi
is idempotent.
template<class V>
void paracel_update(const std::string & key, const V & delta,
paracel::async_functor_type & future);
Advanced interface! Update key’s value with delta in parameter server. The function will be invoked for delta
with the update function specified in paracel_register_update
. It is a non-blocking interface which means after returning, the updating operation in the server end may or may not be finished. You can use paracel::wait(future)
to wait until the update occured in the server end, before invoking paracel::wait(future)
, you can do some computation at the same time. If you are not pursuing performance, we strongly recommend you to use paracel_bupdate
interface in most case to ensure the correctness of your algorithms.
template<class V>
void paracel_update(const std::string & key, const V & delta,
const std::string & file_name,
const std::string & func_name,
paracel::async_functor_type & future);
Advanced interface! Update key’s value with delta in parameter server. The function will be invoked for delta
with file_name
and func_name
. It is a non-blocking interface which means after returning, the updating operation in the server end may or may not be finished. You can use paracel::wait(future)
to wait until the update occured in the server end, before invoking paracel::wait(future)
, you can do some computation at the same time. If you are not pursuing performance, we strongly recommend you to use paracel_bupdate
interface in most case to ensure the correctness of your algorithms.
template<class V>
bool paracel_bupdate(const std::string & key, const V & delta);
Block update key’s value with delta in parameter server. The function will be invoked for delta
with the update function specified in paracel_register_bupdate
. It is a blocking interface which means after returning, the updating operation in the server end must be finished.
template <class V>
bool paracel_bupdate(const std::string & key, const V & delta,
const std::string & file_name,
const std::string & func_name);
Block update key’s value with delta in parameter server. The function will be invoked for delta
with the update function specified by file_name
and func_name
. It is a blocking interface which means after returning, the updating operation in the server end must be finished.
template <class V>
bool paracel_bupdate_multi(const std::vector<std::string> & keys,
const std::vector<V> & deltas
const std::string & file_name,
const std::string & func_name);
Block update key-value pairs in batches. You must specify the update function with file_name
and func_name
. deltas
size must be equal with keys
size.
template <class V>
bool paracel_bupdate_multi(const unordered_map & dct,
const std::string & file_name,
const std::string & func_name);
const std::string & file_name,
const std::string & func_name);
Block update key-value pairs in batches. You must specify the update function with file_name
and func_name
. dct
refer to the unordered_map with key to delta.
bool paracel_contains(const std::string & key);
Check if key
is existed in parameter server.
bool paracel_remove(const std::string & key);
Remove key-value pair in parameter server.
void paracel_sync();
Block until all workers have reached this line of code.
void set_decomp_info(const std::string & pattern);
Set decomposition infomation by pattern
(fmap, smap, fsmap, fvec…).
void get_decomp_info(int & x, int & y);
Get decomposition infomation from x
and y
. x
refers to the number of partition in the horizontal direction while y
refers to the number of partition in the vertical direction.
size_t get_worker_id();
Get the worker id.
size_t get_worker_size();
Get the size of workers.
size_t get_server_size();
Get the size of servers.
paracel::Comm get_comm();
Get workers’ communication channel, you can use the returned channel to do low-level communication between workers. See more interfaces in the communicator section.
Dump
#include <string>
#include <unordered_map>
#include <google/gflags.h>
#include "ps.hpp"
#include "utils.hpp"
using std::string;
using std::unordered_map;
using std::vector;
using std::pair;
class foo : public paracel::pralg {
public:
foo(paracel::Comm comm, string hosts_dct_str)
: paracel::paralg(hosts_dct_str, comm, "/nfs/tmp/paracel_output/") {}
void save() {
vector<double> data1 = {1., 2., 3.};
unordered_map<string, int> data2;
data2["a"] = 1; data2["b"] = 2;
unordered_map<string, double> data3;
data3["c"] = 3.; data3["d"] = 4.;
unordered_map<string, vector<double> > data4;
data4["e"] = {1.23, 2.34, 3.45};
data4["f"] = {5.43, 4.32, 3.21};
unordered_map<string,
vector<pair<string, double> > > data5;
data5["g"] = { make_pair("i", 3.1415926),
make_pair("j", 3.14) };
data5["h"] = { make_pair("x", 3.1415),
make_pair("y", 3.141) };
paracel_dump_vector(data1, "data1_");
paracel_dump_dict(data2, "data2_");
paracel_dump_dict(data3, "data3_");
paracel_dump_dict(data4, "data4_");
paracel_dump_dict(data5, "data4_", true);
}
};
DEFINE_string(server_info,
"host1:7777PARACELhost2:8888",
"hosts name string of paracel-servers.\n");
int main(int argc, char *argv[])
{
paracel::main_env comm_main_env(argc, argv);
paracel::Comm comm(MPI_COMM_WORLD);
google::SetUsageMessage("[options]\n\t--server_info\n\t--cfg_file\n");
google::ParseCommandLineFlags(&argc, &argv, true);
foo solver(comm, FLAGS_server_info);
solver.save();
return 0;
}
Paracel implements several dumping interfaces below. They are all defined in the paralg
baseclass. Since there are very limited data types supported, you must write your own dumper in most cases.
template<class V>
void paracel_dump_vector(const std::vector<V> & data,
const string & file_prefix = “result_”,
const string & sep = “,”,
bool append_flag = false,
bool merge = false);
void paracel_dump_dict(const unordered_map<string, int>& data,
const string & file_prefix = “result_”,
bool append_flag = false,
bool merge = false);
void paracel_dump_dict(const unordered_map<string, double>& data,
const string & file_prefix = “result_”,
bool append_flag = false,
bool merge = false);
template<class T, class P>
void paracel_dump_dict(
const unordered_map<T, vector<P> > & data,
const string & file_prefix = “result_”,
bool append_flag = false,
bool merge = false);
void paracel_dump_dict(
const unordered_map<string, vector<pair<string, double> > > & data,
const string & file_prefix = “result_”,
bool append_flag = false,
bool merge = false);
Asynchronous Controlling
class logistic_regression: public paracel::paralg {
public:
logistic_regression(paracel::Comm comm,
std::string hosts_dct_str,
std::string _output,
int _rounds,
int _limit_s,
bool _ssp_switch) :
paracel::paralg(hosts_dct_str,
comm,
_output,
_rounds,
_limit_s,
_ssp_switch) {}
void training() {
theta = paracel::random_double_list(data_dim);
paracel_write("theta", theta); // init push
for(int iter = 0; iter < rounds; ++iter) {
for(int i = 0; i < data_dim; ++i) {
delta[i] = 0.;
}
random_shuffle(idx.begin(), idx.end());
// pull theta
theta = paracel_read<vector<double> >("theta");
for(auto sample_id : idx) {
for(int i = 0; i < data_dim; ++i) {
delta[i] += coff1 *
samples[sample_id][i] - coff2 * theta[i];
}
} // traverse
// update theta with delta
paracel_bupdate("theta",
delta,
"update.so",
"lg_theta_update");
// commit to server at the end of each iteration
iter_commit();
}
// last pull
theta = paracel_read<vector<double> >("theta");
}
void solve() {
// init training data
auto parser = [](const std::vector<std::string>) {
/* ... */
};
auto lines = paracel_load(input);
parser(lines);
paracel_sync();
// set total iterations of your training process
set_total_iters(rounds);
// training
training();
}
}; // class logistic regression
Many machine learning problems can be converted to an iterative task, the traditional way to do that is using BSP model which means we must synchronize at the end of every iterator. This leads to the last reducer problem. The straggled worker came out because of some hareware related reasons such as network congestion, CPU interrupts, produrement of machines in different years and some software related reasons such as garbage collection, virtualization and so on.
There are two ways to solve this problem: firstly, we have to write some tricky code to make load imbalance so we could make a fast worker training more data. Secondly, we can do some asynchronous controlling to relax the synchronization condition.
Paracel use the second solution, we relax the synchronization condition with an assumption that the fastest worker can lead no more than a bounded parameter with the slowest worker which is a trade-off between convergence of every iteration and total time of convergence. We have some similar ideas with the SSP data structure. The attractive point is that you only have to add few lines of code to transform a BSP process to an asynchronous process. The asynchronous iterfaces are listed below and we will show you a very simple example in the right.
void iter_commit();
Commit to parameter servers at the end of each iteration.
void set_total_iters(int n);
Set total number of iterations. You have to use this function to tell Paracel the total number of iterations beforehand.
template<class V>
V get_cache(const std::string & key);
Get cached value of specified key
locally.
bool is_cached(const std::string & key);
Check whether the value of specified key
is cached locally.
Except above functions, there are two important parameters you have to use: ssp_switch
and limit_s
. ssp_switch
is the switch and limit_s
is the bound parameter, all of them must be set in the constructor of paralg baseclass.
The logistic regression example on the right describe the usage in detail. As you can see, you only need to add four lines of code in original version.
Registry Function
A registry function is a user-define function that interacts with Paracel framework.
You must follow the predefined interface below and can do anything you want inside a registry function. Then you need to register your function into Paracel. After compiling and installing your code, you can specify the registry function with the shared library file name and function name you defined.
// substitute name
with your update function name
template <class T>
T name(T & a, T & b);
// substitute name
with your filter function name
bool name(const std::string & key);
Update Function
/* update.cpp */
#include "proxy.hpp"
#include "paracel_types.hpp"
// wrap c++ function here
extern "C" {
extern paracel::update_result sum_updater;
}
// define your update function here
// return type must be the same with parameter a and parameter b
double foo(double a, double b) { return a + b; }
// register into Paracel framework
paracel::update_result sum_updater = paracel::update_proxy(foo);
A update registry function can be used together with paracel_update
, paracel_bupdate
and so on. It is something like the reduce function in map-reduce paradigm which will be dynamicly executed in the server side. For example, in the code area we define a simple sum updater in file update.cpp
and in the corresponding CMakeLists.txt
file we add:
add_library(update SHARED update.cpp)
target_link_libraries(update ${CMAKE_DL_LIBS})
install(TARGETS update LIBRARY DESTINATION lib)
After compiling and installing the code, you can specify your update function with the shared library file name:xxx/lib/libupdate.so
and the function name inside: sum_updater
(here xxx refer to your path for Paracel installation).
Filter Function
/* filter.cpp */
#include "proxy.hpp"
#include "paracel_types.hpp"
// wrap c++ function here
extern "C" {
extern paracel::filter_with_key_result key_filter;
}
// define your filter function here
// return bool, parameter must be std::string
bool foo(const std::string & key) {
if(paracel::startswith(key, "doc")) return true;
return false;
}
// register into Paracel framework
paracel::filter_with_key_result key_filter = paracel::filter_with_key_proxy(foo);
A filter registry function can be used together with paracel_read_special
and paracel_read_special_handle
which will read special key-value pairs from all paramter servers. Workers can only pull key-value pairs that the filter function return true.
add_library(filter SHARED filter.cpp)
target_link_libraries(filter ${CMAKE_DL_LIBS})
install(TARGETS filter LIBRARY DESTINATION lib)
Similarly, you must add compile information in the corresponding CMakeLists.txt
and after compiling and installing the code, you can specify your filter function with the shared library file name:xxx/lib/libfilter.so
and the function name inside: key_filter
(here xxx refer to your path for Paracel installation).
In the right example, say we have {"doc_key1" : 1.23, "key2" : "world"}
stored in parameter server, and invoking
paracel_read_special("xxx/lib/libfilter.so", "key_filter")
will only return {"doc_key1" : 1.23}
.
Utility
Paracel provide some scattered utility for common use functionality and we are planning to encapsulate them with high-level abstraction in next release.
Communicator
// worker number = 2
#include <vector>
#include <tuple>
#include <string>
#include <set>
#include <unordered_map>
#include <mpi.h>
#include "utils/comm.hpp"
int main(int argc, char *argv[])
{
paracel::main_env comm_main_env(argc, argv);
paracel::Comm comm(MPI_COMM_WORLD);
int rk = comm.get_rank();
int sz = comm.get_size();
comm.get_rank();
{ // builtin send + recv
if(rk == 0) {
int a = 7;
comm.send(a, 1, 2014);
} else if(rk == 1){
int b = 0;
comm.recv(b, 0, 2014);
}
}
{ // isend + recv
if(rk == 0) {
int a = 7;
paracel::vrequest req;
req = comm.isend(a, 1, 2014);
} else if(rk == 1){
int b = 0;
comm.recv(b, 0, 2014);
}
}
{ // container send + recv
if(rk == 0) {
std::vector<int> aa {77, 88};
comm.send(aa, 1, 2014);
} else if(rk == 1) {
std::vector<int> bb;
comm.recv(bb, 0, 2014);
}
}
{ // container isend + recv
if(rk == 0) {
std::vector<int> aa {77, 88};
paracel::vrequest req;
req = comm.isend(aa, 1, 2014);
} else if(rk == 1) {
std::vector<int> bb;
comm.recv(bb, 0, 2014);
}
}
{ // paracel triple send + recv
if(rk == 0) {
std::tuple<std::string, std::string, double> aa;
std::get<0>(aa) = "abc";
std::get<1>(aa) = "def";
std::get<2>(aa) = 3.14;
paracel::vrequest req;
req = comm.isend(aa, 1, 2014);
} else if(rk == 1) {
std::tuple<std::string, std::string, double> bb;
comm.recv(bb, 0, 2014);
}
}
{ // paracel list of triple send + recv
if(rk == 0) {
std::vector<std::tuple<std::string,
std::string, double> > aa;
std::tuple<std::string, std::string, double> tmp1;
std::get<0>(tmp1) = "abc"; std::get<1>(tmp1) = "def";
std::get<2>(tmp1) = 4.15; aa.push_back(tmp1);
std::tuple<std::string, std::string, double> tmp2;
std::get<0>(tmp2) = "cba"; std::get<1>(tmp2) = "fed";
std::get<2>(tmp2) = 5.16; aa.push_back(tmp2);
paracel::vrequest req;
req = comm.isend(aa, 1, 2014);
} else if(rk == 1) {
std::vector<std::tuple<std::string,
std::string, double> > bb;
comm.recv(bb, 0, 2014);
}
}
{ // another paracel list of triple send + recv
if(rk == 0) {
std::vector<std::tuple<std::string,
std::string, double> > aa;
std::tuple<std::string, std::string, double> tmp1;
std::get<0>(tmp1) = "abc"; std::get<1>(tmp1) = "def";
std::get<2>(tmp1) = 4.15; aa.push_back(tmp1);
std::tuple<std::string, std::string, double> tmp2;
aa.push_back(tmp2);
paracel::vrequest req;
req = comm.isend(aa, 1, 2014);
} else if(rk == 1) {
std::vector<std::tuple<std::string,
std::string, double> > bb;
comm.recv(bb, 0, 2014);
}
}
{ // builtin sendrecv
int a = 8;
int b;
int left, right;
right = (rk + 1) % sz;
left = rk - 1;
if(left < 0) left = sz - 1;
comm.sendrecv(a, b, left, 2014, right, 2014);
}
{ // container sendrecv
std::vector<int> aaa{1,2,3};
std::vector<int> bbb;
int left, right;
right = (rk + 1) % sz;
left = rk - 1;
if(left < 0) left = sz - 1;
comm.sendrecv(aaa, bbb, left, 2014, right, 2014);
}
{ // paracel triple sendrecv
std::tuple<std::string, std::string, double> aa;
std::tuple<std::string, std::string, double> bb;
std::get<0>(aa) = "abc"; std::get<1>(aa) = "def";
std::get<2>(aa) = 3.14; int left, right;
right = (rk + 1) % sz;
left = rk - 1;
if(left < 0) left = sz - 1;
comm.sendrecv(aa, bb, left, 2014, right, 2014);
}
{ // paracel list of triple sendrecv
std::vector<std::tuple<std::string,
std::string, double> > bb;
std::vector<std::tuple<std::string,
std::string, double> > aa;
std::tuple<std::string, std::string, double> tmp1;
std::get<0>(tmp1) = "abc"; std::get<1>(tmp1) = "def";
std::get<2>(tmp1) = 4.15; aa.push_back(tmp1);
std::tuple<std::string, std::string, double> tmp2;
std::get<0>(tmp2) = "cba"; std::get<1>(tmp2) = "fed";
std::get<2>(tmp2) = 5.16; aa.push_back(tmp2);
int left, right;
right = (rk + 1) % sz;
left = rk - 1;
if(left < 0) left = sz - 1;
comm.sendrecv(aa, bb, left, 2014, right, 2014);
}
{ // another paracel list of triple sendrecv
std::vector<std::tuple<std::string,
std::string, double> > bb;
std::vector<std::tuple<std::string,
std::string, double> > aa;
std::tuple<std::string, std::string, double> tmp1;
std::get<0>(tmp1) = "abc"; std::get<1>(tmp1) = "def";
std::get<2>(tmp1) = 4.15;
std::tuple<std::string, std::string, double> tmp2;
aa.push_back(tmp2); aa.push_back(tmp1);
int left, right;
right = (rk + 1) % sz;
left = rk - 1;
if(left < 0) left = sz - 1;
comm.sendrecv(aa, bb, left, 2014, right, 2014);
}
{ // debug for list of triple sendrecv
std::vector<std::vector<std::tuple<std::string,
std::string, double> > > aa(2);
std::vector<std::tuple<std::string,
std::string, double> > bb;
int t = -1, f = -1;
if(rk == 0) {
std::vector<std::tuple<std::string,
std::string, double> > aaa;
std::tuple<std::string, std::string, double> tmp1;
std::get<0>(tmp1) = "a"; std::get<1>(tmp1) = "b";
std::get<2>(tmp1) = 1.; aaa.push_back(tmp1);
std::get<0>(tmp1) = "a"; std::get<1>(tmp1) = "c";
std::get<2>(tmp1) = 1.; aaa.push_back(tmp1);
std::get<0>(tmp1) = "a"; std::get<1>(tmp1) = "d";
std::get<2>(tmp1) = 1.; aaa.push_back(tmp1);
std::get<0>(tmp1) = "b"; std::get<1>(tmp1) = "a";
std::get<2>(tmp1) = 1.; aaa.push_back(tmp1);
aa[1] = aaa; t = 1; f = 1;
} else if(rk == 1) {
std::vector<std::tuple<std::string,
std::string, double> > aaa;
std::tuple<std::string, std::string, double> tmp1;
std::get<0>(tmp1) = "e"; std::get<1>(tmp1) = "a";
std::get<2>(tmp1) = 1.; aaa.push_back(tmp1);
std::get<0>(tmp1) = "e"; std::get<1>(tmp1) = "d";
std::get<2>(tmp1) = 1.; aaa.push_back(tmp1);
aa[0] = aaa;
std::vector<std::tuple<std::string,
std::string, double> > aaaa;
std::tuple<std::string, std::string, double> tmp2;
std::get<0>(tmp2) = "b"; std::get<1>(tmp2) = "d";
std::get<2>(tmp2) = 1.; aaaa.push_back(tmp2);
std::get<0>(tmp2) = "d"; std::get<1>(tmp2) = "c";
std::get<2>(tmp2) = 1.; aaaa.push_back(tmp2);
aa[1] = aaaa; f = 0; t = 0;
}
comm.sendrecv(aa[t], bb, t, 2014, f, 2014);
}
{ // builtin bcast
int a;
if(rk == 0) a = 7;
comm.bcast(a, 0);
}
{ // container bcast
std::vector<int> aa(2);
if(rk == 0) {
aa[0] = 3; aa[1] = 4;
}
comm.bcast(aa, 0);
}
{ // builtin alltoall
std::vector<int> a(2), b(2);
if(rk == 0) {
a[0] = 1; a[1] = 3;
}
if(rk == 1) {
a[0] = 2; a[1] = 4;
}
comm.alltoall(a, b);
}
{ // container alltoall
std::vector< std::vector<int> > a(2), b;
if(rk == 0) {
std::vector<int> tmp1{1, 5};
std::vector<int> tmp2{3};
a[0] = tmp1; a[1] = tmp2;
}
if(rk == 1) {
std::vector<int> tmp1{2};
std::vector<int> tmp2{4, 7};
a[0] = tmp1; a[1] = tmp2;
}
comm.alltoall(a, b);
}
{ // builtin allreduce
int aaa;
if(rk == 0) { aaa = 1; }
if(rk == 1) { aaa = 2; }
comm.allreduce(aaa);
}
{ // container allreduce
std::vector<int> aaa(3);
if(rk == 0) {
aaa[0] = 1; aaa[1] = 2; aaa[2] = 3;
}
if(rk == 1) {
aaa[0] = 3; aaa[1] = 2; aaa[2] = 1;
}
comm.allreduce(aaa);
}
{ // bcastring
std::vector<int> a;
if(rk == 0) {
a.push_back(6); a.push_back(42);
}
if(rk == 1) {
a.push_back(28); a.push_back(42);
a.push_back(42); a.push_back(28); a.push_back(6);
}
std::set<int> result;
auto func = [&](std::vector<int> tmp){
for(auto & stf : tmp)
result.insert(stf);
};
comm.bcastring(a, func);
}
{ // dict_type<size_t, int> isend
if(rk == 0) {
std::unordered_map<size_t, int> aa;
aa[0] = 1; aa[1] = 2;
paracel::vrequest req;
req = comm.isend(aa, 1, 2014);
} else if(rk == 1) {
std::unordered_map<size_t, int> bb;
comm.recv(bb, 0, 2014);
}
}
return 0;
}
Communicator stands for the collection of workers. Paracel’s communicator has the same meaning as MPI communicator. We provide a C++ interface, but we are still suggesting you to do communication between parameter servers unless very special cases. Do not use low-level interfaces given in this section unless you know what you’re doing.
class Comm {
public:
Comm(MPI_Comm comm = MPI_COMM_WORLD);
Constructor
inline size_t get_size() const;
Get the number of workers.
inline size_t get_rank() const;
Get the id of worker.
inline MPI_Comm get_comm() const;
Get communicator field.
inline void synchronize();
Block until all workers in this communicator have reached this routine.
int get_source(MPI_Status & stat);
Get source id.
Comm split(int color);
Split the communicator into sub-communicators using color
.
void wait(MPI_Request & req);
Wait for an MPI request to complete.
void wait(vrequest & v_req);
Wait for an MPI request to complete
template<class T>
void send(const T & data, int dest, int tag);
Block send messages.
template<class T>
void isend(const T & data, int dest, int tag);
Non-blocking send messages.
template<class T>
void recv(T & data, int src, int tag);
Receive messages.
template<class T>
void sendrecv(const T & sdata, T & rdata,
int sto, int stag,
int rfrom, int rtag);
Receive rdata
from rfrom
, and send sdata
to sto
.
template<class T>
void bcast(T & data, int master);
Broadcast master’s data
.
template<class T, class F>
void bcastring(const T & data, F & func);
Broadcast every worker’s data
and handle with functor func
.
template<class T>
void alltoall(const T & sbuf, T & rbuf);
Send data from all to all workers.
template<class T, class F>
void alltoallring(const T & sbuf, T & rbuf, F & func);
Send data from all to all workers and handle with functor func
.
template<class T>
void allreduce(T & data);
Combine values from all workers and distributes the result back to all workers. Only supported summation in current version.
template<class T, class F>
T treereduce(T & data, F & func, int rank = 0);
Treereduce using functor func
.
}; // class Comm
Json_parser
#include "utils.hpp"
void json_parser_usage() {
paracel::json_parser pt("demo.json");
std::string r1 = pt.parse<std::string>("wu"); // "hong"
int r2 = pt.parse<int>("hong"); // 7
bool r3 = pt.parse<bool>("changsheng"); // true
double r4 = pt.parse<double>("jiang"); // 3.141592653
// {"hong", "xun", "zhang"}
std::vector<std::string> r5 = pt.parse_v<std::string>("wul");
// {1, 2, 3, 4, 5, 6, 7}
std::vector<int> r6 = pt.parse_v<int>("hongl");
// {true, false, false, true, true}
std::vector<bool> r7 = pt.parse_v<bool>("changshengl");
// {1.23, 2.34, 3.45, 4.56, 5.67, 6.78, 7.89}
std::vector<double> r8 = pt.parse_v<double>("jiangl");
}
{
"wu" : "hong",
"hong" : 7,
"changsheng" : true,
"jiang" : 3.141592653,
"wul" : ["hong", "xun", "zhang"],
"hongl" : [1, 2, 3, 4, 5, 6, 7],
"changshengl" : [true, false, false, true, true],
"jiangl" : [1.23, 2.34, 3.45, 4.56, 5.67, 6.78, 7.89]
}
For some design reasons, the configuration information of algorithms/applications building upon paracel must be read from a config file in stead of command line arguments. We highly recommend JSON. Paracel provide a rough json parser which do not support comment and must be parsered in sequence. The simple parser may be not that flexible but can avoid unpredictable mistakes. The check_parser
and check_parser_v
interface below will check if the value is a file or a directory.
struct json_parser {
public:
json_parser(paracel::str_type fn);
template <class T>
T parse(const paracel::str_type & key);
template <class T>
T check_parse(const paracel::str_type & key);
template <class T>
std::vector<T> parse_v(const paracel::str_type & key);
template <class T>
std::vector<T> check_parse_v(const paracel::str_type & key);
};
Random
#include "utils.hpp"
double pi() {
int incycle_cnt = 0;
for(int i = 0; i < 10000000; ++i) {
double x = paracel::random_double();
double y = paracel::random_double();
if(x * x + y * y < 1.) incycle_cnt += 1;
}
return 4 * static_cast<double>(incycle_cnt) / 10000000.;
}
Random is the basic assumption of many fields of algorithms. Below we provide two simple interface to randomly generate double value(s) for some initialization usage of machine learning algorithms. More details and interfaces can be found here.
// return a uniform random double value in range(0, 1.)
double random_double();
// return a list of random double value with upper bound upper_bnd
std::vector<double> random_double_list(size_t len, double upper_bnd = 1.);
String
#include "utils.hpp"
void foo() {
{
std::vector<std::string> init_lst
= {"hello", "world", "happy", "new", "year", "2015"};
std::string seps = "orz";
// helloorzworldorzhappyorzneworzyearorz2015
auto together = paracel::str_join(init_lst, seps);
auto res1 = paracel::str_split_by_word(together, seps); // init_lst
std::string tmp = together;
// init_lst
auto res2 = paracel::str_split_by_word(std::move(tmp), seps);
paracel::startswith(together, "hello"); // true
paracel::startswith(together, "helo"); // false
paracel::startswith(together, ""); // true
paracel::endswith(together, "2015"); // true
paracel::endswith(together, "2014"); // false
paracel::endswith(together, ""); // true
}
{
std::string tmp = "a|b|c|d|e";
std::vector<std::string> r = {"a", "b", "c", "d", "e"};
auto r1 = paracel::str_split(tmp, '|'); // r
auto r2 = paracel::str_split(tmp, "|"); // r
auto r3 = paracel::str_split(tmp, "?|?"); // r
}
}
Paracel provide several extra operations which are included in std::string
.
std::vector<std::string>
str_split(const std::string & str, const char sep);
std::vector<std::string>
str_split(const std::string & str, const std::string & seps);
std::vector<std::string>
str_split_by_word(const std::string & str, const std::string & seps);
std::vector<std::string>
str_split_by_word(std::string && str, const std::string & seps);
std::string str_join(const std::vector<std::string> & strlst, const std::string & seps);
bool startswith(const std::string & str, const std::string & key);
bool endswith(const std::string & str, const std::string & key);
Misc
#include "ps.hpp"
#include "utils.hpp"
#include "paracel_types.hpp"
void foo() {
pt = new paralg(comm);
paracel::default_id_type id = 7;
pt->paracel_write(cvt(id), 3.14);
double val = pt->paracel_read<double>(cvt(id));
delete pt;
}
void goo() {
paracel::hash_type<paracel::default_id_type> hfunc;
paracel::default_id_type a = 0, b = 1, c = 2, d = 3;
hfunc(a); // 0
hfunc(b); // 1
hfunc(c); // 2
hfunc(d); // 3
paracel::hash_type<std::string> hfunc2;
std::string x = "0", y = "1", z = "2", t = "3";
a = 2297668033614959926ULL;
b = 10159970873491820195ULL;
c = 4551451650890805270ULL;
d = 8248777770799913213ULL;
hfunc2(x); // a
hfunc2(y); // b
hfunc2(z); // c
hfunc2(t); // d
}
template<class T> struct hash { size_t operator()(const T &t) const; };
double dot_product(const std::vector<double> & a, const std::vector<double> & b);
paracel::default_id_type cvt(std::string id);
std::string cvt(paracel::default_id_type id);
bool wait(paracel::async_functor_type & future);
DataGen
python ./tool/datagen.py -m wc -o data.txt
python datagen.py -o sample1.dat -m classification -n 100000 -k 800
python datagen.py -o sample2.dat -m regression -n 100000 -k 100
python ./tool/datagen.py -m pagerank -o pr.dat
python datagen.py -o sample3.dat -m similarity -n 100000 -k 800
python datagen.py -o sample4.dat -m kmeans -n 1000 --ncenters 20 -k 80
# for lda, n refer to the number of documents while k refer to the number of topics
python datagen.py -o sample5.dat -m lda -n 100000 -k 800 -s '|'
python ./tool/datagen.py -m svd -o svd.dat
To generate training data for machine learning algorithms and ensure repeatability, we provide a python script named datagen.py
in the tool
folder. Click right shell
tab to see some examples.
Options | Description |
---|---|
-h, –help | show this help message and exit |
-m METHOD, –method=METHOD | wc, classification, regression, pagerank, similarity, kmeans, lda, svd… |
-o OUTPUT, –out=OUTPUT | output file name |
-s SEP, –sep=SEP | seperator, default: , |
-n SIZE, –datasize=SIZE | number of training samples |
-k K, –nfeatures=K | number of features |
–ncenters=NCENTERS | number of centers for kmeans method |
Driver
PARACEL_INSTALL_PREFIX/prun.py -p 1 -w 16 -m local -c demo_config.json ./a.out
PARACEL_INSTALL_PREFIX/prun.py -p 10 -w 100 -m mesos --ppn 10 -c word_count_config.json ./local/bin/wc
PARACEL_INSTALL_PREFIX/prun.py -p 10 --m_server mesos --ppn_server 2 --mem_limit_server 1000 -w 100 -m mesos --ppn 10 --mem_limit 500 -c pagerank_config.xml ./local/bin/pagerank
We provide a python script named prun.py
to run Paracel programs. prun.py
resides at PARACEL_INSTALL_PREFIX
which you were specified during installation. Click shell
tab in the right area to see some examples.
Options | Description |
---|---|
-h, –help | show this help message and exit |
-p PARASRV_NUM, –snum =PARASRV_NUM |
number of parameter servers |
–m_server =local | mesos | mpi |
running method for parameter servers. If not given, set with the same value of -m or –method |
–ppn_server =PPN_SERVER |
mesos case: procs number per node of parameter servers. If not given, set with the same value of –ppn |
–mem_limit_server =MEM_LIMIT_SERVER |
mesos case: memory size of each task in parameter servers. If not given, set with the same value of –mem_limit |
–hostfile_server =HOSTFILE_SERVER |
mpi case: hostfile for mpirun of parameter servers. If not given, set with the same value of –hostfile |
-w WORKER_NUM, –wnum =WORKER_NUM |
number of workers for learning |
-m local | mesos | mpi, –method =local | mesos | mpi |
running method for workers |
–ppn=PPN | mesos case: procs number per node for workers |
–mem_limit =MEM_LIMIT |
mesos case: memory size of each task of workers |
–hostfile=HOSTFILE | mpi case: hostfile for mpirun for workers |
-c CONFIG, –cfg_file=CONFIG |
config file in json fmt, for alg usage |
Toolkits
In current version, Paracel Toolkits contains the following algorithms:
- alg/regression/ridge
- alg/classification/logistic_regression
- alg/clusterting/kmeans
- alg/graph_alg/pagerank
- alg/recommendation/matrix_factorization
- alg/recommendation/similarity_sparse
- alg/recommendation/simialrity_dense
- alg/recommendation/decision_tree_rec
- alg/recommendation/als
- alg/topic_model/gLDA
- tool/lasso_serial
- tool/lr_l1_serial
- tool/svd_serial
Refer to the link for more description and usage examples.
Deployment
Paracel needs some libraries pre-installed in the environment of your cluster.
Just follow instructions below step by step.
Prerequisites
There are a few prerequisites which must be manually satisfied including:
- g++ (>= 4.7) [Required]
- Required for compiling Paracel.
- clang++ is ok too.
- Required for compiling Paracel.
- CMake (>= 2.8.9) [Required]
- Should come with most Mac/Linux systems by default. Recent Ubuntu version will require to install the build-essential package.
- Should come with most Mac/Linux systems by default. Recent Ubuntu version will require to install the build-essential package.
- Any version of MPI(Open MPI or MPICH2) [Required]
- Required for running Paracel code distributed.
Dependencies
We give different installation guide on different platforms:
Gentoo
We provide the needed ebuild files for you in the ebuild
directory and you can install all this libraries in any order.
Ubuntu(12.04 LTS) / Debian(Wheezy)
sudo apt-get install libboost-dev
sudo apt-get install libzmq-dev
Make sure the version is greater than 3.2.4 by typingapt-cache show libzmq-dev
, if no related version found, you must manually download and install it at here.sudo apt-get install libeigen3-dev
sudo apt-get install libgoogle-glog-dev
- Install a increment version of Msgpack-C which supports some C++11 types. Here you must manually clone and install the library:
git clone https://github.com/xunzhang/msgpack-c.git
cd msgpack-c;
./bootstrap; ./configure; make; sudo make install
sudo apt-get install libgflags-dev
Make sure version of gflags is greater than 2.0, if no corresponding version could be found, you should install it manually.
OpenSUSE(13.2)
sudo zypper in boost-devel
sudo zypper in zeromq-devel
Make sure the version is greater than 3.2.4 by typingzypper info zeromq_devel
, if no related version found, you must manually download and install it at here.sudo zypper in eigen3-devel
- Install a increment version of Msgpack-C which supports some C++11 types. Here you must manually clone and install the library:
git clone https://github.com/xunzhang/msgpack-c.git
cd msgpack-c;
./bootstrap; ./configure; make; sudo make install
sudo apt-get install libgflags-dev
Make sure version of gflags is greater than 2.0, if no corresponding version could be found, you should install it manually.
Centos(7)
sudo yum install boost-devel
sudo yum install zeromq-devel
Make sure the version is greater than 3.2.4 by typingyum info zeromq-devel
, if no related version found, you must manually download and install it at here.sudo yum install eigen3-devel
- Install a increment version of Msgpack-C which supports some C++11 types. Here you must manually clone and install the library:
git clone https://github.com/xunzhang/msgpack-c.git
cd msgpack-c;
./bootstrap; ./configure; make; sudo make install
sudo apt-get install libgflags-dev
Make sure version of gflags is greater than 2.0, if no corresponding version could be found, you should install it manually.
ArchLinux
sudo pacman -S boost
sudo pacman -S zeromq
Make sure the version is greater than 3.2.4.sudo pacman -S eigen3
- Install a increment version of Msgpack-C which supports some C++11 types. Here you must manually clone and install the library:
git clone https://github.com/xunzhang/msgpack-c.git
cd msgpack-c;
./bootstrap; ./configure; make; sudo make install
sudo apt-get install libgflags-dev
Make sure version of gflags is greater than 2.0, if no corresponding version could be found, you should install it manually.
Mac OS X
brew install open-mpi
brew install boost
brew install zmq
brew install gflags
brew install glog
- Install msgpack-C
wget -O "/tmp/msgpack-c-1.4.2.tar.gz" "https://github.com/msgpack/msgpack-c/archive/cpp-1.4.2.tar.gz"
tar -zxvf "/tmp/msgpack-c-1.4.2.tar.gz" "msgpack-c-cpp-1.4.2"
cd "/tmp/msgpack-c-cpp-1.4.2"
./bootstrap; ./configure; make; sudo make install
- Install Eigen3
wget -O "3.1.0.tar.gz" "http://bitbucket.org/eigen/eigen/get/3.1.0.tar.gz"
tar -zxvf "3.1.0.tar.gz" "eigen-eigen-ca142d0540d3"
mkdir -p eigen-eigen-ca142d0540d3/build
cd eigen-eigen-ca142d0540d3/build
cmake ..; make; make install;
Downloading Paracel
You can download Paracel directly from the github repository. The git command line for cloning the reposotory is:
git clone https://github.com/douban/paracel.git
cd paracel
Compiling Paracel
mkdir build; cd build
cmake -DCMAKE_BUILD_TYPE=Release ..
You can also specify -DCMAKE_INSTALL_PREFIX
with the path where you want Paracel to be installed. For example, type cmake -DCMAKE_INSTALL_PREFIX=your_paracel_install_path ..
instead.
make -j 4
The above command will perform up to 4 build tasks in parallel, you can specify the number of tasks you want according to your machine.
make test
make install
Running Paracel
Firstly, you must set up linking option before running paracel programs. There are two typical ways to do that:
1. Add your linking path(PARACEL_INSTALL_PREFIX/lib) into /etc/ld.so.conf.(recommended)
2. Set LD_LIBRARY_PATH
(linux) or DYLD_LIBRARY_PATH
(osx) variable with PARACEL_INSTALL_PREFIX/lib
. For example, suppose we have installed Paracel into /usr/local
folder. Then we should export LD_LIBRARY_PATH=/usr/local/lib
.
After that, we could run Paracel programs. Here we will take a word cound example under MacOS system:
1. cd /tmp
2. python /usr/local/bin/tool/datagen.py -m wc -o data.txt
3. Get a sample configuration file for our word count example: wget "paracel.io/data/travis_demo_osx.json"
4. Run word count example in a single node with multiply processers: /usr/local/prun.py -p 4 -w 4 -c travis_demo_osx.json -m local /usr/local/bin/wc
There are lots of programs in current version of Paracel, check out this link for more examples(each link contains a README.md for you to begin with).
If you want to contribute to Paracel project or try to write distributed machine learning or graph algorithms upon Paracel, here is a good start for you.
Build Algorithm Outside paracel Folder
A recommended way to build Paracel algorithms is writing a CMakeLists.txt
file inside paracel folder.
But in many cases, particular among the scenarios of industry users, they need to build algorithms that developed upon Paracel framework externally.
The dependency is not that complicated and we give a real example for doing that.
Paracel FAQ
How does Paracel relate to MPI?
Paracel use MPI to create worker processers, then parallelly load input data(it will do some communication in this step). Communication between workers and servers is nothing to do with MPI. And we provide a C++ wrapper for MPI communication, you can directly do message passing between workers in some cases. See more details in the Communicator section.
What is parameter server?
Parameter server is a global and distributed key-value store brings to a novel way for communication. In this paradigm, if worker W1
needs to talk to worker W2
, W1
has to push his words to server S
for W2
to pull. It is kind of indirect, but you can get more flexibility and simplicity from that.
What is the difference between Paracel and Spark/GraphLab framework?
The question is hard to answer in brief, Spark and GraphLab are both outstanding distributed computational framework with many advantages, but there are also some limitations of them:
Spark project follows the mapreduce paradigm which is not that straightforward and flexible for algorithms in some specialized domains such as machine learning, graph and so on. If you think the transformation of RDD as a set of basis functions, they are not fit for all the applications. For example, Spark implements Bagel for graph processing which follows Google’s Pregel graph processing model(not mapreduce at all). Spark is more suitable and easily used for data statistics problems and problems with narrow dependency.
Graphlab framework is designed for graph algorithms which focus more on efficiency. In the other end, your application code is so tricky that it will look very different from its original logic. Also you have to transform your machine learning problems to GraphLab’s model and it may be too low-level for application developers.
Paracel is a framework in between, it is more high-level and easily to use than GraphLab and more low-level/flexible than Spark, users can write communication code in Paracel. Developers may focus more on their application logic to build distributed algorithms. Paracel is original designed for machine learning problems.
What programming languages does Paracel support?
C++ only, for the consideration of computational efficiency.
What is the largest data size Paracel can scale to?
It depends on the number of processers you have. At Douban, the usual data size is about 100GB.
Is Paracel only fit for machine learning problems?
Not exactly, Paracel is designed for many machine problems as well as graph algorithms and scientific problems. For example, pagerank algorithm in Paracel is really straightfoward compare to a two-phase mapreduce implementation.
How can I write a distributed algorithm upon Paracel framework?
Follow the third section in quick tutorial page step by step.
How large a cluster can Paracel scale to?
In the current version, we have tested the cluster scale to 100+ nodes with 5000+ processers.
How can I run Paracel on a cluster?
Firstly, set up a cluster environment with either mpi or mesos. Secondly, make sure Paracel is successfully installed on your clusters. More details can be found here. Then use -m
option with mpi
or mesos
in your prun.py
script.
Can I run Paracel programs with docker?
How can I contribute to Paracel?
Just fork the repository and send a pull request on github.
What new features will Paracel import in the future?
More data source formats such as gzip and bz2.
Streaming Paracel: fault tolerance to ensure service-like application reliable.
Data flow interfaces with which you can process data like a pipeline.
ParacelSQL.
1.Deadlock mistake
void deadlock_mistake() {
if(get_worker_id() == 0) {
paracel_sync();
}
}
2.Wrong type mistake
void wrong_type_mistake() {
paracel_write("key1", 1.2);
paracel_write("key2", 3.4);
vector<string> keys = {"key1", "key2"};
// use double in stead of vector<double>
auto vals = paracel_read_multi<vector<double> >(keys);
}
This document is terrible and I have a lot of questions, where can I get more help?
You can initiate a discussion at this group or write an email to xunzhangthu@gmail.com, algorithm@douban.com.
Another good and quick way is to open an issue in Github.
Can I write a Paracel application outside the Paracel directory?
Yes, of course. In this case you must modify cmake files and be responsible to the linking relations of your code. If you are not professional, we strongly do not suggest you to do that.
Common mistakes.
1. Deadlock mistake: in the example on the right, only worker 0 can access the if
clause while paracel_sync
requires all workers to execute this line.
2. Wrong type specified mistake: see the interface at Paralg section, the return type is std::vector<V>
, so in the right example, you have to only use double
.