summaryrefslogtreecommitdiff
path: root/modules/system
diff options
context:
space:
mode:
Diffstat (limited to 'modules/system')
-rw-r--r--modules/system/system.info1
-rw-r--r--modules/system/system.install131
-rw-r--r--modules/system/system.module10
-rw-r--r--modules/system/system.queue.inc255
-rw-r--r--modules/system/system.test92
5 files changed, 489 insertions, 0 deletions
diff --git a/modules/system/system.info b/modules/system/system.info
index 8bf71536f..0afcdaf9a 100644
--- a/modules/system/system.info
+++ b/modules/system/system.info
@@ -6,6 +6,7 @@ version = VERSION
core = 7.x
files[] = system.module
files[] = system.admin.inc
+files[] = system.queue.inc
files[] = image.gd.inc
files[] = system.install
required = TRUE
diff --git a/modules/system/system.install b/modules/system/system.install
index 48dfd9b0e..8dc92d45c 100644
--- a/modules/system/system.install
+++ b/modules/system/system.install
@@ -1052,6 +1052,67 @@ function system_schema() {
'primary key' => array('mlid'),
);
+ $schema['queue'] = array(
+ 'description' => 'Stores items in queues.',
+ 'fields' => array(
+ 'item_id' => array(
+ 'type' => 'serial',
+ 'unsigned' => TRUE,
+ 'not null' => TRUE,
+ 'description' => 'Primary Key: Unique item ID.',
+ ),
+ 'name' => array(
+ 'type' => 'varchar',
+ 'length' => 255,
+ 'not null' => TRUE,
+ 'default' => '',
+ 'description' => 'The queue name.',
+ ),
+ 'consumer_id' => array(
+ 'type' => 'int',
+ 'not null' => TRUE,
+ 'default' => 0,
+ 'description' => 'The ID of the dequeuing consumer.',
+ ),
+ 'data' => array(
+ 'type' => 'text',
+ 'not null' => FALSE,
+ 'size' => 'big',
+ 'serialize' => TRUE,
+ 'description' => 'The arbitrary data for the item.',
+ ),
+ 'expire' => array(
+ 'type' => 'int',
+ 'not null' => TRUE,
+ 'default' => 0,
+ 'description' => 'Timestamp when the claim lease expires on the item.',
+ ),
+ 'created' => array(
+ 'type' => 'int',
+ 'not null' => TRUE,
+ 'default' => 0,
+ 'description' => 'Timestamp when the item was created.',
+ ),
+ ),
+ 'primary key' => array('item_id'),
+ 'indexes' => array(
+ 'consumer_queue' => array('consumer_id', 'name', 'created'),
+ 'consumer_expire' => array('consumer_id', 'expire'),
+ ),
+ );
+
+ $schema['queue_consumer_id'] = array(
+ 'description' => 'Stores queue consumer IDs, used to auto-increment the consumer ID so that a unique consumer ID is used.',
+ 'fields' => array(
+ 'consumer_id' => array(
+ 'type' => 'serial',
+ 'not null' => TRUE,
+ 'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.',
+ ),
+ ),
+ 'primary key' => array('consumer_id'),
+ );
+
$schema['registry'] = array(
'description' => "Each record is a function, class, or interface name and the file it is in.",
'fields' => array(
@@ -3299,6 +3360,76 @@ function system_update_7021() {
}
/**
+ * Add the queue tables.
+ */
+function system_update_7022() {
+ $schema['queue'] = array(
+ 'description' => 'Stores items in queues.',
+ 'fields' => array(
+ 'item_id' => array(
+ 'type' => 'serial',
+ 'unsigned' => TRUE,
+ 'not null' => TRUE,
+ 'description' => 'Primary Key: Unique item ID.',
+ ),
+ 'name' => array(
+ 'type' => 'varchar',
+ 'length' => 255,
+ 'not null' => TRUE,
+ 'default' => '',
+ 'description' => 'The queue name.',
+ ),
+ 'consumer_id' => array(
+ 'type' => 'int',
+ 'not null' => TRUE,
+ 'default' => 0,
+ 'description' => 'The ID of the dequeuing consumer.',
+ ),
+ 'data' => array(
+ 'type' => 'text',
+ 'not null' => FALSE,
+ 'size' => 'big',
+ 'serialize' => TRUE,
+ 'description' => 'The arbitrary data for the item.',
+ ),
+ 'expire' => array(
+ 'type' => 'int',
+ 'not null' => TRUE,
+ 'default' => 0,
+ 'description' => 'Timestamp when the claim lease expires on the item.',
+ ),
+ 'created' => array(
+ 'type' => 'int',
+ 'not null' => TRUE,
+ 'default' => 0,
+ 'description' => 'Timestamp when the item was created.',
+ ),
+ ),
+ 'primary key' => array('item_id'),
+ 'indexes' => array(
+ 'consumer_queue' => array('consumer_id', 'name', 'created'),
+ 'consumer_expire' => array('consumer_id', 'expire'),
+ ),
+ );
+
+ $schema['queue_consumer_id'] = array(
+ 'description' => 'Stores queue consumer IDs, used to auto-incrament the consumer ID so that a unique consumer ID is used.',
+ 'fields' => array(
+ 'consumer_id' => array(
+ 'type' => 'serial',
+ 'not null' => TRUE,
+ 'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.',
+ ),
+ ),
+ 'primary key' => array('consumer_id'),
+ );
+ db_create_table($ret, 'queue', $schema['queue']);
+ db_create_table($ret, 'queue_consumer_id', $schema['queue_consumer_id']);
+
+ return $ret;
+}
+
+/**
* @} End of "defgroup updates-6.x-to-7.x"
* The next series of updates should start at 8000.
*/
diff --git a/modules/system/system.module b/modules/system/system.module
index 98d1b2f00..861c3c51c 100644
--- a/modules/system/system.module
+++ b/modules/system/system.module
@@ -1587,6 +1587,16 @@ function system_cron() {
foreach ($cache_tables as $table) {
cache_clear_all(NULL, $table);
}
+
+ // Reset expired items in the default queue implementation table. If that's
+ // not used, this will simply be a no-op.
+ db_update('queue')
+ ->fields(array(
+ 'consumer_id' => 0,
+ 'expire' => 0,
+ ))
+ ->condition('expire', REQUEST_TIME, '<')
+ ->execute();
}
/**
diff --git a/modules/system/system.queue.inc b/modules/system/system.queue.inc
new file mode 100644
index 000000000..9fbed26eb
--- /dev/null
+++ b/modules/system/system.queue.inc
@@ -0,0 +1,255 @@
+<?php
+// $Id$
+
+/**
+ * @file
+ * Queue functionality.
+ */
+
+/**
+ * @defgroup queue Queue operations
+ * @{
+ * The queue system allows placing items in a queue and processing them later.
+ * The system tries to ensure that only one consumer can process an item.
+ *
+ * Before a queue can be used it needs to be created by
+ * DrupalQueueInterface::createQueue().
+ *
+ * Items can be added to the queue by passing an arbitrary data object to
+ * DrupalQueueInterface::createItem().
+ *
+ * To process an item, call DrupalQueueInterface::claimItem() and specify how
+ * long you want to have a lease for working on that item. When finished
+ * processing, the item needs to be deleted by calling
+ * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
+ * made available again by the DrapalQueueInterface implementation once the
+ * lease expires. Another consumer will then be able to receive it when calling
+ * DrupalQueueInterface::claimItem().
+ *
+ * The $item object used by the DrupalQueueInterface can contain arbitrary
+ * metadata depending on the implementation. Systems using the interface should
+ * only rely on the data property which will contain the information passed to
+ * DrupalQueueInterface::createItem(). The full queue item returned by
+ * DrupalQueueInterface::createItem() needs to be passed to
+ * DrupalQueueInterface::deleteItem() once processing is completed.
+ *
+ * While the queue system makes a best effort to preserve order in messages,
+ * due to the pluggable nature of the queue, there is no guarantee that items
+ * will be delivered on claim in the order they were sent. For example, some
+ * implementations like beanstalkd or others with distributed back-ends like
+ * Amazon SQS will be managing jobs for a large set of producers and consumers
+ * where a strict FIFO ordering will likely not be preserved.
+ *
+ * The system also makes no guarantees about a task only being executed once:
+ * callers that have non-idempotent tasks either need to live with the
+ * possiblity of the task being invoked multiple times in cases where a claim
+ * lease expires, or need to implement their own transactions to make their
+ * tasks idempotent.
+ */
+
+/**
+ * Factory class for interacting with queues.
+ */
+class DrupalQueue {
+ /**
+ * Get a queue object for a given name.
+ *
+ * @param $name
+ * Arbitrary string. The name of the queue to work with.
+ * @return
+ * The queue object for a given name.
+ */
+ public static function get($name) {
+ static $queues;
+ if (!isset($queues[$name])) {
+ $class = variable_get('queue_module_'. $name, 'System') . 'Queue';
+ $queues[$name] = new $class($name);
+ }
+ return $queues[$name];
+ }
+}
+
+interface DrupalQueueInterface {
+ /**
+ * Start working with a queue.
+ *
+ * @param $name
+ * Arbitrary string. The name of the queue to work with.
+ */
+ public function __construct($name);
+
+ /**
+ * Add a queue item and store it directly to the queue.
+ *
+ * @param $data
+ * Arbitrary data to be associated with the new task in the queue.
+ * @return
+ * TRUE if the item was successfully created and was (best effort) added
+ * to the queue, otherwise FALSE. We don't guarantee the item was
+ * committed to disk, that your disk wasn't hit by a meteor, etc, but as
+ * far as we know, the item is now in the queue.
+ */
+ public function createItem($data);
+
+ /**
+ * Retrieve the number of items in the queue.
+ *
+ * This is intended to provide a "best guess" count of the number of items in
+ * the queue. Depending on the implementation and the setup, the accuracy of
+ * the results of this function may vary.
+ *
+ * e.g. On a busy system with a large number of consumers and items, the
+ * result might only be valid for a fraction of a second and not provide an
+ * accurate representation.
+ *
+ * @return
+ * An integer estimate of the number of items in the queue.
+ */
+ public function numberOfItems();
+
+ /**
+ * Claim an item in the queue for processing.
+ *
+ * @param $lease_time
+ * How long the processing is expected to take in seconds, defaults to an
+ * hour. After this lease expires, the item will be reset and another
+ * consumer can claim the item. For idempotent tasks (which can be run
+ * multiple times without side effects), shorter lease times would result
+ * in lower latency in case a consumer fails. For tasks that should not be
+ * run more than once (non-idempotent), a larger lease time will make it
+ * more rare for a given task to run multiple times in cases of failure,
+ * at the cost of higher latency.
+ * @return
+ * On success we return an item object. If the queue is unable to claim an
+ * item it returns false. This implies a best effort to retrieve an item
+ * and either the queue is empty or there is some other non-recoverable
+ * problem.
+ */
+ public function claimItem($lease_time = 3600);
+
+ /**
+ * Delete a finished item from the queue.
+ *
+ * @param $item
+ * The item returned by claimItem().
+ */
+ public function deleteItem($item);
+
+ /**
+ * Create a queue.
+ *
+ * Called during installation and should be used to perform any necessary
+ * initialization operations. This should not be confused with the
+ * constructor for these objects, which is called every time an object is
+ * instantiated to operate on a queue. This operation is only needed the
+ * first time a given queue is going to be initialized (for example, to make
+ * a new database table or directory to hold tasks for the queue -- it
+ * depends on the queue implementation if this is necessary at all).
+ */
+ public function createQueue();
+
+ /**
+ * Delete a queue and every item in the queue.
+ */
+ public function deleteQueue();
+}
+
+/**
+ * Default queue implementation.
+ */
+class SystemQueue implements DrupalQueueInterface {
+ /**
+ * Our internal consumer ID for this queue instance.
+ *
+ * This is created lazily when we start consuming items with claimItem().
+ *
+ * @var integer
+ */
+ protected $consumerId;
+
+ /**
+ * The name of the queue this instance is working with.
+ *
+ * @var string
+ */
+ protected $name;
+
+ public function __construct($name) {
+ $this->name = $name;
+ }
+
+ public function createItem($data) {
+ $record = new stdClass();
+ $record->name = $this->name;
+ $record->data = $data;
+ $record->consumer_id = 0;
+ // We cannot rely on REQUEST_TIME because many items might be created by a
+ // single request which takes longer than 1 second.
+ $record->created = time();
+ return drupal_write_record('queue', $record) !== FALSE;
+ }
+
+ public function numberOfItems() {
+ return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
+ }
+
+ public function claimItem($lease_time = 30) {
+ if (!isset($this->consumerId)) {
+ $this->consumerId = db_insert('queue_consumer_id')
+ ->useDefaults(array('consumer_id'))
+ ->execute();
+ }
+ // Claim an item by updating its consumer_id and expire fields. If claim
+ // is not successful another thread may have claimed the item in the
+ // meantime. Therefore loop until an item is successfully claimed or we are
+ // reasonably sure there are no unclaimed items left.
+ while (TRUE) {
+ $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE consumer_id = 0 AND name = :name ORDER BY created ASC', array(':name' => $this->name), 0, 1)->fetchObject();
+ if ($item) {
+ // Try to mark the item as ours. We cannot rely on REQUEST_TIME
+ // because items might be claimed by a single consumer which runs
+ // longer than 1 second. If we continue to use REQUEST_TIME instead of
+ // the current time(), we steal time from the lease, and will tend to
+ // reset items before the lease should really expire.
+ $update = db_update('queue')
+ ->fields(array(
+ 'consumer_id' => $this->consumerId,
+ 'expire' => time() + $lease_time,
+ ))
+ ->condition('item_id', $item->item_id)
+ ->condition('consumer_id', 0);
+ // If there are affected rows, this update succeeded.
+ if ($update->execute()) {
+ $item->data = unserialize($item->data);
+ return $item;
+ }
+ }
+ else {
+ // No items currently available to claim.
+ return FALSE;
+ }
+ }
+ }
+
+ public function deleteItem($item) {
+ db_delete('queue')
+ ->condition('item_id', $item->item_id)
+ ->execute();
+ }
+
+ public function createQueue() {
+ // All tasks are stored in a single database table (which is created when
+ // Drupal is first installed) so there is nothing we need to do to create
+ // a new queue.
+ }
+
+ public function deleteQueue() {
+ db_delete('queue')
+ ->condition('name', $this->name)
+ ->execute();
+ }
+}
+
+/**
+ * @} End of "defgroup queue".
+ */
diff --git a/modules/system/system.test b/modules/system/system.test
index 1e26330a9..60c872df0 100644
--- a/modules/system/system.test
+++ b/modules/system/system.test
@@ -895,3 +895,95 @@ class SystemThemeFunctionalTest extends DrupalWebTestCase {
$this->assertRaw('themes/garland', t('Site default theme used on the add content page.'));
}
}
+
+
+/**
+ * Test the basic queue functionality.
+ */
+class QueueTestCase extends DrupalWebTestCase {
+ public static function getInfo() {
+ return array(
+ 'name' => t('Queue functionality'),
+ 'description' => t('Queues and dequeues a set of items to check the basic queue functionality.'),
+ 'group' => t('System'),
+ );
+ }
+
+ /**
+ * Queues and dequeues a set of items to check the basic queue functionality.
+ */
+ function testQueue() {
+ // Create two queues.
+ $queue1 = DrupalQueue::get($this->randomName());
+ $queue1->createQueue();
+ $queue2 = DrupalQueue::get($this->randomName());
+ $queue2->createQueue();
+
+ // Create four items.
+ $data = array();
+ for ($i = 0; $i < 4; $i++) {
+ $data[] = array($this->randomName() => $this->randomName());
+ }
+
+ // Queue items 1 and 2 in the queue1.
+ $queue1->createItem($data[0]);
+ $queue1->createItem($data[1]);
+
+ // Retrieve two items from queue1.
+ $items = array();
+ $new_items = array();
+
+ $items[] = $item = $queue1->claimItem();
+ $new_items[] = $item->data;
+
+ $items[] = $item = $queue1->claimItem();
+ $new_items[] = $item->data;
+
+ // First two dequeued items should match the first two items we queued.
+ $this->assertEqual($this->queueScore($data, $new_items), 2, t('Two items matched'));
+
+ // Add two more items.
+ $queue1->createItem($data[2]);
+ $queue1->createItem($data[3]);
+
+ $this->assertTrue($queue1->numberOfItems(), t('Queue 1 is not empty after adding items.'));
+ $this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty while Queue 1 has items'));
+
+ $items[] = $item = $queue1->claimItem();
+ $new_items[] = $item->data;
+
+ $items[] = $item = $queue1->claimItem();
+ $new_items[] = $item->data;
+
+ // All dequeued items should match the items we queued exactly once,
+ // therefore the score must be exactly 4.
+ $this->assertEqual($this->queueScore($data, $new_items), 4, t('Four items matched'));
+
+ // There should be no duplicate items.
+ $this->assertEqual($this->queueScore($new_items, $new_items), 4, t('Four items matched'));
+
+ // Delete all items from queue1.
+ foreach ($items as $item) {
+ $queue1->deleteItem($item);
+ }
+
+ // Check that both queues are empty.
+ $this->assertFalse($queue1->numberOfItems(), t('Queue 1 is empty'));
+ $this->assertFalse($queue2->numberOfItems(), t('Queue 2 is empty'));
+ }
+
+ /**
+ * This function returns the number of equal items in two arrays.
+ */
+ function queueScore($items, $new_items) {
+ $score = 0;
+ foreach ($items as $item) {
+ foreach ($new_items as $new_item) {
+ if ($item === $new_item) {
+ $score++;
+ }
+ }
+ }
+ return $score;
+ }
+}