Queue_item.php 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. <?php
  2. // This file is part of GNU social - https://www.gnu.org/software/social
  3. //
  4. // GNU social is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Affero General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // GNU social is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Affero General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Affero General Public License
  15. // along with GNU social. If not, see <http://www.gnu.org/licenses/>.
  16. /**
  17. * Table Definition for queue_item
  18. */
  19. defined('GNUSOCIAL') || die();
  20. class Queue_item extends Managed_DataObject
  21. {
  22. ###START_AUTOCODE
  23. /* the code below is auto generated do not remove the above tag */
  24. public $__table = 'queue_item'; // table name
  25. public $id; // int(4) primary_key not_null
  26. public $frame; // blob not_null
  27. public $transport; // varchar(32)
  28. public $created; // datetime()
  29. public $claimed; // datetime()
  30. /* the code above is auto generated do not remove the tag below */
  31. ###END_AUTOCODE
  32. public static function schemaDef()
  33. {
  34. return [
  35. 'fields' => [
  36. 'id' => ['type' => 'serial', 'not null' => true, 'description' => 'unique identifier'],
  37. 'frame' => ['type' => 'blob', 'not null' => true, 'description' => 'data: object reference or opaque string'],
  38. 'transport' => ['type' => 'varchar', 'length' => 32, 'not null' => true, 'description' => 'queue for what? "email", "xmpp", "sms", "irc", ...'],
  39. 'created' => ['type' => 'datetime', 'description' => 'date this record was created'],
  40. 'claimed' => ['type' => 'datetime', 'description' => 'date this item was claimed'],
  41. ],
  42. 'primary key' => ['id'],
  43. 'indexes' => [
  44. 'queue_item_created_id_idx' => ['created', 'id'],
  45. ],
  46. ];
  47. }
  48. /**
  49. * @param mixed $transports name of a single queue or array of queues to pull from
  50. * If not specified, checks all queues in the system.
  51. */
  52. public static function top($transports = null, array $ignored_transports = [])
  53. {
  54. $qi = new Queue_item();
  55. if ($transports) {
  56. if (is_array($transports)) {
  57. $qi->whereAddIn(
  58. 'transport',
  59. $transports,
  60. $qi->columnType('transport')
  61. );
  62. } else {
  63. $qi->transport = $transports;
  64. }
  65. }
  66. if (!empty($ignored_transports)) {
  67. $qi->whereAddIn(
  68. '!transport',
  69. $ignored_transports,
  70. $qi->columnType('transport')
  71. );
  72. }
  73. $qi->whereAdd('claimed IS NULL');
  74. $qi->orderBy('created, id');
  75. $qi->limit(1);
  76. $cnt = $qi->find(true);
  77. if ($cnt) {
  78. // XXX: potential race condition
  79. // can we force it to only update if claimed is still null
  80. // (or old)?
  81. common_log(LOG_INFO, 'claiming queue item id = ' . $qi->getID() . ' for transport ' . $qi->transport);
  82. $orig = clone($qi);
  83. $qi->claimed = common_sql_now();
  84. $result = $qi->update($orig);
  85. if ($result) {
  86. common_log(LOG_DEBUG, 'claim succeeded.');
  87. return $qi;
  88. } else {
  89. common_log(LOG_ERR, 'claim of queue item id= ' . $qi->getID() . ' for transport ' . $qi->transport . ' failed.');
  90. }
  91. }
  92. unset($qi);
  93. return null;
  94. }
  95. /**
  96. * Release a claimed item.
  97. */
  98. public function releaseClaim()
  99. {
  100. // @fixme Consider $this->sqlValue('NULL')
  101. $ret = $this->query(sprintf(
  102. 'UPDATE queue_item SET claimed = NULL WHERE id = %d',
  103. $this->getID()
  104. ));
  105. if ($ret) {
  106. $this->claimed = null;
  107. $this->encache();
  108. }
  109. }
  110. }