res_corosync.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2007, Digium, Inc.
  5. * Copyright (C) 2012, Russell Bryant
  6. *
  7. * Russell Bryant <russell@russellbryant.net>
  8. *
  9. * See http://www.asterisk.org for more information about
  10. * the Asterisk project. Please do not directly contact
  11. * any of the maintainers of this project for assistance;
  12. * the project provides a web site, mailing lists and IRC
  13. * channels for your use.
  14. *
  15. * This program is free software, distributed under the terms of
  16. * the GNU General Public License Version 2. See the LICENSE file
  17. * at the top of the source tree.
  18. */
  19. /*!
  20. * \file
  21. * \author Russell Bryant <russell@russellbryant.net>
  22. *
  23. * This module is based on and replaces the previous res_ais module.
  24. */
  25. /*** MODULEINFO
  26. <depend>corosync</depend>
  27. <defaultenabled>no</defaultenabled>
  28. <support_level>extended</support_level>
  29. ***/
  30. #include "asterisk.h"
  31. ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
  32. #include <corosync/cpg.h>
  33. #include <corosync/cfg.h>
  34. #include "asterisk/module.h"
  35. #include "asterisk/logger.h"
  36. #include "asterisk/poll-compat.h"
  37. #include "asterisk/config.h"
  38. #include "asterisk/event.h"
  39. #include "asterisk/cli.h"
  40. #include "asterisk/devicestate.h"
  41. #include "asterisk/app.h"
  42. #include "asterisk/stasis.h"
  43. #include "asterisk/stasis_message_router.h"
  44. AST_RWLOCK_DEFINE_STATIC(event_types_lock);
  45. static void publish_mwi_to_stasis(struct ast_event *event);
  46. static void publish_device_state_to_stasis(struct ast_event *event);
  47. /*! \brief The internal topic used for message forwarding and pings */
  48. static struct stasis_topic *corosync_aggregate_topic;
  49. /*! \brief Our \ref stasis message router */
  50. static struct stasis_message_router *stasis_router;
  51. /*! \brief Internal accessor for our topic */
  52. static struct stasis_topic *corosync_topic(void)
  53. {
  54. return corosync_aggregate_topic;
  55. }
  56. /*! \brief A payload wrapper around a corosync ping event */
  57. struct corosync_ping_payload {
  58. /*! The corosync ping event being passed over \ref stasis */
  59. struct ast_event *event;
  60. };
  61. /*! \brief Destructor for the \ref corosync_ping_payload wrapper object */
  62. static void corosync_ping_payload_dtor(void *obj)
  63. {
  64. struct corosync_ping_payload *payload = obj;
  65. ast_free(payload->event);
  66. }
  67. /*! \brief Convert a Corosync PING to a \ref ast_event */
  68. static struct ast_event *corosync_ping_to_event(struct stasis_message *message)
  69. {
  70. struct corosync_ping_payload *payload;
  71. struct ast_event *event;
  72. struct ast_eid *event_eid;
  73. if (!message) {
  74. return NULL;
  75. }
  76. payload = stasis_message_data(message);
  77. if (!payload->event) {
  78. return NULL;
  79. }
  80. event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID);
  81. event = ast_event_new(AST_EVENT_PING,
  82. AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
  83. AST_EVENT_IE_END);
  84. return event;
  85. }
  86. STASIS_MESSAGE_TYPE_DEFN_LOCAL(corosync_ping_message_type,
  87. .to_event = corosync_ping_to_event, );
  88. /*! \brief Publish a Corosync ping to \ref stasis */
  89. static void publish_corosync_ping_to_stasis(struct ast_event *event)
  90. {
  91. struct corosync_ping_payload *payload;
  92. struct stasis_message *message;
  93. ast_assert(ast_event_get_type(event) == AST_EVENT_PING);
  94. ast_assert(event != NULL);
  95. if (!corosync_ping_message_type()) {
  96. return;
  97. }
  98. payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload");
  99. if (!payload) {
  100. return;
  101. }
  102. payload->event = event;
  103. message = stasis_message_create(corosync_ping_message_type(), payload);
  104. if (!message) {
  105. ao2_t_ref(payload, -1, "Destroy payload on off nominal");
  106. return;
  107. }
  108. stasis_publish(corosync_topic(), message);
  109. ao2_t_ref(payload, -1, "Hand ref to stasis");
  110. ao2_t_ref(message, -1, "Hand ref to stasis");
  111. }
  112. static struct {
  113. const char *name;
  114. struct stasis_forward *sub;
  115. unsigned char publish;
  116. unsigned char publish_default;
  117. unsigned char subscribe;
  118. unsigned char subscribe_default;
  119. struct stasis_topic *(* topic_fn)(void);
  120. struct stasis_cache *(* cache_fn)(void);
  121. struct stasis_message_type *(* message_type_fn)(void);
  122. void (* publish_to_stasis)(struct ast_event *);
  123. } event_types[] = {
  124. [AST_EVENT_MWI] = { .name = "mwi",
  125. .topic_fn = ast_mwi_topic_all,
  126. .cache_fn = ast_mwi_state_cache,
  127. .message_type_fn = ast_mwi_state_type,
  128. .publish_to_stasis = publish_mwi_to_stasis, },
  129. [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state",
  130. .topic_fn = ast_device_state_topic_all,
  131. .cache_fn = ast_device_state_cache,
  132. .message_type_fn = ast_device_state_message_type,
  133. .publish_to_stasis = publish_device_state_to_stasis, },
  134. [AST_EVENT_PING] = { .name = "ping",
  135. .publish_default = 1,
  136. .subscribe_default = 1,
  137. .topic_fn = corosync_topic,
  138. .message_type_fn = corosync_ping_message_type,
  139. .publish_to_stasis = publish_corosync_ping_to_stasis, },
  140. };
  141. static struct {
  142. pthread_t id;
  143. int alert_pipe[2];
  144. unsigned int stop:1;
  145. } dispatch_thread = {
  146. .id = AST_PTHREADT_NULL,
  147. .alert_pipe = { -1, -1 },
  148. };
  149. static cpg_handle_t cpg_handle;
  150. static corosync_cfg_handle_t cfg_handle;
  151. #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
  152. static void cfg_state_track_cb(
  153. corosync_cfg_state_notification_buffer_t *notification_buffer,
  154. cs_error_t error);
  155. #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
  156. static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
  157. corosync_cfg_shutdown_flags_t flags);
  158. static corosync_cfg_callbacks_t cfg_callbacks = {
  159. #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
  160. .corosync_cfg_state_track_callback = cfg_state_track_cb,
  161. #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
  162. .corosync_cfg_shutdown_callback = cfg_shutdown_cb,
  163. };
  164. /*! \brief Publish a received MWI \ref ast_event to \ref stasis */
  165. static void publish_mwi_to_stasis(struct ast_event *event)
  166. {
  167. const char *mailbox;
  168. const char *context;
  169. unsigned int new_msgs;
  170. unsigned int old_msgs;
  171. struct ast_eid *event_eid;
  172. ast_assert(ast_event_get_type(event) == AST_EVENT_MWI);
  173. mailbox = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX);
  174. context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
  175. new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
  176. old_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS);
  177. event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
  178. if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) {
  179. return;
  180. }
  181. if (new_msgs > INT_MAX) {
  182. new_msgs = INT_MAX;
  183. }
  184. if (old_msgs > INT_MAX) {
  185. old_msgs = INT_MAX;
  186. }
  187. if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
  188. (int)old_msgs, NULL, event_eid)) {
  189. char eid[16];
  190. ast_eid_to_str(eid, sizeof(eid), event_eid);
  191. ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
  192. mailbox, context, eid);
  193. }
  194. }
  195. /*! \brief Publish a received device state \ref ast_event to \ref stasis */
  196. static void publish_device_state_to_stasis(struct ast_event *event)
  197. {
  198. const char *device;
  199. enum ast_device_state state;
  200. unsigned int cachable;
  201. struct ast_eid *event_eid;
  202. ast_assert(ast_event_get_type(event) == AST_EVENT_DEVICE_STATE_CHANGE);
  203. device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
  204. state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
  205. cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE);
  206. event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
  207. if (ast_strlen_zero(device)) {
  208. return;
  209. }
  210. if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
  211. char eid[16];
  212. ast_eid_to_str(eid, sizeof(eid), event_eid);
  213. ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
  214. device, eid);
  215. }
  216. }
  217. static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
  218. uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
  219. static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
  220. const struct cpg_address *member_list, size_t member_list_entries,
  221. const struct cpg_address *left_list, size_t left_list_entries,
  222. const struct cpg_address *joined_list, size_t joined_list_entries);
  223. static cpg_callbacks_t cpg_callbacks = {
  224. .cpg_deliver_fn = cpg_deliver_cb,
  225. .cpg_confchg_fn = cpg_confchg_cb,
  226. };
  227. #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
  228. static void cfg_state_track_cb(
  229. corosync_cfg_state_notification_buffer_t *notification_buffer,
  230. cs_error_t error)
  231. {
  232. }
  233. #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
  234. static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
  235. corosync_cfg_shutdown_flags_t flags)
  236. {
  237. }
  238. static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
  239. uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
  240. {
  241. struct ast_event *event;
  242. void (*publish_handler)(struct ast_event *) = NULL;
  243. enum ast_event_type event_type;
  244. if (msg_len < ast_event_minimum_length()) {
  245. ast_debug(1, "Ignoring event that's too small. %u < %u\n",
  246. (unsigned int) msg_len,
  247. (unsigned int) ast_event_minimum_length());
  248. return;
  249. }
  250. if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(msg, AST_EVENT_IE_EID))) {
  251. /* Don't feed events back in that originated locally. */
  252. return;
  253. }
  254. event_type = ast_event_get_type(msg);
  255. if (event_type > AST_EVENT_TOTAL) {
  256. /* Egads, we don't support this */
  257. return;
  258. }
  259. ast_rwlock_rdlock(&event_types_lock);
  260. publish_handler = event_types[event_type].publish_to_stasis;
  261. if (!event_types[event_type].subscribe || !publish_handler) {
  262. /* We are not configured to subscribe to these events or
  263. we have no way to publish it internally. */
  264. ast_rwlock_unlock(&event_types_lock);
  265. return;
  266. }
  267. ast_rwlock_unlock(&event_types_lock);
  268. if (!(event = ast_malloc(msg_len))) {
  269. return;
  270. }
  271. memcpy(event, msg, msg_len);
  272. if (event_type == AST_EVENT_PING) {
  273. const struct ast_eid *eid;
  274. char buf[128] = "";
  275. eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
  276. ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
  277. ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
  278. }
  279. ast_debug(5, "Publishing event %s (%u) to stasis\n",
  280. ast_event_get_type_name(event), event_type);
  281. publish_handler(event);
  282. }
  283. static void publish_to_corosync(struct stasis_message *message)
  284. {
  285. cs_error_t cs_err;
  286. struct iovec iov;
  287. struct ast_event *event;
  288. event = stasis_message_to_event(message);
  289. if (!event) {
  290. return;
  291. }
  292. if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
  293. /* If the event didn't originate from this server, don't send it back out. */
  294. ast_event_destroy(event);
  295. return;
  296. }
  297. if (ast_event_get_type(event) == AST_EVENT_PING) {
  298. const struct ast_eid *eid;
  299. char buf[128] = "";
  300. eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
  301. ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
  302. ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
  303. }
  304. iov.iov_base = (void *)event;
  305. iov.iov_len = ast_event_get_size(event);
  306. ast_debug(5, "Publishing event %s (%u) to corosync\n",
  307. ast_event_get_type_name(event), ast_event_get_type(event));
  308. /* The stasis subscription will only exist if we are configured to publish
  309. * these events, so just send away. */
  310. if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
  311. ast_log(LOG_WARNING, "CPG mcast failed (%u)\n", cs_err);
  312. }
  313. }
  314. static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
  315. {
  316. if (!message) {
  317. return;
  318. }
  319. publish_to_corosync(message);
  320. }
  321. static int dump_cache_cb(void *obj, void *arg, int flags)
  322. {
  323. struct stasis_message *message = obj;
  324. if (!message) {
  325. return 0;
  326. }
  327. publish_to_corosync(message);
  328. return 0;
  329. }
  330. static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
  331. const struct cpg_address *member_list, size_t member_list_entries,
  332. const struct cpg_address *left_list, size_t left_list_entries,
  333. const struct cpg_address *joined_list, size_t joined_list_entries)
  334. {
  335. unsigned int i;
  336. /* If any new nodes have joined, dump our cache of events we are publishing
  337. * that originated from this server. */
  338. if (!joined_list_entries) {
  339. return;
  340. }
  341. for (i = 0; i < ARRAY_LEN(event_types); i++) {
  342. struct ao2_container *messages;
  343. ast_rwlock_rdlock(&event_types_lock);
  344. if (!event_types[i].publish) {
  345. ast_rwlock_unlock(&event_types_lock);
  346. continue;
  347. }
  348. if (!event_types[i].cache_fn || !event_types[i].message_type_fn) {
  349. ast_rwlock_unlock(&event_types_lock);
  350. continue;
  351. }
  352. messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(),
  353. event_types[i].message_type_fn(),
  354. &ast_eid_default);
  355. ast_rwlock_unlock(&event_types_lock);
  356. ao2_callback(messages, OBJ_NODATA, dump_cache_cb, NULL);
  357. ao2_t_ref(messages, -1, "Dispose of dumped cache");
  358. }
  359. }
  360. static void *dispatch_thread_handler(void *data)
  361. {
  362. cs_error_t cs_err;
  363. struct pollfd pfd[3] = {
  364. { .events = POLLIN, },
  365. { .events = POLLIN, },
  366. { .events = POLLIN, },
  367. };
  368. if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
  369. ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n");
  370. return NULL;
  371. }
  372. if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
  373. ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n");
  374. return NULL;
  375. }
  376. pfd[2].fd = dispatch_thread.alert_pipe[0];
  377. while (!dispatch_thread.stop) {
  378. int res;
  379. cs_err = CS_OK;
  380. pfd[0].revents = 0;
  381. pfd[1].revents = 0;
  382. pfd[2].revents = 0;
  383. res = ast_poll(pfd, ARRAY_LEN(pfd), -1);
  384. if (res == -1 && errno != EINTR && errno != EAGAIN) {
  385. ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
  386. continue;
  387. }
  388. if (pfd[0].revents & POLLIN) {
  389. if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
  390. ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
  391. }
  392. }
  393. if (pfd[1].revents & POLLIN) {
  394. if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
  395. ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
  396. }
  397. }
  398. if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
  399. struct cpg_name name;
  400. /* If corosync gets restarted out from under Asterisk, try to recover. */
  401. ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n");
  402. if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
  403. ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
  404. sleep(5);
  405. continue;
  406. }
  407. if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
  408. ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
  409. sleep(5);
  410. continue;
  411. }
  412. if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
  413. ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
  414. sleep(5);
  415. continue;
  416. }
  417. if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
  418. ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
  419. sleep(5);
  420. continue;
  421. }
  422. ast_copy_string(name.value, "asterisk", sizeof(name.value));
  423. name.length = strlen(name.value);
  424. if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
  425. ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
  426. sleep(5);
  427. continue;
  428. }
  429. ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
  430. }
  431. }
  432. return NULL;
  433. }
  434. static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
  435. {
  436. cs_error_t cs_err;
  437. cpg_iteration_handle_t cpg_iter;
  438. struct cpg_iteration_description_t cpg_desc;
  439. unsigned int i;
  440. switch (cmd) {
  441. case CLI_INIT:
  442. e->command = "corosync show members";
  443. e->usage =
  444. "Usage: corosync show members\n"
  445. " Show corosync cluster members\n";
  446. return NULL;
  447. case CLI_GENERATE:
  448. return NULL; /* no completion */
  449. }
  450. if (a->argc != e->args) {
  451. return CLI_SHOWUSAGE;
  452. }
  453. cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
  454. if (cs_err != CS_OK) {
  455. ast_cli(a->fd, "Failed to initialize CPG iterator.\n");
  456. return CLI_FAILURE;
  457. }
  458. ast_cli(a->fd, "\n"
  459. "=============================================================\n"
  460. "=== Cluster members =========================================\n"
  461. "=============================================================\n"
  462. "===\n");
  463. for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
  464. cs_err == CS_OK;
  465. cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
  466. corosync_cfg_node_address_t addrs[8];
  467. int num_addrs = 0;
  468. unsigned int j;
  469. cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
  470. ARRAY_LEN(addrs), &num_addrs, addrs);
  471. if (cs_err != CS_OK) {
  472. ast_log(LOG_WARNING, "Failed to get node addresses\n");
  473. continue;
  474. }
  475. ast_cli(a->fd, "=== Node %u\n", i);
  476. ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
  477. for (j = 0; j < num_addrs; j++) {
  478. struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
  479. size_t sa_len = (size_t) addrs[j].address_length;
  480. char buf[128];
  481. getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
  482. ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
  483. }
  484. }
  485. ast_cli(a->fd, "===\n"
  486. "=============================================================\n"
  487. "\n");
  488. cpg_iteration_finalize(cpg_iter);
  489. return CLI_SUCCESS;
  490. }
  491. static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
  492. {
  493. struct ast_event *event;
  494. switch (cmd) {
  495. case CLI_INIT:
  496. e->command = "corosync ping";
  497. e->usage =
  498. "Usage: corosync ping\n"
  499. " Send a test ping to the cluster.\n"
  500. "A NOTICE will be in the log for every ping received\n"
  501. "on a server.\n If you send a ping, you should see a NOTICE\n"
  502. "in the log for every server in the cluster.\n";
  503. return NULL;
  504. case CLI_GENERATE:
  505. return NULL; /* no completion */
  506. }
  507. if (a->argc != e->args) {
  508. return CLI_SHOWUSAGE;
  509. }
  510. event = ast_event_new(AST_EVENT_PING, AST_EVENT_IE_END);
  511. if (!event) {
  512. return CLI_FAILURE;
  513. }
  514. ast_rwlock_rdlock(&event_types_lock);
  515. event_types[AST_EVENT_PING].publish_to_stasis(event);
  516. ast_rwlock_unlock(&event_types_lock);
  517. return CLI_SUCCESS;
  518. }
  519. static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
  520. {
  521. unsigned int i;
  522. switch (cmd) {
  523. case CLI_INIT:
  524. e->command = "corosync show config";
  525. e->usage =
  526. "Usage: corosync show config\n"
  527. " Show configuration loaded from res_corosync.conf\n";
  528. return NULL;
  529. case CLI_GENERATE:
  530. return NULL; /* no completion */
  531. }
  532. if (a->argc != e->args) {
  533. return CLI_SHOWUSAGE;
  534. }
  535. ast_cli(a->fd, "\n"
  536. "=============================================================\n"
  537. "=== res_corosync config =====================================\n"
  538. "=============================================================\n"
  539. "===\n");
  540. ast_rwlock_rdlock(&event_types_lock);
  541. for (i = 0; i < ARRAY_LEN(event_types); i++) {
  542. if (event_types[i].publish) {
  543. ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
  544. event_types[i].name);
  545. }
  546. if (event_types[i].subscribe) {
  547. ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
  548. event_types[i].name);
  549. }
  550. }
  551. ast_rwlock_unlock(&event_types_lock);
  552. ast_cli(a->fd, "===\n"
  553. "=============================================================\n"
  554. "\n");
  555. return CLI_SUCCESS;
  556. }
  557. static struct ast_cli_entry corosync_cli[] = {
  558. AST_CLI_DEFINE(corosync_show_config, "Show configuration"),
  559. AST_CLI_DEFINE(corosync_show_members, "Show cluster members"),
  560. AST_CLI_DEFINE(corosync_ping, "Send a test ping to the cluster"),
  561. };
  562. enum {
  563. PUBLISH,
  564. SUBSCRIBE,
  565. };
  566. static int set_event(const char *event_type, int pubsub)
  567. {
  568. unsigned int i;
  569. for (i = 0; i < ARRAY_LEN(event_types); i++) {
  570. if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
  571. continue;
  572. }
  573. switch (pubsub) {
  574. case PUBLISH:
  575. event_types[i].publish = 1;
  576. break;
  577. case SUBSCRIBE:
  578. event_types[i].subscribe = 1;
  579. break;
  580. }
  581. break;
  582. }
  583. return (i == ARRAY_LEN(event_types)) ? -1 : 0;
  584. }
  585. static int load_general_config(struct ast_config *cfg)
  586. {
  587. struct ast_variable *v;
  588. int res = 0;
  589. unsigned int i;
  590. ast_rwlock_wrlock(&event_types_lock);
  591. for (i = 0; i < ARRAY_LEN(event_types); i++) {
  592. event_types[i].publish = event_types[i].publish_default;
  593. event_types[i].subscribe = event_types[i].subscribe_default;
  594. }
  595. for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) {
  596. if (!strcasecmp(v->name, "publish_event")) {
  597. res = set_event(v->value, PUBLISH);
  598. } else if (!strcasecmp(v->name, "subscribe_event")) {
  599. res = set_event(v->value, SUBSCRIBE);
  600. } else {
  601. ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name);
  602. }
  603. }
  604. for (i = 0; i < ARRAY_LEN(event_types); i++) {
  605. if (event_types[i].publish && !event_types[i].sub) {
  606. event_types[i].sub = stasis_forward_all(event_types[i].topic_fn(),
  607. corosync_topic());
  608. stasis_message_router_add(stasis_router,
  609. event_types[i].message_type_fn(),
  610. stasis_message_cb,
  611. NULL);
  612. } else if (!event_types[i].publish && event_types[i].sub) {
  613. event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
  614. stasis_message_router_remove(stasis_router,
  615. event_types[i].message_type_fn());
  616. }
  617. }
  618. ast_rwlock_unlock(&event_types_lock);
  619. return res;
  620. }
  621. static int load_config(unsigned int reload)
  622. {
  623. static const char filename[] = "res_corosync.conf";
  624. struct ast_config *cfg;
  625. const char *cat = NULL;
  626. struct ast_flags config_flags = { 0 };
  627. int res = 0;
  628. cfg = ast_config_load(filename, config_flags);
  629. if (cfg == CONFIG_STATUS_FILEMISSING || cfg == CONFIG_STATUS_FILEINVALID) {
  630. return -1;
  631. }
  632. while ((cat = ast_category_browse(cfg, cat))) {
  633. if (!strcasecmp(cat, "general")) {
  634. res = load_general_config(cfg);
  635. } else {
  636. ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat);
  637. }
  638. }
  639. ast_config_destroy(cfg);
  640. return res;
  641. }
  642. static void cleanup_module(void)
  643. {
  644. cs_error_t cs_err;
  645. unsigned int i;
  646. if (stasis_router) {
  647. /* Unsubscribe all topic forwards and cancel all message routes */
  648. ast_rwlock_wrlock(&event_types_lock);
  649. for (i = 0; i < ARRAY_LEN(event_types); i++) {
  650. if (event_types[i].sub) {
  651. event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
  652. stasis_message_router_remove(stasis_router,
  653. event_types[i].message_type_fn());
  654. }
  655. event_types[i].publish = 0;
  656. event_types[i].subscribe = 0;
  657. }
  658. ast_rwlock_unlock(&event_types_lock);
  659. stasis_message_router_unsubscribe_and_join(stasis_router);
  660. stasis_router = NULL;
  661. }
  662. if (corosync_aggregate_topic) {
  663. ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup");
  664. corosync_aggregate_topic = NULL;
  665. }
  666. STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type);
  667. if (dispatch_thread.id != AST_PTHREADT_NULL) {
  668. char meepmeep = 'x';
  669. dispatch_thread.stop = 1;
  670. if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1,
  671. 5000) == -1) {
  672. ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
  673. strerror(errno), errno);
  674. }
  675. pthread_join(dispatch_thread.id, NULL);
  676. }
  677. if (dispatch_thread.alert_pipe[0] != -1) {
  678. close(dispatch_thread.alert_pipe[0]);
  679. dispatch_thread.alert_pipe[0] = -1;
  680. }
  681. if (dispatch_thread.alert_pipe[1] != -1) {
  682. close(dispatch_thread.alert_pipe[1]);
  683. dispatch_thread.alert_pipe[1] = -1;
  684. }
  685. if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
  686. ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
  687. }
  688. cpg_handle = 0;
  689. if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
  690. ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
  691. }
  692. cfg_handle = 0;
  693. }
  694. static int load_module(void)
  695. {
  696. cs_error_t cs_err;
  697. enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE;
  698. struct cpg_name name;
  699. corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
  700. if (!corosync_aggregate_topic) {
  701. ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
  702. goto failed;
  703. }
  704. stasis_router = stasis_message_router_create(corosync_aggregate_topic);
  705. if (!stasis_router) {
  706. ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
  707. goto failed;
  708. }
  709. if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
  710. ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
  711. goto failed;
  712. }
  713. if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
  714. ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
  715. goto failed;
  716. }
  717. if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
  718. ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
  719. goto failed;
  720. }
  721. ast_copy_string(name.value, "asterisk", sizeof(name.value));
  722. name.length = strlen(name.value);
  723. if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
  724. ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
  725. goto failed;
  726. }
  727. if (pipe(dispatch_thread.alert_pipe) == -1) {
  728. ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
  729. strerror(errno), errno);
  730. goto failed;
  731. }
  732. if (ast_pthread_create_background(&dispatch_thread.id, NULL,
  733. dispatch_thread_handler, NULL)) {
  734. ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
  735. goto failed;
  736. }
  737. if (load_config(0)) {
  738. /* simply not configured is not a fatal error */
  739. res = AST_MODULE_LOAD_DECLINE;
  740. goto failed;
  741. }
  742. ast_cli_register_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
  743. return AST_MODULE_LOAD_SUCCESS;
  744. failed:
  745. cleanup_module();
  746. return res;
  747. }
  748. static int unload_module(void)
  749. {
  750. ast_cli_unregister_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
  751. cleanup_module();
  752. return 0;
  753. }
  754. AST_MODULE_INFO_STANDARD_EXTENDED(ASTERISK_GPL_KEY, "Corosync");