123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- /*-
- * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
- * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- * SUCH DAMAGE.
- *
- */
- #include <sys/types.h>
- #include <sys/event.h>
- #include <sys/socket.h>
- #include <sys/time.h>
- #include <assert.h>
- #include <errno.h>
- #include <nsswitch.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include "cachelib.h"
- #include "config.h"
- #include "debug.h"
- #include "log.h"
- #include "query.h"
- #include "mp_rs_query.h"
- #include "mp_ws_query.h"
- #include "singletons.h"
- static int on_mp_read_session_close_notification(struct query_state *);
- static void on_mp_read_session_destroy(struct query_state *);
- static int on_mp_read_session_mapper(struct query_state *);
- /* int on_mp_read_session_request_read1(struct query_state *); */
- static int on_mp_read_session_request_read2(struct query_state *);
- static int on_mp_read_session_request_process(struct query_state *);
- static int on_mp_read_session_response_write1(struct query_state *);
- static int on_mp_read_session_read_request_process(struct query_state *);
- static int on_mp_read_session_read_response_write1(struct query_state *);
- static int on_mp_read_session_read_response_write2(struct query_state *);
- /*
- * This function is used as the query_state's destroy_func to make the
- * proper cleanup in case of errors.
- */
- static void
- on_mp_read_session_destroy(struct query_state *qstate)
- {
- TRACE_IN(on_mp_read_session_destroy);
- finalize_comm_element(&qstate->request);
- finalize_comm_element(&qstate->response);
- if (qstate->mdata != NULL) {
- configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
- close_cache_mp_read_session(
- (cache_mp_read_session)qstate->mdata);
- configuration_unlock_entry(qstate->config_entry,
- CELT_MULTIPART);
- }
- TRACE_OUT(on_mp_read_session_destroy);
- }
- /*
- * The functions below are used to process multipart read session initiation
- * requests.
- * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
- * the request itself
- * - on_mp_read_session_request_process processes it
- * - on_mp_read_session_response_write1 sends the response
- */
- int
- on_mp_read_session_request_read1(struct query_state *qstate)
- {
- struct cache_mp_read_session_request *c_mp_rs_request;
- ssize_t result;
- TRACE_IN(on_mp_read_session_request_read1);
- if (qstate->kevent_watermark == 0)
- qstate->kevent_watermark = sizeof(size_t);
- else {
- init_comm_element(&qstate->request,
- CET_MP_READ_SESSION_REQUEST);
- c_mp_rs_request = get_cache_mp_read_session_request(
- &qstate->request);
- result = qstate->read_func(qstate,
- &c_mp_rs_request->entry_length, sizeof(size_t));
- if (result != sizeof(size_t)) {
- TRACE_OUT(on_mp_read_session_request_read1);
- return (-1);
- }
- if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
- TRACE_OUT(on_mp_read_session_request_read1);
- return (-1);
- }
- c_mp_rs_request->entry = calloc(1,
- c_mp_rs_request->entry_length + 1);
- assert(c_mp_rs_request->entry != NULL);
- qstate->kevent_watermark = c_mp_rs_request->entry_length;
- qstate->process_func = on_mp_read_session_request_read2;
- }
- TRACE_OUT(on_mp_read_session_request_read1);
- return (0);
- }
- static int
- on_mp_read_session_request_read2(struct query_state *qstate)
- {
- struct cache_mp_read_session_request *c_mp_rs_request;
- ssize_t result;
- TRACE_IN(on_mp_read_session_request_read2);
- c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
- result = qstate->read_func(qstate, c_mp_rs_request->entry,
- c_mp_rs_request->entry_length);
- if (result < 0 || (size_t)result != qstate->kevent_watermark) {
- LOG_ERR_3("on_mp_read_session_request_read2",
- "read failed");
- TRACE_OUT(on_mp_read_session_request_read2);
- return (-1);
- }
- qstate->kevent_watermark = 0;
- qstate->process_func = on_mp_read_session_request_process;
- TRACE_OUT(on_mp_read_session_request_read2);
- return (0);
- }
- static int
- on_mp_read_session_request_process(struct query_state *qstate)
- {
- struct cache_mp_read_session_request *c_mp_rs_request;
- struct cache_mp_read_session_response *c_mp_rs_response;
- cache_mp_read_session rs;
- cache_entry c_entry;
- char *dec_cache_entry_name;
- char *buffer;
- size_t buffer_size;
- cache_mp_write_session ws;
- struct agent *lookup_agent;
- struct multipart_agent *mp_agent;
- void *mdata;
- int res;
- TRACE_IN(on_mp_read_session_request_process);
- init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
- c_mp_rs_response = get_cache_mp_read_session_response(
- &qstate->response);
- c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
- qstate->config_entry = configuration_find_entry(
- s_configuration, c_mp_rs_request->entry);
- if (qstate->config_entry == NULL) {
- c_mp_rs_response->error_code = ENOENT;
- LOG_ERR_2("read_session_request",
- "can't find configuration entry '%s'."
- " aborting request", c_mp_rs_request->entry);
- goto fin;
- }
- if (qstate->config_entry->enabled == 0) {
- c_mp_rs_response->error_code = EACCES;
- LOG_ERR_2("read_session_request",
- "configuration entry '%s' is disabled",
- c_mp_rs_request->entry);
- goto fin;
- }
- if (qstate->config_entry->perform_actual_lookups != 0)
- dec_cache_entry_name = strdup(
- qstate->config_entry->mp_cache_params.cep.entry_name);
- else {
- #ifdef NS_NSCD_EID_CHECKING
- if (check_query_eids(qstate) != 0) {
- c_mp_rs_response->error_code = EPERM;
- goto fin;
- }
- #endif
- asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
- qstate->config_entry->mp_cache_params.cep.entry_name);
- }
- assert(dec_cache_entry_name != NULL);
- configuration_lock_rdlock(s_configuration);
- c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
- configuration_unlock(s_configuration);
- if ((c_entry == INVALID_CACHE) &&
- (qstate->config_entry->perform_actual_lookups != 0))
- c_entry = register_new_mp_cache_entry(qstate,
- dec_cache_entry_name);
- free(dec_cache_entry_name);
- if (c_entry != INVALID_CACHE_ENTRY) {
- configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
- rs = open_cache_mp_read_session(c_entry);
- configuration_unlock_entry(qstate->config_entry,
- CELT_MULTIPART);
- if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
- (qstate->config_entry->perform_actual_lookups != 0)) {
- lookup_agent = find_agent(s_agent_table,
- c_mp_rs_request->entry, MULTIPART_AGENT);
- if ((lookup_agent != NULL) &&
- (lookup_agent->type == MULTIPART_AGENT)) {
- mp_agent = (struct multipart_agent *)
- lookup_agent;
- mdata = mp_agent->mp_init_func();
- /*
- * Multipart agents read the whole snapshot
- * of the data at one time.
- */
- configuration_lock_entry(qstate->config_entry,
- CELT_MULTIPART);
- ws = open_cache_mp_write_session(c_entry);
- configuration_unlock_entry(qstate->config_entry,
- CELT_MULTIPART);
- if (ws != NULL) {
- do {
- buffer = NULL;
- res = mp_agent->mp_lookup_func(&buffer,
- &buffer_size,
- mdata);
- if ((res & NS_TERMINATE) &&
- (buffer != NULL)) {
- configuration_lock_entry(
- qstate->config_entry,
- CELT_MULTIPART);
- if (cache_mp_write(ws, buffer,
- buffer_size) != 0) {
- abandon_cache_mp_write_session(ws);
- ws = NULL;
- }
- configuration_unlock_entry(
- qstate->config_entry,
- CELT_MULTIPART);
- free(buffer);
- buffer = NULL;
- } else {
- configuration_lock_entry(
- qstate->config_entry,
- CELT_MULTIPART);
- close_cache_mp_write_session(ws);
- configuration_unlock_entry(
- qstate->config_entry,
- CELT_MULTIPART);
- free(buffer);
- buffer = NULL;
- }
- } while ((res & NS_TERMINATE) &&
- (ws != NULL));
- }
- configuration_lock_entry(qstate->config_entry,
- CELT_MULTIPART);
- rs = open_cache_mp_read_session(c_entry);
- configuration_unlock_entry(qstate->config_entry,
- CELT_MULTIPART);
- }
- }
- if (rs == INVALID_CACHE_MP_READ_SESSION)
- c_mp_rs_response->error_code = -1;
- else {
- qstate->mdata = rs;
- qstate->destroy_func = on_mp_read_session_destroy;
- configuration_lock_entry(qstate->config_entry,
- CELT_MULTIPART);
- if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
- (qstate->config_entry->mp_query_timeout.tv_usec != 0))
- memcpy(&qstate->timeout,
- &qstate->config_entry->mp_query_timeout,
- sizeof(struct timeval));
- configuration_unlock_entry(qstate->config_entry,
- CELT_MULTIPART);
- }
- } else
- c_mp_rs_response->error_code = -1;
- fin:
- qstate->process_func = on_mp_read_session_response_write1;
- qstate->kevent_watermark = sizeof(int);
- qstate->kevent_filter = EVFILT_WRITE;
- TRACE_OUT(on_mp_read_session_request_process);
- return (0);
- }
- static int
- on_mp_read_session_response_write1(struct query_state *qstate)
- {
- struct cache_mp_read_session_response *c_mp_rs_response;
- ssize_t result;
- TRACE_IN(on_mp_read_session_response_write1);
- c_mp_rs_response = get_cache_mp_read_session_response(
- &qstate->response);
- result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
- sizeof(int));
- if (result != sizeof(int)) {
- LOG_ERR_3("on_mp_read_session_response_write1",
- "write failed");
- TRACE_OUT(on_mp_read_session_response_write1);
- return (-1);
- }
- if (c_mp_rs_response->error_code == 0) {
- qstate->kevent_watermark = sizeof(int);
- qstate->process_func = on_mp_read_session_mapper;
- qstate->kevent_filter = EVFILT_READ;
- } else {
- qstate->kevent_watermark = 0;
- qstate->process_func = NULL;
- }
- TRACE_OUT(on_mp_read_session_response_write1);
- return (0);
- }
- /*
- * Mapper function is used to avoid multiple connections for each session
- * write or read requests. After processing the request, it does not close
- * the connection, but waits for the next request.
- */
- static int
- on_mp_read_session_mapper(struct query_state *qstate)
- {
- ssize_t result;
- int elem_type;
- TRACE_IN(on_mp_read_session_mapper);
- if (qstate->kevent_watermark == 0) {
- qstate->kevent_watermark = sizeof(int);
- } else {
- result = qstate->read_func(qstate, &elem_type, sizeof(int));
- if (result != sizeof(int)) {
- LOG_ERR_3("on_mp_read_session_mapper",
- "read failed");
- TRACE_OUT(on_mp_read_session_mapper);
- return (-1);
- }
- switch (elem_type) {
- case CET_MP_READ_SESSION_READ_REQUEST:
- qstate->kevent_watermark = 0;
- qstate->process_func =
- on_mp_read_session_read_request_process;
- break;
- case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
- qstate->kevent_watermark = 0;
- qstate->process_func =
- on_mp_read_session_close_notification;
- break;
- default:
- qstate->kevent_watermark = 0;
- qstate->process_func = NULL;
- LOG_ERR_3("on_mp_read_session_mapper",
- "unknown element type");
- TRACE_OUT(on_mp_read_session_mapper);
- return (-1);
- }
- }
- TRACE_OUT(on_mp_read_session_mapper);
- return (0);
- }
- /*
- * The functions below are used to process multipart read sessions read
- * requests. User doesn't have to pass any kind of data, besides the
- * request identificator itself. So we don't need any XXX_read functions and
- * start with the XXX_process function.
- * - on_mp_read_session_read_request_process processes it
- * - on_mp_read_session_read_response_write1 and
- * on_mp_read_session_read_response_write2 sends the response
- */
- static int
- on_mp_read_session_read_request_process(struct query_state *qstate)
- {
- struct cache_mp_read_session_read_response *read_response;
- TRACE_IN(on_mp_read_session_response_process);
- init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
- read_response = get_cache_mp_read_session_read_response(
- &qstate->response);
- configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
- read_response->error_code = cache_mp_read(
- (cache_mp_read_session)qstate->mdata, NULL,
- &read_response->data_size);
- if (read_response->error_code == 0) {
- read_response->data = malloc(read_response->data_size);
- assert(read_response != NULL);
- read_response->error_code = cache_mp_read(
- (cache_mp_read_session)qstate->mdata,
- read_response->data,
- &read_response->data_size);
- }
- configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
- if (read_response->error_code == 0)
- qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
- else
- qstate->kevent_watermark = sizeof(int);
- qstate->process_func = on_mp_read_session_read_response_write1;
- qstate->kevent_filter = EVFILT_WRITE;
- TRACE_OUT(on_mp_read_session_response_process);
- return (0);
- }
- static int
- on_mp_read_session_read_response_write1(struct query_state *qstate)
- {
- struct cache_mp_read_session_read_response *read_response;
- ssize_t result;
- TRACE_IN(on_mp_read_session_read_response_write1);
- read_response = get_cache_mp_read_session_read_response(
- &qstate->response);
- result = qstate->write_func(qstate, &read_response->error_code,
- sizeof(int));
- if (read_response->error_code == 0) {
- result += qstate->write_func(qstate, &read_response->data_size,
- sizeof(size_t));
- if (result < 0 || (size_t)result != qstate->kevent_watermark) {
- TRACE_OUT(on_mp_read_session_read_response_write1);
- LOG_ERR_3("on_mp_read_session_read_response_write1",
- "write failed");
- return (-1);
- }
- qstate->kevent_watermark = read_response->data_size;
- qstate->process_func = on_mp_read_session_read_response_write2;
- } else {
- if (result < 0 || (size_t)result != qstate->kevent_watermark) {
- LOG_ERR_3("on_mp_read_session_read_response_write1",
- "write failed");
- TRACE_OUT(on_mp_read_session_read_response_write1);
- return (-1);
- }
- qstate->kevent_watermark = 0;
- qstate->process_func = NULL;
- }
- TRACE_OUT(on_mp_read_session_read_response_write1);
- return (0);
- }
- static int
- on_mp_read_session_read_response_write2(struct query_state *qstate)
- {
- struct cache_mp_read_session_read_response *read_response;
- ssize_t result;
- TRACE_IN(on_mp_read_session_read_response_write2);
- read_response = get_cache_mp_read_session_read_response(
- &qstate->response);
- result = qstate->write_func(qstate, read_response->data,
- read_response->data_size);
- if (result < 0 || (size_t)result != qstate->kevent_watermark) {
- LOG_ERR_3("on_mp_read_session_read_response_write2",
- "write failed");
- TRACE_OUT(on_mp_read_session_read_response_write2);
- return (-1);
- }
- finalize_comm_element(&qstate->request);
- finalize_comm_element(&qstate->response);
- qstate->kevent_watermark = sizeof(int);
- qstate->process_func = on_mp_read_session_mapper;
- qstate->kevent_filter = EVFILT_READ;
- TRACE_OUT(on_mp_read_session_read_response_write2);
- return (0);
- }
- /*
- * Handles session close notification by calling close_cache_mp_read_session
- * function.
- */
- static int
- on_mp_read_session_close_notification(struct query_state *qstate)
- {
- TRACE_IN(on_mp_read_session_close_notification);
- configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
- close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
- configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
- qstate->mdata = NULL;
- qstate->kevent_watermark = 0;
- qstate->process_func = NULL;
- TRACE_OUT(on_mp_read_session_close_notification);
- return (0);
- }
|