sfJobQueuePlugin - 0.3.0

Symfony Job Queues

You are currently browsing
the website for symfony 1

Visit the Symfony2 website


« Back to the Plugins Home

Signin


Forgot your password?
Create an account

Tools

Stats

advanced search
Information Readme Releases Changelog Contribute
This plugin is deprecated and is not maintained anymore. Please rather use Gearman or comparable things, in order to manage asynchronous tasks. See http://www.symfony-project.org/plugins/sfGearmanPlugin for more informations
Show source | Show as Markdown

sfJobQueuePlugin

Introduction

What the **** ?

This plugins enables job queues into Symfony. It includes all the common job queues tasks (start, stop, scheduling through job election strategies, logging, etc.), command line tasks, and a graphical interface for managing queues and jobs.

Using a job queue can be useful when asynchronous server-side operations have to be performed (periodically grabbing a RSS feed, automatically sending emails, re-sampling a video, etc.) or in environments without a cron access.

The advantages compared to cron are: * a more accurate job-execution scheduling (cron's best precision is at the minute level, the sfjobQueuePlugin's one is at the second level) * detailed error logging * jobs creation from a graphical interface * API to create jobs programmatically

Requirements

  • a shell access
  • the right to run PHP from the command line
  • the right to override max_execution_time

Windows user, the plugin has not been tested with windows.

Glossary

A "job" is a task that has to be done. In this plugin, it is modeled as a call to a JobHandler, which understands the parameters associated to the job in order to perform a dedicated action.

A "job queue" is a group of jobs to be run, eventually several times, associated to a job election strategy. One job queue can only run one job at a time, and there is at the moment no dependency mechanism between jobs.

The "job queue manager" is the process that is responsible for the launch of the job queues. It creates a PHP process for every queue the user has requested the start.

A "job scheduler" is a class responsible of the election of the next job to be runned, given several election models.

Features

  • multi-queues support
  • one given queue can contain heterogeneous job types
  • scheduling (job election) strategy abstraction
  • support for recurring jobs
  • CLI tasks
  • detailed logging capacities
  • Queue and jobs management graphical module:
    • job-queues start/stop
    • job-queues statistics
    • job creation
    • possibility to run a job manually, directly from the graphical interface

Job-queues admin panel

sfJobQueuePlugin_admin_panel.png

Get it installed

  • go to your project's root

  • Install the plugin:

     ./symfony plugin-install http://plugins.symfony-project.com/sfJobQueuePlugin
    
  • rebuild the model, the sql instructions, and load it into the database:

     ./symfony propel-build-all
    
  • enable the "sfJob" and the "sfJobQueue" modules in your app's settings.yml. As these modules are administration interfaces, you will want to only enable it in your administration application:

      enabled_modules:   [sfJob, sfJobQueue](default,)
    
  • clear cache:

     ./symfony cc
    

Usage

Starting the job queues manager

The "job queue manager" is the process that is responsible for the management of the job queues. When requested from the graphical interface, it is able to start a given job queue. No job will be run until you start the job queue manager.

As this process must be able to create child processes, it has to be run from the command line. This notably means that you must have a shell access to your server in order to be able to use this plugin.

Starting the queue manager can be done with the following Symfony task:

  ./symfony sfqueue-start-queuemanager YOUR_APP_NAME

You usually will want that the executions continues when you disconnect from your server. Therefore, you should rather run the following command:

  nohup ./symfony sfqueue-start-queuemanager YOUR_APP_NAME >> /tmp/nohup.out  &

The content of the /tmp/nohup.out file will contain basic messages from the job queue manager execution, that might be useful when the job queue manager process unexpectedly dies.

Using nohup will forbid you to stop the job queue manager using CTRL+C, and you will have to rather use the "kill" command. If this sounds like a problem to you, you will probably be interested in the screen utility.

Creating a queue

In order to create a job queue, you must first activate the administration module. You can then create a job queue by defining: * its name * its job election strategy (either FIFO or priority-based). Other job-election strategies may be developed in the future.

You usually won't need it, but it is also possible to create job queues dynamically, directly from your application's code:

<?php
$queue = new sfJobQueue();
$queue->setName('RSS grabbing queue');
$queue->setSchedulerName('fifo');
$queue->save();

By default, the job queue is stopped when created. In order it to get active, you need to start it.

Running a job queue

Starting a job queue can either be done from the graphical interface (simply click on the symbol in the "is_running" column), or directly in the code:

<?php
$queue->setRequestedStatus(sfJobQueue::RUNNING);
$queue->save();

A job queue can also be started with Symfony's command-line utility:

  ./symfony sfqueue-start-queue YOUR_APP_NAME QUEUE_NAME

So, for instance:

  ./symfony sfqueue-start-queue frontend 'RSS grabbing queue'

