threadedRunner.cpp 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136
  1. #ifdef _WIN32
  2. #include <winsock2.h>
  3. #endif
  4. #include <algorithm>
  5. #include "threadedRunner.h"
  6. #include <openssl/ssl.h>
  7. #include "global.h"
  8. #include "stl/stringUtils.h"
  9. #include "services/stdServiceImpl.h"
  10. #include "protocol_HTTPStyle.h"
  11. #include "protocol_HTTPSource.h"
  12. #include "protocol_shoutcastSource.h"
  13. #include "protocol_FlashPolicyServer.h"
  14. #include "protocol_uvox2Source.h"
  15. #include "uvox2Common.h"
  16. #include "banList.h"
  17. #include "ripList.h"
  18. #ifdef _MSC_VER
  19. #define MSG_DONTWAIT 0
  20. #else
  21. #include <netinet/tcp.h>
  22. #include <sys/resource.h>
  23. #ifndef EPOLLRDHUP
  24. #define EPOLLRDHUP 0x2000
  25. #endif
  26. #endif
  27. using namespace std;
  28. using namespace stringUtil;
  29. using namespace uniString;
  30. #define LOGNAME "[THREADRUNNER] "
  31. #define DEBUG_LOG(...) do { if (gOptions.threadRunnerDebug()) DLOG(__VA_ARGS__); } while (0)
  32. #ifndef SSL_OP_NO_COMPRESSION
  33. #define SSL_OP_NO_COMPRESSION 0
  34. #endif
  35. // make standard string for logging address
  36. inline utf8 addrLogString(const utf8 &addr, const u_short port, const utf8 &xff) throw()
  37. {
  38. const bool use_xff = (gOptions.useXFF() && !xff.empty());
  39. return (use_xff ? xff : addr) + ":" + tos(port) + (use_xff ? " (xff)" : "");
  40. }
  41. // make standard string for logging src address
  42. utf8 srcAddrLogString(const utf8 &addr, const u_short port, const size_t sid) throw()
  43. {
  44. return "[SRC " + addrLogString(addr, port) + (sid > 0 ? " sid=" + tos(sid) : "") + "] ";
  45. }
  46. // make standard string for loggin dst address
  47. utf8 dstAddrLogString(const utf8 &addr, const u_short port, const utf8 &xff, const size_t sid) throw()
  48. {
  49. return "[DST " + addrLogString(addr, port, xff) + (sid > 0 ? " sid=" + tos(sid) : "") + "] ";
  50. }
  51. // make standard string for logging unknown address
  52. utf8 recvAddrLogString(const utf8 &addr, const u_short port) throw()
  53. {
  54. return "[RECV " + addrLogString(addr, port) + "] ";
  55. }
  56. // make standard string for socket error
  57. string socketErrString(int err) throw() { return "err=" + socketOps::errMsg(err) + "(" + tos(err) + ")"; }
  58. static AOL_namespace::mutex sm_globalRunnerLock;
  59. static set<threadedRunner*> sm_runners;
  60. SSL_CTX *threadedRunner::m_sslCtx = NULL;
  61. AOL_namespace::mutex *threadedRunner::m_sslMutexes = NULL;
  62. static bool cmp(threadedRunner* a, threadedRunner* b) throw()
  63. {
  64. return (a->sizeOfRunList() < b->sizeOfRunList());
  65. }
  66. bool threadedRunner::scheduleRunnable(runnable *r) throw()
  67. {
  68. if (r)
  69. {
  70. stackLock sml(sm_globalRunnerLock);
  71. // diagnostics. Print load
  72. if (gOptions.threadRunnerDebug())
  73. {
  74. utf8 msg = LOGNAME;
  75. for (set<threadedRunner*>::const_iterator i = sm_runners.begin(); i != sm_runners.end(); ++i)
  76. {
  77. msg += ((i == sm_runners.begin() ? "Thread " : ", thread ") + (*i)->threadNumber() + " amt=" + tos((*i)->sizeOfRunList()));
  78. }
  79. DEBUG_LOG(msg);
  80. }
  81. // find least busy
  82. set<threadedRunner*>::const_iterator which = min_element(sm_runners.begin(), sm_runners.end(), cmp);
  83. if ((which != sm_runners.end()) && (*which)->addRunnable(r))
  84. {
  85. return true;
  86. }
  87. // didn't work... schedule anywhere
  88. DEBUG_LOG(LOGNAME "Schedule failure, trying any thread");
  89. for (set<threadedRunner*>::const_iterator i = sm_runners.begin(); i != sm_runners.end(); ++i)
  90. {
  91. if ((*i)->addRunnable(r))
  92. {
  93. return true;
  94. }
  95. }
  96. }
  97. return false;
  98. }
  99. void threadedRunner::wakeup() throw()
  100. {
  101. stackLock sml(sm_globalRunnerLock);
  102. for (set<threadedRunner*>::const_iterator i = sm_runners.begin(); i != sm_runners.end(); ++i)
  103. {
  104. (*i)->wakeupRunnable();
  105. }
  106. }
  107. uniString::utf8 threadedRunner::getRunnabledetails() throw()
  108. {
  109. stackLock sml(sm_globalRunnerLock);
  110. utf8 details;
  111. for (set<threadedRunner*>::const_iterator i = sm_runners.begin(); i != sm_runners.end(); ++i)
  112. {
  113. details += (i != sm_runners.begin() ? "<br>" : (utf8)"") + "Thread #" +
  114. (*i)->threadNumber() + ": <b>" + tos((*i)->sizeOfRunList()) +
  115. "</b><div style=\"padding-left:1em;\">";
  116. map<utf8, size_t> runners;
  117. (*i)->enumRunnables(runners);
  118. for (map<utf8, size_t>::const_iterator r = runners.begin(); r != runners.end(); ++r)
  119. {
  120. details += (r != runners.begin() ? "<br>" : "") + (*r).first + " - " + tos((*r).second);
  121. }
  122. details += "</div>";
  123. }
  124. return details;
  125. }
  126. threadedRunner::threadedRunner() throw() : m_stop(false), m_threadNumber((const short)(sm_runners.size() + 1))
  127. {
  128. stackLock sml(sm_globalRunnerLock);
  129. sm_runners.insert(this);
  130. }
  131. threadedRunner::~threadedRunner() throw()
  132. {
  133. stackLock sml(sm_globalRunnerLock);
  134. sm_runners.erase(this);
  135. }
  136. // main loop of thread
  137. const unsigned threadedRunner::operator()() throw()
  138. {
  139. unsigned result = 1;
  140. try
  141. {
  142. m_lock.lock();
  143. while (!m_stop)
  144. {
  145. m_lock.unlock();
  146. std::set<size_t> readSet, writeSet;
  147. int timeout = -1;
  148. // run everyone (as long as not scheduled) and get their status information, etc
  149. set<runnable*>::const_iterator i = m_runList.begin();
  150. while (i != m_runList.end())
  151. {
  152. runnable::timeSliceResult &tsr = (*i)->m_result;
  153. __uint64 now = time_now_ms();
  154. // if we're indicated as being scheduled then we
  155. // need to skip doing anything else and look at
  156. // checking if it's ok to process or not, etc
  157. if (tsr.m_scheduleTime)
  158. {
  159. if (now < tsr.m_scheduleTime)
  160. {
  161. int time_diff = (int)(tsr.m_scheduleTime - now);
  162. if (timeout == -1)
  163. {
  164. timeout = time_diff;
  165. }
  166. else
  167. {
  168. timeout = min(timeout, time_diff);
  169. }
  170. ++i;
  171. continue;
  172. }
  173. else
  174. {
  175. // we clear this as we're reached time
  176. // but not the rest as we want what was
  177. // set to now be used in the processing
  178. tsr.m_scheduleTime = 0;
  179. }
  180. }
  181. // make sure to reset otherwise it gets weird but
  182. // we're only going to do this if we are able to
  183. // run the runnable now (i.e. it's not scheduled)
  184. tsr.reset(now);
  185. bool exception_occured = false;
  186. try
  187. {
  188. (*i)->timeSlice();
  189. }
  190. catch (const exception &ex)
  191. {
  192. exception_occured = true;
  193. utf8 what = ex.what();
  194. if (!what.empty())
  195. {
  196. ELOG(ex.what());
  197. }
  198. }
  199. if (tsr.m_done || exception_occured)
  200. {
  201. set<runnable*>::const_iterator to_go = i;
  202. DEBUG_LOG(LOGNAME "Removing " + (*i)->name() + " [done: " + tos(tsr.m_done) + ", exception: " + tos(exception_occured) + "]");
  203. removeRunnable(*to_go);
  204. ++i;
  205. m_runList.erase (to_go);
  206. continue;
  207. }
  208. if (!tsr.m_runImmediately)
  209. {
  210. if (!tsr.m_scheduleTime)
  211. {
  212. update_sets:
  213. if (tsr.m_readSet)
  214. {
  215. // filter out anything with an invalid socket
  216. if ((*i)->m_socket != socketOps::cINVALID_SOCKET)
  217. {
  218. readSet.insert(readSet.end(), (*i)->m_socket);
  219. }
  220. if (tsr.m_customSocket != socketOps::cINVALID_SOCKET)
  221. {
  222. readSet.insert(readSet.end(), tsr.m_customSocket);
  223. }
  224. }
  225. if (tsr.m_writeSet)
  226. {
  227. // filter out anything with an invalid socket
  228. if ((*i)->m_socket != socketOps::cINVALID_SOCKET)
  229. {
  230. writeSet.insert(writeSet.end(), (*i)->m_socket);
  231. }
  232. if (tsr.m_customSocket != socketOps::cINVALID_SOCKET)
  233. {
  234. writeSet.insert(writeSet.end(), tsr.m_customSocket);
  235. }
  236. }
  237. if (tsr.m_timeout != -1)
  238. {
  239. if (timeout == -1)
  240. {
  241. timeout = tsr.m_timeout;
  242. }
  243. else
  244. {
  245. timeout = min(timeout, tsr.m_timeout);
  246. }
  247. }
  248. }
  249. else
  250. {
  251. // if this is to be scheduled then we'll do a
  252. // quick check to see if we're already after
  253. // that time and if it isn't (which is how it
  254. // should be) then we'll abort, else allow it
  255. // and we get the time again to account for
  256. // the time it's taken to process the runnable
  257. now = time_now_ms();
  258. if (now < tsr.m_scheduleTime)
  259. {
  260. int time_diff = (int)(tsr.m_scheduleTime - now);
  261. if (timeout == -1)
  262. {
  263. timeout = time_diff;
  264. }
  265. else
  266. {
  267. timeout = min(timeout, time_diff);
  268. }
  269. }
  270. else
  271. {
  272. // we clear this as we're reached time
  273. // but not the rest as we want what was
  274. // set to now be used in the processing
  275. tsr.m_scheduleTime = 0;
  276. timeout = 50;
  277. goto update_sets;
  278. }
  279. }
  280. ++i;
  281. }
  282. } // for
  283. // delete the old guys, no lock required here, only we add to this set
  284. int released = 0;
  285. while (true)
  286. {
  287. set<runnable*>::const_iterator it = m_runnablesToRemove.begin();
  288. if (it == m_runnablesToRemove.end())
  289. break;
  290. if (++released > 300)
  291. {
  292. timeout &= 15; // prevent a large stall but force a quick retry
  293. break;
  294. }
  295. stlx::delete_fntr<runnable> (*it);
  296. m_runnablesToRemove.erase (it);
  297. }
  298. readSet.insert (m_signal.test());
  299. if (timeout < 0)
  300. timeout = 60000;
  301. int n = socketOps::socketSelect(readSet, writeSet, timeout);
  302. m_lock.lock();
  303. // add the new guys, requires lock as set can be added from elsewhere
  304. m_runList.insert(m_runnablesToAdd.begin(), m_runnablesToAdd.end());
  305. m_runnablesToAdd.clear();
  306. if (n > 0)
  307. m_signal.clear();
  308. }
  309. m_lock.unlock();
  310. result = 0;
  311. }
  312. catch (const exception &ex)
  313. {
  314. ELOG(LOGNAME + string(ex.what()));
  315. }
  316. catch (...)
  317. {
  318. ELOG(LOGNAME "Unknown exception");
  319. }
  320. // delete runnables in run list, and those that are queued to be added
  321. m_lock.lock();
  322. for_each(m_runnablesToAdd.begin(), m_runnablesToAdd.end(), stlx::delete_fntr<runnable>);
  323. m_lock.unlock();
  324. for_each(m_runList.begin(), m_runList.end(), stlx::delete_fntr<runnable>);
  325. return result;
  326. }
  327. const size_t threadedRunner::sizeOfRunList() throw()
  328. {
  329. stackLock sml(m_lock);
  330. const size_t result = (m_runList.size() + m_runnablesToAdd.size());
  331. const size_t subtr = m_runnablesToRemove.size();
  332. return (subtr > result ? 0 : result - subtr);
  333. }
  334. const bool threadedRunner::addRunnable(runnable* r) throw()
  335. {
  336. if (!r)
  337. {
  338. return false;
  339. }
  340. stackLock sml(m_lock);
  341. if (m_stop)
  342. {
  343. return false;
  344. }
  345. m_runnablesToAdd.insert(r);
  346. m_signal.set();
  347. DEBUG_LOG(LOGNAME "Adding " + r->name() + " to thread " + tos(m_threadNumber));
  348. return true;
  349. }
  350. const bool threadedRunner::removeRunnable(runnable *r) throw()
  351. {
  352. if (!r)
  353. {
  354. return false;
  355. }
  356. m_runnablesToRemove.insert(r);
  357. m_signal.set();
  358. DEBUG_LOG(LOGNAME "Removing " + r->name() + " from thread " + tos(m_threadNumber));
  359. return true;
  360. }
  361. void threadedRunner::enumRunnables(map<utf8, size_t>& runners) throw()
  362. {
  363. stackLock sml(m_lock);
  364. for (set<runnable*>::const_iterator i = m_runList.begin(); i != m_runList.end(); ++i)
  365. {
  366. const utf8::size_type pos = (*i)->name().find((utf8)"protocol_");
  367. if (pos != utf8::npos)
  368. {
  369. ++runners[(*i)->name().substr(pos + 9, (*i)->name().length())];
  370. }
  371. else
  372. {
  373. ++runners[(*i)->name()];
  374. }
  375. }
  376. }
  377. void threadedRunner::wakeupRunnable() throw()
  378. {
  379. if (m_lock.timedLock(1000))
  380. {
  381. m_signal.set();
  382. m_lock.unlock();
  383. }
  384. }
  385. void threadedRunner::stop() throw()
  386. {
  387. stackLock sml(m_lock);
  388. m_stop = true;
  389. m_signal.set();
  390. }
  391. ///////////////////////////
  392. #ifdef LOGNAME
  393. #undef LOGNAME
  394. #endif
  395. #define LOGNAME "[MICROSERVER] "
  396. #ifdef DEBUG_LOG
  397. #undef DEBUG_LOG
  398. #endif
  399. #define DEBUG_LOG(x) { if (gOptions.microServerDebug()) DLOG((x)); }
  400. microServer::microServer(const string &listenAddr, const u_short listenPort,
  401. const AllowableProtocols_t protocols,
  402. const ListenTypes_t types) throw(exception)
  403. : m_protocols(protocols)
  404. {
  405. try
  406. {
  407. m_socket = socketOps::createTCPSocketTHROW();
  408. #ifndef _WIN32
  409. {
  410. int bflag = 1;
  411. setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &bflag, sizeof(bflag));
  412. #if (defined PLATFORM_LINUX || defined PLATFORM_ARMv6 || defined PLATFORM_ARMv7)
  413. int wait = 1;
  414. setsockopt(m_socket, IPPROTO_TCP, TCP_DEFER_ACCEPT, &wait, sizeof(wait));
  415. #endif
  416. #ifdef PLATFORM_BSD
  417. struct accept_filter_arg af = {"dataready", ""};
  418. setsockopt(m_socket, SOL_SOCKET, SO_ACCEPTFILTER, &af, sizeof(af));
  419. #endif
  420. }
  421. #endif
  422. socketOps::bindTHROW(m_socket, listenPort, listenAddr);
  423. socketOps::listenTHROW(m_socket);
  424. socketOps::setNonblockTHROW(m_socket, true);
  425. bindMessage(types, listenPort);
  426. }
  427. catch (const exception &ex)
  428. {
  429. socketOps::forgetTCPSocket(m_socket);
  430. string error = ex.what();
  431. throw runtime_error(LOGNAME "Error opening port " + tos(listenPort) + " because " + toLower(error));
  432. }
  433. }
  434. void microServer::bindMessage(const ListenTypes_t types, const u_short listenPort) throw()
  435. {
  436. string message = "Listening for connections on port ";
  437. if ((types & microServer::L_SOURCE) && (types & microServer::L_CLIENT))
  438. {
  439. message = "Listening for source and client connections on port ";
  440. }
  441. else if ((types & microServer::L_FLASH))
  442. {
  443. message = "Listening for flash policy server connection on port ";
  444. }
  445. else if ((types & microServer::L_SOURCE))
  446. {
  447. message = "Listening for legacy source connections on port ";
  448. }
  449. else if ((types & microServer::L_SOURCE2))
  450. {
  451. message = "Listening for source connections on port ";
  452. }
  453. else if ((types & microServer::L_CLIENT_ALT))
  454. {
  455. message = "Listening for client connections on alternate port ";
  456. }
  457. else if ((types & microServer::L_CLIENT))
  458. {
  459. message = "Listening for client connections on port ";
  460. }
  461. ILOG(LOGNAME + message + tos(listenPort));
  462. }
  463. void microServer::updateProtocols(AllowableProtocols_t protocols, ListenTypes_t types, const u_short listenPort) throw()
  464. {
  465. m_protocols = protocols;
  466. bindMessage(types, listenPort);
  467. }
  468. microServer::~microServer() throw()
  469. {
  470. string addr;
  471. u_short port = 0;
  472. socketOps::getsockname(m_socket, addr, port);
  473. socketOps::forgetTCPSocket(m_socket);
  474. if (!iskilled())
  475. {
  476. ELOG(LOGNAME "Unexpected stop detected for listening for connections on port " + tos(port));
  477. ELOG(LOGNAME "This should not happen and prevents the DNAS from working correctly.");
  478. ELOG(LOGNAME "DNAS restart is required. If this keeps happening, enable all debugging options and provide the logs to Shoutcast support.");
  479. }
  480. else
  481. {
  482. DEBUG_LOG(LOGNAME "Stopped listening for connections on port " + tos(port));
  483. }
  484. }
  485. void microServer::timeSlice() throw(exception)
  486. {
  487. static int repeated = 0;
  488. // don't allow any new connections when the server is stopping
  489. if (!iskilled())
  490. {
  491. try
  492. {
  493. string addr;
  494. u_short port = 0;
  495. socketOps::tSOCKET newSock = socketOps::acceptTHROW(m_socket, addr, port, true);
  496. if (newSock != socketOps::cSOCKET_ERROR)
  497. {
  498. socketOps::getpeername(newSock, addr, port);
  499. string hostName = addr;
  500. if (gOptions.nameLookups())
  501. {
  502. if (socketOps::addressToHostName(addr, port, hostName))
  503. {
  504. hostName = addr;
  505. }
  506. }
  507. socketOps::setNonblockTHROW(newSock, true);
  508. DEBUG_LOG(LOGNAME "Connection received from " + addr + ":" + tos(port));
  509. threadedRunner::scheduleRunnable(new microConnection(newSock, hostName, addr, port, m_protocols));
  510. repeated = 0;
  511. }
  512. }
  513. catch (const tagged_error &ex)
  514. {
  515. ELOG(ex.what());
  516. }
  517. catch (const exception &ex)
  518. {
  519. string msg = ex.what();
  520. if (!msg.empty())
  521. {
  522. if (msg.find("Could not call") == 0)
  523. {
  524. // serious error, log unless repeated and delay a retry
  525. if ((repeated & 255) == 0)
  526. ELOG(LOGNAME + msg);
  527. ++repeated;
  528. m_result.schedule (1000);
  529. return;
  530. }
  531. ELOG(LOGNAME + msg);
  532. }
  533. }
  534. m_result.read();
  535. return;
  536. }
  537. m_result.done();
  538. }
  539. ///////////////////////////////////////////////////
  540. microConnection::microConnection(const socketOps::tSOCKET s, const string &hostName, const string &addr,
  541. const u_short port, const microServer::AllowableProtocols_t protocols) throw()
  542. : runnable(s), m_srcHostName(hostName),
  543. m_srcAddress(addr), m_srcPort(port),
  544. m_protocols(protocols)
  545. {
  546. }
  547. microConnection::~microConnection() throw()
  548. {
  549. socketOps::forgetTCPSocket(m_socket);
  550. }
  551. void microConnection::timeSlice() throw(exception)
  552. {
  553. time_t cur_time;
  554. const int autoDumpTime = ::detectAutoDumpTimeout(cur_time, m_lastActivityTime, (recvAddrLogString(m_srcAddress, m_srcPort) +
  555. "Got timeout waiting for data"), gOptions.microServerDebug());
  556. const int maxHeaderLineSize = gOptions.maxHeaderLineSize();
  557. const bool flash_policy = !!(m_protocols & P_FLASHPOLICYFILE);
  558. bool uvox_checked = false;
  559. runnable *runnable = NULL;
  560. char buf[MAX_MESSAGE_SIZE] = {0};
  561. // if we've got a 1.x source connection then only handle
  562. // on a per-byte basis, for everything else, try getting
  563. // a few bytes so we can use that as a guide on how then
  564. // to try to process things a bit quicker than per-byte.
  565. // int amt = (!(m_protocols & P_SHOUTCAST1SOURCE) ? UV2X_HDR_SIZE : 1);
  566. int amt = (m_ssl) ? 1 : 4096;
  567. ssize_t rval = 0;
  568. while (true)
  569. {
  570. if (iskilled() || (size_t)amt > sizeof(buf))
  571. {
  572. m_result.done();
  573. return;
  574. }
  575. int flags = (m_lineBuffer.size() || m_ssl) ? 0 : MSG_PEEK; // use PEEK initially as SSL requires bytes in the socket
  576. if ((rval = recv (buf, amt, flags|MSG_DONTWAIT)) < 1)
  577. {
  578. if (rval == 0)
  579. {
  580. throwEx<runtime_error>((gOptions.microServerDebug() ? (recvAddrLogString(m_srcAddress, m_srcPort) +
  581. "Remote socket closed while waiting for data.") : (utf8)""));
  582. }
  583. else if (rval < 0)
  584. {
  585. rval = socketOps::errCode();
  586. if (rval != SOCKETOPS_WOULDBLOCK)
  587. {
  588. throwEx<runtime_error>((gOptions.microServerDebug() ? (recvAddrLogString(m_srcAddress, m_srcPort) +
  589. "Socket error while waiting for data. " + socketErrString(rval)) : (utf8)""));
  590. }
  591. m_result.schedule();
  592. m_result.read();
  593. m_result.timeout((autoDumpTime - (int)(cur_time - m_lastActivityTime)));
  594. return;
  595. }
  596. }
  597. m_lineBuffer.insert (m_lineBuffer.end(), buf, buf + rval);
  598. int lineSize = (int)m_lineBuffer.size();
  599. if (lineSize > maxHeaderLineSize)
  600. {
  601. throwEx<runtime_error>((gOptions.microServerDebug() ? (recvAddrLogString(m_srcAddress, m_srcPort) +
  602. "Protocol header line is too large - exceeds " + tos(maxHeaderLineSize) +
  603. " bytes") : (utf8)""));
  604. }
  605. if (m_ssl == NULL && flags && (m_lineBuffer [0] == 0x16)) // SSLv3 / TLSv1.x ?
  606. {
  607. if (threadedRunner::m_sslCtx == NULL)
  608. {
  609. throwEx<runtime_error>((gOptions.microServerDebug() ? (recvAddrLogString(m_srcAddress, m_srcPort) +
  610. "Remote socket closed, no SSL configured.") : (utf8)""));
  611. }
  612. if (lineSize < 6)
  613. {
  614. m_result.schedule();
  615. m_result.read();
  616. m_result.timeout((autoDumpTime - (int)(cur_time - m_lastActivityTime)));
  617. return;
  618. }
  619. if ((m_lineBuffer [1] == 0x3) && (m_lineBuffer [5] == 0x1))
  620. {
  621. DLOG ("detected ssl request, checking further");
  622. m_ssl = SSL_new (threadedRunner::m_sslCtx);
  623. SSL_set_accept_state (m_ssl);
  624. SSL_set_fd (m_ssl, (int)m_socket);
  625. SSL_set_mode (m_ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER|SSL_MODE_ENABLE_PARTIAL_WRITE);
  626. m_lineBuffer.clear();
  627. continue;
  628. }
  629. }
  630. utf8::size_type nl = m_lineBuffer.find ((unsigned char)'\n');
  631. if (nl != utf8::npos)
  632. {
  633. rval = lineSize = (int)nl+1; // 0 offset
  634. if (flags) m_lineBuffer.erase (lineSize); // truncate line to maintain parsing
  635. }
  636. if (flags)
  637. ::recv (m_socket, buf, rval, MSG_DONTWAIT); // pull bytes from input, passed any PEEK requirement
  638. if ((lineSize > 0) && (lineSize < UV2X_HDR_SIZE) && (m_lineBuffer [lineSize - 1] != '\n'))
  639. {
  640. if (m_lineBuffer[0] == UVOX2_SYNC_BYTE)
  641. {
  642. // if it looks like it might be a uvox
  643. // frame then grab more on the next go
  644. amt = 3;
  645. }
  646. else
  647. {
  648. // no point in doing any of the checks
  649. // if there is not enough data to view
  650. // e.g. we need to see a valid newline
  651. amt = 1;
  652. }
  653. m_lastActivityTime = ::time(NULL);
  654. continue;
  655. }
  656. if ((lineSize >= UV2X_HDR_SIZE) && (uvox_checked == false))
  657. {
  658. // look at first uvox packet to see if we're running uvox 2 or uvox 2.1
  659. // NOTE: This is a protocol change. We need to add a new packet to 2.1 so request cipher key
  660. const uv2xHdr *voxhdr = reinterpret_cast<const uv2xHdr*>(m_lineBuffer.c_str());
  661. if ((voxhdr->sync == UVOX2_SYNC_BYTE) &&
  662. (ntohs(voxhdr->msgType) == (u_short)MSG_CIPHER))
  663. {
  664. const int wanted = (ntohs(voxhdr->msgLen) + UV2X_OVERHEAD);
  665. if (wanted == lineSize)
  666. {
  667. // we have uvox 2.1
  668. if (m_protocols & P_SHOUTCAST2SOURCE) // only if allowed
  669. {
  670. runnable = new protocol_uvox2Source (*this, (const __uint8 *)m_lineBuffer.c_str(), lineSize);
  671. }
  672. break;
  673. }
  674. amt = min(MAX_MESSAGE_SIZE, (wanted - lineSize));
  675. m_lastActivityTime = ::time(NULL);
  676. continue;
  677. }
  678. // if we've got enough and there's no sync
  679. // byte then there's not point to re-check.
  680. uvox_checked = true;
  681. }
  682. if ((lineSize > 0) && (m_lineBuffer[lineSize - 1] == '\n'))
  683. {
  684. // look at start of line, if it's a GET or POST or some standard HTTP thing, then we
  685. // have either a web request or a client connection request. If that is missing, then
  686. // we have to assume it's a shoutcast source, and we have just received the password.
  687. //
  688. // this should be enough to detect absolute and relative requests made to the server
  689. // if there's no / for absolute paths then we'll reject the request as a bad access.
  690. if ((m_lineBuffer.find((utf8)"GET /") == 0) ||
  691. (m_lineBuffer.find((utf8)"GET h") == 0) ||
  692. (m_lineBuffer.find((utf8)"POST /") == 0) ||
  693. (m_lineBuffer.find((utf8)"POST h") == 0) ||
  694. (m_lineBuffer.find((utf8)"HEAD /") == 0) ||
  695. (m_lineBuffer.find((utf8)"HEAD h") == 0))
  696. {
  697. if (m_protocols & (P_SHOUTCAST1CLIENT |
  698. P_SHOUTCAST2CLIENT |
  699. P_WEB | P_WEB_SETUP))
  700. {
  701. runnable = new protocol_HTTPStyle (*this, stripWhitespace(m_lineBuffer).hideAsString());
  702. }
  703. break;
  704. }
  705. else // assume shoutcast source, and this is the password (though do some checks to sanitise)
  706. {
  707. // and now look for invalid HTTP requests and
  708. // reject them as the earlier handling should
  709. // allow valid relative and absolute requests
  710. if (lineSize > 5)
  711. {
  712. if ((m_lineBuffer.find((utf8)"GET ") == 0) ||
  713. (m_lineBuffer.find((utf8)"POST ") == 0) ||
  714. (m_lineBuffer.find((utf8)"SOURCE ") == 0) ||
  715. (m_lineBuffer.find((utf8)"PUT ") == 0) ||
  716. (m_lineBuffer.find((utf8)"HEAD ") == 0))
  717. {
  718. throwEx<runtime_error>((gOptions.microServerDebug() ? (recvAddrLogString(m_srcAddress, m_srcPort) +
  719. "Invalid HTTP request detected - only valid relative and absolute paths are allowed.") : (utf8)""));
  720. }
  721. }
  722. // if we appear to have a 'PUT' or 'SOURCE' request then we'll need to
  723. // do some different handling in-order to get the correct details before
  724. // we can then actually process the stream as a valid (icecast?) source
  725. if (((m_lineBuffer.find((utf8)"SOURCE ") == 0) ||
  726. (m_lineBuffer.find((utf8)"PUT ") == 0)) &&
  727. ((m_lineBuffer.find((utf8)"HTTP/1.") != utf8::npos) ||
  728. (m_lineBuffer.find((utf8)"ICE/1.") != utf8::npos)))
  729. {
  730. runnable = new protocol_HTTPSource (*this, stripWhitespace(m_lineBuffer).hideAsString());
  731. }
  732. else
  733. {
  734. runnable = new protocol_shoutcastSource (*this, stripWhitespace(m_lineBuffer));
  735. }
  736. }
  737. break;
  738. }
  739. if (flash_policy && (m_lineBuffer.find((utf8)"<policy-file-request/>") == 0))
  740. {
  741. runnable = new protocol_FlashPolicyServer (m_socket, dstAddrLogString(m_srcHostName, m_srcPort));
  742. break;
  743. }
  744. amt = 1;
  745. m_lastActivityTime = ::time(NULL);
  746. } // while
  747. if (runnable)
  748. {
  749. threadedRunner::scheduleRunnable (runnable);
  750. }
  751. m_result.done();
  752. return;
  753. }
  754. /////////////////////////////////////////////////////////////////////////////////////////
  755. // return 0 if line is ready, or a timeout in seconds for next select call if we are still waiting
  756. // lineBuffer and lastActivityTime are updated by this call
  757. const bool runnable::getHTTPStyleHeaderLine(const size_t sid, utf8 &lineBuffer, const utf8 &logMsgPrefix, int maxLineLength) throw(exception)
  758. {
  759. time_t cur_time;
  760. const int autoDumpTime = ::detectAutoDumpTimeout (cur_time, m_lastActivityTime,
  761. (logMsgPrefix + "Timeout waiting for data"), gOptions.microServerDebug(), sid);
  762. const int maxHeaderLineSize = maxLineLength > 0 ? maxLineLength : gOptions.maxHeaderLineSize();
  763. int count = 0;
  764. bool ret = true;
  765. while (true)
  766. {
  767. int rval = 0;
  768. char buf[2] = {0};
  769. if ((rval = recv(buf, 1, 0x0)) < 1)
  770. {
  771. if (rval == 0)
  772. {
  773. if (gOptions.microServerDebug())
  774. ELOG (logMsgPrefix + "Remote socket closed while waiting for data.", LOGNAME, sid);
  775. throwEx<runtime_error>((utf8)"");
  776. }
  777. rval = socketOps::errCode();
  778. if (rval != SOCKETOPS_WOULDBLOCK)
  779. {
  780. if (gOptions.microServerDebug())
  781. ELOG (logMsgPrefix + "Socket error while waiting for data. " + socketErrString(rval), LOGNAME, sid);
  782. throwEx<runtime_error>((utf8)"");
  783. }
  784. // if we've read something then it's likely to be from a POST response
  785. if (lineBuffer.empty() == false)
  786. {
  787. ret = false;
  788. if (count) break;
  789. }
  790. // try again but wait a bit
  791. // so we don't overload it.
  792. m_result.schedule(30);
  793. m_result.timeout((autoDumpTime - (int)(cur_time - m_lastActivityTime)));
  794. return false;
  795. }
  796. ++count;
  797. lineBuffer.insert (lineBuffer.end(), buf, buf + rval);
  798. const int lineSize = (int)lineBuffer.size();
  799. if (lineSize == maxLineLength) break;
  800. if (lineSize > maxHeaderLineSize)
  801. {
  802. ELOG (logMsgPrefix + "Protocol header line is too large - exceeds "
  803. + tos(maxHeaderLineSize) + " bytes", LOGNAME, sid);
  804. throwEx<runtime_error> ((utf8)"");
  805. }
  806. if ((lineSize > 0) && lineBuffer [lineSize - 1] == '\n')
  807. {
  808. break;
  809. }
  810. }
  811. m_result.run();
  812. m_lastActivityTime = ::time(NULL);
  813. return ret;
  814. }
  815. // send a hunk of data out a socket - returns true if send is complete,
  816. // outBuffer and outBufferSize should be initially set to point to the
  817. // data and the size of the data - these values are moved and updated.
  818. const bool runnable::sendDataBuffer(const size_t sid, const uniString::utf8::value_type *&outBuffer,
  819. int &outBufferSize, const uniString::utf8 &logMsgPrefix) throw(std::exception)
  820. {
  821. #if defined(_DEBUG) || defined(DEBUG)
  822. DEBUG_LOG(logMsgPrefix + __FUNCTION__ + " " + tos(outBufferSize));
  823. #endif
  824. if (outBufferSize > 0) // done
  825. {
  826. time_t cur_time;
  827. const int autoDumpTime = ::detectAutoDumpTimeout(cur_time, m_lastActivityTime,
  828. (logMsgPrefix + "Timeout waiting to send data"),
  829. gOptions.microServerDebug(), sid);
  830. int rval = send ((const char *)outBuffer, outBufferSize, 0);
  831. if (rval == 0)
  832. {
  833. throwEx<std::runtime_error>((gOptions.microServerDebug() ? (logMsgPrefix +
  834. "Remote socket closed while sending data") :
  835. (uniString::utf8)""));
  836. }
  837. else if (rval < 0)
  838. {
  839. rval = socketOps::errCode();
  840. if (rval != SOCKETOPS_WOULDBLOCK)
  841. {
  842. throwEx<std::runtime_error>((gOptions.microServerDebug() ? (((
  843. #ifdef _WIN32
  844. rval == WSAECONNABORTED || rval == WSAECONNRESET
  845. #else
  846. rval == ECONNABORTED || rval == ECONNRESET || rval == EPIPE
  847. #endif
  848. ) ? (uniString::utf8)"" : logMsgPrefix +
  849. "Socket error while waiting to send data. " +
  850. socketErrString(rval))) : (uniString::utf8)""));
  851. }
  852. // try again but wait a bit
  853. // so we don't overload it.
  854. m_result.schedule();
  855. m_result.timeout((autoDumpTime - (int)(cur_time - m_lastActivityTime)));
  856. return false;
  857. }
  858. // move pointers
  859. outBufferSize -= rval;
  860. outBuffer += rval;
  861. // update time
  862. m_lastActivityTime = ::time(NULL);
  863. m_result.timeout((autoDumpTime - (int)(cur_time - m_lastActivityTime)));
  864. if (outBufferSize == 0) // done
  865. {
  866. m_result.schedule();
  867. return true;
  868. }
  869. m_result.schedule (160);
  870. return false;
  871. }
  872. m_result.write();
  873. m_result.schedule();
  874. m_result.timeoutSID(sid);
  875. return true;
  876. }
  877. runnable::runnable (runnable &r) throw()
  878. {
  879. m_socket = r.m_socket;
  880. m_ssl = r.m_ssl;
  881. m_lastActivityTime = ::time (NULL);
  882. // the following are handed off to this sub-protocol, so make sure they cannot affect them
  883. r.m_socket = socketOps::cINVALID_SOCKET;
  884. r.m_ssl = NULL;
  885. }
  886. ssize_t runnable::recv (void *buf, size_t len, int flags)
  887. {
  888. if (m_ssl)
  889. {
  890. ssize_t bytes = SSL_read (m_ssl, buf, len);
  891. int code = SSL_get_error (m_ssl, bytes);
  892. // char err[128];
  893. switch (code)
  894. {
  895. case SSL_ERROR_NONE:
  896. case SSL_ERROR_ZERO_RETURN:
  897. break;
  898. case SSL_ERROR_WANT_READ:
  899. case SSL_ERROR_WANT_WRITE:
  900. return -1;
  901. default:
  902. bytes = 0;
  903. }
  904. return bytes;
  905. }
  906. return (ssize_t)::recv (m_socket, (char*)buf, len, flags);
  907. }
  908. ssize_t runnable::send(const void *buf, size_t len, int flags)
  909. {
  910. if (m_ssl)
  911. {
  912. ssize_t bytes = SSL_write (m_ssl, buf, len);
  913. int code = SSL_get_error (m_ssl, bytes);
  914. // char err[128];
  915. switch (code)
  916. {
  917. case SSL_ERROR_NONE:
  918. case SSL_ERROR_ZERO_RETURN:
  919. break;
  920. case SSL_ERROR_WANT_READ:
  921. case SSL_ERROR_WANT_WRITE:
  922. return -1;
  923. default:
  924. return -1;
  925. }
  926. return bytes;
  927. }
  928. return (ssize_t)::send(m_socket, (char*)buf, len, flags);
  929. }
  930. // This pick the dump time for sources, as there is no general class for that yet, unlike listeners
  931. int runnable::detectAutoDumpTimeout (time_t &cur_time, const size_t streamID, const utf8 &msg) throw(runtime_error)
  932. {
  933. const int autoDumpTime = gOptions.stream_autoDumpSourceTime(streamID);
  934. cur_time = ::time(NULL);
  935. if ((autoDumpTime > 0) && ((cur_time - m_lastActivityTime) >= autoDumpTime))
  936. {
  937. WLOG (msg, LOGNAME, streamID);
  938. throwEx<runtime_error>("");
  939. }
  940. return autoDumpTime;
  941. }
  942. unsigned long threadedRunner::SSL_idFunction (void)
  943. {
  944. return threadedRunner::getCurrentThreadID();
  945. }
  946. void threadedRunner::SSL_lockingFunction (int mode, int n, const char * /*file*/, int /*line*/)
  947. {
  948. if (mode & CRYPTO_LOCK)
  949. m_sslMutexes[n].lock();
  950. else
  951. m_sslMutexes[n].unlock();
  952. }
  953. void threadedRunner::SSL_shutdown ()
  954. {
  955. #if !defined(WIN32) && OPENSSL_VERSION_NUMBER < 0x10000000
  956. CRYPTO_set_id_callback (NULL);
  957. #endif
  958. CRYPTO_set_locking_callback (NULL);
  959. if (m_sslCtx)
  960. {
  961. ::SSL_CTX_free (m_sslCtx);
  962. m_sslCtx = NULL;
  963. }
  964. if (m_sslMutexes)
  965. delete [] m_sslMutexes;
  966. m_sslMutexes = NULL;
  967. }
  968. void threadedRunner::SSL_init ()
  969. {
  970. SSL_load_error_strings();
  971. SSL_library_init ();
  972. utf8 cert_file = gOptions.sslCertificateFile();
  973. utf8 key_file = gOptions.sslCertificateKeyFile();
  974. do {
  975. if (cert_file == "") break;
  976. CRYPTO_set_id_callback (&threadedRunner::SSL_idFunction);
  977. #if !defined(WIN32) && OPENSSL_VERSION_NUMBER < 0x10000000
  978. CRYPTO_set_locking_callback (&threadedRunner::SSL_lockingFunction);
  979. #endif
  980. m_sslMutexes = new AOL_namespace::mutex [CRYPTO_num_locks()];
  981. if (m_sslMutexes == NULL)
  982. break;
  983. m_sslCtx = ::SSL_CTX_new (::SSLv23_server_method());
  984. long ssl_opts = ::SSL_CTX_get_options (m_sslCtx);
  985. ::SSL_CTX_set_options (m_sslCtx, ssl_opts|SSL_OP_NO_SSLv2|SSL_OP_NO_SSLv3|SSL_OP_NO_COMPRESSION);
  986. if (::SSL_CTX_use_certificate_chain_file (m_sslCtx, (char*)cert_file.c_str()) <= 0)
  987. {
  988. WLOG ("[MAIN] Invalid certificate file " + cert_file);
  989. break;
  990. }
  991. utf8 &pkfile = key_file.empty() ? cert_file : key_file;
  992. if (::SSL_CTX_use_PrivateKey_file (m_sslCtx, (char*)pkfile.c_str(), SSL_FILETYPE_PEM) <= 0)
  993. {
  994. WLOG ("[MAIN] Invalid private key file " + pkfile);
  995. break;
  996. }
  997. if (! SSL_CTX_check_private_key (m_sslCtx))
  998. {
  999. WLOG ("[MAIN] Invalid, private key does not match public key, " + pkfile);
  1000. break;
  1001. }
  1002. ILOG ("[MAIN] SSL keys installed");
  1003. return;
  1004. } while (0);
  1005. if (m_sslCtx)
  1006. {
  1007. WLOG ("[MAIN] failed to set up SSL, " + utf8(::ERR_reason_error_string (::ERR_peek_last_error())));
  1008. ::SSL_CTX_free (m_sslCtx);
  1009. m_sslCtx = NULL;
  1010. }
  1011. CRYPTO_set_id_callback (NULL);
  1012. CRYPTO_set_locking_callback (NULL);
  1013. if (m_sslMutexes)
  1014. delete [] m_sslMutexes;
  1015. m_sslMutexes = NULL;
  1016. }