mp_rs_query.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. /*-
  2. * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions
  7. * are met:
  8. * 1. Redistributions of source code must retain the above copyright
  9. * notice, this list of conditions and the following disclaimer.
  10. * 2. Redistributions in binary form must reproduce the above copyright
  11. * notice, this list of conditions and the following disclaimer in the
  12. * documentation and/or other materials provided with the distribution.
  13. *
  14. * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
  15. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  16. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  17. * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  18. * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  19. * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  20. * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  21. * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  22. * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  23. * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  24. * SUCH DAMAGE.
  25. *
  26. */
  27. #include <sys/types.h>
  28. #include <sys/event.h>
  29. #include <sys/socket.h>
  30. #include <sys/time.h>
  31. #include <assert.h>
  32. #include <errno.h>
  33. #include <nsswitch.h>
  34. #include <stdio.h>
  35. #include <stdlib.h>
  36. #include <string.h>
  37. #include "cachelib.h"
  38. #include "config.h"
  39. #include "debug.h"
  40. #include "log.h"
  41. #include "query.h"
  42. #include "mp_rs_query.h"
  43. #include "mp_ws_query.h"
  44. #include "singletons.h"
  45. static int on_mp_read_session_close_notification(struct query_state *);
  46. static void on_mp_read_session_destroy(struct query_state *);
  47. static int on_mp_read_session_mapper(struct query_state *);
  48. /* int on_mp_read_session_request_read1(struct query_state *); */
  49. static int on_mp_read_session_request_read2(struct query_state *);
  50. static int on_mp_read_session_request_process(struct query_state *);
  51. static int on_mp_read_session_response_write1(struct query_state *);
  52. static int on_mp_read_session_read_request_process(struct query_state *);
  53. static int on_mp_read_session_read_response_write1(struct query_state *);
  54. static int on_mp_read_session_read_response_write2(struct query_state *);
  55. /*
  56. * This function is used as the query_state's destroy_func to make the
  57. * proper cleanup in case of errors.
  58. */
  59. static void
  60. on_mp_read_session_destroy(struct query_state *qstate)
  61. {
  62. TRACE_IN(on_mp_read_session_destroy);
  63. finalize_comm_element(&qstate->request);
  64. finalize_comm_element(&qstate->response);
  65. if (qstate->mdata != NULL) {
  66. configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
  67. close_cache_mp_read_session(
  68. (cache_mp_read_session)qstate->mdata);
  69. configuration_unlock_entry(qstate->config_entry,
  70. CELT_MULTIPART);
  71. }
  72. TRACE_OUT(on_mp_read_session_destroy);
  73. }
  74. /*
  75. * The functions below are used to process multipart read session initiation
  76. * requests.
  77. * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
  78. * the request itself
  79. * - on_mp_read_session_request_process processes it
  80. * - on_mp_read_session_response_write1 sends the response
  81. */
  82. int
  83. on_mp_read_session_request_read1(struct query_state *qstate)
  84. {
  85. struct cache_mp_read_session_request *c_mp_rs_request;
  86. ssize_t result;
  87. TRACE_IN(on_mp_read_session_request_read1);
  88. if (qstate->kevent_watermark == 0)
  89. qstate->kevent_watermark = sizeof(size_t);
  90. else {
  91. init_comm_element(&qstate->request,
  92. CET_MP_READ_SESSION_REQUEST);
  93. c_mp_rs_request = get_cache_mp_read_session_request(
  94. &qstate->request);
  95. result = qstate->read_func(qstate,
  96. &c_mp_rs_request->entry_length, sizeof(size_t));
  97. if (result != sizeof(size_t)) {
  98. TRACE_OUT(on_mp_read_session_request_read1);
  99. return (-1);
  100. }
  101. if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
  102. TRACE_OUT(on_mp_read_session_request_read1);
  103. return (-1);
  104. }
  105. c_mp_rs_request->entry = calloc(1,
  106. c_mp_rs_request->entry_length + 1);
  107. assert(c_mp_rs_request->entry != NULL);
  108. qstate->kevent_watermark = c_mp_rs_request->entry_length;
  109. qstate->process_func = on_mp_read_session_request_read2;
  110. }
  111. TRACE_OUT(on_mp_read_session_request_read1);
  112. return (0);
  113. }
  114. static int
  115. on_mp_read_session_request_read2(struct query_state *qstate)
  116. {
  117. struct cache_mp_read_session_request *c_mp_rs_request;
  118. ssize_t result;
  119. TRACE_IN(on_mp_read_session_request_read2);
  120. c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
  121. result = qstate->read_func(qstate, c_mp_rs_request->entry,
  122. c_mp_rs_request->entry_length);
  123. if (result < 0 || (size_t)result != qstate->kevent_watermark) {
  124. LOG_ERR_3("on_mp_read_session_request_read2",
  125. "read failed");
  126. TRACE_OUT(on_mp_read_session_request_read2);
  127. return (-1);
  128. }
  129. qstate->kevent_watermark = 0;
  130. qstate->process_func = on_mp_read_session_request_process;
  131. TRACE_OUT(on_mp_read_session_request_read2);
  132. return (0);
  133. }
  134. static int
  135. on_mp_read_session_request_process(struct query_state *qstate)
  136. {
  137. struct cache_mp_read_session_request *c_mp_rs_request;
  138. struct cache_mp_read_session_response *c_mp_rs_response;
  139. cache_mp_read_session rs;
  140. cache_entry c_entry;
  141. char *dec_cache_entry_name;
  142. char *buffer;
  143. size_t buffer_size;
  144. cache_mp_write_session ws;
  145. struct agent *lookup_agent;
  146. struct multipart_agent *mp_agent;
  147. void *mdata;
  148. int res;
  149. TRACE_IN(on_mp_read_session_request_process);
  150. init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
  151. c_mp_rs_response = get_cache_mp_read_session_response(
  152. &qstate->response);
  153. c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
  154. qstate->config_entry = configuration_find_entry(
  155. s_configuration, c_mp_rs_request->entry);
  156. if (qstate->config_entry == NULL) {
  157. c_mp_rs_response->error_code = ENOENT;
  158. LOG_ERR_2("read_session_request",
  159. "can't find configuration entry '%s'."
  160. " aborting request", c_mp_rs_request->entry);
  161. goto fin;
  162. }
  163. if (qstate->config_entry->enabled == 0) {
  164. c_mp_rs_response->error_code = EACCES;
  165. LOG_ERR_2("read_session_request",
  166. "configuration entry '%s' is disabled",
  167. c_mp_rs_request->entry);
  168. goto fin;
  169. }
  170. if (qstate->config_entry->perform_actual_lookups != 0)
  171. dec_cache_entry_name = strdup(
  172. qstate->config_entry->mp_cache_params.cep.entry_name);
  173. else {
  174. #ifdef NS_NSCD_EID_CHECKING
  175. if (check_query_eids(qstate) != 0) {
  176. c_mp_rs_response->error_code = EPERM;
  177. goto fin;
  178. }
  179. #endif
  180. asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
  181. qstate->config_entry->mp_cache_params.cep.entry_name);
  182. }
  183. assert(dec_cache_entry_name != NULL);
  184. configuration_lock_rdlock(s_configuration);
  185. c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
  186. configuration_unlock(s_configuration);
  187. if ((c_entry == INVALID_CACHE) &&
  188. (qstate->config_entry->perform_actual_lookups != 0))
  189. c_entry = register_new_mp_cache_entry(qstate,
  190. dec_cache_entry_name);
  191. free(dec_cache_entry_name);
  192. if (c_entry != INVALID_CACHE_ENTRY) {
  193. configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
  194. rs = open_cache_mp_read_session(c_entry);
  195. configuration_unlock_entry(qstate->config_entry,
  196. CELT_MULTIPART);
  197. if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
  198. (qstate->config_entry->perform_actual_lookups != 0)) {
  199. lookup_agent = find_agent(s_agent_table,
  200. c_mp_rs_request->entry, MULTIPART_AGENT);
  201. if ((lookup_agent != NULL) &&
  202. (lookup_agent->type == MULTIPART_AGENT)) {
  203. mp_agent = (struct multipart_agent *)
  204. lookup_agent;
  205. mdata = mp_agent->mp_init_func();
  206. /*
  207. * Multipart agents read the whole snapshot
  208. * of the data at one time.
  209. */
  210. configuration_lock_entry(qstate->config_entry,
  211. CELT_MULTIPART);
  212. ws = open_cache_mp_write_session(c_entry);
  213. configuration_unlock_entry(qstate->config_entry,
  214. CELT_MULTIPART);
  215. if (ws != NULL) {
  216. do {
  217. buffer = NULL;
  218. res = mp_agent->mp_lookup_func(&buffer,
  219. &buffer_size,
  220. mdata);
  221. if ((res & NS_TERMINATE) &&
  222. (buffer != NULL)) {
  223. configuration_lock_entry(
  224. qstate->config_entry,
  225. CELT_MULTIPART);
  226. if (cache_mp_write(ws, buffer,
  227. buffer_size) != 0) {
  228. abandon_cache_mp_write_session(ws);
  229. ws = NULL;
  230. }
  231. configuration_unlock_entry(
  232. qstate->config_entry,
  233. CELT_MULTIPART);
  234. free(buffer);
  235. buffer = NULL;
  236. } else {
  237. configuration_lock_entry(
  238. qstate->config_entry,
  239. CELT_MULTIPART);
  240. close_cache_mp_write_session(ws);
  241. configuration_unlock_entry(
  242. qstate->config_entry,
  243. CELT_MULTIPART);
  244. free(buffer);
  245. buffer = NULL;
  246. }
  247. } while ((res & NS_TERMINATE) &&
  248. (ws != NULL));
  249. }
  250. configuration_lock_entry(qstate->config_entry,
  251. CELT_MULTIPART);
  252. rs = open_cache_mp_read_session(c_entry);
  253. configuration_unlock_entry(qstate->config_entry,
  254. CELT_MULTIPART);
  255. }
  256. }
  257. if (rs == INVALID_CACHE_MP_READ_SESSION)
  258. c_mp_rs_response->error_code = -1;
  259. else {
  260. qstate->mdata = rs;
  261. qstate->destroy_func = on_mp_read_session_destroy;
  262. configuration_lock_entry(qstate->config_entry,
  263. CELT_MULTIPART);
  264. if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
  265. (qstate->config_entry->mp_query_timeout.tv_usec != 0))
  266. memcpy(&qstate->timeout,
  267. &qstate->config_entry->mp_query_timeout,
  268. sizeof(struct timeval));
  269. configuration_unlock_entry(qstate->config_entry,
  270. CELT_MULTIPART);
  271. }
  272. } else
  273. c_mp_rs_response->error_code = -1;
  274. fin:
  275. qstate->process_func = on_mp_read_session_response_write1;
  276. qstate->kevent_watermark = sizeof(int);
  277. qstate->kevent_filter = EVFILT_WRITE;
  278. TRACE_OUT(on_mp_read_session_request_process);
  279. return (0);
  280. }
  281. static int
  282. on_mp_read_session_response_write1(struct query_state *qstate)
  283. {
  284. struct cache_mp_read_session_response *c_mp_rs_response;
  285. ssize_t result;
  286. TRACE_IN(on_mp_read_session_response_write1);
  287. c_mp_rs_response = get_cache_mp_read_session_response(
  288. &qstate->response);
  289. result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
  290. sizeof(int));
  291. if (result != sizeof(int)) {
  292. LOG_ERR_3("on_mp_read_session_response_write1",
  293. "write failed");
  294. TRACE_OUT(on_mp_read_session_response_write1);
  295. return (-1);
  296. }
  297. if (c_mp_rs_response->error_code == 0) {
  298. qstate->kevent_watermark = sizeof(int);
  299. qstate->process_func = on_mp_read_session_mapper;
  300. qstate->kevent_filter = EVFILT_READ;
  301. } else {
  302. qstate->kevent_watermark = 0;
  303. qstate->process_func = NULL;
  304. }
  305. TRACE_OUT(on_mp_read_session_response_write1);
  306. return (0);
  307. }
  308. /*
  309. * Mapper function is used to avoid multiple connections for each session
  310. * write or read requests. After processing the request, it does not close
  311. * the connection, but waits for the next request.
  312. */
  313. static int
  314. on_mp_read_session_mapper(struct query_state *qstate)
  315. {
  316. ssize_t result;
  317. int elem_type;
  318. TRACE_IN(on_mp_read_session_mapper);
  319. if (qstate->kevent_watermark == 0) {
  320. qstate->kevent_watermark = sizeof(int);
  321. } else {
  322. result = qstate->read_func(qstate, &elem_type, sizeof(int));
  323. if (result != sizeof(int)) {
  324. LOG_ERR_3("on_mp_read_session_mapper",
  325. "read failed");
  326. TRACE_OUT(on_mp_read_session_mapper);
  327. return (-1);
  328. }
  329. switch (elem_type) {
  330. case CET_MP_READ_SESSION_READ_REQUEST:
  331. qstate->kevent_watermark = 0;
  332. qstate->process_func =
  333. on_mp_read_session_read_request_process;
  334. break;
  335. case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
  336. qstate->kevent_watermark = 0;
  337. qstate->process_func =
  338. on_mp_read_session_close_notification;
  339. break;
  340. default:
  341. qstate->kevent_watermark = 0;
  342. qstate->process_func = NULL;
  343. LOG_ERR_3("on_mp_read_session_mapper",
  344. "unknown element type");
  345. TRACE_OUT(on_mp_read_session_mapper);
  346. return (-1);
  347. }
  348. }
  349. TRACE_OUT(on_mp_read_session_mapper);
  350. return (0);
  351. }
  352. /*
  353. * The functions below are used to process multipart read sessions read
  354. * requests. User doesn't have to pass any kind of data, besides the
  355. * request identificator itself. So we don't need any XXX_read functions and
  356. * start with the XXX_process function.
  357. * - on_mp_read_session_read_request_process processes it
  358. * - on_mp_read_session_read_response_write1 and
  359. * on_mp_read_session_read_response_write2 sends the response
  360. */
  361. static int
  362. on_mp_read_session_read_request_process(struct query_state *qstate)
  363. {
  364. struct cache_mp_read_session_read_response *read_response;
  365. TRACE_IN(on_mp_read_session_response_process);
  366. init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
  367. read_response = get_cache_mp_read_session_read_response(
  368. &qstate->response);
  369. configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
  370. read_response->error_code = cache_mp_read(
  371. (cache_mp_read_session)qstate->mdata, NULL,
  372. &read_response->data_size);
  373. if (read_response->error_code == 0) {
  374. read_response->data = malloc(read_response->data_size);
  375. assert(read_response != NULL);
  376. read_response->error_code = cache_mp_read(
  377. (cache_mp_read_session)qstate->mdata,
  378. read_response->data,
  379. &read_response->data_size);
  380. }
  381. configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
  382. if (read_response->error_code == 0)
  383. qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
  384. else
  385. qstate->kevent_watermark = sizeof(int);
  386. qstate->process_func = on_mp_read_session_read_response_write1;
  387. qstate->kevent_filter = EVFILT_WRITE;
  388. TRACE_OUT(on_mp_read_session_response_process);
  389. return (0);
  390. }
  391. static int
  392. on_mp_read_session_read_response_write1(struct query_state *qstate)
  393. {
  394. struct cache_mp_read_session_read_response *read_response;
  395. ssize_t result;
  396. TRACE_IN(on_mp_read_session_read_response_write1);
  397. read_response = get_cache_mp_read_session_read_response(
  398. &qstate->response);
  399. result = qstate->write_func(qstate, &read_response->error_code,
  400. sizeof(int));
  401. if (read_response->error_code == 0) {
  402. result += qstate->write_func(qstate, &read_response->data_size,
  403. sizeof(size_t));
  404. if (result < 0 || (size_t)result != qstate->kevent_watermark) {
  405. TRACE_OUT(on_mp_read_session_read_response_write1);
  406. LOG_ERR_3("on_mp_read_session_read_response_write1",
  407. "write failed");
  408. return (-1);
  409. }
  410. qstate->kevent_watermark = read_response->data_size;
  411. qstate->process_func = on_mp_read_session_read_response_write2;
  412. } else {
  413. if (result < 0 || (size_t)result != qstate->kevent_watermark) {
  414. LOG_ERR_3("on_mp_read_session_read_response_write1",
  415. "write failed");
  416. TRACE_OUT(on_mp_read_session_read_response_write1);
  417. return (-1);
  418. }
  419. qstate->kevent_watermark = 0;
  420. qstate->process_func = NULL;
  421. }
  422. TRACE_OUT(on_mp_read_session_read_response_write1);
  423. return (0);
  424. }
  425. static int
  426. on_mp_read_session_read_response_write2(struct query_state *qstate)
  427. {
  428. struct cache_mp_read_session_read_response *read_response;
  429. ssize_t result;
  430. TRACE_IN(on_mp_read_session_read_response_write2);
  431. read_response = get_cache_mp_read_session_read_response(
  432. &qstate->response);
  433. result = qstate->write_func(qstate, read_response->data,
  434. read_response->data_size);
  435. if (result < 0 || (size_t)result != qstate->kevent_watermark) {
  436. LOG_ERR_3("on_mp_read_session_read_response_write2",
  437. "write failed");
  438. TRACE_OUT(on_mp_read_session_read_response_write2);
  439. return (-1);
  440. }
  441. finalize_comm_element(&qstate->request);
  442. finalize_comm_element(&qstate->response);
  443. qstate->kevent_watermark = sizeof(int);
  444. qstate->process_func = on_mp_read_session_mapper;
  445. qstate->kevent_filter = EVFILT_READ;
  446. TRACE_OUT(on_mp_read_session_read_response_write2);
  447. return (0);
  448. }
  449. /*
  450. * Handles session close notification by calling close_cache_mp_read_session
  451. * function.
  452. */
  453. static int
  454. on_mp_read_session_close_notification(struct query_state *qstate)
  455. {
  456. TRACE_IN(on_mp_read_session_close_notification);
  457. configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
  458. close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
  459. configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
  460. qstate->mdata = NULL;
  461. qstate->kevent_watermark = 0;
  462. qstate->process_func = NULL;
  463. TRACE_OUT(on_mp_read_session_close_notification);
  464. return (0);
  465. }