123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- <?php
- defined('GNUSOCIAL') || die();
- class Queue_item extends Managed_DataObject
- {
-
-
- public $__table = 'queue_item';
- public $id;
- public $frame;
- public $transport;
- public $created;
- public $claimed;
-
-
- public static function schemaDef()
- {
- return array(
- 'fields' => array(
- 'id' => array('type' => 'serial', 'not null' => true, 'description' => 'unique identifier'),
- 'frame' => array('type' => 'blob', 'not null' => true, 'description' => 'data: object reference or opaque string'),
- 'transport' => array('type' => 'varchar', 'length' => 32, 'not null' => true, 'description' => 'queue for what? "email", "xmpp", "sms", "irc", ...'),
- 'created' => array('type' => 'datetime', 'description' => 'date this record was created'),
- 'claimed' => array('type' => 'datetime', 'description' => 'date this item was claimed'),
- ),
- 'primary key' => array('id'),
- 'indexes' => array(
- 'queue_item_created_id_idx' => array('created', 'id'),
- ),
- );
- }
-
- public static function top($transports = null, array $ignored_transports = [])
- {
- $qi = new Queue_item();
- if ($transports) {
- if (is_array($transports)) {
- $qi->whereAddIn(
- 'transport',
- $transports,
- $qi->columnType('transport')
- );
- } else {
- $qi->transport = $transports;
- }
- }
- if (!empty($ignored_transports)) {
- $qi->whereAddIn(
- '!transport',
- $ignored_transports,
- $qi->columnType('transport')
- );
- }
- $qi->whereAdd('claimed IS NULL');
- $qi->orderBy('created, id');
- $qi->limit(1);
- $cnt = $qi->find(true);
- if ($cnt) {
-
-
-
- common_log(LOG_INFO, 'claiming queue item id = ' . $qi->getID() . ' for transport ' . $qi->transport);
- $orig = clone($qi);
- $qi->claimed = common_sql_now();
- $result = $qi->update($orig);
- if ($result) {
- common_log(LOG_DEBUG, 'claim succeeded.');
- return $qi;
- } else {
- common_log(LOG_ERR, 'claim of queue item id= ' . $qi->getID() . ' for transport ' . $qi->transport . ' failed.');
- }
- }
- unset($qi);
- return null;
- }
-
- public function releaseClaim()
- {
-
- $ret = $this->query(sprintf(
- 'UPDATE queue_item SET claimed = NULL WHERE id = %d',
- $this->getID()
- ));
- if ($ret) {
- $this->claimed = null;
- $this->encache();
- }
- }
- }
|