res_stasis_device_state.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * Kevin Harwell <kharwell@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*** MODULEINFO
  19. <depend type="module">res_stasis</depend>
  20. <support_level>core</support_level>
  21. ***/
  22. #include "asterisk.h"
  23. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  24. #include "asterisk/astdb.h"
  25. #include "asterisk/astobj2.h"
  26. #include "asterisk/module.h"
  27. #include "asterisk/stasis_app_impl.h"
  28. #include "asterisk/stasis_app_device_state.h"
  29. #define DEVICE_STATE_SIZE 64
  30. /*! astdb family name */
  31. #define DEVICE_STATE_FAMILY "StasisDeviceState"
  32. /*! Stasis device state provider */
  33. #define DEVICE_STATE_PROVIDER_STASIS "Stasis"
  34. /*! Scheme for custom device states */
  35. #define DEVICE_STATE_SCHEME_STASIS "Stasis:"
  36. /*! Scheme for device state subscriptions */
  37. #define DEVICE_STATE_SCHEME_SUB "deviceState:"
  38. /*! Number of hash buckets for device state subscriptions */
  39. #define DEVICE_STATE_BUCKETS 37
  40. /*! Container for subscribed device states */
  41. static struct ao2_container *device_state_subscriptions;
  42. /*!
  43. * \brief Device state subscription object.
  44. */
  45. struct device_state_subscription {
  46. AST_DECLARE_STRING_FIELDS(
  47. AST_STRING_FIELD(app_name);
  48. AST_STRING_FIELD(device_name);
  49. );
  50. /*! The subscription object */
  51. struct stasis_subscription *sub;
  52. };
  53. static int device_state_subscriptions_hash(const void *obj, const int flags)
  54. {
  55. const struct device_state_subscription *object;
  56. switch (flags & OBJ_SEARCH_MASK) {
  57. case OBJ_SEARCH_OBJECT:
  58. object = obj;
  59. return ast_str_hash(object->device_name);
  60. case OBJ_SEARCH_KEY:
  61. default:
  62. /* Hash can only work on something with a full key. */
  63. ast_assert(0);
  64. return 0;
  65. }
  66. }
  67. static int device_state_subscriptions_cmp(void *obj, void *arg, int flags)
  68. {
  69. const struct device_state_subscription *object_left = obj;
  70. const struct device_state_subscription *object_right = arg;
  71. int cmp;
  72. switch (flags & OBJ_SEARCH_MASK) {
  73. case OBJ_SEARCH_OBJECT:
  74. /* find objects matching both device and app names */
  75. if (strcmp(object_left->device_name,
  76. object_right->device_name)) {
  77. return 0;
  78. }
  79. cmp = strcmp(object_left->app_name, object_right->app_name);
  80. break;
  81. case OBJ_SEARCH_KEY:
  82. case OBJ_SEARCH_PARTIAL_KEY:
  83. ast_assert(0); /* not supported by container */
  84. /* fall through */
  85. default:
  86. cmp = 0;
  87. break;
  88. }
  89. return cmp ? 0 : CMP_MATCH | CMP_STOP;
  90. }
  91. static void device_state_subscription_destroy(void *obj)
  92. {
  93. struct device_state_subscription *sub = obj;
  94. sub->sub = stasis_unsubscribe(sub->sub);
  95. ast_string_field_free_memory(sub);
  96. }
  97. static struct device_state_subscription *device_state_subscription_create(
  98. const struct stasis_app *app, const char *device_name)
  99. {
  100. struct device_state_subscription *sub = ao2_alloc(
  101. sizeof(*sub), device_state_subscription_destroy);
  102. const char *app_name = stasis_app_name(app);
  103. size_t size = strlen(device_name) + strlen(app_name) + 2;
  104. if (!sub) {
  105. return NULL;
  106. }
  107. if (ast_string_field_init(sub, size)) {
  108. ao2_ref(sub, -1);
  109. return NULL;
  110. }
  111. ast_string_field_set(sub, app_name, app_name);
  112. ast_string_field_set(sub, device_name, device_name);
  113. return sub;
  114. }
  115. static struct device_state_subscription *find_device_state_subscription(
  116. struct stasis_app *app, const char *name)
  117. {
  118. struct device_state_subscription dummy_sub = {
  119. .app_name = stasis_app_name(app),
  120. .device_name = name
  121. };
  122. return ao2_find(device_state_subscriptions, &dummy_sub, OBJ_SEARCH_OBJECT);
  123. }
  124. static void remove_device_state_subscription(
  125. struct device_state_subscription *sub)
  126. {
  127. ao2_unlink(device_state_subscriptions, sub);
  128. }
  129. struct ast_json *stasis_app_device_state_to_json(
  130. const char *name, enum ast_device_state state)
  131. {
  132. return ast_json_pack("{s: s, s: s}",
  133. "name", name,
  134. "state", ast_devstate_str(state));
  135. }
  136. struct ast_json *stasis_app_device_states_to_json(void)
  137. {
  138. struct ast_json *array = ast_json_array_create();
  139. RAII_VAR(struct ast_db_entry *, tree,
  140. ast_db_gettree(DEVICE_STATE_FAMILY, NULL), ast_db_freetree);
  141. struct ast_db_entry *entry;
  142. for (entry = tree; entry; entry = entry->next) {
  143. const char *name = strrchr(entry->key, '/');
  144. if (!ast_strlen_zero(name)) {
  145. struct ast_str *device = ast_str_alloca(DEVICE_STATE_SIZE);
  146. ast_str_set(&device, 0, "%s%s",
  147. DEVICE_STATE_SCHEME_STASIS, ++name);
  148. ast_json_array_append(
  149. array, stasis_app_device_state_to_json(
  150. ast_str_buffer(device),
  151. ast_device_state(ast_str_buffer(device))));
  152. }
  153. }
  154. return array;
  155. }
  156. static void send_device_state(struct device_state_subscription *sub,
  157. const char *name, enum ast_device_state state)
  158. {
  159. RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
  160. json = ast_json_pack("{s:s, s:s, s:o, s:o}",
  161. "type", "DeviceStateChanged",
  162. "application", sub->app_name,
  163. "timestamp", ast_json_timeval(ast_tvnow(), NULL),
  164. "device_state", stasis_app_device_state_to_json(
  165. name, state));
  166. if (!json) {
  167. ast_log(LOG_ERROR, "Unable to create device state json object\n");
  168. return;
  169. }
  170. stasis_app_send(sub->app_name, json);
  171. }
  172. enum stasis_device_state_result stasis_app_device_state_update(
  173. const char *name, const char *value)
  174. {
  175. size_t size = strlen(DEVICE_STATE_SCHEME_STASIS);
  176. enum ast_device_state state;
  177. ast_debug(3, "Updating device name = %s, value = %s", name, value);
  178. if (strncasecmp(name, DEVICE_STATE_SCHEME_STASIS, size)) {
  179. ast_log(LOG_ERROR, "Update can only be used to set "
  180. "'%s' device state!\n", DEVICE_STATE_SCHEME_STASIS);
  181. return STASIS_DEVICE_STATE_NOT_CONTROLLED;
  182. }
  183. name += size;
  184. if (ast_strlen_zero(name)) {
  185. ast_log(LOG_ERROR, "Update requires custom device name!\n");
  186. return STASIS_DEVICE_STATE_MISSING;
  187. }
  188. if (!value || (state = ast_devstate_val(value)) == AST_DEVICE_UNKNOWN) {
  189. ast_log(LOG_ERROR, "Unknown device state "
  190. "value '%s'\n", value);
  191. return STASIS_DEVICE_STATE_UNKNOWN;
  192. }
  193. ast_db_put(DEVICE_STATE_FAMILY, name, value);
  194. ast_devstate_changed(state, AST_DEVSTATE_CACHABLE, "%s%s",
  195. DEVICE_STATE_SCHEME_STASIS, name);
  196. return STASIS_DEVICE_STATE_OK;
  197. }
  198. enum stasis_device_state_result stasis_app_device_state_delete(const char *name)
  199. {
  200. const char *full_name = name;
  201. size_t size = strlen(DEVICE_STATE_SCHEME_STASIS);
  202. if (strncasecmp(name, DEVICE_STATE_SCHEME_STASIS, size)) {
  203. ast_log(LOG_ERROR, "Can only delete '%s' device states!\n",
  204. DEVICE_STATE_SCHEME_STASIS);
  205. return STASIS_DEVICE_STATE_NOT_CONTROLLED;
  206. }
  207. name += size;
  208. if (ast_strlen_zero(name)) {
  209. ast_log(LOG_ERROR, "Delete requires a device name!\n");
  210. return STASIS_DEVICE_STATE_MISSING;
  211. }
  212. if (ast_device_state_clear_cache(full_name)) {
  213. return STASIS_DEVICE_STATE_UNKNOWN;
  214. }
  215. ast_db_del(DEVICE_STATE_FAMILY, name);
  216. /* send state change for delete */
  217. ast_devstate_changed(
  218. AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "%s%s",
  219. DEVICE_STATE_SCHEME_STASIS, name);
  220. return STASIS_DEVICE_STATE_OK;
  221. }
  222. static void populate_cache(void)
  223. {
  224. RAII_VAR(struct ast_db_entry *, tree,
  225. ast_db_gettree(DEVICE_STATE_FAMILY, NULL), ast_db_freetree);
  226. struct ast_db_entry *entry;
  227. for (entry = tree; entry; entry = entry->next) {
  228. const char *name = strrchr(entry->key, '/');
  229. if (!ast_strlen_zero(name)) {
  230. ast_devstate_changed(
  231. ast_devstate_val(entry->data),
  232. AST_DEVSTATE_CACHABLE, "%s%s\n",
  233. DEVICE_STATE_SCHEME_STASIS, name + 1);
  234. }
  235. }
  236. }
  237. static enum ast_device_state stasis_device_state_cb(const char *data)
  238. {
  239. char buf[DEVICE_STATE_SIZE] = "";
  240. ast_db_get(DEVICE_STATE_FAMILY, data, buf, sizeof(buf));
  241. return ast_devstate_val(buf);
  242. }
  243. static void device_state_cb(void *data, struct stasis_subscription *sub,
  244. struct stasis_message *msg)
  245. {
  246. struct ast_device_state_message *device_state;
  247. if (ast_device_state_message_type() != stasis_message_type(msg)) {
  248. return;
  249. }
  250. device_state = stasis_message_data(msg);
  251. if (device_state->eid) {
  252. /* ignore non-aggregate states */
  253. return;
  254. }
  255. send_device_state(data, device_state->device, device_state->state);
  256. }
  257. static void *find_device_state(const struct stasis_app *app, const char *name)
  258. {
  259. return device_state_subscription_create(app, name);
  260. }
  261. static int is_subscribed_device_state(struct stasis_app *app, const char *name)
  262. {
  263. RAII_VAR(struct device_state_subscription *, sub,
  264. find_device_state_subscription(app, name), ao2_cleanup);
  265. return sub != NULL;
  266. }
  267. static int subscribe_device_state(struct stasis_app *app, void *obj)
  268. {
  269. struct device_state_subscription *sub = obj;
  270. ast_debug(3, "Subscribing to device %s", sub->device_name);
  271. if (is_subscribed_device_state(app, sub->device_name)) {
  272. ast_debug(3, "App %s is already subscribed to %s\n", stasis_app_name(app), sub->device_name);
  273. return 0;
  274. }
  275. if (!(sub->sub = stasis_subscribe(
  276. ast_device_state_topic(sub->device_name),
  277. device_state_cb, sub))) {
  278. ast_log(LOG_ERROR, "Unable to subscribe to device %s\n",
  279. sub->device_name);
  280. return -1;
  281. }
  282. ao2_link(device_state_subscriptions, sub);
  283. return 0;
  284. }
  285. static int unsubscribe_device_state(struct stasis_app *app, const char *name)
  286. {
  287. RAII_VAR(struct device_state_subscription *, sub,
  288. find_device_state_subscription(app, name), ao2_cleanup);
  289. remove_device_state_subscription(sub);
  290. return 0;
  291. }
  292. static int device_to_json_cb(void *obj, void *arg, void *data, int flags)
  293. {
  294. struct device_state_subscription *sub = obj;
  295. const char *app_name = arg;
  296. struct ast_json *array = data;
  297. if (strcmp(sub->app_name, app_name)) {
  298. return 0;
  299. }
  300. ast_json_array_append(
  301. array, ast_json_string_create(sub->device_name));
  302. return 0;
  303. }
  304. static void devices_to_json(const struct stasis_app *app, struct ast_json *json)
  305. {
  306. struct ast_json *array = ast_json_array_create();
  307. ao2_callback_data(device_state_subscriptions, OBJ_NODATA,
  308. device_to_json_cb, (void *)stasis_app_name(app), array);
  309. ast_json_object_set(json, "device_names", array);
  310. }
  311. struct stasis_app_event_source device_state_event_source = {
  312. .scheme = DEVICE_STATE_SCHEME_SUB,
  313. .find = find_device_state,
  314. .subscribe = subscribe_device_state,
  315. .unsubscribe = unsubscribe_device_state,
  316. .is_subscribed = is_subscribed_device_state,
  317. .to_json = devices_to_json
  318. };
  319. static int load_module(void)
  320. {
  321. populate_cache();
  322. if (ast_devstate_prov_add(DEVICE_STATE_PROVIDER_STASIS,
  323. stasis_device_state_cb)) {
  324. return AST_MODULE_LOAD_FAILURE;
  325. }
  326. if (!(device_state_subscriptions = ao2_container_alloc(
  327. DEVICE_STATE_BUCKETS, device_state_subscriptions_hash,
  328. device_state_subscriptions_cmp))) {
  329. return AST_MODULE_LOAD_FAILURE;
  330. }
  331. stasis_app_register_event_source(&device_state_event_source);
  332. return AST_MODULE_LOAD_SUCCESS;
  333. }
  334. static int unload_module(void)
  335. {
  336. ast_devstate_prov_del(DEVICE_STATE_PROVIDER_STASIS);
  337. stasis_app_unregister_event_source(&device_state_event_source);
  338. ao2_cleanup(device_state_subscriptions);
  339. device_state_subscriptions = NULL;
  340. return 0;
  341. }
  342. AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application device state support",
  343. .support_level = AST_MODULE_SUPPORT_CORE,
  344. .load = load_module,
  345. .unload = unload_module,
  346. .nonoptreq = "res_stasis");