bridge_multiplexed.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2008, Digium, Inc.
  5. *
  6. * Joshua Colp <jcolp@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Two channel bridging module which groups bridges into batches of threads
  21. *
  22. * \author Joshua Colp <jcolp@digium.com>
  23. *
  24. * \ingroup bridges
  25. */
  26. /*** MODULEINFO
  27. <support_level>core</support_level>
  28. ***/
  29. #include "asterisk.h"
  30. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  31. #include <stdio.h>
  32. #include <stdlib.h>
  33. #include <string.h>
  34. #include <sys/types.h>
  35. #include <sys/stat.h>
  36. #include <fcntl.h>
  37. #include "asterisk/module.h"
  38. #include "asterisk/channel.h"
  39. #include "asterisk/bridging.h"
  40. #include "asterisk/bridging_technology.h"
  41. #include "asterisk/frame.h"
  42. #include "asterisk/astobj2.h"
  43. /*! \brief Number of buckets our multiplexed thread container can have */
  44. #define MULTIPLEXED_BUCKETS 53
  45. /*! \brief Number of channels we handle in a single thread */
  46. #define MULTIPLEXED_MAX_CHANNELS 8
  47. /*! \brief Structure which represents a single thread handling multiple 2 channel bridges */
  48. struct multiplexed_thread {
  49. /*! Thread itself */
  50. pthread_t thread;
  51. /*! Pipe used to wake up the multiplexed thread */
  52. int pipe[2];
  53. /*! Channels in this thread */
  54. struct ast_channel *chans[MULTIPLEXED_MAX_CHANNELS];
  55. /*! Number of channels in this thread */
  56. unsigned int count;
  57. /*! Bit used to indicate that the thread is waiting on channels */
  58. unsigned int waiting:1;
  59. /*! Number of channels actually being serviced by this thread */
  60. unsigned int service_count;
  61. };
  62. /*! \brief Container of all operating multiplexed threads */
  63. static struct ao2_container *multiplexed_threads;
  64. /*! \brief Callback function for finding a free multiplexed thread */
  65. static int find_multiplexed_thread(void *obj, void *arg, int flags)
  66. {
  67. struct multiplexed_thread *multiplexed_thread = obj;
  68. return (multiplexed_thread->count <= (MULTIPLEXED_MAX_CHANNELS - 2)) ? CMP_MATCH | CMP_STOP : 0;
  69. }
  70. /*! \brief Destroy callback for a multiplexed thread structure */
  71. static void destroy_multiplexed_thread(void *obj)
  72. {
  73. struct multiplexed_thread *multiplexed_thread = obj;
  74. if (multiplexed_thread->pipe[0] > -1) {
  75. close(multiplexed_thread->pipe[0]);
  76. }
  77. if (multiplexed_thread->pipe[1] > -1) {
  78. close(multiplexed_thread->pipe[1]);
  79. }
  80. return;
  81. }
  82. /*! \brief Create function which finds/reserves/references a multiplexed thread structure */
  83. static int multiplexed_bridge_create(struct ast_bridge *bridge)
  84. {
  85. struct multiplexed_thread *multiplexed_thread;
  86. ao2_lock(multiplexed_threads);
  87. /* Try to find an existing thread to handle our additional channels */
  88. if (!(multiplexed_thread = ao2_callback(multiplexed_threads, 0, find_multiplexed_thread, NULL))) {
  89. int flags;
  90. /* If we failed we will have to create a new one from scratch */
  91. if (!(multiplexed_thread = ao2_alloc(sizeof(*multiplexed_thread), destroy_multiplexed_thread))) {
  92. ast_debug(1, "Failed to find or create a new multiplexed thread for bridge '%p'\n", bridge);
  93. ao2_unlock(multiplexed_threads);
  94. return -1;
  95. }
  96. multiplexed_thread->pipe[0] = multiplexed_thread->pipe[1] = -1;
  97. /* Setup a pipe so we can poke the thread itself when needed */
  98. if (pipe(multiplexed_thread->pipe)) {
  99. ast_debug(1, "Failed to create a pipe for poking a multiplexed thread for bridge '%p'\n", bridge);
  100. ao2_ref(multiplexed_thread, -1);
  101. ao2_unlock(multiplexed_threads);
  102. return -1;
  103. }
  104. /* Setup each pipe for non-blocking operation */
  105. flags = fcntl(multiplexed_thread->pipe[0], F_GETFL);
  106. if (fcntl(multiplexed_thread->pipe[0], F_SETFL, flags | O_NONBLOCK) < 0) {
  107. ast_log(LOG_WARNING, "Failed to setup first nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
  108. ao2_ref(multiplexed_thread, -1);
  109. ao2_unlock(multiplexed_threads);
  110. return -1;
  111. }
  112. flags = fcntl(multiplexed_thread->pipe[1], F_GETFL);
  113. if (fcntl(multiplexed_thread->pipe[1], F_SETFL, flags | O_NONBLOCK) < 0) {
  114. ast_log(LOG_WARNING, "Failed to setup second nudge pipe for non-blocking operation on %p (%d: %s)\n", bridge, errno, strerror(errno));
  115. ao2_ref(multiplexed_thread, -1);
  116. ao2_unlock(multiplexed_threads);
  117. return -1;
  118. }
  119. /* Set up default parameters */
  120. multiplexed_thread->thread = AST_PTHREADT_NULL;
  121. /* Finally link us into the container so others may find us */
  122. ao2_link(multiplexed_threads, multiplexed_thread);
  123. ast_debug(1, "Created multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
  124. } else {
  125. ast_debug(1, "Found multiplexed thread '%p' for bridge '%p'\n", multiplexed_thread, bridge);
  126. }
  127. /* Bump the count of the thread structure up by two since the channels for this bridge will be joining shortly */
  128. multiplexed_thread->count += 2;
  129. ao2_unlock(multiplexed_threads);
  130. bridge->bridge_pvt = multiplexed_thread;
  131. return 0;
  132. }
  133. /*! \brief Internal function which nudges the thread */
  134. static void multiplexed_nudge(struct multiplexed_thread *multiplexed_thread)
  135. {
  136. int nudge = 0;
  137. if (multiplexed_thread->thread == AST_PTHREADT_NULL) {
  138. return;
  139. }
  140. if (write(multiplexed_thread->pipe[1], &nudge, sizeof(nudge)) != sizeof(nudge)) {
  141. ast_log(LOG_ERROR, "We couldn't poke multiplexed thread '%p'... something is VERY wrong\n", multiplexed_thread);
  142. }
  143. while (multiplexed_thread->waiting) {
  144. sched_yield();
  145. }
  146. return;
  147. }
  148. /*! \brief Destroy function which unreserves/unreferences/removes a multiplexed thread structure */
  149. static int multiplexed_bridge_destroy(struct ast_bridge *bridge)
  150. {
  151. struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
  152. ao2_lock(multiplexed_threads);
  153. multiplexed_thread->count -= 2;
  154. if (!multiplexed_thread->count) {
  155. ast_debug(1, "Unlinking multiplexed thread '%p' since nobody is using it anymore\n", multiplexed_thread);
  156. ao2_unlink(multiplexed_threads, multiplexed_thread);
  157. }
  158. multiplexed_nudge(multiplexed_thread);
  159. ao2_unlock(multiplexed_threads);
  160. ao2_ref(multiplexed_thread, -1);
  161. return 0;
  162. }
  163. /*! \brief Thread function that executes for multiplexed threads */
  164. static void *multiplexed_thread_function(void *data)
  165. {
  166. struct multiplexed_thread *multiplexed_thread = data;
  167. int fds = multiplexed_thread->pipe[0];
  168. ao2_lock(multiplexed_thread);
  169. ast_debug(1, "Starting actual thread for multiplexed thread '%p'\n", multiplexed_thread);
  170. while (multiplexed_thread->thread != AST_PTHREADT_STOP) {
  171. struct ast_channel *winner = NULL, *first = multiplexed_thread->chans[0];
  172. int to = -1, outfd = -1;
  173. /* Move channels around so not just the first one gets priority */
  174. memmove(multiplexed_thread->chans, multiplexed_thread->chans + 1, sizeof(struct ast_channel *) * (multiplexed_thread->service_count - 1));
  175. multiplexed_thread->chans[multiplexed_thread->service_count - 1] = first;
  176. multiplexed_thread->waiting = 1;
  177. ao2_unlock(multiplexed_thread);
  178. winner = ast_waitfor_nandfds(multiplexed_thread->chans, multiplexed_thread->service_count, &fds, 1, NULL, &outfd, &to);
  179. multiplexed_thread->waiting = 0;
  180. ao2_lock(multiplexed_thread);
  181. if (multiplexed_thread->thread == AST_PTHREADT_STOP) {
  182. break;
  183. }
  184. if (outfd > -1) {
  185. int nudge;
  186. if (read(multiplexed_thread->pipe[0], &nudge, sizeof(nudge)) < 0) {
  187. if (errno != EINTR && errno != EAGAIN) {
  188. ast_log(LOG_WARNING, "read() failed for pipe on multiplexed thread '%p': %s\n", multiplexed_thread, strerror(errno));
  189. }
  190. }
  191. }
  192. if (winner && ast_channel_internal_bridge(winner)) {
  193. struct ast_bridge *bridge = ast_channel_internal_bridge(winner);
  194. int stop = 0;
  195. ao2_unlock(multiplexed_thread);
  196. while ((bridge = ast_channel_internal_bridge(winner)) && ao2_trylock(bridge)) {
  197. sched_yield();
  198. if (multiplexed_thread->thread == AST_PTHREADT_STOP) {
  199. stop = 1;
  200. break;
  201. }
  202. }
  203. if (!stop && bridge) {
  204. ast_bridge_handle_trip(bridge, NULL, winner, -1);
  205. ao2_unlock(bridge);
  206. }
  207. ao2_lock(multiplexed_thread);
  208. }
  209. }
  210. multiplexed_thread->thread = AST_PTHREADT_NULL;
  211. ast_debug(1, "Stopping actual thread for multiplexed thread '%p'\n", multiplexed_thread);
  212. ao2_unlock(multiplexed_thread);
  213. ao2_ref(multiplexed_thread, -1);
  214. return NULL;
  215. }
  216. /*! \brief Helper function which adds or removes a channel and nudges the thread */
  217. static void multiplexed_add_or_remove(struct multiplexed_thread *multiplexed_thread, struct ast_channel *chan, int add)
  218. {
  219. int i, removed = 0;
  220. pthread_t thread = AST_PTHREADT_NULL;
  221. ao2_lock(multiplexed_thread);
  222. multiplexed_nudge(multiplexed_thread);
  223. for (i = 0; i < MULTIPLEXED_MAX_CHANNELS; i++) {
  224. if (multiplexed_thread->chans[i] == chan) {
  225. if (!add) {
  226. multiplexed_thread->chans[i] = NULL;
  227. multiplexed_thread->service_count--;
  228. removed = 1;
  229. }
  230. break;
  231. } else if (!multiplexed_thread->chans[i] && add) {
  232. multiplexed_thread->chans[i] = chan;
  233. multiplexed_thread->service_count++;
  234. break;
  235. }
  236. }
  237. if (multiplexed_thread->service_count && multiplexed_thread->thread == AST_PTHREADT_NULL) {
  238. ao2_ref(multiplexed_thread, +1);
  239. if (ast_pthread_create(&multiplexed_thread->thread, NULL, multiplexed_thread_function, multiplexed_thread)) {
  240. ao2_ref(multiplexed_thread, -1);
  241. ast_debug(1, "Failed to create an actual thread for multiplexed thread '%p', trying next time\n", multiplexed_thread);
  242. }
  243. } else if (!multiplexed_thread->service_count && multiplexed_thread->thread != AST_PTHREADT_NULL) {
  244. thread = multiplexed_thread->thread;
  245. multiplexed_thread->thread = AST_PTHREADT_STOP;
  246. } else if (!add && removed) {
  247. memmove(multiplexed_thread->chans + i, multiplexed_thread->chans + i + 1, sizeof(struct ast_channel *) * (MULTIPLEXED_MAX_CHANNELS - (i + 1)));
  248. }
  249. ao2_unlock(multiplexed_thread);
  250. if (thread != AST_PTHREADT_NULL) {
  251. pthread_join(thread, NULL);
  252. }
  253. return;
  254. }
  255. /*! \brief Join function which actually adds the channel into the array to be monitored */
  256. static int multiplexed_bridge_join(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
  257. {
  258. struct ast_channel *c0 = AST_LIST_FIRST(&bridge->channels)->chan, *c1 = AST_LIST_LAST(&bridge->channels)->chan;
  259. struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
  260. ast_debug(1, "Adding channel '%s' to multiplexed thread '%p' for monitoring\n", ast_channel_name(bridge_channel->chan), multiplexed_thread);
  261. multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
  262. /* If the second channel has not yet joined do not make things compatible */
  263. if (c0 == c1) {
  264. return 0;
  265. }
  266. if ((ast_format_cmp(ast_channel_writeformat(c0), ast_channel_readformat(c1)) == AST_FORMAT_CMP_EQUAL) &&
  267. (ast_format_cmp(ast_channel_readformat(c0), ast_channel_writeformat(c1)) == AST_FORMAT_CMP_EQUAL) &&
  268. (ast_format_cap_identical(ast_channel_nativeformats(c0), ast_channel_nativeformats(c1)))) {
  269. return 0;
  270. }
  271. return ast_channel_make_compatible(c0, c1);
  272. }
  273. /*! \brief Leave function which actually removes the channel from the array */
  274. static int multiplexed_bridge_leave(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
  275. {
  276. struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
  277. ast_debug(1, "Removing channel '%s' from multiplexed thread '%p'\n", ast_channel_name(bridge_channel->chan), multiplexed_thread);
  278. multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
  279. return 0;
  280. }
  281. /*! \brief Suspend function which means control of the channel is going elsewhere */
  282. static void multiplexed_bridge_suspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
  283. {
  284. struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
  285. ast_debug(1, "Suspending channel '%s' from multiplexed thread '%p'\n", ast_channel_name(bridge_channel->chan), multiplexed_thread);
  286. multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 0);
  287. return;
  288. }
  289. /*! \brief Unsuspend function which means control of the channel is coming back to us */
  290. static void multiplexed_bridge_unsuspend(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel)
  291. {
  292. struct multiplexed_thread *multiplexed_thread = bridge->bridge_pvt;
  293. ast_debug(1, "Unsuspending channel '%s' from multiplexed thread '%p'\n", ast_channel_name(bridge_channel->chan), multiplexed_thread);
  294. multiplexed_add_or_remove(multiplexed_thread, bridge_channel->chan, 1);
  295. return;
  296. }
  297. /*! \brief Write function for writing frames into the bridge */
  298. static enum ast_bridge_write_result multiplexed_bridge_write(struct ast_bridge *bridge, struct ast_bridge_channel *bridge_channel, struct ast_frame *frame)
  299. {
  300. struct ast_bridge_channel *other;
  301. if (AST_LIST_FIRST(&bridge->channels) == AST_LIST_LAST(&bridge->channels)) {
  302. return AST_BRIDGE_WRITE_FAILED;
  303. }
  304. if (!(other = (AST_LIST_FIRST(&bridge->channels) == bridge_channel ? AST_LIST_LAST(&bridge->channels) : AST_LIST_FIRST(&bridge->channels)))) {
  305. return AST_BRIDGE_WRITE_FAILED;
  306. }
  307. if (other->state == AST_BRIDGE_CHANNEL_STATE_WAIT) {
  308. ast_write(other->chan, frame);
  309. }
  310. return AST_BRIDGE_WRITE_SUCCESS;
  311. }
  312. static struct ast_bridge_technology multiplexed_bridge = {
  313. .name = "multiplexed_bridge",
  314. .capabilities = AST_BRIDGE_CAPABILITY_1TO1MIX,
  315. .preference = AST_BRIDGE_PREFERENCE_HIGH,
  316. .create = multiplexed_bridge_create,
  317. .destroy = multiplexed_bridge_destroy,
  318. .join = multiplexed_bridge_join,
  319. .leave = multiplexed_bridge_leave,
  320. .suspend = multiplexed_bridge_suspend,
  321. .unsuspend = multiplexed_bridge_unsuspend,
  322. .write = multiplexed_bridge_write,
  323. };
  324. static int unload_module(void)
  325. {
  326. int res = ast_bridge_technology_unregister(&multiplexed_bridge);
  327. ao2_ref(multiplexed_threads, -1);
  328. multiplexed_bridge.format_capabilities = ast_format_cap_destroy(multiplexed_bridge.format_capabilities);
  329. return res;
  330. }
  331. static int load_module(void)
  332. {
  333. if (!(multiplexed_threads = ao2_container_alloc(MULTIPLEXED_BUCKETS, NULL, NULL))) {
  334. return AST_MODULE_LOAD_DECLINE;
  335. }
  336. if (!(multiplexed_bridge.format_capabilities = ast_format_cap_alloc())) {
  337. return AST_MODULE_LOAD_DECLINE;
  338. }
  339. ast_format_cap_add_all_by_type(multiplexed_bridge.format_capabilities, AST_FORMAT_TYPE_AUDIO);
  340. ast_format_cap_add_all_by_type(multiplexed_bridge.format_capabilities, AST_FORMAT_TYPE_VIDEO);
  341. ast_format_cap_add_all_by_type(multiplexed_bridge.format_capabilities, AST_FORMAT_TYPE_TEXT);
  342. return ast_bridge_technology_register(&multiplexed_bridge);
  343. }
  344. AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Multiplexed two channel bridging module");