The sfqueue-start-queue task can be used even if the job queue manager is not launched. This is particularly useful if you don't want to use the job queue manager, but prefer to use cron to periodically start/stop a specific job queue.

It also means something else: when using the sfqueue-start-queue task, the process that will be created won't be a child of the job queue manager and therefore, if the job queue manager is stopped or killed, the job queues that have been launched with sfqueue-start-queue will continue their execution.

Adding a job to the JobQueue

Adding a job to one job queue is rather simple. You only have to give the type of the job which has to be created, and set its execution parameters.

<?php
// retrieve the job queue
$queue = sfJobQueuePeer::retrieveByQueueName('mailings queue');
 
// add a new job of type "mail" in it
$queue->addJob('mail', 
               array('to'    => 'xavier@lacot.org', 
                     'topic' => 'Test of the mailings queue :-)'));

This means that you will have to create, somewhere in your project (why not in another plugin ;)) a JobHandler that supports the "mail" job type. For more details, see the paragraph "Creating a new job type".

Monitoring the job queues and the job queue manager

Suppose you have launched the job manager, and that it is running since several hours or days. If you want to monitor that it is still running, and did not die since you have launched it, you may want to use the ps UNIX command:

  ps aux |grep 'php symfony sfqueue-start-queue YOUR_APP_NAME YOUR_QUEUE_NAME' |wc |awk '{print ($1 >= 2)}'

So, for instance:

  ps aux |grep 'php symfony sfqueue-start-queue backend rss_feeds' |wc |awk '{print ($1 >= 2)}'

If the process is found, this command will display "1". Else, it will display "0".

Stopping the job queue manager

For stopping the job queue manager, you simply have to kill it. However, please take care that all the job queues that have been launched using the graphical interface (and so, that are child-processes of the job queue manager) should be stopped before, using the graphical interface. This will ensure that all the running jobs are completed when killing the job queue manager.

In order to kill the job queue manager, please run the following commands:

$ ps aux| grep 'php ./symfony sfqueue-start-queuemanager backend'
xavier    2691   0.0 -0.5    52940  10156  p1  S    08:04AM   0:00.85 php ./symfony sfqueue-start-queuemanager backend
xavier    2732   0.0 -0.0    27368    456  p1  S+   08:26AM   0:00.00 grep php ./symfony sfqueue-start-queuemanager backend

The job queue manager process pid is 2691, let's kill it:

$ kill -9 2691

Job parameters

A job queue can be tweaked using several parameters: * scheduler_name: the scheduler name defines the policy of job election. For instance, with a "fifo" scheduler, the oldest eligible job will be the one who will be processed next. With a "priority" scheduler, the eligible job with the highest priority will be the first out. * polling_delay: time between the end of the execution of one job, and the next job election

sfJobQueuePlugin_job_election.png

At the job level, it is also possible to set some general execution parameters: * max_tries: the maximal number of tries for a non-recurring job. * is_recurring: if the job is recurring, it will be executed every retry_delay seconds. * retry_delay: retry delay, in seconds (minimal delay between two tries of the same job) * priority: priority, from 0 to 9 (lower to higher) * params: an array of parameters for the job execution * scheduled_at: date of scheduling after which the job can be runned

Creating a new job type

The sfJobQueuePlugin has been designed so that creating a new job type should be the less painful possible for the developer. Actually, developping a new job type only requires one single JobHandler class to be written, that extends the class sfJobHandler. Nothing else.

The name of this class is important, as the plugins gets the name of the job type from it. For instance, if you create a "sfClearCacheJobHandler", the plugin graphical interface will automagically propose the job type "ClearCache" during the creation of new jobs.

Here is for instance a minimal JobHandler:

<?php
class sfMailJobHandler extends sfJobHandler
{
  public function getParamFields()
  {
    return array('from', 'to', 'subject', 'message');
  }
 
  public function run(array $params)
  {
    if (mail($params['to'],
             $params['subject'],
             $params['message'],
             'From: '.$params['from']))
    {
      $this->logger->log(sprintf('{sfMailJobHandler} successfully sent mail %s -> %s', $params[$params['to']('from'],)));
      return sfJob::SUCCESS;
    }
    else
    {
      throw new Exception('There was an error while sending the mail.');
    }
  }
}

Several remarks about it:

getParamFields()

This must return the name of the parameters expected when running the job. It is used by the graphical interface, in order to ease up the job creation. In the graphical interface, the fields will be displayed in the order of the elements in this array:

sfJobQueuePlugin_job_creation_detail.png

run(array $params)

The method run() uses an array of parameters for completing the job. It must do a proper use of the return status, either sfJob::SUCCESS in case the job is successful, or sfJob::ERROR if there is an error. Remember that, if a PHP error is raised while running the job, its execution will be stopped and the job will be marked as failed. If you have activated the logging, you'll then be able to track the error on the graphical interface.

