diff options
author | Angie Byron <webchick@24967.no-reply.drupal.org> | 2010-01-08 06:36:34 +0000 |
---|---|---|
committer | Angie Byron <webchick@24967.no-reply.drupal.org> | 2010-01-08 06:36:34 +0000 |
commit | 0dd161277046bab1ec994e8d756c4e99c717421e (patch) | |
tree | 5f33a4f472b6b5e10544924e5bf6cfc0348b8d08 /includes | |
parent | e07b9d35a1f4dcb1678c4d3bb6482daaebea6350 (diff) | |
download | brdo-0dd161277046bab1ec994e8d756c4e99c717421e.tar.gz brdo-0dd161277046bab1ec994e8d756c4e99c717421e.tar.bz2 |
#629794 by yched: Fix Scaling issues with batch API. (with tests)
Diffstat (limited to 'includes')
-rw-r--r-- | includes/batch.inc | 81 | ||||
-rw-r--r-- | includes/batch.queue.inc | 72 | ||||
-rw-r--r-- | includes/form.inc | 177 | ||||
-rw-r--r-- | includes/update.inc | 45 |
4 files changed, 302 insertions, 73 deletions
diff --git a/includes/batch.inc b/includes/batch.inc index c0a7c96e0..850eff710 100644 --- a/includes/batch.inc +++ b/includes/batch.inc @@ -6,7 +6,7 @@ * @file * Batch processing API for processes to run in multiple HTTP requests. * - * Please note that batches are usually invoked by form submissions, which is + * Note that batches are usually invoked by form submissions, which is * why the core interaction functions of the batch processing API live in * form.inc. * @@ -62,8 +62,10 @@ function _batch_page() { // Add batch-specific CSS. foreach ($batch['sets'] as $batch_set) { - foreach ($batch_set['css'] as $css) { - drupal_add_css($css); + if (isset($batch_set['css'])) { + foreach ($batch_set['css'] as $css) { + drupal_add_css($css); + } } } @@ -252,6 +254,12 @@ function _batch_process() { timer_start('batch_processing'); } + if (empty($current_set['start'])) { + $current_set['start'] = microtime(TRUE); + } + + $queue = _batch_queue($current_set); + while (!$current_set['success']) { // If this is the first time we iterate this batch set in the current // request, we check if it requires an additional file for functions @@ -261,42 +269,49 @@ function _batch_process() { } $task_message = ''; - // We assume a single pass operation and set the completion level to 1 by + // Assume a single pass operation and set the completion level to 1 by // default. $finished = 1; - if ((list($function, $args) = reset($current_set['operations'])) && function_exists($function)) { - // Build the 'context' array, execute the function call, and retrieve the - // user message. + + if ($item = $queue->claimItem()) { + list($function, $args) = $item->data; + + // Build the 'context' array and execute the function call. $batch_context = array( 'sandbox' => &$current_set['sandbox'], 'results' => &$current_set['results'], 'finished' => &$finished, 'message' => &$task_message, ); - // Process the current operation. call_user_func_array($function, array_merge($args, array(&$batch_context))); - } - if ($finished == 1) { - // Make sure this step is not counted twice when computing $current. - $finished = 0; - // Remove the processed operation and clear the sandbox. - array_shift($current_set['operations']); - $current_set['sandbox'] = array(); + if ($finished == 1) { + // Make sure this step is not counted twice when computing $current. + $finished = 0; + // Remove the processed operation and clear the sandbox. + $queue->deleteItem($item); + $current_set['count']--; + $current_set['sandbox'] = array(); + } } // When all operations in the current batch set are completed, browse - // through the remaining sets until we find a set that contains operations. - // Note that _batch_next_set() executes stored form submit handlers in - // remaining batch sets, which can add new sets to the batch. + // through the remaining sets, marking them 'successfully processed' + // along the way, until we find a set that contains operations. + // _batch_next_set() executes form submit handlers stored in 'control' + // sets (see form_execute_handlers()), which can in turn add new sets to + // the batch. $set_changed = FALSE; $old_set = $current_set; - while (empty($current_set['operations']) && ($current_set['success'] = TRUE) && _batch_next_set()) { + while (empty($current_set['count']) && ($current_set['success'] = TRUE) && _batch_next_set()) { $current_set = &_batch_current_set(); + $current_set['start'] = microtime(TRUE); $set_changed = TRUE; } + // At this point, either $current_set contains operations that need to be // processed or all sets have been completed. + $queue = _batch_queue($current_set); // If we are in progressive mode, break processing after 1 second. if ($batch['progressive'] && timer_read('batch_processing') > 1000) { @@ -312,33 +327,31 @@ function _batch_process() { // Reporting 100% progress will cause the whole batch to be considered // processed. If processing was paused right after moving to a new set, // we have to use the info from the new (unprocessed) set. - if ($set_changed && isset($current_set['operations'])) { + if ($set_changed && isset($current_set['queue'])) { // Processing will continue with a fresh batch set. - $remaining = count($current_set['operations']); + $remaining = $current_set['count']; $total = $current_set['total']; $progress_message = $current_set['init_message']; $task_message = ''; } else { // Processing will continue with the current batch set. - $remaining = count($old_set['operations']); + $remaining = $old_set['count']; $total = $old_set['total']; $progress_message = $old_set['progress_message']; } - $current = $total - $remaining + $finished; + $current = $total - $remaining + $finished; $percentage = _batch_api_percentage($total, $current); - $elapsed = $current_set['elapsed']; - // Estimate remaining with percentage in floating format. - $estimate = $elapsed * ($total - $current) / $current; $values = array( '@remaining' => $remaining, '@total' => $total, '@current' => floor($current), '@percentage' => $percentage, '@elapsed' => format_interval($elapsed / 1000), - '@estimate' => format_interval($estimate / 1000), + // If possible, estimate remaining processing time. + '@estimate' => ($current > 0) ? format_interval(($elapsed * ($total - $current) / $current) / 1000) : '-', ); $message = strtr($progress_message, $values); if (!empty($message)) { @@ -410,7 +423,7 @@ function _batch_next_set() { if (isset($current_set['form_submit']) && ($function = $current_set['form_submit']) && function_exists($function)) { // We use our stored copies of $form and $form_state to account for // possible alterations by previous form submit handlers. - $function($batch['form'], $batch['form_state']); + $function($batch['form_state']['complete form'], $batch['form_state']); } return TRUE; } @@ -426,15 +439,16 @@ function _batch_finished() { $batch = &batch_get(); // Execute the 'finished' callbacks for each batch set, if defined. - foreach ($batch['sets'] as $key => $batch_set) { + foreach ($batch['sets'] as $batch_set) { if (isset($batch_set['finished'])) { // Check if the set requires an additional file for function definitions. if (isset($batch_set['file']) && is_file($batch_set['file'])) { include_once DRUPAL_ROOT . '/' . $batch_set['file']; } if (function_exists($batch_set['finished'])) { - // Format the elapsed time when batch complete. - $batch_set['finished']($batch_set['success'], $batch_set['results'], $batch_set['operations'], format_interval($batch_set['elapsed'] / 1000)); + $queue = _batch_queue($batch_set); + $operations = $queue->getAllItems(); + $batch_set['finished']($batch_set['success'], $batch_set['results'], $operations, format_interval($batch_set['elapsed'] / 1000)); } } } @@ -444,6 +458,11 @@ function _batch_finished() { db_delete('batch') ->condition('bid', $batch['id']) ->execute(); + foreach ($batch['sets'] as $batch_set) { + if ($queue = _batch_queue($batch_set)) { + $queue->deleteQueue(); + } + } } $_batch = $batch; $batch = NULL; diff --git a/includes/batch.queue.inc b/includes/batch.queue.inc new file mode 100644 index 000000000..8193280f3 --- /dev/null +++ b/includes/batch.queue.inc @@ -0,0 +1,72 @@ +<?php +// $Id$ + + +/** + * @file + * Queue handlers used by the Batch API. + * + * Those implementations: + * - ensure FIFO ordering, + * - let an item be repeatedly claimed until it is actually deleted (no notion + * of lease time or 'expire' date), to allow multipass operations. + */ + +/** + * Batch queue implementation. + * + * Stale items from failed batches are cleaned from the {queue} table on cron + * using the 'created' date. + */ +class BatchQueue extends SystemQueue { + + public function claimItem($lease_time = 0) { + $item = db_query('SELECT data, item_id FROM {queue} q WHERE name = :name ORDER BY item_id ASC', array(':name' => $this->name))->fetchObject(); + if ($item) { + $item->data = unserialize($item->data); + return $item; + } + return FALSE; + } + + /** + * Retrieve all remaining items in the queue. + * + * This is specific to Batch API and is not part of the DrupalQueueInterface, + */ + public function getAllItems() { + $result = array(); + $items = db_query('SELECT data FROM {queue} q WHERE name = :name ORDER BY item_id ASC', array(':name' => $this->name))->fetchAll(); + foreach ($items as $item) { + $result[] = unserialize($item->data); + } + return $result; + } +} + +/** + * Batch queue implementation used for non-progressive batches. + */ +class BatchMemoryQueue extends MemoryQueue { + + public function claimItem($lease_time = 0) { + if (!empty($this->queue)) { + reset($this->queue); + return current($this->queue); + } + return FALSE; + } + + /** + * Retrieve all remaining items in the queue. + * + * This is specific to Batch API and is not part of the DrupalQueueInterface, + */ + public function getAllItems() { + $result = array(); + foreach ($this->queue as $item) { + $result[] = $item->data; + } + return $result; + } +} diff --git a/includes/form.inc b/includes/form.inc index 753f421e8..f9f0a27df 100644 --- a/includes/form.inc +++ b/includes/form.inc @@ -634,12 +634,23 @@ function drupal_process_form($form_id, &$form, &$form_state) { // that is already being processed (if a batch operation performs a // drupal_form_submit). if ($batch =& batch_get() && !isset($batch['current_set'])) { - // The batch uses its own copies of $form and $form_state for - // late execution of submit handlers and post-batch redirection. - $batch['form'] = $form; - $batch['form_state'] = $form_state; + // Store $form_state information in the batch definition. + // We need the full $form_state when either: + // - Some submit handlers were saved to be called during batch + // processing. See form_execute_handlers(). + // - The form is multistep. + // In other cases, we only need the information expected by + // drupal_redirect_form(). + if ($batch['has_form_submits'] || !empty($form_state['rebuild']) || !empty($form_state['storage'])) { + $batch['form_state'] = $form_state; + } + else { + $batch['form_state'] = array_intersect_key($form_state, array_flip(array('programmed', 'rebuild', 'storage', 'no_redirect', 'redirect'))); + } + $batch['progressive'] = !$form_state['programmed']; batch_process(); + // Execution continues only for programmatic forms. // For 'regular' forms, we get redirected to the batch processing // page. Form redirection will be handled in _batch_finished(), @@ -1004,14 +1015,15 @@ function form_execute_handlers($type, &$form, &$form_state) { foreach ($handlers as $function) { if (function_exists($function)) { - // Check to see if a previous _submit handler has set a batch, but - // make sure we do not react to a batch that is already being processed - // (for instance if a batch operation performs a drupal_form_submit()). - if ($type == 'submit' && ($batch =& batch_get()) && !isset($batch['current_set'])) { - // Some previous _submit handler has set a batch. We store the call - // in a special 'control' batch set, for execution at the correct - // time during the batch processing workflow. + // Check if a previous _submit handler has set a batch, but make sure we + // do not react to a batch that is already being processed (for instance + // if a batch operation performs a drupal_form_submit()). + if ($type == 'submit' && ($batch =& batch_get()) && !isset($batch['id'])) { + // Some previous submit handler has set a batch. To ensure correct + // execution order, store the call in a special 'control' batch set. + // See _batch_next_set(). $batch['sets'][] = array('form_submit' => $function); + $batch['has_form_submits'] = TRUE; } else { $function($form, $form_state); @@ -3305,22 +3317,25 @@ function _form_set_class(&$element, $class = array()) { function batch_set($batch_definition) { if ($batch_definition) { $batch =& batch_get(); - // Initialize the batch + + // Initialize the batch if needed. if (empty($batch)) { $batch = array( 'sets' => array(), + 'has_form_submits' => FALSE, ); } + // Base and default properties for the batch set. + // Use get_t() to allow batches at install time. + $t = get_t(); $init = array( 'sandbox' => array(), 'results' => array(), 'success' => FALSE, - 'start' => microtime(TRUE), + 'start' => 0, 'elapsed' => 0, ); - // Use get_t() to allow batches at install time. - $t = get_t(); $defaults = array( 'title' => $t('Processing'), 'init_message' => $t('Initializing.'), @@ -3330,20 +3345,29 @@ function batch_set($batch_definition) { ); $batch_set = $init + $batch_definition + $defaults; - // Tweak init_message to avoid the bottom of the page flickering down after init phase. + // Tweak init_message to avoid the bottom of the page flickering down after + // init phase. $batch_set['init_message'] .= '<br/> '; + + // The non-concurrent workflow of batch execution allows us to save + // numberOfItems() queries by handling our own counter. $batch_set['total'] = count($batch_set['operations']); + $batch_set['count'] = $batch_set['total']; - // If the batch is being processed (meaning we are executing a stored submit handler), - // insert the new set after the current one. - if (isset($batch['current_set'])) { - // array_insert does not exist... - $slice1 = array_slice($batch['sets'], 0, $batch['current_set'] + 1); - $slice2 = array_slice($batch['sets'], $batch['current_set'] + 1); - $batch['sets'] = array_merge($slice1, array($batch_set), $slice2); + // Add the set to the batch. + if (empty($batch['id'])) { + // The batch is not running yet. Simply add the new set. + $batch['sets'][] = $batch_set; } else { - $batch['sets'][] = $batch_set; + // The set is being added while the batch is running. Insert the new set + // right after the current one to ensure execution order, and store its + // operations in a queue. + $index = $batch['current_set'] + 1; + $slice1 = array_slice($batch['sets'], 0, $index); + $slice2 = array_slice($batch['sets'], $index); + $batch['sets'] = array_merge($slice1, array($batch_set), $slice2); + _batch_populate_queue($batch, $index); } } } @@ -3387,11 +3411,28 @@ function batch_process($redirect = NULL, $url = 'batch', $redirect_callback = 'd ); $batch += $process_info; - // The batch is now completely built. Allow other modules to make changes to the - // batch so that it is easier to reuse batch processes in other enviroments. + // The batch is now completely built. Allow other modules to make changes + // to the batch so that it is easier to reuse batch processes in other + // enviroments. drupal_alter('batch', $batch); + // Assign an arbitrary id: don't rely on a serial column in the 'batch' + // table, since non-progressive batches skip database storage completely. + $batch['id'] = db_next_id(); + + // Move operations to a job queue. Non-progressive batches will use a + // memory-based queue. + foreach ($batch['sets'] as $key => $batch_set) { + _batch_populate_queue($batch, $key); + } + + // Initiate processing. if ($batch['progressive']) { + // Now that we have a batch id, we can generate the redirection link in + // the generic error message. + $t = get_t(); + $batch['error_message'] = $t('Please continue to <a href="@error_url">the error page</a>', array('@error_url' => url($url, array('query' => array('id' => $batch['id'], 'op' => 'finished'))))); + // Clear the way for the drupal_goto() redirection to the batch processing // page, by saving and unsetting the 'destination', if there is any. if (isset($_GET['destination'])) { @@ -3399,24 +3440,11 @@ function batch_process($redirect = NULL, $url = 'batch', $redirect_callback = 'd unset($_GET['destination']); } - // Initiate db storage in order to get a batch id. We have to provide - // at least an empty string for the (not null) 'token' column. - $batch['id'] = db_insert('batch') + // Store the batch. + db_insert('batch') ->fields(array( - 'token' => '', + 'bid' => $batch['id'], 'timestamp' => REQUEST_TIME, - )) - ->execute(); - - // Now that we have a batch id, we can generate the redirection link in - // the generic error message. - $t = get_t(); - $batch['error_message'] = $t('Please continue to <a href="@error_url">the error page</a>', array('@error_url' => url($url, array('query' => array('id' => $batch['id'], 'op' => 'finished'))))); - - // Actually store the batch data and the token generated form the batch id. - db_update('batch') - ->condition('bid', $batch['id']) - ->fields(array( 'token' => drupal_get_token($batch['id']), 'batch' => serialize($batch), )) @@ -3425,6 +3453,7 @@ function batch_process($redirect = NULL, $url = 'batch', $redirect_callback = 'd // Set the batch number in the session to guarantee that it will stay alive. $_SESSION['batches'][$batch['id']] = TRUE; + // Redirect for processing. $function = $batch['redirect_callback']; if (function_exists($function)) { $function($batch['url'], array('query' => array('op' => 'start', 'id' => $batch['id']))); @@ -3454,5 +3483,69 @@ function &batch_get() { } /** + * Populates a job queue with the operations of a batch set. + * + * Depending on whether the batch is progressive or not, the BatchQueue or + * BatchStaticQueue handler classes will be used. + * + * @param $batch + * The batch array. + * @param $set_id + * The id of the set to process. + * @return + * The name and class of the queue are added by reference to the batch set. + */ +function _batch_populate_queue(&$batch, $set_id) { + $batch_set = &$batch['sets'][$set_id]; + + if (isset($batch_set['operations'])) { + $batch_set += array( + 'queue' => array( + 'name' => 'drupal_batch:' . $batch['id'] . ':' . $set_id, + 'class' => $batch['progressive'] ? 'BatchQueue' : 'BatchMemoryQueue', + ), + ); + + $queue = _batch_queue($batch_set); + $queue->createQueue(); + foreach ($batch_set['operations'] as $operation) { + $queue->createItem($operation); + } + + unset($batch_set['operations']); + } +} + +/** + * Returns a queue object for a batch set. + * + * @param $batch_set + * The batch set. + * @return + * The queue object. + */ +function _batch_queue($batch_set) { + static $queues; + + // The class autoloader is not available when running update.php, so make + // sure the files are manually included. + if (is_null($queues)) { + $queues = array(); + require_once DRUPAL_ROOT . '/modules/system/system.queue.inc'; + require_once DRUPAL_ROOT . '/includes/batch.queue.inc'; + } + + if (isset($batch_set['queue'])) { + $name = $batch_set['queue']['name']; + $class = $batch_set['queue']['class']; + + if (!isset($queues[$class][$name])) { + $queues[$class][$name] = new $class($name); + } + return $queues[$class][$name]; + } +} + +/** * @} End of "defgroup batch". */ diff --git a/includes/update.inc b/includes/update.inc index 18dd7a8df..816f32bd0 100644 --- a/includes/update.inc +++ b/includes/update.inc @@ -338,6 +338,51 @@ function update_fix_d7_requirements() { db_create_table('date_formats', $schema['date_formats']); db_create_table('date_format_locale', $schema['date_format_locale']); + // Add the queue table. + $schema['queue'] = array( + 'description' => 'Stores items in queues.', + 'fields' => array( + 'item_id' => array( + 'type' => 'serial', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'description' => 'Primary Key: Unique item ID.', + ), + 'name' => array( + 'type' => 'varchar', + 'length' => 255, + 'not null' => TRUE, + 'default' => '', + 'description' => 'The queue name.', + ), + 'data' => array( + 'type' => 'text', + 'not null' => FALSE, + 'size' => 'big', + 'serialize' => TRUE, + 'description' => 'The arbitrary data for the item.', + ), + 'expire' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the claim lease expires on the item.', + ), + 'created' => array( + 'type' => 'int', + 'not null' => TRUE, + 'default' => 0, + 'description' => 'Timestamp when the item was created.', + ), + ), + 'primary key' => array('item_id'), + 'indexes' => array( + 'name_created' => array('name', 'created'), + 'expire' => array('expire'), + ), + ); + db_create_table('queue', $schema['queue']); + // Add column for locale context. if (db_table_exists('locales_source')) { db_add_field('locales_source', 'context', array('type' => 'varchar', 'length' => 255, 'not null' => TRUE, 'default' => '', 'description' => 'The context this string applies to.')); |