summaryrefslogtreecommitdiff
path: root/modules/system
diff options
context:
space:
mode:
Diffstat (limited to 'modules/system')
-rw-r--r--modules/system/system.install45
-rw-r--r--modules/system/system.module1
-rw-r--r--modules/system/system.queue.inc30
3 files changed, 16 insertions, 60 deletions
diff --git a/modules/system/system.install b/modules/system/system.install
index 77ef11498..4febc30e8 100644
--- a/modules/system/system.install
+++ b/modules/system/system.install
@@ -1226,12 +1226,6 @@ function system_schema() {
'default' => '',
'description' => 'The queue name.',
),
- 'consumer_id' => array(
- 'type' => 'int',
- 'not null' => TRUE,
- 'default' => 0,
- 'description' => 'The ID of the dequeuing consumer.',
- ),
'data' => array(
'type' => 'text',
'not null' => FALSE,
@@ -1254,21 +1248,9 @@ function system_schema() {
),
'primary key' => array('item_id'),
'indexes' => array(
- 'consumer_queue' => array('consumer_id', 'name', 'created'),
- 'consumer_expire' => array('consumer_id', 'expire'),
- ),
- );
-
- $schema['queue_consumer_id'] = array(
- 'description' => 'Stores queue consumer IDs, used to auto-increment the consumer ID so that a unique consumer ID is used.',
- 'fields' => array(
- 'consumer_id' => array(
- 'type' => 'serial',
- 'not null' => TRUE,
- 'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.',
- ),
+ 'name_created' => array('name', 'created'),
+ 'expire' => array('expire'),
),
- 'primary key' => array('consumer_id'),
);
$schema['registry'] = array(
@@ -2253,12 +2235,6 @@ function system_update_7022() {
'default' => '',
'description' => 'The queue name.',
),
- 'consumer_id' => array(
- 'type' => 'int',
- 'not null' => TRUE,
- 'default' => 0,
- 'description' => 'The ID of the dequeuing consumer.',
- ),
'data' => array(
'type' => 'text',
'not null' => FALSE,
@@ -2281,25 +2257,12 @@ function system_update_7022() {
),
'primary key' => array('item_id'),
'indexes' => array(
- 'consumer_queue' => array('consumer_id', 'name', 'created'),
- 'consumer_expire' => array('consumer_id', 'expire'),
- ),
- );
-
- $schema['queue_consumer_id'] = array(
- 'description' => 'Stores queue consumer IDs, used to auto-incrament the consumer ID so that a unique consumer ID is used.',
- 'fields' => array(
- 'consumer_id' => array(
- 'type' => 'serial',
- 'not null' => TRUE,
- 'description' => 'Primary Key: Unique consumer ID used to make sure only one consumer gets one item.',
- ),
+ 'name_created' => array('name', 'created'),
+ 'expire' => array('expire'),
),
- 'primary key' => array('consumer_id'),
);
db_create_table('queue', $schema['queue']);
- db_create_table('queue_consumer_id', $schema['queue_consumer_id']);
}
/**
diff --git a/modules/system/system.module b/modules/system/system.module
index 23db12b56..9b6a90bd4 100644
--- a/modules/system/system.module
+++ b/modules/system/system.module
@@ -2426,7 +2426,6 @@ function system_cron() {
// not used, this will simply be a no-op.
db_update('queue')
->fields(array(
- 'consumer_id' => 0,
'expire' => 0,
))
->condition('expire', REQUEST_TIME, '<')
diff --git a/modules/system/system.queue.inc b/modules/system/system.queue.inc
index 2132012fe..c384f581d 100644
--- a/modules/system/system.queue.inc
+++ b/modules/system/system.queue.inc
@@ -182,7 +182,6 @@ class SystemQueue implements DrupalQueueInterface {
$record = new stdClass();
$record->name = $this->name;
$record->data = $data;
- $record->consumer_id = 0;
// We cannot rely on REQUEST_TIME because many items might be created by a
// single request which takes longer than 1 second.
$record->created = time();
@@ -194,30 +193,25 @@ class SystemQueue implements DrupalQueueInterface {
}
public function claimItem($lease_time = 30) {
- if (!isset($this->consumerId)) {
- $this->consumerId = db_insert('queue_consumer_id')
- ->useDefaults(array('consumer_id'))
- ->execute();
- }
- // Claim an item by updating its consumer_id and expire fields. If claim
- // is not successful another thread may have claimed the item in the
- // meantime. Therefore loop until an item is successfully claimed or we are
- // reasonably sure there are no unclaimed items left.
+ // Claim an item by updating its expire fields. If claim is not successful
+ // another thread may have claimed the item in the meantime. Therefore loop
+ // until an item is successfully claimed or we are reasonably sure there
+ // are no unclaimed items left.
while (TRUE) {
- $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE consumer_id = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
+ $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
if ($item) {
- // Try to mark the item as ours. We cannot rely on REQUEST_TIME
- // because items might be claimed by a single consumer which runs
- // longer than 1 second. If we continue to use REQUEST_TIME instead of
- // the current time(), we steal time from the lease, and will tend to
- // reset items before the lease should really expire.
+ // Try to update the item. Only one thread can succeed in UPDATEing the
+ // same row. We cannot rely on REQUEST_TIME because items might be
+ // claimed by a single consumer which runs longer than 1 second. If we
+ // continue to use REQUEST_TIME instead of the current time(), we steal
+ // time from the lease, and will tend to reset items before the lease
+ // should really expire.
$update = db_update('queue')
->fields(array(
- 'consumer_id' => $this->consumerId,
'expire' => time() + $lease_time,
))
->condition('item_id', $item->item_id)
- ->condition('consumer_id', 0);
+ ->condition('expire', 0);
// If there are affected rows, this update succeeded.
if ($update->execute()) {
$item->data = unserialize($item->data);