如何創(chuàng)建長(zhǎng)時(shí)間運(yùn)行的作業(yè)
如果 Web 應(yīng)用程序中的一個(gè)特性需要超過 1 秒或 2 秒才能完成,那么應(yīng)該怎么辦?需要某種離線處理解決方案。學(xué)習(xí)幾種對(duì) PHP 應(yīng)用程序中長(zhǎng)時(shí)間運(yùn)行的作業(yè)進(jìn)行離線服務(wù)的方法。
大型的連鎖店有一個(gè)大問題。每天,在每家商店會(huì)發(fā)生數(shù)千次交易。公司執(zhí)行官希望對(duì)這些數(shù)據(jù)進(jìn)行挖掘。哪些產(chǎn)品賣得好?哪些不好?有機(jī)產(chǎn)品在哪里賣得好?冰淇淋的銷售情況怎么樣?
為了捕捉這些數(shù)據(jù),組織必須將所有事務(wù)性數(shù)據(jù)裝載進(jìn)一個(gè)數(shù)據(jù)模型,以便更適合生成公司所需的報(bào)告類型。但是,這很花費(fèi)時(shí)間,而且隨著連鎖規(guī)模的增長(zhǎng),處理一天的數(shù)據(jù)可能要花費(fèi)一天以上的時(shí)間。因此,這是個(gè)大問題。
現(xiàn)在,您的 Web 應(yīng)用程序可能不需要處理這么多數(shù)據(jù),但是任何站點(diǎn)的處理時(shí)間都有可能超過客戶愿意等待的時(shí)間。一般來說,客戶愿意等待的時(shí)間是 200 毫秒,如果超過這個(gè)時(shí)間,客戶就會(huì)覺得過程 “緩慢”。這個(gè)數(shù)字基于桌面應(yīng)用程序,而 Web 使我們更有耐心了。但無論如何,不應(yīng)該讓客戶等待的時(shí)間超過幾秒。所以,要采用一些策略來處理 PHP 中的批處理作業(yè)。
分散的方式與 cron
在 UNIX® 機(jī)器上,執(zhí)行批處理的核心程序是 cron 守護(hù)進(jìn)程。這個(gè)守護(hù)進(jìn)程讀取一個(gè)配置文件,這個(gè)文件會(huì)告訴它要運(yùn)行哪些命令行以及運(yùn)行的頻率。然后,這個(gè)守護(hù)進(jìn)程就按照配置執(zhí)行它們。在遇到錯(cuò)誤時(shí),它甚至能夠向指定的電子郵件地址發(fā)送錯(cuò)誤輸出,從而幫助對(duì)問題進(jìn)行調(diào)試。
我知道一些工程師強(qiáng)烈主張使用線程技術(shù)。“線程!線程才是進(jìn)行后臺(tái)處理的真正方法。cron 守護(hù)進(jìn)程太過時(shí)了。”
我不這么認(rèn)為。
這兩種方法我都用過,我認(rèn)為 cron 具備 “Keep It Simple, Stupid(KISS,簡(jiǎn)單就是美)” 原則的優(yōu)點(diǎn)。它使后臺(tái)處理保持簡(jiǎn)單。不需要編寫一直運(yùn)行的多線程的作業(yè)處理應(yīng)用程序(因此不會(huì)有內(nèi)存泄漏),而是由 cron 啟動(dòng)一個(gè)簡(jiǎn)單的批處理腳本。這個(gè)腳本判斷是否有作業(yè)要處理,執(zhí)行作業(yè),然后退出。不需要擔(dān)心內(nèi)存泄漏。也不需要擔(dān)心線程停止或陷入無限循環(huán)。
那么,cron 是如何工作的?這依賴于您所處的系統(tǒng)環(huán)境。我只討論老式簡(jiǎn)單的 cron 的 UNIX 命令行版本,您可以向系統(tǒng)管理員咨詢?nèi)绾卧谧约旱?Web 應(yīng)用程序中實(shí)現(xiàn)它。
下面是一個(gè)簡(jiǎn)單的 cron 配置,它在每天晚上 11 點(diǎn)運(yùn)行一個(gè) PHP 腳本:
0 23 * * * jack /usr/bin/php /users/home/jack/myscript.php
前 5 個(gè)字段定義應(yīng)該啟動(dòng)腳本的時(shí)間。然后是應(yīng)該用來運(yùn)行這個(gè)腳本的用戶名。其余的命令是要執(zhí)行的命令行。時(shí)間字段分別是分、小時(shí)、月中的日、月和周中的日。下面是幾個(gè)示例。
命令:
15 * * * * jack /usr/bin/php /users/home/jack/myscript.php
在每個(gè)小時(shí)的第 15 分鐘運(yùn)行腳本。
命令:
15,45 * * * * jack /usr/bin/php /users/home/jack/myscript.php
在每個(gè)小時(shí)的第 15 和第 45 分鐘運(yùn)行腳本。
命令:
*/1 3-23 * * * jack /usr/bin/php /users/home/jack/myscript.php
在早上 3 點(diǎn)到晚上 11 點(diǎn)之間的每分鐘運(yùn)行腳本。
命令
30 23 * * 6 jack /usr/bin/php /users/home/jack/myscript.php
在每星期六的晚上 11:30 運(yùn)行腳本(星期六由 6 指定)。
可以看到,組合的數(shù)量是無限的。可以根據(jù)需要控制運(yùn)行腳本的時(shí)間。還可以指定多個(gè)要運(yùn)行的腳本,這樣的話,一些腳本可以每分鐘都運(yùn)行,而其他腳本(比如備份腳本)可以每天只運(yùn)行一次。
為了指定將報(bào)告的錯(cuò)誤發(fā)送到哪個(gè)電子郵件地址,可以使用 MAILTO 指令,如下所示:
MAILTO=jherr@pobox.com
注意:對(duì)于 Microsoft® Windows® 用戶,有一個(gè)等效的 Scheduled Tasks 系統(tǒng)可以用來定期啟動(dòng)命令行進(jìn)程(比如 PHP 腳本)。
批處理體系結(jié)構(gòu)的基礎(chǔ)知識(shí)
批處理是相當(dāng)簡(jiǎn)單的。在大多數(shù)情況下,采用兩個(gè)工作流之一。第一個(gè)工作流用于進(jìn)行報(bào)告;腳本每天運(yùn)行一次,它生成報(bào)告并將報(bào)告發(fā)送給一組用戶。第二個(gè)工作流是在響應(yīng)某種請(qǐng)求時(shí)創(chuàng)建的批作業(yè)。例如,我登錄進(jìn) Web 應(yīng)用程序中,并要求它向系統(tǒng)中注冊(cè)的所有用戶發(fā)送一個(gè)消息,將一個(gè)新的特性告訴他們。這個(gè)操作必須進(jìn)行批處理,因?yàn)橄到y(tǒng)中有 10,000 個(gè)用戶。PHP 要花費(fèi)一段時(shí)間才能完成這樣的任務(wù),所以它必須由瀏覽器之外的一個(gè)作業(yè)來執(zhí)行。
在第二個(gè)工作流中,Web 應(yīng)用程序只需將信息放在某個(gè)位置,讓批處理應(yīng)用程序共享它。這些信息指定作業(yè)的性質(zhì)(例如,“Send this e-mail to all the people on the system”。)批處理程序運(yùn)行這個(gè)作業(yè),然后刪除作業(yè)。另一種方法是,處理程序?qū)⒆鳂I(yè)標(biāo)為已完成。無論用哪種方法,作業(yè)都應(yīng)該識(shí)別為已完成,這樣就不會(huì)再次運(yùn)行它。
本文的其余部分演示在 Web 應(yīng)用程序前端和批處理后端之間共享數(shù)據(jù)的各種方法。
郵件隊(duì)列
第一種方法是使用專用的郵件隊(duì)列系統(tǒng)。在這種模型中,數(shù)據(jù)庫中的一個(gè)表包含應(yīng)該發(fā)送給各個(gè)用戶的電子郵件消息。Web 界面使用mailouts 類將電子郵件添加到隊(duì)列中。電子郵件處理程序使用 mailouts 類檢索未處理的電子郵件,然后再次使用它從隊(duì)列中刪除未處理的電子郵件。
這個(gè)模型首先需要 MySQL 模式。
清單 1. mailout.sql
DROP TABLE IF EXISTS mailouts; CREATE TABLE mailouts ( id MEDIUMINT NOT NULL AUTO_INCREMENT, from_address TEXT NOT NULL, to_address TEXT NOT NULL, subject TEXT NOT NULL, content TEXT NOT NULL, PRIMARY KEY ( id ) );
這個(gè)模式非常簡(jiǎn)單。每行中有一個(gè) from 和一個(gè) to 地址,以及電子郵件的主題和內(nèi)容。
對(duì)數(shù)據(jù)庫中的 mailouts 表進(jìn)行處理的是 PHP mailouts 類。
清單 2. mailouts.php
getMessage()); } return $db; } public static function delete( $id ) { $db = Mailouts::get_db(); $sth = $db->prepare( 'DELETE FROM mailouts WHERE id=?' ); $db->execute( $sth, $id ); return true; } public static function add( $from, $to, $subject, $content ) { $db = Mailouts::get_db(); $sth = $db->prepare( 'INSERT INTO mailouts VALUES (null,?,?,?,?)' ); $db->execute( $sth, array( $from, $to, $subject, $content ) ); return true; } public static function get_all() { $db = Mailouts::get_db(); $res = $db->query( "SELECT * FROM mailouts" ); $rows = array(); while( $res->fetchInto( $row ) ) { $rows []= $row; } return $rows; } } ?>
這個(gè)腳本包含 Pear::DB 數(shù)據(jù)庫訪問類。然后定義 mailouts 類,其中包含三個(gè)主要的靜態(tài)函數(shù):add、delete 和 get_all。add() 方法向隊(duì)列中添加一個(gè)電子郵件,這個(gè)方法由前端使用。get_all() 方法從表中返回所有數(shù)據(jù)。delete() 方法刪除一個(gè)電子郵件。
您可能會(huì)問,我為什么不只在腳本末尾調(diào)用 delete_all() 方法。不這么做有兩個(gè)原因:如果在發(fā)送每個(gè)消息之后刪除它,那么即使腳本在出現(xiàn)問題之后重新運(yùn)行,消息也不可能發(fā)送兩次;在批作業(yè)的啟動(dòng)和完成之間可能會(huì)添加新的消息。
下一步是編寫一個(gè)簡(jiǎn)單的測(cè)試腳本,這個(gè)腳本將一個(gè)條目添加到隊(duì)列中。
清單 3. mailout_test_add.php
在這個(gè)示例中,我添加一個(gè) mailout,這個(gè)消息要發(fā)送給某公司的 Molly,其中包括主題 “Test Subject” 和電子郵件主體??梢栽诿钚猩线\(yùn)行這個(gè)腳本:php mailout_test_add.php。
為了發(fā)送電子郵件,需要另一個(gè)腳本,這個(gè)腳本作為作業(yè)處理程序。
清單 4. mailout_send.php
這個(gè)腳本使用 get_all() 方法檢索所有電子郵件消息,然后使用 PHP 的 mail() 方法逐一發(fā)送消息。在每次成功發(fā)送電子郵件之后,調(diào)用delete() 方法從隊(duì)列中刪除對(duì)應(yīng)的記錄。
使用 cron 守護(hù)進(jìn)程定期運(yùn)行這個(gè)腳本。運(yùn)行這個(gè)腳本的頻率取決于您的應(yīng)用程序的需要。
注意:PHP Extension and Application Repository(PEAR)存儲(chǔ)庫包含一個(gè)出色的 郵件隊(duì)列系統(tǒng) 實(shí)現(xiàn),可以免費(fèi)下載。
更通用的方法
專門用來發(fā)送電子郵件的解決方案是很不錯(cuò),但是是否有更通用的方法?我們需要能夠發(fā)送電子郵件、生成報(bào)告或者執(zhí)行其他耗費(fèi)時(shí)間的處理,而不必在瀏覽器中等待處理完成。
為此,可以利用一個(gè)事實(shí):PHP 是一種解釋型語言??梢詫?PHP 代碼存儲(chǔ)在數(shù)據(jù)庫中的隊(duì)列中,以后再執(zhí)行它。這需要兩個(gè)表,見清單 5。
清單 5. generic.sql
DROP TABLE IF EXISTS processing_items; CREATE TABLE processing_items ( id MEDIUMINT NOT NULL AUTO_INCREMENT, function TEXT NOT NULL, PRIMARY KEY ( id ) ); DROP TABLE IF EXISTS processing_args; CREATE TABLE processing_args ( id MEDIUMINT NOT NULL AUTO_INCREMENT, item_id MEDIUMINT NOT NULL, key_name TEXT NOT NULL, value TEXT NOT NULL, PRIMARY KEY ( id ) );
第一個(gè)表 processing_items 包含作業(yè)處理程序調(diào)用的函數(shù)。第二個(gè)表 processing_args 包含要發(fā)送給函數(shù)的參數(shù),采用的形式是由鍵/值對(duì)組成的 hash 表。
與 mailouts 表一樣,這兩個(gè)表也由 PHP 類包裝,這個(gè)類稱為 ProcessingItems。
清單 6. generic.php
prepare( 'DELETE FROM processing_args WHERE item_id=?' ); $db->execute( $sth, $id ); $sth = $db->prepare( 'DELETE FROM processing_items WHERE id=?' ); $db->execute( $sth, $id ); return true; } public static function add( $function, $args ) { $db = ProcessingItems::get_db(); $sth = $db->prepare( 'INSERT INTO processing_items VALUES (null,?)' ); $db->execute( $sth, array( $function ) ); $res = $db->query( "SELECT last_insert_id()" ); $id = null; while( $res->fetchInto( $row ) ) { $id = $row[0]; } foreach( $args as $key => $value ) { $sth = $db->prepare( 'INSERT INTO processing_args VALUES (null,?,?,?)' ); $db->execute( $sth, array( $id, $key, $value ) ); } return true; } public static function get_all() { $db = ProcessingItems::get_db(); $res = $db->query( "SELECT * FROM processing_items" ); $rows = array(); while( $res->fetchInto( $row ) ) { $item = array(); $item['id'] = $row[0]; $item['function'] = $row[1]; $item['args'] = array(); $ares = $db->query( "SELECT key_name, value FROM processing_args WHERE item_id=?", $item['id'] ); while( $ares->fetchInto( $arow ) ) $item['args'][ $arow[0] ] = $arow[1]; $rows []= $item; } return $rows; } } ?>
這個(gè)類包含三個(gè)重要的方法:add()、get_all() 和 delete()。與 mailouts 系統(tǒng)一樣,前端使用 add(),處理引擎使用 get_all() 和delete()。
清單 7 所示的測(cè)試腳本將一個(gè)條目添加到處理隊(duì)列中。
清單 7. generic_test_add.php
'foo' ) ); ?>
在這個(gè)示例中,添加了一個(gè)對(duì) printvalue 函數(shù)的調(diào)用,并將 value 參數(shù)設(shè)置為 foo。我使用 PHP 命令行解釋器運(yùn)行這個(gè)腳本,并將這個(gè)方法調(diào)用放進(jìn)隊(duì)列中。然后使用以下處理腳本運(yùn)行這個(gè)方法。
清單 8. generic_process.php
這個(gè)腳本非常簡(jiǎn)單。它獲得 get_all() 返回的處理?xiàng)l目,然后使用 call_user_func_array(一個(gè) PHP 內(nèi)部函數(shù))用給定的參數(shù)動(dòng)態(tài)地調(diào)用這個(gè)方法。在這個(gè)示例中,調(diào)用本地的 printvalue 函數(shù)。
為了演示這種功能,我們看看在命令行上發(fā)生了什么:
% php generic_test_add.php % php generic_process.php Printing: foo %
輸出并不多,但是您能夠看出要點(diǎn)。通過這種機(jī)制,可以將任何 PHP 函數(shù)的處理推遲。
現(xiàn)在,如果您不喜歡將 PHP 函數(shù)名和參數(shù)放進(jìn)數(shù)據(jù)庫中,那么另一種方法是在 PHP 代碼中建立數(shù)據(jù)庫中的 “處理作業(yè)類型” 名稱和實(shí)際 PHP 處理函數(shù)之間的映射。按照這種方式,如果以后決定修改 PHP 后端,那么只要 “處理作業(yè)類型” 字符串匹配,系統(tǒng)就仍然可以工作。
放棄數(shù)據(jù)庫
最后,我演示另一種稍有不同的解決方案,它使用一個(gè)目錄中的文件來存儲(chǔ)批作業(yè),而不是使用數(shù)據(jù)庫。在這里提供這個(gè)思路并不是建議您 “采用這種方式,而不使用數(shù)據(jù)庫”,這只是一種可供選擇的方式,是否采用它由您決定。
顯然,這個(gè)解決方案中沒有模式,因?yàn)槲覀儾皇褂脭?shù)據(jù)庫。所以先編寫一個(gè)類,它包含與前面示例中相似的 add()、get_all() 和 delete()方法。
清單 9. batch_by_file.php
$v ) { fprintf( $fh, $k.":".$v."\n" ); } fclose( $fh ); return true; } public static function get_all() { $rows = array(); if (is_dir(BATCH_DIRECTORY)) { if ($dh = opendir(BATCH_DIRECTORY)) { while (($file = readdir($dh)) !== false) { $path = BATCH_DIRECTORY.$file; if ( is_dir( $path ) == false ) { $item = array(); $item['id'] = $path; $fh = fopen( $path, 'r' ); if ( $fh ) { $item['function'] = trim(fgets( $fh )); $item['args'] = array(); while( ( $line = fgets( $fh ) ) != null ) { $args = split( ':', trim($line) ); $item['args'][$args[0]] = $args[1]; } $rows []= $item; fclose( $fh ); } } } closedir($dh); } } return $rows; } } ?>
BatchFiles 類有三個(gè)主要方法:add()、get_all() 和 delete()。這個(gè)類不訪問數(shù)據(jù)庫,而是讀寫 batch_items 目錄中的文件。
使用以下測(cè)試代碼添加新的批處理?xiàng)l目。
清單 10. batch_by_file_test_add.php
'foo' ) ); ?>
有一點(diǎn)需要注意:除了類名(BatchFiles)之外,實(shí)際上沒有任何跡象能夠說明作業(yè)是如何存儲(chǔ)的。所以,以后很容易將它改為數(shù)據(jù)庫風(fēng)格的存儲(chǔ)方式,而不需要修改接口。
最后是處理程序的代碼。
清單 11. batch_by_file_processor.php
這段代碼幾乎與數(shù)據(jù)庫版本完全相同,只是修改了文件名和類名。
結(jié)語
正如前面提到的,服務(wù)器對(duì)線程提供了許多支持,可以進(jìn)行后臺(tái)批處理。在某些情況下,使用輔助線程處理小作業(yè)肯定比較容易。但是,也可以使用傳統(tǒng)工具(cron、MySQL、標(biāo)準(zhǔn)的面向?qū)ο蟮?PHP 和 Pear::DB)在 PHP 應(yīng)用程序中創(chuàng)建批作業(yè),這很容易實(shí)現(xiàn)、部署和維護(hù)。