Queue_item.php 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. <?php
  2. /**
  3. * Table Definition for queue_item
  4. */
  5. require_once INSTALLDIR.'/classes/Memcached_DataObject.php';
  6. class Queue_item extends Managed_DataObject
  7. {
  8. ###START_AUTOCODE
  9. /* the code below is auto generated do not remove the above tag */
  10. public $__table = 'queue_item'; // table name
  11. public $id; // int(4) primary_key not_null
  12. public $frame; // blob not_null
  13. public $transport; // varchar(32)
  14. public $created; // datetime() not_null
  15. public $claimed; // datetime()
  16. /* the code above is auto generated do not remove the tag below */
  17. ###END_AUTOCODE
  18. public static function schemaDef()
  19. {
  20. return array(
  21. 'fields' => array(
  22. 'id' => array('type' => 'serial', 'not null' => true, 'description' => 'unique identifier'),
  23. 'frame' => array('type' => 'blob', 'not null' => true, 'description' => 'data: object reference or opaque string'),
  24. 'transport' => array('type' => 'varchar', 'length' => 32, 'not null' => true, 'description' => 'queue for what? "email", "xmpp", "sms", "irc", ...'),
  25. 'created' => array('type' => 'datetime', 'not null' => true, 'description' => 'date this record was created'),
  26. 'claimed' => array('type' => 'datetime', 'description' => 'date this item was claimed'),
  27. ),
  28. 'primary key' => array('id'),
  29. 'indexes' => array(
  30. 'queue_item_created_idx' => array('created'),
  31. ),
  32. );
  33. }
  34. /**
  35. * @param mixed $transports name of a single queue or array of queues to pull from
  36. * If not specified, checks all queues in the system.
  37. */
  38. static function top($transports=null) {
  39. $qi = new Queue_item();
  40. if ($transports) {
  41. if (is_array($transports)) {
  42. // @fixme use safer escaping
  43. $list = implode("','", array_map(array($qi, 'escape'), $transports));
  44. $qi->whereAdd("transport in ('$list')");
  45. } else {
  46. $qi->transport = $transports;
  47. }
  48. }
  49. $qi->orderBy('created');
  50. $qi->whereAdd('claimed is null');
  51. $qi->limit(1);
  52. $cnt = $qi->find(true);
  53. if ($cnt) {
  54. // XXX: potential race condition
  55. // can we force it to only update if claimed is still null
  56. // (or old)?
  57. common_log(LOG_INFO, 'claiming queue item id = ' . $qi->id .
  58. ' for transport ' . $qi->transport);
  59. $orig = clone($qi);
  60. $qi->claimed = common_sql_now();
  61. $result = $qi->update($orig);
  62. if ($result) {
  63. common_log(LOG_INFO, 'claim succeeded.');
  64. return $qi;
  65. } else {
  66. common_log(LOG_INFO, 'claim failed.');
  67. }
  68. }
  69. $qi = null;
  70. return null;
  71. }
  72. /**
  73. * Release a claimed item.
  74. */
  75. function releaseClaim()
  76. {
  77. // DB_DataObject doesn't let us save nulls right now
  78. $sql = sprintf("UPDATE queue_item SET claimed=NULL WHERE id=%d", $this->id);
  79. $this->query($sql);
  80. $this->claimed = null;
  81. $this->encache();
  82. }
  83. }