The run() method may launch exceptions. In this case, the job will be marked as failed, except if it is a recurring job.

static postSave($job, $params)

This "hook" is called after one job object is created. Depending on the job type, you may want to do several operations along with the job creation. And these operations may also modify the parameters of the job (for instance, for adding the id of a new object created during the hook, etc.)

If the job creation must perform other operations that the job execution must be aware of, then the optional static method postSave($job, $params) is the right place for this.

For instance:

public static function postSave($job, $params)
{
  $feed = new AggregatorFeed();
  $feed->setUri($params['uri']);
  $feed->save();
  $params['feed_id'] = $feed->getId();
  $job->setParams(serialize($params));
  $job->save();
}

Logging

The plugin proposes a logging system, that is used along with Symfony's default logging system. This logger uses Propel, and is rather useful when debugging in-development jobHandlers:

sfJobQueuePlugin_successful_job.png

If you believe that the JobHandlers that you developed are robust and don't require such a precise logging, you may want to disable this logging by setting the parameter app_sfJobQueuePlugin_logging_enabled in the app.yml file:

all:
  sfJobQueuePlugin:
    logging_enabled: false

Enabling this logging is completely independent from Symfony's own logging. If you want to enable logging only for the jobs, then disable Symfony's logging, and leave the sfJobQueuePlugin's one enabled.

The plugin's logging system allows to add log messages from within a job handler:

if ($condition)
  {
    $this->logger->log('{sfTestJobHandler} successfully passed condition');
    return sfJob::SUCCESS;
  }
  else
  {
    $this->logger->err('{sfTestJobHandler} i will launch an exception');
    throw new Exception('There was an error while sending the mail.');
  }

The class sfPropelJobLogger is compatible with sfLogger, which means that you can use the methods debug($message), info($message), notice($message), warning($message), err($message), crit($message), alert($message), emerg($message) and log($message, $priority = SF_LOG_INFO).

Error messages will be displayed in red:

sfJobQueuePlugin_error_job.png

API

sfJobQueue: * addJob($type = '', $options = null) - Creates a new job in the queue * getNbActiveJobs() - Returns the number of active jobs, ie. jobs that are neither failed, nor successful, nor canceled * getNbActiveReadyJobs() - Returns the number of active jobs ready to be run * getNbActiveRecurringJobs() - Returns the number of active recurring jobs * getNbActiveScheduledJobs() - Returns the number of active scheduled jobs (ie. scheduled in the future) * getNbActiveWaitingJobs() - Returns the number of active waiting jobs (ie. active, but not ready to be run) * getNbCompletedCancelledJobs() - Returns the number of jobs that have been canceled * getNbCompletedFailureJobs() - Returns the number of jobs completed on failure * getNbCompletedSuccessfulJobs() - Returns the number of successfully run jobs * getNbCompletedJobs() - Returns the number of completed jobs * getStatusText() - Returns the status of the queue, as a text * isRunning() - Indicates whether the queue is running or not * run() - Runs the queue

sfJob: * getStatusText() - Returns the status of the job, as a text * run() - Runs the job

Plugin's configuration

There are several app.yml configuration variables:

all:
  sfJobQueuePlugin:
    logfile:         /tmp/sfJobQueuePlugin.log    # path of the file that logs the activity of the queues started by the job queue manager
    logging_enabled: false   # whether the detailed logging should be enabled, or not
    max_tries:       3       # default value for max_tries for a new job
    memory:          512M    # max memnory to be used when running a job manually from the graphical interface
    php:             php     # path of the php binary to be used. If not set, the plugin will use the value of sfToolkit::getPHPCli()
    queues_polling:  5       # delay in seconds of sleep time between two verifications if queues must be started by the job queue manager

Unit testing

To be done.

Roadmap

  • unit-testing
  • job reporting hooks (measure jobs progress)
  • handle dependency between jobs

Long term runners might want to see: * integration with http://yowl.googlecode.com/svn/trunk/test/test-yowl.html * exposition to Web services

Bibliograhy

  • http://en.wikipedia.org/wiki/Queue_%28data_structure%29
  • http://en.wikipedia.org/wiki/FIFO
  • http://en.wikipedia.org/wiki/Priority_queue

License and credits

This plugin is licensed under the MIT license and maintained by Xavier Lacot xavier@lacot.org. External contributions and comments are welcome !

Changelog

version 0.3 - 2008-02-04

  • introduced sfPropelJobLogger, for a better job activity logging
  • upgraded the graphical interface:
    • possibility to manually launch jobs
    • added display logging messages for the last job execution
  • improved documentation
  • improved job queue manager robustness

version 0.2 - 2007-10-23

  • upgraded the graphical interface:
    • possibility to create jobs
    • support for recurring jobs
    • support for jobs scheduling
  • improved documentation
  • introduced job queue manager concept

version 0.1 - 2007-09-17

Initial public release.