pbx_spool.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 1999 - 2010, Digium, Inc.
  5. *
  6. * Mark Spencer <markster@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Full-featured outgoing call spool support
  21. *
  22. */
  23. /*** MODULEINFO
  24. <support_level>core</support_level>
  25. ***/
  26. #include "asterisk.h"
  27. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  28. #include <sys/stat.h>
  29. #include <time.h>
  30. #include <utime.h>
  31. #include <dirent.h>
  32. #ifdef HAVE_INOTIFY
  33. #include <sys/inotify.h>
  34. #elif defined(HAVE_KQUEUE)
  35. #include <sys/types.h>
  36. #include <sys/time.h>
  37. #include <sys/event.h>
  38. #include <fcntl.h>
  39. #endif
  40. #include "asterisk/paths.h" /* use ast_config_AST_SPOOL_DIR */
  41. #include "asterisk/lock.h"
  42. #include "asterisk/file.h"
  43. #include "asterisk/logger.h"
  44. #include "asterisk/channel.h"
  45. #include "asterisk/callerid.h"
  46. #include "asterisk/pbx.h"
  47. #include "asterisk/module.h"
  48. #include "asterisk/utils.h"
  49. #include "asterisk/options.h"
  50. /*
  51. * pbx_spool is similar in spirit to qcall, but with substantially enhanced functionality...
  52. * The spool file contains a header
  53. */
  54. enum {
  55. /*! Always delete the call file after a call succeeds or the
  56. * maximum number of retries is exceeded, even if the
  57. * modification time of the call file is in the future.
  58. */
  59. SPOOL_FLAG_ALWAYS_DELETE = (1 << 0),
  60. /* Don't unlink the call file after processing, move in qdonedir */
  61. SPOOL_FLAG_ARCHIVE = (1 << 1),
  62. };
  63. static char qdir[255];
  64. static char qdonedir[255];
  65. struct outgoing {
  66. int retries; /*!< Current number of retries */
  67. int maxretries; /*!< Maximum number of retries permitted */
  68. int retrytime; /*!< How long to wait between retries (in seconds) */
  69. int waittime; /*!< How long to wait for an answer */
  70. long callingpid; /*!< PID which is currently calling */
  71. struct ast_format_cap *capabilities; /*!< Formats (codecs) for this call */
  72. AST_DECLARE_STRING_FIELDS (
  73. AST_STRING_FIELD(fn); /*!< File name of call file */
  74. AST_STRING_FIELD(tech); /*!< Which channel technology to use for outgoing call */
  75. AST_STRING_FIELD(dest); /*!< Which device/line to use for outgoing call */
  76. AST_STRING_FIELD(app); /*!< If application: Application name */
  77. AST_STRING_FIELD(data); /*!< If application: Application data */
  78. AST_STRING_FIELD(exten); /*!< If extension/context/priority: Extension in dialplan */
  79. AST_STRING_FIELD(context); /*!< If extension/context/priority: Dialplan context */
  80. AST_STRING_FIELD(cid_num); /*!< CallerID Information: Number/extension */
  81. AST_STRING_FIELD(cid_name); /*!< CallerID Information: Name */
  82. AST_STRING_FIELD(account); /*!< account code */
  83. );
  84. int priority; /*!< If extension/context/priority: Dialplan priority */
  85. struct ast_variable *vars; /*!< Variables and Functions */
  86. int maxlen; /*!< Maximum length of call */
  87. struct ast_flags options; /*!< options */
  88. };
  89. #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE)
  90. static void queue_file(const char *filename, time_t when);
  91. #endif
  92. static void free_outgoing(struct outgoing *o)
  93. {
  94. if (o->vars) {
  95. ast_variables_destroy(o->vars);
  96. }
  97. o->capabilities = ast_format_cap_destroy(o->capabilities);
  98. ast_string_field_free_memory(o);
  99. ast_free(o);
  100. }
  101. static struct outgoing *new_outgoing(const char *fn)
  102. {
  103. struct outgoing *o;
  104. struct ast_format tmpfmt;
  105. o = ast_calloc(1, sizeof(*o));
  106. if (!o) {
  107. return NULL;
  108. }
  109. /* Initialize the new object. */
  110. o->priority = 1;
  111. o->retrytime = 300;
  112. o->waittime = 45;
  113. ast_set_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE);
  114. if (ast_string_field_init(o, 128)) {
  115. /*
  116. * No need to call free_outgoing here since the failure was to
  117. * allocate string fields and no variables have been allocated
  118. * yet.
  119. */
  120. ast_free(o);
  121. return NULL;
  122. }
  123. ast_string_field_set(o, fn, fn);
  124. if (ast_strlen_zero(o->fn)) {
  125. /* String field set failed. Since this string is important we must fail. */
  126. free_outgoing(o);
  127. return NULL;
  128. }
  129. o->capabilities = ast_format_cap_alloc_nolock();
  130. if (!o->capabilities) {
  131. free_outgoing(o);
  132. return NULL;
  133. }
  134. ast_format_cap_add(o->capabilities, ast_format_set(&tmpfmt, AST_FORMAT_SLINEAR, 0));
  135. return o;
  136. }
  137. static int apply_outgoing(struct outgoing *o, FILE *f)
  138. {
  139. char buf[256];
  140. char *c, *c2;
  141. int lineno = 0;
  142. struct ast_variable *var, *last = o->vars;
  143. while (last && last->next) {
  144. last = last->next;
  145. }
  146. while(fgets(buf, sizeof(buf), f)) {
  147. lineno++;
  148. /* Trim comments */
  149. c = buf;
  150. while ((c = strchr(c, '#'))) {
  151. if ((c == buf) || (*(c-1) == ' ') || (*(c-1) == '\t'))
  152. *c = '\0';
  153. else
  154. c++;
  155. }
  156. c = buf;
  157. while ((c = strchr(c, ';'))) {
  158. if ((c > buf) && (c[-1] == '\\')) {
  159. memmove(c - 1, c, strlen(c) + 1);
  160. c++;
  161. } else {
  162. *c = '\0';
  163. break;
  164. }
  165. }
  166. /* Trim trailing white space */
  167. ast_trim_blanks(buf);
  168. if (ast_strlen_zero(buf)) {
  169. continue;
  170. }
  171. c = strchr(buf, ':');
  172. if (!c) {
  173. ast_log(LOG_NOTICE, "Syntax error at line %d of %s\n", lineno, o->fn);
  174. continue;
  175. }
  176. *c = '\0';
  177. c = ast_skip_blanks(c + 1);
  178. #if 0
  179. printf("'%s' is '%s' at line %d\n", buf, c, lineno);
  180. #endif
  181. if (!strcasecmp(buf, "channel")) {
  182. if ((c2 = strchr(c, '/'))) {
  183. *c2 = '\0';
  184. c2++;
  185. ast_string_field_set(o, tech, c);
  186. ast_string_field_set(o, dest, c2);
  187. } else {
  188. ast_log(LOG_NOTICE, "Channel should be in form Tech/Dest at line %d of %s\n", lineno, o->fn);
  189. }
  190. } else if (!strcasecmp(buf, "callerid")) {
  191. char cid_name[80] = {0}, cid_num[80] = {0};
  192. ast_callerid_split(c, cid_name, sizeof(cid_name), cid_num, sizeof(cid_num));
  193. ast_string_field_set(o, cid_num, cid_num);
  194. ast_string_field_set(o, cid_name, cid_name);
  195. } else if (!strcasecmp(buf, "application")) {
  196. ast_string_field_set(o, app, c);
  197. } else if (!strcasecmp(buf, "data")) {
  198. ast_string_field_set(o, data, c);
  199. } else if (!strcasecmp(buf, "maxretries")) {
  200. if (sscanf(c, "%30d", &o->maxretries) != 1) {
  201. ast_log(LOG_WARNING, "Invalid max retries at line %d of %s\n", lineno, o->fn);
  202. o->maxretries = 0;
  203. }
  204. } else if (!strcasecmp(buf, "codecs")) {
  205. ast_parse_allow_disallow(NULL, o->capabilities, c, 1);
  206. } else if (!strcasecmp(buf, "context")) {
  207. ast_string_field_set(o, context, c);
  208. } else if (!strcasecmp(buf, "extension")) {
  209. ast_string_field_set(o, exten, c);
  210. } else if (!strcasecmp(buf, "priority")) {
  211. if ((sscanf(c, "%30d", &o->priority) != 1) || (o->priority < 1)) {
  212. ast_log(LOG_WARNING, "Invalid priority at line %d of %s\n", lineno, o->fn);
  213. o->priority = 1;
  214. }
  215. } else if (!strcasecmp(buf, "retrytime")) {
  216. if ((sscanf(c, "%30d", &o->retrytime) != 1) || (o->retrytime < 1)) {
  217. ast_log(LOG_WARNING, "Invalid retrytime at line %d of %s\n", lineno, o->fn);
  218. o->retrytime = 300;
  219. }
  220. } else if (!strcasecmp(buf, "waittime")) {
  221. if ((sscanf(c, "%30d", &o->waittime) != 1) || (o->waittime < 1)) {
  222. ast_log(LOG_WARNING, "Invalid waittime at line %d of %s\n", lineno, o->fn);
  223. o->waittime = 45;
  224. }
  225. } else if (!strcasecmp(buf, "retry")) {
  226. o->retries++;
  227. } else if (!strcasecmp(buf, "startretry")) {
  228. if (sscanf(c, "%30ld", &o->callingpid) != 1) {
  229. ast_log(LOG_WARNING, "Unable to retrieve calling PID!\n");
  230. o->callingpid = 0;
  231. }
  232. } else if (!strcasecmp(buf, "endretry") || !strcasecmp(buf, "abortretry")) {
  233. o->callingpid = 0;
  234. o->retries++;
  235. } else if (!strcasecmp(buf, "delayedretry")) {
  236. } else if (!strcasecmp(buf, "setvar") || !strcasecmp(buf, "set")) {
  237. c2 = c;
  238. strsep(&c2, "=");
  239. if (c2) {
  240. var = ast_variable_new(c, c2, o->fn);
  241. if (var) {
  242. /* Always insert at the end, because some people want to treat the spool file as a script */
  243. if (last) {
  244. last->next = var;
  245. } else {
  246. o->vars = var;
  247. }
  248. last = var;
  249. }
  250. } else
  251. ast_log(LOG_WARNING, "Malformed \"%s\" argument. Should be \"%s: variable=value\"\n", buf, buf);
  252. } else if (!strcasecmp(buf, "account")) {
  253. ast_string_field_set(o, account, c);
  254. } else if (!strcasecmp(buf, "alwaysdelete")) {
  255. ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ALWAYS_DELETE);
  256. } else if (!strcasecmp(buf, "archive")) {
  257. ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ARCHIVE);
  258. } else {
  259. ast_log(LOG_WARNING, "Unknown keyword '%s' at line %d of %s\n", buf, lineno, o->fn);
  260. }
  261. }
  262. if (ast_strlen_zero(o->tech) || ast_strlen_zero(o->dest) || (ast_strlen_zero(o->app) && ast_strlen_zero(o->exten))) {
  263. ast_log(LOG_WARNING, "At least one of app or extension must be specified, along with tech and dest in file %s\n", o->fn);
  264. return -1;
  265. }
  266. return 0;
  267. }
  268. static void safe_append(struct outgoing *o, time_t now, char *s)
  269. {
  270. FILE *f;
  271. struct utimbuf tbuf = { .actime = now, .modtime = now + o->retrytime };
  272. ast_debug(1, "Outgoing %s/%s: %s\n", o->tech, o->dest, s);
  273. if ((f = fopen(o->fn, "a"))) {
  274. fprintf(f, "\n%s: %ld %d (%ld)\n", s, (long)ast_mainpid, o->retries, (long) now);
  275. fclose(f);
  276. }
  277. /* Update the file time */
  278. if (utime(o->fn, &tbuf)) {
  279. ast_log(LOG_WARNING, "Unable to set utime on %s: %s\n", o->fn, strerror(errno));
  280. }
  281. }
  282. /*!
  283. * \brief Remove a call file from the outgoing queue optionally moving it in the archive dir
  284. *
  285. * \param o the pointer to outgoing struct
  286. * \param status the exit status of the call. Can be "Completed", "Failed" or "Expired"
  287. */
  288. static int remove_from_queue(struct outgoing *o, const char *status)
  289. {
  290. FILE *f;
  291. char newfn[256];
  292. const char *bname;
  293. if (!ast_test_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE)) {
  294. struct stat current_file_status;
  295. if (!stat(o->fn, &current_file_status)) {
  296. if (time(NULL) < current_file_status.st_mtime) {
  297. return 0;
  298. }
  299. }
  300. }
  301. if (!ast_test_flag(&o->options, SPOOL_FLAG_ARCHIVE)) {
  302. unlink(o->fn);
  303. return 0;
  304. }
  305. if (ast_mkdir(qdonedir, 0777)) {
  306. ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool archiving disabled\n", qdonedir);
  307. unlink(o->fn);
  308. return -1;
  309. }
  310. if (!(bname = strrchr(o->fn, '/'))) {
  311. bname = o->fn;
  312. } else {
  313. bname++;
  314. }
  315. snprintf(newfn, sizeof(newfn), "%s/%s", qdonedir, bname);
  316. /* If there is already a call file with the name in the archive dir, it will be overwritten. */
  317. unlink(newfn);
  318. if (rename(o->fn, newfn) != 0) {
  319. unlink(o->fn);
  320. return -1;
  321. }
  322. /* Only append to the file AFTER we move it out of the watched directory,
  323. * otherwise the fclose() causes another event for inotify(7) */
  324. if ((f = fopen(newfn, "a"))) {
  325. fprintf(f, "Status: %s\n", status);
  326. fclose(f);
  327. }
  328. return 0;
  329. }
  330. static void *attempt_thread(void *data)
  331. {
  332. struct outgoing *o = data;
  333. int res, reason;
  334. if (!ast_strlen_zero(o->app)) {
  335. ast_verb(3, "Attempting call on %s/%s for application %s(%s) (Retry %d)\n", o->tech, o->dest, o->app, o->data, o->retries);
  336. res = ast_pbx_outgoing_app(o->tech, o->capabilities, (void *) o->dest, o->waittime * 1000, o->app, o->data, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL);
  337. o->vars = NULL;
  338. } else {
  339. ast_verb(3, "Attempting call on %s/%s for %s@%s:%d (Retry %d)\n", o->tech, o->dest, o->exten, o->context,o->priority, o->retries);
  340. res = ast_pbx_outgoing_exten(o->tech, o->capabilities, (void *) o->dest, o->waittime * 1000, o->context, o->exten, o->priority, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL);
  341. o->vars = NULL;
  342. }
  343. if (res) {
  344. ast_log(LOG_NOTICE, "Call failed to go through, reason (%d) %s\n", reason, ast_channel_reason2str(reason));
  345. if (o->retries >= o->maxretries + 1) {
  346. /* Max retries exceeded */
  347. ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : "");
  348. remove_from_queue(o, "Expired");
  349. } else {
  350. /* Notate that the call is still active */
  351. safe_append(o, time(NULL), "EndRetry");
  352. #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE)
  353. queue_file(o->fn, time(NULL) + o->retrytime);
  354. #endif
  355. }
  356. } else {
  357. ast_log(LOG_NOTICE, "Call completed to %s/%s\n", o->tech, o->dest);
  358. remove_from_queue(o, "Completed");
  359. }
  360. free_outgoing(o);
  361. return NULL;
  362. }
  363. static void launch_service(struct outgoing *o)
  364. {
  365. pthread_t t;
  366. int ret;
  367. if ((ret = ast_pthread_create_detached(&t, NULL, attempt_thread, o))) {
  368. ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret);
  369. free_outgoing(o);
  370. }
  371. }
  372. /* Called from scan_thread or queue_file */
  373. static int scan_service(const char *fn, time_t now)
  374. {
  375. struct outgoing *o;
  376. FILE *f;
  377. int res;
  378. o = new_outgoing(fn);
  379. if (!o) {
  380. return -1;
  381. }
  382. /* Attempt to open the file */
  383. f = fopen(o->fn, "r");
  384. if (!f) {
  385. #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE)
  386. /*!
  387. * \todo XXX There is some odd delayed duplicate servicing of
  388. * call files going on. We need to suppress the error message
  389. * if the file does not exist as a result.
  390. */
  391. if (errno != ENOENT)
  392. #endif
  393. {
  394. ast_log(LOG_WARNING, "Unable to open %s: '%s'(%d), deleting\n",
  395. o->fn, strerror(errno), (int) errno);
  396. }
  397. remove_from_queue(o, "Failed");
  398. free_outgoing(o);
  399. return -1;
  400. }
  401. /* Read in and verify the contents */
  402. res = apply_outgoing(o, f);
  403. fclose(f);
  404. if (res) {
  405. ast_log(LOG_WARNING, "Invalid file contents in %s, deleting\n", o->fn);
  406. remove_from_queue(o, "Failed");
  407. free_outgoing(o);
  408. return -1;
  409. }
  410. ast_debug(1, "Filename: %s, Retries: %d, max: %d\n", o->fn, o->retries, o->maxretries);
  411. if (o->retries <= o->maxretries) {
  412. now += o->retrytime;
  413. if (o->callingpid && (o->callingpid == ast_mainpid)) {
  414. safe_append(o, time(NULL), "DelayedRetry");
  415. ast_debug(1, "Delaying retry since we're currently running '%s'\n", o->fn);
  416. free_outgoing(o);
  417. } else {
  418. /* Increment retries */
  419. o->retries++;
  420. /* If someone else was calling, they're presumably gone now
  421. so abort their retry and continue as we were... */
  422. if (o->callingpid)
  423. safe_append(o, time(NULL), "AbortRetry");
  424. safe_append(o, now, "StartRetry");
  425. launch_service(o);
  426. }
  427. return now;
  428. }
  429. ast_log(LOG_NOTICE, "Queued call to %s/%s expired without completion after %d attempt%s\n",
  430. o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : "");
  431. remove_from_queue(o, "Expired");
  432. free_outgoing(o);
  433. return 0;
  434. }
  435. #if defined(HAVE_INOTIFY) || defined(HAVE_KQUEUE)
  436. struct direntry {
  437. AST_LIST_ENTRY(direntry) list;
  438. time_t mtime;
  439. char name[0];
  440. };
  441. static AST_LIST_HEAD_STATIC(dirlist, direntry);
  442. #if defined(HAVE_INOTIFY)
  443. /* Only one thread is accessing this list, so no lock is necessary */
  444. static AST_LIST_HEAD_NOLOCK_STATIC(createlist, direntry);
  445. static AST_LIST_HEAD_NOLOCK_STATIC(openlist, direntry);
  446. #endif
  447. static void queue_file(const char *filename, time_t when)
  448. {
  449. struct stat st;
  450. struct direntry *cur, *new;
  451. int res;
  452. time_t now = time(NULL);
  453. if (!strchr(filename, '/')) {
  454. char *fn = ast_alloca(strlen(qdir) + strlen(filename) + 2);
  455. sprintf(fn, "%s/%s", qdir, filename); /* SAFE */
  456. filename = fn;
  457. }
  458. if (when == 0) {
  459. if (stat(filename, &st)) {
  460. ast_log(LOG_WARNING, "Unable to stat %s: %s\n", filename, strerror(errno));
  461. return;
  462. }
  463. if (!S_ISREG(st.st_mode)) {
  464. return;
  465. }
  466. when = st.st_mtime;
  467. }
  468. /* Need to check the existing list in order to avoid duplicates. */
  469. AST_LIST_LOCK(&dirlist);
  470. AST_LIST_TRAVERSE(&dirlist, cur, list) {
  471. if (cur->mtime == when && !strcmp(filename, cur->name)) {
  472. AST_LIST_UNLOCK(&dirlist);
  473. return;
  474. }
  475. }
  476. if ((res = when) > now || (res = scan_service(filename, now)) > 0) {
  477. if (!(new = ast_calloc(1, sizeof(*new) + strlen(filename) + 1))) {
  478. AST_LIST_UNLOCK(&dirlist);
  479. return;
  480. }
  481. new->mtime = res;
  482. strcpy(new->name, filename);
  483. /* List is ordered by mtime */
  484. if (AST_LIST_EMPTY(&dirlist)) {
  485. AST_LIST_INSERT_HEAD(&dirlist, new, list);
  486. } else {
  487. int found = 0;
  488. AST_LIST_TRAVERSE_SAFE_BEGIN(&dirlist, cur, list) {
  489. if (cur->mtime > new->mtime) {
  490. AST_LIST_INSERT_BEFORE_CURRENT(new, list);
  491. found = 1;
  492. break;
  493. }
  494. }
  495. AST_LIST_TRAVERSE_SAFE_END
  496. if (!found) {
  497. AST_LIST_INSERT_TAIL(&dirlist, new, list);
  498. }
  499. }
  500. }
  501. AST_LIST_UNLOCK(&dirlist);
  502. }
  503. #ifdef HAVE_INOTIFY
  504. static void queue_file_create(const char *filename)
  505. {
  506. struct direntry *cur;
  507. AST_LIST_TRAVERSE(&createlist, cur, list) {
  508. if (!strcmp(cur->name, filename)) {
  509. return;
  510. }
  511. }
  512. if (!(cur = ast_calloc(1, sizeof(*cur) + strlen(filename) + 1))) {
  513. return;
  514. }
  515. strcpy(cur->name, filename);
  516. /* We'll handle this file unless an IN_OPEN event occurs within 2 seconds */
  517. cur->mtime = time(NULL) + 2;
  518. AST_LIST_INSERT_TAIL(&createlist, cur, list);
  519. }
  520. static void queue_file_open(const char *filename)
  521. {
  522. struct direntry *cur;
  523. AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) {
  524. if (!strcmp(cur->name, filename)) {
  525. AST_LIST_REMOVE_CURRENT(list);
  526. AST_LIST_INSERT_TAIL(&openlist, cur, list);
  527. break;
  528. }
  529. }
  530. AST_LIST_TRAVERSE_SAFE_END
  531. }
  532. static void queue_created_files(void)
  533. {
  534. struct direntry *cur;
  535. time_t now = time(NULL);
  536. AST_LIST_TRAVERSE_SAFE_BEGIN(&createlist, cur, list) {
  537. if (cur->mtime > now) {
  538. break;
  539. }
  540. AST_LIST_REMOVE_CURRENT(list);
  541. queue_file(cur->name, 0);
  542. ast_free(cur);
  543. }
  544. AST_LIST_TRAVERSE_SAFE_END
  545. }
  546. static void queue_file_write(const char *filename)
  547. {
  548. struct direntry *cur;
  549. /* Only queue entries where an IN_CREATE preceded the IN_CLOSE_WRITE */
  550. AST_LIST_TRAVERSE_SAFE_BEGIN(&openlist, cur, list) {
  551. if (!strcmp(cur->name, filename)) {
  552. AST_LIST_REMOVE_CURRENT(list);
  553. ast_free(cur);
  554. queue_file(filename, 0);
  555. break;
  556. }
  557. }
  558. AST_LIST_TRAVERSE_SAFE_END
  559. }
  560. #endif
  561. static void *scan_thread(void *unused)
  562. {
  563. DIR *dir;
  564. struct dirent *de;
  565. time_t now;
  566. struct timespec ts = { .tv_sec = 1 };
  567. #ifdef HAVE_INOTIFY
  568. ssize_t res;
  569. int inotify_fd = inotify_init();
  570. struct inotify_event *iev;
  571. char buf[8192] __attribute__((aligned (sizeof(int))));
  572. struct pollfd pfd = { .fd = inotify_fd, .events = POLLIN };
  573. #else
  574. struct timespec nowait = { 0, 1 };
  575. int inotify_fd = kqueue();
  576. struct kevent kev;
  577. #endif
  578. struct direntry *cur;
  579. while (!ast_fully_booted) {
  580. nanosleep(&ts, NULL);
  581. }
  582. if (inotify_fd < 0) {
  583. ast_log(LOG_ERROR, "Unable to initialize "
  584. #ifdef HAVE_INOTIFY
  585. "inotify(7)"
  586. #else
  587. "kqueue(2)"
  588. #endif
  589. "\n");
  590. return NULL;
  591. }
  592. #ifdef HAVE_INOTIFY
  593. inotify_add_watch(inotify_fd, qdir, IN_CREATE | IN_OPEN | IN_CLOSE_WRITE | IN_MOVED_TO);
  594. #endif
  595. /* First, run through the directory and clear existing entries */
  596. if (!(dir = opendir(qdir))) {
  597. ast_log(LOG_ERROR, "Unable to open directory %s: %s\n", qdir, strerror(errno));
  598. return NULL;
  599. }
  600. #ifndef HAVE_INOTIFY
  601. EV_SET(&kev, dirfd(dir), EVFILT_VNODE, EV_ADD | EV_ENABLE | EV_CLEAR, NOTE_WRITE, 0, NULL);
  602. if (kevent(inotify_fd, &kev, 1, NULL, 0, &nowait) < 0 && errno != 0) {
  603. ast_log(LOG_ERROR, "Unable to watch directory %s: %s\n", qdir, strerror(errno));
  604. }
  605. #endif
  606. now = time(NULL);
  607. while ((de = readdir(dir))) {
  608. queue_file(de->d_name, 0);
  609. }
  610. #ifdef HAVE_INOTIFY
  611. /* Directory needs to remain open for kqueue(2) */
  612. closedir(dir);
  613. #endif
  614. /* Wait for either a) next timestamp to occur, or b) a change to happen */
  615. for (;/* ever */;) {
  616. time_t next = AST_LIST_EMPTY(&dirlist) ? INT_MAX : AST_LIST_FIRST(&dirlist)->mtime;
  617. time(&now);
  618. if (next > now) {
  619. #ifdef HAVE_INOTIFY
  620. int stage = 0;
  621. /* Convert from seconds to milliseconds, unless there's nothing
  622. * in the queue already, in which case, we wait forever. */
  623. int waittime = next == INT_MAX ? -1 : (next - now) * 1000;
  624. if (!AST_LIST_EMPTY(&createlist)) {
  625. waittime = 1000;
  626. }
  627. /* When a file arrives, add it to the queue, in mtime order. */
  628. if ((res = poll(&pfd, 1, waittime)) > 0 && (stage = 1) &&
  629. (res = read(inotify_fd, &buf, sizeof(buf))) >= sizeof(*iev)) {
  630. ssize_t len = 0;
  631. /* File(s) added to directory, add them to my list */
  632. for (iev = (void *) buf; res >= sizeof(*iev); iev = (struct inotify_event *) (((char *) iev) + len)) {
  633. /* For an IN_MOVED_TO event, simply process the file. However, if
  634. * we get an IN_CREATE event it *might* be an open(O_CREAT) or it
  635. * might be a hardlink (like smsq does, since rename() might
  636. * overwrite an existing file). So we have to see if we get a
  637. * subsequent IN_OPEN event on the same file. If we do, keep it
  638. * on the openlist and wait for the corresponding IN_CLOSE_WRITE.
  639. * If we *don't* see an IN_OPEN event, then it was a hard link so
  640. * it can be processed immediately.
  641. *
  642. * Unfortunately, although open(O_CREAT) is an atomic file system
  643. * operation, the inotify subsystem doesn't give it to us in a
  644. * single event with both IN_CREATE|IN_OPEN set. It's two separate
  645. * events, and the kernel doesn't even give them to us at the same
  646. * time. We can read() from inotify_fd after the IN_CREATE event,
  647. * and get *nothing* from it. The IN_OPEN arrives only later! So
  648. * we have a very short timeout of 2 seconds. */
  649. if (iev->mask & IN_CREATE) {
  650. queue_file_create(iev->name);
  651. } else if (iev->mask & IN_OPEN) {
  652. queue_file_open(iev->name);
  653. } else if (iev->mask & IN_CLOSE_WRITE) {
  654. queue_file_write(iev->name);
  655. } else if (iev->mask & IN_MOVED_TO) {
  656. queue_file(iev->name, 0);
  657. } else {
  658. ast_log(LOG_ERROR, "Unexpected event %d for file '%s'\n", (int) iev->mask, iev->name);
  659. }
  660. len = sizeof(*iev) + iev->len;
  661. res -= len;
  662. }
  663. } else if (res < 0 && errno != EINTR && errno != EAGAIN) {
  664. ast_debug(1, "Got an error back from %s(2): %s\n", stage ? "read" : "poll", strerror(errno));
  665. }
  666. time(&now);
  667. }
  668. queue_created_files();
  669. #else
  670. struct timespec ts2 = { next - now, 0 };
  671. if (kevent(inotify_fd, NULL, 0, &kev, 1, &ts2) <= 0) {
  672. /* Interrupt or timeout, restart calculations */
  673. continue;
  674. } else {
  675. /* Directory changed, rescan */
  676. rewinddir(dir);
  677. while ((de = readdir(dir))) {
  678. queue_file(de->d_name, 0);
  679. }
  680. }
  681. time(&now);
  682. }
  683. #endif
  684. /* Empty the list of all entries ready to be processed */
  685. AST_LIST_LOCK(&dirlist);
  686. while (!AST_LIST_EMPTY(&dirlist) && AST_LIST_FIRST(&dirlist)->mtime <= now) {
  687. cur = AST_LIST_REMOVE_HEAD(&dirlist, list);
  688. queue_file(cur->name, cur->mtime);
  689. ast_free(cur);
  690. }
  691. AST_LIST_UNLOCK(&dirlist);
  692. }
  693. return NULL;
  694. }
  695. #else
  696. static void *scan_thread(void *unused)
  697. {
  698. struct stat st;
  699. DIR *dir;
  700. struct dirent *de;
  701. char fn[256];
  702. int res;
  703. int force_poll = 1;
  704. time_t last = 0;
  705. time_t next = 0;
  706. time_t now;
  707. struct timespec ts = { .tv_sec = 1 };
  708. while (!ast_fully_booted) {
  709. nanosleep(&ts, NULL);
  710. }
  711. for (;;) {
  712. /* Wait a sec */
  713. nanosleep(&ts, NULL);
  714. time(&now);
  715. if (stat(qdir, &st)) {
  716. ast_log(LOG_WARNING, "Unable to stat %s\n", qdir);
  717. continue;
  718. }
  719. /* Make sure it is time for us to execute our check */
  720. if (!force_poll && st.st_mtime == last && (!next || now < next)) {
  721. /*
  722. * The directory timestamp did not change and any delayed
  723. * call-file is not ready to be executed.
  724. */
  725. continue;
  726. }
  727. #if 0
  728. printf("atime: %ld, mtime: %ld, ctime: %ld\n", st.st_atime, st.st_mtime, st.st_ctime);
  729. printf("Ooh, something changed / timeout\n");
  730. #endif
  731. if (!(dir = opendir(qdir))) {
  732. ast_log(LOG_WARNING, "Unable to open directory %s: %s\n", qdir, strerror(errno));
  733. continue;
  734. }
  735. /*
  736. * Since the dir timestamp is available at one second
  737. * resolution, we cannot know if it was updated within the same
  738. * second after we scanned it. Therefore, we will force another
  739. * scan if the dir was just modified.
  740. */
  741. force_poll = (st.st_mtime == now);
  742. next = 0;
  743. last = st.st_mtime;
  744. while ((de = readdir(dir))) {
  745. snprintf(fn, sizeof(fn), "%s/%s", qdir, de->d_name);
  746. if (stat(fn, &st)) {
  747. ast_log(LOG_WARNING, "Unable to stat %s: %s\n", fn, strerror(errno));
  748. continue;
  749. }
  750. if (!S_ISREG(st.st_mode)) {
  751. /* Not a regular file. */
  752. continue;
  753. }
  754. if (st.st_mtime <= now) {
  755. res = scan_service(fn, now);
  756. if (res > 0) {
  757. /* The call-file is delayed or to be retried later. */
  758. if (!next || res < next) {
  759. /* This delayed call file expires earlier. */
  760. next = res;
  761. }
  762. } else if (res) {
  763. ast_log(LOG_WARNING, "Failed to scan service '%s'\n", fn);
  764. } else if (!next) {
  765. /* Expired entry: must recheck on the next go-around */
  766. next = st.st_mtime;
  767. }
  768. } else {
  769. /* The file's timestamp is in the future. */
  770. if (!next || st.st_mtime < next) {
  771. /* This call-file's timestamp expires earlier. */
  772. next = st.st_mtime;
  773. }
  774. }
  775. }
  776. closedir(dir);
  777. }
  778. return NULL;
  779. }
  780. #endif
  781. static int unload_module(void)
  782. {
  783. return -1;
  784. }
  785. static int load_module(void)
  786. {
  787. pthread_t thread;
  788. int ret;
  789. snprintf(qdir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing");
  790. if (ast_mkdir(qdir, 0777)) {
  791. ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool disabled\n", qdir);
  792. return AST_MODULE_LOAD_DECLINE;
  793. }
  794. snprintf(qdonedir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing_done");
  795. if ((ret = ast_pthread_create_detached_background(&thread, NULL, scan_thread, NULL))) {
  796. ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret);
  797. return AST_MODULE_LOAD_FAILURE;
  798. }
  799. return AST_MODULE_LOAD_SUCCESS;
  800. }
  801. AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Outgoing Spool Support");