cslmpi.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817
  1. /* cslmpi.c */
  2. /*
  3. * Interfaces for mpi from CSL. The bulk of this code was written by
  4. * M O Seymour (199/98) who has released it for inclusion as part of
  5. * this Lisp system.
  6. */
  7. /* Signature: 6962d5a4 28-Jan-1999 */
  8. #include <stdarg.h>
  9. #include <string.h>
  10. #include <ctype.h>
  11. #include "machine.h"
  12. #include "tags.h"
  13. #include "cslerror.h"
  14. #include "externs.h"
  15. #include "arith.h"
  16. #include "entries.h"
  17. #ifdef TIMEOUT
  18. #include "timeout.h"
  19. #endif
  20. #ifdef USE_MPI
  21. #include "read.h"
  22. #include "mpipack.c"
  23. #define check_fix(v) if (!is_fixnum(v)) return aerror1(fun_name, v)
  24. #define get_arg(v) v = va_arg(a,Lisp_Object)
  25. #define get_fix_arg(v) get_arg(v); check_fix(v); v=int_of_fixnum(v)
  26. /************************ Environmental functions *******************/
  27. /* Returns process rank
  28. * (mpi_comm_rank comm)
  29. */
  30. /* For now, I assume that comm will fit into a fixnum.
  31. * This appears to be the case with MPICH (values in the hundreds),
  32. * but assumptions like this should not be made.
  33. */
  34. static Lisp_Object Lmpi_comm_rank(Lisp_Object nil, Lisp_Object comm)
  35. {
  36. int rank;
  37. static char fun_name[] = "mpi_comm_rank";
  38. CSL_IGNORE(nil);
  39. check_fix(comm);
  40. MPI_Comm_rank(int_of_fixnum(comm),&rank);
  41. return onevalue(fixnum_of_int(rank));
  42. }
  43. /* returns size of communicator
  44. * (mpi_comm_size comm)
  45. */
  46. /* Same assumption about comm. */
  47. static Lisp_Object Lmpi_comm_size(Lisp_Object nil, Lisp_Object comm)
  48. {
  49. int size;
  50. static char fun_name[] = "mpi_comm_size";
  51. CSL_IGNORE(nil);
  52. check_fix(comm);
  53. MPI_Comm_size(int_of_fixnum(comm),&size);
  54. return onevalue(fixnum_of_int(size));
  55. }
  56. /********************** Blocking point-to-point functions *************/
  57. /* Standard blocking send
  58. * (mpi_send message dest tag comm)
  59. * returns nil.
  60. */
  61. /* Same assumption about comm. */
  62. static Lisp_Object MS_CDECL Lmpi_send(Lisp_Object nil, int nargs, ...)
  63. {
  64. static char fun_name[] = "mpi_send";
  65. Lisp_Object message;
  66. int dest,tag,comm;
  67. va_list a;
  68. argcheck(nargs,4,fun_name);
  69. va_start(a,nargs);
  70. get_arg(message);
  71. get_fix_arg(dest); get_fix_arg(tag); get_fix_arg(comm);
  72. pack_object(message);
  73. MPI_Send(mpi_pack_buffer, mpi_pack_position, MPI_PACKED,
  74. dest, tag, comm);
  75. free(mpi_pack_buffer);
  76. return onevalue(nil);
  77. }
  78. /* Standard blocking receive
  79. * (mpi_recv source tag comm)
  80. * returns (message (source tag error)).
  81. */
  82. static Lisp_Object MS_CDECL Lmpi_recv(Lisp_Object nil, int nargs, ...)
  83. {
  84. static char fun_name[] = "mpi_recv";
  85. MPI_Status status;
  86. int source,tag,comm;
  87. Lisp_Object Lstatus;
  88. va_list a;
  89. CSL_IGNORE(nil);
  90. argcheck(nargs,3,fun_name);
  91. va_start(a,nargs);
  92. get_fix_arg(source); get_fix_arg(tag); get_fix_arg(comm);
  93. MPI_Probe(source, tag, comm, &status);
  94. MPI_Get_count(&status, MPI_PACKED, &mpi_pack_size);
  95. mpi_pack_buffer = (char*)malloc(mpi_pack_size);
  96. MPI_Recv(mpi_pack_buffer, mpi_pack_size, MPI_PACKED,
  97. source, tag, comm, &status);
  98. /* The only relevant status things are the 3 public fields, so I'll
  99. * stick them in a list and return them as the 2nd value
  100. */
  101. push(unpack_object());
  102. free(mpi_pack_buffer);
  103. Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
  104. fixnum_of_int(status.MPI_TAG),
  105. fixnum_of_int(status.MPI_ERROR));
  106. return onevalue(list2(my_pop(),Lstatus));
  107. }
  108. /* Standard blocking simultaneous send and receive
  109. * (mpi_sendrecv send_message dest send_tag source recv_tag comm)
  110. * returns (recv_message (source recv_tag error))
  111. */
  112. /* THERE IS A LIMIT OF 1024 BYTES FOR THE RECEIVE BUFFER (sorry.)
  113. * THIS WILL BE REMOVED ASAP.
  114. */
  115. static Lisp_Object MS_CDECL Lmpi_sendrecv(Lisp_Object nil, int nargs, ...)
  116. {
  117. static char fun_name[] = "mpi_sendrecv";
  118. MPI_Status status;
  119. Lisp_Object Lstatus;
  120. Lisp_Object s_mess;
  121. int s_tag, r_tag, dest, source, comm;
  122. char r_buffer[1024];
  123. va_list a;
  124. CSL_IGNORE(nil);
  125. argcheck(nargs,6,fun_name);
  126. va_start(a,nargs);
  127. get_arg(s_mess);
  128. get_fix_arg(dest); get_fix_arg(s_tag);
  129. get_fix_arg(source); get_fix_arg(r_tag); get_fix_arg(comm);
  130. pack_object(s_mess);
  131. MPI_Sendrecv(mpi_pack_buffer, mpi_pack_position, MPI_PACKED,
  132. dest, s_tag,
  133. r_buffer, 1024, MPI_PACKED,
  134. source, r_tag, comm, &status);
  135. free(mpi_pack_buffer);
  136. mpi_pack_buffer = r_buffer;
  137. push(unpack_object());
  138. Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
  139. fixnum_of_int(status.MPI_TAG),
  140. fixnum_of_int(status.MPI_ERROR));
  141. return onevalue(list2(my_pop(),Lstatus));
  142. }
  143. /************** Non-Blocking point-to-point functions ***********/
  144. /* Standard non-blocking send post
  145. * (mpi_isend message dest tag comm)
  146. * returns request handle
  147. */
  148. static Lisp_Object MS_CDECL Lmpi_isend(Lisp_Object nil, int nargs, ...)
  149. {
  150. static char fun_name[] = "mpi_isend";
  151. Lisp_Object message, request;
  152. int dest, tag, comm;
  153. va_list a;
  154. CSL_IGNORE(nil);
  155. /* For now, we assume type MPI_Request to be 32 bits. */
  156. request = Lmkvect32(nil,fixnum_of_int(2));
  157. argcheck(nargs,4,fun_name);
  158. va_start(a,nargs);
  159. get_arg(message);
  160. get_fix_arg(dest); get_fix_arg(tag); get_fix_arg(comm);
  161. pack_object(message);
  162. MPI_Isend(mpi_pack_buffer, mpi_pack_position, MPI_PACKED,
  163. dest, tag, comm, (MPI_Request*)&elt(request,0));
  164. elt(request,1) = (int)mpi_pack_buffer;
  165. return onevalue(request);
  166. }
  167. /* Standard non-blocking receive post
  168. * (mpi_irecv source tag comm)
  169. * returns request handle
  170. */
  171. /* I actually cheat horribly by not posting the request at all (at least
  172. * not via MPI), but rather create my own "dummy" request structure.
  173. * Then, to complete the request, I MPI_(I)Probe for a matching message,
  174. * and receive it if it is there.
  175. * This is unsatisfactory since the operation is only non-blocking until the
  176. * first lump of the message arrives; for a long message, there may by
  177. * a lot of latency after this.
  178. */
  179. struct dummy_request{
  180. int source;
  181. int tag;
  182. int comm;
  183. };
  184. static Lisp_Object MS_CDECL Lmpi_irecv(Lisp_Object nil, int nargs, ...)
  185. {
  186. static char fun_name[] = "mpi_irecv";
  187. int source,tag,comm;
  188. Lisp_Object request;
  189. va_list a;
  190. char* buffer;
  191. CSL_IGNORE(nil);
  192. /* For now, we assume type MPI_Request to be 32 bits. */
  193. request = Lmkvect32(nil,fixnum_of_int(2));
  194. argcheck(nargs,3,fun_name);
  195. va_start(a,nargs);
  196. get_fix_arg(source); get_fix_arg(tag); get_fix_arg(comm);
  197. elt(request,1) = 0; /* There is no buffer yet */
  198. elt(request,0) = (int)malloc(sizeof(struct dummy_request));
  199. ((struct dummy_request*)elt(request,0))->source = source;
  200. ((struct dummy_request*)elt(request,0))->tag = tag;
  201. ((struct dummy_request*)elt(request,0))->comm = comm;
  202. return onevalue(request);
  203. }
  204. /* Wait to complete operation, and deallocate buffer.
  205. * (mpi_wait request)
  206. * for send, returns nil
  207. * for recv, returns (message (source tag error))
  208. */
  209. static Lisp_Object Lmpi_wait(Lisp_Object nil, Lisp_Object request)
  210. {
  211. MPI_Status status;
  212. Lisp_Object message, Lstatus;
  213. if ( !(is_vector(request) && type_of_header(vechdr(request)) == TYPE_VEC32 &&
  214. length_of_header(vechdr(request)) == 3*CELL) )
  215. return aerror1("mpi_wait",request);
  216. if ( elt(request,1)){
  217. status.MPI_ERROR = MPI_UNDEFINED;
  218. mpi_pack_buffer = (void*)elt(request,1);
  219. MPI_Wait( (MPI_Request*)&elt(request,0), &status);
  220. if (status.MPI_ERROR == MPI_UNDEFINED){ /* i.e. send request */
  221. free(mpi_pack_buffer);
  222. return onevalue(nil);
  223. }
  224. else { /* old-style receive */
  225. push(unpack_object());
  226. free(mpi_pack_buffer);
  227. Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
  228. fixnum_of_int(status.MPI_TAG),
  229. fixnum_of_int(status.MPI_ERROR));
  230. return onevalue(list2(my_pop(),Lstatus));
  231. }
  232. }
  233. else{ /* new-style receive */
  234. int source = ((struct dummy_request*)elt(request,0))->source,
  235. tag = ((struct dummy_request*)elt(request,0))->tag,
  236. comm = ((struct dummy_request*)elt(request,0))->comm;
  237. MPI_Probe(source, tag, comm, &status);
  238. free((struct dummy_request*)elt(request,0));
  239. MPI_Get_count(&status, MPI_PACKED, &mpi_pack_size);
  240. mpi_pack_buffer = (char*)malloc(mpi_pack_size);
  241. MPI_Recv(mpi_pack_buffer, mpi_pack_size, MPI_PACKED,
  242. source, tag, comm, &status);
  243. /* The only relevant status things are the 3 public fields, so I'll
  244. * stick them in a list and return them as the 2nd value
  245. */
  246. push(unpack_object());
  247. free(mpi_pack_buffer);
  248. Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
  249. fixnum_of_int(status.MPI_TAG),
  250. fixnum_of_int(status.MPI_ERROR));
  251. return onevalue(list2(my_pop(),Lstatus));
  252. }
  253. }
  254. /* Test for completion, deallocate buffer if so
  255. * (mpi_test request)
  256. * for send, returns flag
  257. * for recv, returns nil or (message (source tag error))
  258. */
  259. static Lisp_Object Lmpi_test(Lisp_Object nil, Lisp_Object request)
  260. {
  261. MPI_Status status;
  262. Lisp_Object message, Lstatus;
  263. int flag;
  264. if ( !(is_vector(request) && type_of_header(vechdr(request)) == TYPE_VEC32 &&
  265. length_of_header(vechdr(request)) == 3*CELL) )
  266. return aerror1("mpi_wait",request);
  267. if (elt(request,1)){
  268. status.MPI_ERROR = MPI_UNDEFINED;
  269. mpi_pack_buffer = (void*)elt(request,1);
  270. MPI_Test( (MPI_Request*)&elt(request,0), &flag, &status);
  271. if (!flag) return onevalue(nil);
  272. if (status.MPI_ERROR == MPI_UNDEFINED){ /* send request */
  273. free(mpi_pack_buffer);
  274. return onevalue(Lispify_predicate(YES));
  275. }
  276. else{ /* old-style receive */
  277. push(unpack_object());
  278. free(mpi_pack_buffer);
  279. Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
  280. fixnum_of_int(status.MPI_TAG),
  281. fixnum_of_int(status.MPI_ERROR));
  282. return onevalue(list2(my_pop(),Lstatus));
  283. }
  284. }
  285. else { /* new-style receive */
  286. int source = ((struct dummy_request*)elt(request,0))->source,
  287. tag = ((struct dummy_request*)elt(request,0))->tag,
  288. comm = ((struct dummy_request*)elt(request,0))->comm, flag;
  289. MPI_Iprobe(source, tag, comm, &flag, &status);
  290. if (!flag) return onevalue(nil);
  291. free((struct dummy_request*)elt(request,0));
  292. MPI_Get_count(&status, MPI_PACKED, &mpi_pack_size);
  293. mpi_pack_buffer = (char*)malloc(mpi_pack_size);
  294. MPI_Recv(mpi_pack_buffer, mpi_pack_size, MPI_PACKED,
  295. source, tag, comm, &status);
  296. /* The only relevant status things are the 3 public fields, so I'll
  297. * stick them in a list and return them as the 2nd value
  298. */
  299. push(unpack_object());
  300. free(mpi_pack_buffer);
  301. Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
  302. fixnum_of_int(status.MPI_TAG),
  303. fixnum_of_int(status.MPI_ERROR));
  304. return onevalue(list2(my_pop(),Lstatus));
  305. }
  306. }
  307. /************** Probe functions *******************/
  308. /* Non-blocking probe
  309. * (mpi_iprobe source tag comm)
  310. * returns (flag (source tag error))
  311. */
  312. static Lisp_Object MS_CDECL Lmpi_iprobe(Lisp_Object nil, int nargs, ...)
  313. {
  314. static char fun_name[] = "impi_probe";
  315. MPI_Status status;
  316. int source, tag, comm, flag;
  317. Lisp_Object Lstatus;
  318. va_list a;
  319. CSL_IGNORE(nil);
  320. argcheck(nargs,3,fun_name);
  321. va_start(a,nargs);
  322. get_fix_arg(source); get_fix_arg(tag); get_fix_arg(comm);
  323. MPI_Iprobe(source, tag, comm, &flag, &status);
  324. Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
  325. fixnum_of_int(status.MPI_TAG),
  326. fixnum_of_int(status.MPI_ERROR));
  327. return onevalue(list2(Lispify_predicate(flag), Lstatus));
  328. }
  329. /* Blocking probe
  330. * (mpi_probe source tag comm)
  331. * returns (source tag error)
  332. */
  333. static Lisp_Object MS_CDECL Lmpi_probe(Lisp_Object nil, int nargs, ...)
  334. {
  335. static char fun_name[] = "mpi_probe";
  336. MPI_Status status;
  337. int source, tag, comm;
  338. Lisp_Object Lstatus;
  339. va_list a;
  340. CSL_IGNORE(nil);
  341. argcheck(nargs,3,fun_name);
  342. va_start(a,nargs);
  343. get_fix_arg(source); get_fix_arg(tag); get_fix_arg(comm);
  344. MPI_Probe(source, tag, comm, &status);
  345. Lstatus = list3(fixnum_of_int(status.MPI_SOURCE),
  346. fixnum_of_int(status.MPI_TAG),
  347. fixnum_of_int(status.MPI_ERROR));
  348. return onevalue(Lstatus);
  349. }
  350. /************** Collective Communications *********/
  351. /* Barrier; blocks until all processes have called
  352. * (mpi_barrier comm)
  353. * returns nil
  354. */
  355. static Lisp_Object Lmpi_barrier(Lisp_Object nil, Lisp_Object comm)
  356. {
  357. int rank;
  358. static char fun_name[] = "mpi_barrier";
  359. check_fix(comm);
  360. MPI_Barrier(int_of_fixnum(comm));
  361. return onevalue(nil);
  362. }
  363. /* Broadcast; sends buffer of root to buffers of others.
  364. * (mpi_bcast message root comm) [message ignored if not root]
  365. * returns message
  366. */
  367. static Lisp_Object MS_CDECL Lmpi_bcast(Lisp_Object nil, int nargs, ...)
  368. {
  369. static char fun_name[] = "mpi_bcast";
  370. Lisp_Object message;
  371. int root,comm,rank;
  372. va_list a;
  373. CSL_IGNORE(nil);
  374. argcheck(nargs,3,fun_name);
  375. va_start(a,nargs);
  376. get_arg(message); get_fix_arg(root); get_fix_arg(comm);
  377. MPI_Comm_rank(comm,&rank);
  378. if (rank == root){
  379. pack_object(message);
  380. MPI_Bcast(&mpi_pack_position, 1, MPI_LONG, root, comm);
  381. MPI_Bcast(mpi_pack_buffer, mpi_pack_position, MPI_PACKED, root, comm);
  382. free(mpi_pack_buffer);
  383. }
  384. else {
  385. MPI_Bcast(&mpi_pack_size, 1, MPI_LONG, root, comm);
  386. mpi_pack_buffer = (char*)malloc(mpi_pack_size);
  387. MPI_Bcast(mpi_pack_buffer, mpi_pack_size, MPI_PACKED, root, comm);
  388. message = unpack_object();
  389. free(mpi_pack_buffer);
  390. }
  391. return onevalue(message);
  392. }
  393. /* Gather: root receives messages from others.
  394. * (mpi_gather message root comm)
  395. * returns vector of messages if root, else nil.
  396. */
  397. static Lisp_Object MS_CDECL Lmpi_gather(Lisp_Object nil, int nargs, ...)
  398. {
  399. static char fun_name[] = "mpi_gather";
  400. Lisp_Object message;
  401. int root,comm,rank;
  402. va_list a;
  403. CSL_IGNORE(nil);
  404. argcheck(nargs,3,fun_name);
  405. va_start(a,nargs);
  406. get_arg(message); get_fix_arg(root); get_fix_arg(comm);
  407. MPI_Comm_rank(comm,&rank);
  408. pack_object(message);
  409. if (rank == root){
  410. int commsize, count;
  411. int *recvcounts, *displs;
  412. char *recvbuffer;
  413. MPI_Comm_size(comm,&commsize);
  414. recvcounts = (int*)calloc(commsize, sizeof(int));
  415. displs = (int*)calloc(commsize+1, sizeof(int));
  416. MPI_Gather(&mpi_pack_position, 1, MPI_LONG,
  417. recvcounts, 1, MPI_LONG, root, comm);
  418. displs[0] = 0;
  419. for (count = 0; count < commsize; ++count)
  420. displs[count+1] = displs[count] + recvcounts[count];
  421. recvbuffer = (char*)malloc(displs[commsize]);
  422. MPI_Gatherv(mpi_pack_buffer, mpi_pack_position, MPI_PACKED,
  423. recvbuffer, recvcounts, displs, MPI_PACKED, root, comm);
  424. free(mpi_pack_buffer);
  425. message = Lmkvect(nil, fixnum_of_int(commsize-1));
  426. for (count = 0; count < commsize; ++count){
  427. mpi_pack_buffer = recvbuffer + displs[count];
  428. mpi_pack_size = recvcounts[count];
  429. elt(message, count) = unpack_object();
  430. }
  431. free(recvbuffer); free(recvcounts); free(displs);
  432. }
  433. else {
  434. MPI_Gather(&mpi_pack_position, 1, MPI_LONG, 0, 0, MPI_LONG, root, comm);
  435. MPI_Gatherv(mpi_pack_buffer, mpi_pack_position, MPI_PACKED,
  436. 0,0,0,MPI_PACKED, root, comm);
  437. free(mpi_pack_buffer);
  438. message = nil;
  439. }
  440. return onevalue(message);
  441. }
  442. /* Scatter: inverse of gather.
  443. * (mpi_scatter vector_of_messages root comm) [messages ignored if not root]
  444. * returns message
  445. */
  446. static Lisp_Object MS_CDECL Lmpi_scatter(Lisp_Object nil, int nargs, ...)
  447. {
  448. static char fun_name[] = "mpi_scatter";
  449. Lisp_Object messages, message;
  450. int root, comm, rank;
  451. va_list a;
  452. CSL_IGNORE(nil);
  453. argcheck(nargs,3,fun_name);
  454. va_start(a,nargs);
  455. get_arg(messages); get_fix_arg(root); get_fix_arg(comm);
  456. MPI_Comm_rank(comm,&rank);
  457. if (rank == root){
  458. int commsize, count, *sendcounts, *displs, recvcount;
  459. char* recvbuffer;
  460. MPI_Comm_size(comm,&commsize);
  461. sendcounts = (int*)calloc(commsize, sizeof(int));
  462. displs = (int*)calloc(commsize+1, sizeof(int));
  463. displs[0] = 0;
  464. /* Call private functions in mpi_packing for consecutive packs */
  465. check_buffer = scatter_check_buffer;
  466. mpi_pack_offset = 0;
  467. mpi_pack_position = 0;
  468. mpi_pack_size = 0;
  469. mpi_buffer_bottom = 0;
  470. mpi_real_size = 0;
  471. for (count = 0; count < commsize; ++count){
  472. pack_cell(elt(messages,count));
  473. sendcounts[count] = mpi_pack_position;
  474. mpi_pack_size -= mpi_pack_position;
  475. mpi_pack_offset += mpi_pack_position;
  476. mpi_pack_buffer += mpi_pack_position;
  477. displs[count+1] = mpi_pack_offset;
  478. mpi_pack_position = 0;
  479. }
  480. check_buffer = default_check_buffer;
  481. MPI_Scatter(sendcounts, 1, MPI_LONG, &recvcount, 1, MPI_LONG, root, comm);
  482. recvbuffer = (char*)malloc(recvcount);
  483. MPI_Scatterv(mpi_buffer_bottom, sendcounts, displs, MPI_PACKED,
  484. recvbuffer, recvcount, MPI_PACKED, root, comm);
  485. free(recvbuffer);
  486. free(sendcounts);
  487. free(displs);
  488. free(mpi_buffer_bottom);
  489. message = elt(messages, root);
  490. }
  491. else {
  492. MPI_Scatter(0,0,MPI_LONG,&mpi_pack_size,1,MPI_LONG,root,comm);
  493. mpi_pack_buffer = (char*)malloc(mpi_pack_size);
  494. MPI_Scatterv(0,0,0,MPI_PACKED,
  495. mpi_pack_buffer,mpi_pack_size,MPI_PACKED,root,comm);
  496. message = unpack_object();
  497. free(mpi_pack_buffer);
  498. }
  499. return onevalue(message);
  500. }
  501. /* Allgather: just like gather, only everyone gets the result.
  502. * (mpi_allgather message comm)
  503. * returns vector of messages
  504. */
  505. static Lisp_Object Lmpi_allgather(Lisp_Object nil,
  506. Lisp_Object message,
  507. Lisp_Object comm)
  508. {
  509. static char fun_name[] = "mpi_gather";
  510. int commsize, buffersize, count;
  511. int *recvcounts, *displs;
  512. char *recvbuffer;
  513. check_fix(comm);
  514. comm = int_of_fixnum(comm);
  515. CSL_IGNORE(nil);
  516. pack_object(message);
  517. MPI_Comm_size(comm,&commsize);
  518. recvcounts = (int*)calloc(commsize, sizeof(int));
  519. displs = (int*)calloc(commsize+1, sizeof(int));
  520. MPI_Allgather(&mpi_pack_position, 1, MPI_LONG, recvcounts, 1, MPI_LONG, comm);
  521. displs[0] = 0;
  522. for (count = 0; count < commsize; ++count)
  523. displs[count+1] = displs[count] + recvcounts[count];
  524. recvbuffer = (char*)malloc(displs[commsize]);
  525. MPI_Allgatherv(mpi_pack_buffer, mpi_pack_position, MPI_PACKED,
  526. recvbuffer, recvcounts, displs, MPI_PACKED, comm);
  527. free(mpi_pack_buffer); free(recvcounts); free(displs);
  528. message = Lmkvect(nil, fixnum_of_int(commsize-1));
  529. for (count = 0; count < commsize; ++count){
  530. mpi_pack_buffer = recvbuffer + displs[count];
  531. mpi_pack_size = recvcounts[count];
  532. elt(message, count) = unpack_object();
  533. }
  534. free(recvbuffer);
  535. return onevalue(message);
  536. }
  537. /* All to all scatter/gather.
  538. * (mpi_alltoall vector_of_messages comm)
  539. * returns vector of messages.
  540. */
  541. static Lisp_Object Lmpi_alltoall(Lisp_Object nil,
  542. Lisp_Object smessages, Lisp_Object Lcomm)
  543. {
  544. static char fun_name[] = "mpi_alltoall";
  545. Lisp_Object rmessages;
  546. int rank,comm, commsize, count;
  547. int *sendcounts, *recvcounts, *sdispls, *rdispls;
  548. char* recvbuffer;
  549. CSL_IGNORE(nil);
  550. check_fix(Lcomm);
  551. comm = int_of_fixnum(Lcomm);
  552. MPI_Comm_size(comm,&commsize);
  553. sendcounts = (int*)calloc(commsize, sizeof(int));
  554. recvcounts = (int*)calloc(commsize, sizeof(int));
  555. sdispls = (int*)calloc(commsize+1, sizeof(int));
  556. rdispls = (int*)calloc(commsize+1, sizeof(int));
  557. /* Call private functions in mpi_packing for consecutive packs */
  558. check_buffer = scatter_check_buffer;
  559. mpi_pack_offset = 0;
  560. mpi_pack_position = 0;
  561. mpi_pack_size = 0;
  562. mpi_buffer_bottom = 0;
  563. mpi_real_size = 0;
  564. for (count = 0; count < commsize; ++count){
  565. pack_cell(elt(smessages,count));
  566. sendcounts[count] = mpi_pack_position;
  567. mpi_pack_size -= mpi_pack_position;
  568. mpi_pack_offset += mpi_pack_position;
  569. mpi_pack_buffer += mpi_pack_position;
  570. sdispls[count+1] = mpi_pack_offset;
  571. mpi_pack_position = 0;
  572. }
  573. check_buffer = default_check_buffer;
  574. MPI_Comm_rank(comm,&rank);
  575. MPI_Alltoall(sendcounts, 1, MPI_LONG, recvcounts, 1, MPI_LONG, comm);
  576. rdispls[0] = 0;
  577. for (count = 0; count < commsize; ++count)
  578. rdispls[count+1] = rdispls[count] + recvcounts[count];
  579. recvbuffer = (char*)malloc(rdispls[commsize]);
  580. MPI_Alltoallv(mpi_buffer_bottom, sendcounts, sdispls, MPI_PACKED,
  581. recvbuffer, recvcounts, rdispls, MPI_PACKED, comm);
  582. free(mpi_buffer_bottom); free(sendcounts); free(sdispls);
  583. rmessages = Lmkvect(nil, fixnum_of_int(commsize-1));
  584. for (count = 0; count < commsize; ++count){
  585. mpi_pack_buffer = recvbuffer + rdispls[count];
  586. mpi_pack_size = recvcounts[count];
  587. elt(rmessages, count) = unpack_object();
  588. }
  589. free(recvbuffer); free(recvcounts); free(rdispls);
  590. return onevalue(rmessages);
  591. }
  592. #else /* USE_MPI */
  593. static Lisp_Object Lmpi_comm_rank(Lisp_Object nil, Lisp_Object comm)
  594. {
  595. return aerror0("mpi support not built into this version of CSL");
  596. }
  597. static Lisp_Object Lmpi_comm_size(Lisp_Object nil, Lisp_Object comm)
  598. {
  599. return aerror0("mpi support not built into this version of CSL");
  600. }
  601. static Lisp_Object MS_CDECL Lmpi_send(Lisp_Object nil, int nargs, ...)
  602. {
  603. return aerror0("mpi support not built into this version of CSL");
  604. }
  605. static Lisp_Object MS_CDECL Lmpi_recv(Lisp_Object nil, int nargs, ...)
  606. {
  607. return aerror0("mpi support not built into this version of CSL");
  608. }
  609. static Lisp_Object MS_CDECL Lmpi_sendrecv(Lisp_Object nil, int nargs, ...)
  610. {
  611. return aerror0("mpi support not built into this version of CSL");
  612. }
  613. static Lisp_Object MS_CDECL Lmpi_isend(Lisp_Object nil, int nargs, ...)
  614. {
  615. return aerror0("mpi support not built into this version of CSL");
  616. }
  617. static Lisp_Object MS_CDECL Lmpi_irecv(Lisp_Object nil, int nargs, ...)
  618. {
  619. return aerror0("mpi support not built into this version of CSL");
  620. }
  621. static Lisp_Object Lmpi_wait(Lisp_Object nil, Lisp_Object request)
  622. {
  623. return aerror0("mpi support not built into this version of CSL");
  624. }
  625. static Lisp_Object Lmpi_test(Lisp_Object nil, Lisp_Object request)
  626. {
  627. return aerror0("mpi support not built into this version of CSL");
  628. }
  629. static Lisp_Object MS_CDECL Lmpi_iprobe(Lisp_Object nil, int nargs, ...)
  630. {
  631. return aerror0("mpi support not built into this version of CSL");
  632. }
  633. static Lisp_Object MS_CDECL Lmpi_probe(Lisp_Object nil, int nargs, ...)
  634. {
  635. return aerror0("mpi support not built into this version of CSL");
  636. }
  637. static Lisp_Object Lmpi_barrier(Lisp_Object nil, Lisp_Object comm)
  638. {
  639. return aerror0("mpi support not built into this version of CSL");
  640. }
  641. static Lisp_Object MS_CDECL Lmpi_bcast(Lisp_Object nil, int nargs, ...)
  642. {
  643. return aerror0("mpi support not built into this version of CSL");
  644. }
  645. static Lisp_Object MS_CDECL Lmpi_gather(Lisp_Object nil, int nargs, ...)
  646. {
  647. return aerror0("mpi support not built into this version of CSL");
  648. }
  649. static Lisp_Object MS_CDECL Lmpi_scatter(Lisp_Object nil, int nargs, ...)
  650. {
  651. return aerror0("mpi support not built into this version of CSL");
  652. }
  653. static Lisp_Object Lmpi_allgather(Lisp_Object nil,
  654. Lisp_Object message,
  655. Lisp_Object comm)
  656. {
  657. return aerror0("mpi support not built into this version of CSL");
  658. }
  659. static Lisp_Object Lmpi_alltoall(Lisp_Object nil,
  660. Lisp_Object smessages, Lisp_Object Lcomm)
  661. {
  662. return aerror0("mpi support not built into this version of CSL");
  663. }
  664. #endif /* USE_MPI */
  665. setup_type const mpi_setup[] =
  666. {
  667. {"mpi_comm_rank", Lmpi_comm_rank, too_many_1, wrong_no_1},
  668. {"mpi_comm_size", Lmpi_comm_size, too_many_1, wrong_no_1},
  669. {"mpi_send", wrong_no_0a, wrong_no_0b, Lmpi_send},
  670. {"mpi_recv", wrong_no_0a, wrong_no_0b, Lmpi_recv},
  671. {"mpi_sendrecv", wrong_no_0a, wrong_no_0b, Lmpi_sendrecv},
  672. {"mpi_isend", wrong_no_0a, wrong_no_0b, Lmpi_isend},
  673. {"mpi_irecv", wrong_no_0a, wrong_no_0b, Lmpi_irecv},
  674. {"mpi_wait", Lmpi_wait, too_many_1, wrong_no_1},
  675. {"mpi_test", Lmpi_test, too_many_1, wrong_no_1},
  676. {"mpi_probe", wrong_no_0a, wrong_no_0b, Lmpi_probe},
  677. {"mpi_iprobe", wrong_no_0a, wrong_no_0b, Lmpi_iprobe},
  678. {"mpi_bcast", wrong_no_0a, wrong_no_0b, Lmpi_bcast},
  679. {"mpi_gather", wrong_no_0a, wrong_no_0b, Lmpi_gather},
  680. {"mpi_allgather", wrong_no_0a, Lmpi_allgather, wrong_no_2},
  681. {"mpi_scatter", wrong_no_0a, wrong_no_0b, Lmpi_scatter},
  682. {"mpi_alltoall", wrong_no_0a, Lmpi_alltoall, wrong_no_2},
  683. {NULL, 0, 0, 0}
  684. };
  685. /* end of cslmpi.c */