Queue_item.php 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. <?php
  2. /**
  3. * Table Definition for queue_item
  4. */
  5. require_once INSTALLDIR.'/classes/Memcached_DataObject.php';
  6. class Queue_item extends Managed_DataObject
  7. {
  8. ###START_AUTOCODE
  9. /* the code below is auto generated do not remove the above tag */
  10. public $__table = 'queue_item'; // table name
  11. public $id; // int(4) primary_key not_null
  12. public $frame; // blob not_null
  13. public $transport; // varchar(32)
  14. public $created; // datetime() not_null
  15. public $claimed; // datetime()
  16. /* the code above is auto generated do not remove the tag below */
  17. ###END_AUTOCODE
  18. public static function schemaDef()
  19. {
  20. return array(
  21. 'fields' => array(
  22. 'id' => array('type' => 'serial', 'not null' => true, 'description' => 'unique identifier'),
  23. 'frame' => array('type' => 'blob', 'not null' => true, 'description' => 'data: object reference or opaque string'),
  24. 'transport' => array('type' => 'varchar', 'length' => 32, 'not null' => true, 'description' => 'queue for what? "email", "xmpp", "sms", "irc", ...'),
  25. 'created' => array('type' => 'datetime', 'not null' => true, 'description' => 'date this record was created'),
  26. 'claimed' => array('type' => 'datetime', 'description' => 'date this item was claimed'),
  27. ),
  28. 'primary key' => array('id'),
  29. 'indexes' => array(
  30. 'queue_item_created_idx' => array('created'),
  31. ),
  32. );
  33. }
  34. /**
  35. * @param mixed $transports name of a single queue or array of queues to pull from
  36. * If not specified, checks all queues in the system.
  37. */
  38. static function top($transports=null, array $ignored_transports=array()) {
  39. $qi = new Queue_item();
  40. if ($transports) {
  41. if (is_array($transports)) {
  42. // @fixme use safer escaping
  43. $list = implode("','", array_map(array($qi, 'escape'), $transports));
  44. $qi->whereAdd("transport in ('$list')");
  45. } else {
  46. $qi->transport = $transports;
  47. }
  48. }
  49. if (!empty($ignored_transports)) {
  50. // @fixme use safer escaping
  51. $list = implode("','", array_map(array($qi, 'escape'), $ignored_transports));
  52. $qi->whereAdd("transport NOT IN ('$list')");
  53. }
  54. $qi->orderBy('created');
  55. $qi->whereAdd('claimed is null');
  56. $qi->limit(1);
  57. $cnt = $qi->find(true);
  58. if ($cnt) {
  59. // XXX: potential race condition
  60. // can we force it to only update if claimed is still null
  61. // (or old)?
  62. common_log(LOG_INFO, 'claiming queue item id = ' . $qi->getID() . ' for transport ' . $qi->transport);
  63. $orig = clone($qi);
  64. $qi->claimed = common_sql_now();
  65. $result = $qi->update($orig);
  66. if ($result) {
  67. common_log(LOG_DEBUG, 'claim succeeded.');
  68. return $qi;
  69. } else {
  70. common_log(LOG_ERR, 'claim of queue item id= ' . $qi->getID() . ' for transport ' . $qi->transport . ' failed.');
  71. }
  72. }
  73. $qi = null;
  74. return null;
  75. }
  76. /**
  77. * Release a claimed item.
  78. */
  79. function releaseClaim()
  80. {
  81. // DB_DataObject doesn't let us save nulls right now
  82. $sql = sprintf("UPDATE queue_item SET claimed=NULL WHERE id=%d", $this->getID());
  83. $this->query($sql);
  84. $this->claimed = null;
  85. $this->encache();
  86. }
  87. }