libevent 多線程,使用 Boost 的 IPC 和 MPI 庫進行并發編程

 2023-11-19 阅读 25 评论 0

摘要:使用非常流行的 Boost 庫進行并發編程非常有意思。Boost 有幾個用于并發編程領域的庫:Interprocess (IPC) 庫用于實現共享內存、內存映射的 I/O 和消息隊列;Thread 庫用于實現可移植的多線程;Message Passing Interface (MPI) 庫用于分布式計算中的消息

使用非常流行的 Boost 庫進行并發編程非常有意思。Boost 有幾個用于并發編程領域的庫:Interprocess (IPC) 庫用于實現共享內存、內存映射的 I/O 和消息隊列;Thread 庫用于實現可移植的多線程;Message Passing Interface (MPI) 庫用于分布式計算中的消息傳遞;Asio 庫用于使用套接字和其他低層功能實現可移植的連網功能。本文介紹 IPC 和 MPI 庫以及它們提供的一些功能。

本文中將學習如何使用 Boost IPC 庫實現共享內存對象、消息隊列和同步文件鎖。通過使用 Boost MPI 庫,了解?environment?和communicator?類,以及如何實現分布式通信。

注意:本文中的代碼已經用?gcc-4.3.4?和?boost-1.45?包測試過了。

常用縮寫詞

  • API:應用程序編程接口
  • I/O:輸入/輸出
  • POSIX:針對 UNIX 的便攜式操作系統接口?
  • SDK:軟件開發工具包

使用 Boost IPC 庫

Boost Interprocess 是一個只由頭文件組成的庫,所以您需要做的只是在自己的源代碼中包含適當的頭文件并讓編譯器知道?include?路徑。這是一個非常好的特性;您只需下載 Boost 源代碼(見?參考資料?中的鏈接),然后就可以開始使用了。例如,要想在自己的代碼中使用共享內存,就使用?清單 1?所示的?include


清單 1. Boost IPC 庫只由頭文件組成

				
#include <boost/interprocess/shared_memory_object.hpp>
using namespace boost::interprocess; 
//… your sources follow … 

?

在把信息傳遞給編譯器時,您要求進程根據安裝相應地修改?include?路徑。然后,編譯代碼:

bash-4.1$  g++ ipc1.cpp –I../boost_1_45_0

?

創建共享內存對象

我們先從傳統的 "Hello World!" 程序開始。有兩個進程:第一個進程把字符串 "Hello World!" 寫入內存,另一個進程讀取并顯示此字符串。像?清單 2?這樣創建共享內存對象。


清單 2. 創建共享內存對象

				
#include <boost/interprocess/shared_memory_object.hpp>int main(int argc, char* argv[ ]) 
{using namespace using boost::interprocess; try { // creating our first shared memory object.shared_memory_object sharedmem1 (create_only, "Hello", read_write);// setting the size of the shared memorysharedmem1.truncate (256);// … more code follows} catch (interprocess_exception& e) { // .. .  clean up } 
}

?

sharedmem1?對象的類型是?shared_memory_object(在 Boost 頭文件中聲明并定義),它的構造函數有三個參數:

  • 第一個參數 —?create_only?— 表示要創建這個共享內存對象而且還沒有創建它。如果已經存在同名的共享對象,就會拋出異常。對于希望訪問已經創建的共享內存的進程,第一個參數應該是?open_only
  • 第二個參數 —?Hello?— 是共享內存區域的名稱。另一個進程將使用這個名稱訪問這個共享內存。
  • 第三個參數 —?read_write?— 是共享內存對象的訪問指示符。因為這個進程要修改共享內存對象的內容,所以使用read_write。只從共享內存讀取數據的進程使用?read_only?指示符。

truncate?方法以字節為單位設置共享內存的大小。最好把代碼放在?try-catch?塊中。例如,如果無法創建共享內存對象,就拋出類型為?boost::interprocess_exception?的異常。

使用共享內存對象寫數據

使用共享內存對象的進程必須在自己的地址空間中映射對象。使用在頭文件 mapped_region.hpp 中聲明并定義的?mapped_region?類執行映射。使用?mapped_region?的另一個好處是可以對共享內存對象進行完全和部分訪問。清單 3?演示如何使用?mapped_region


清單 3. 使用 mapped_region 訪問共享內存對象

				
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>int main(int argc, char* argv[ ]) 
{using namespace boost::interprocess; try { // creating our first shared memory object.shared_memory_object sharedmem1 (create_only, "Hello", read_write);// setting the size of the shared memorysharedmem1.truncate (256);// map the shared memory to current process mapped_region mmap (sharedmem1, 256); // access the mapped region using get_addressstd::strcpy(static_cast<char* >(region.get_address()), "Hello World!\n");} catch (interprocess_exception& e) { // .. .  clean up } 
}

?

就這么簡單。現在已經創建了您自己的?mapped_region?對象并使用?get_address?方法訪問了它。執行了?static_cast,因為get_address?返回一個?void*

當主進程退出時共享內存會怎么樣?

當主進程退出時,并不刪除共享內存。要想刪除共享內存,需要調用?shared_memory_object::remove。第二個進程的訪問機制也很簡單:清單 4?證明了這一點。


清單 4. 從第二個進程訪問共享內存對象?

				
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <cstring>
#include <cstdlib>
#include <iostream>int main(int argc, char *argv[ ])
{using namespace boost::interprocess; try { // opening an existing shared memory object shared_memory_object sharedmem2 (open_only, "Hello", read_only);// map shared memory object in current address spacemapped_region mmap (sharedmem2, read_only);// need to type-cast since get_address returns void* char *str1 = static_cast<char*> (mmap.get_address());std::cout << str1 << std::endl;} catch (interprocess_exception& e) { std::cout << e.what( ) << std::endl;} return 0;
}

?

在清單 4 中,使用?open_only?和?read_only?屬性創建共享內存對象。如果無法找到這個共享內存對象,就會拋出異常。現在,構建并運行?清單 3?和?清單 4?中的代碼。應該會在終端上看到 "Hello World!"。

接下來,在第二個進程的代碼(清單 4)中?std::cout?后面添加以下代碼并重新構建代碼:

// std::cout code here
shared_memory_object::remove("Hello");
// } catch(interprocess_exception& e) { 

?

連續執行代碼兩次,第二次執行會顯示 "No such file or directory",這證明共享內存已經被刪除了。

使用消息隊列實現進程間通信

現在,研究另一種流行的進程間通信機制:消息隊列。每個參與通信的進程都可以在隊列中添加消息和從隊列讀取消息。消息隊列具有以下性質:

  • 它有名稱,進程使用名稱訪問它。
  • 在創建隊列時,用戶必須指定隊列的最大長度和一個消息的最大大小。
  • 隊列是持久的,這意味著當創建它的進程死亡之后它仍然留在內存中。可以通過顯式地調用boost::interprocess::message_queue::remove?刪除隊列。

在?清單 5?所示的代碼片段中,進程創建了一個可包含 20 個整數的消息隊列。


清單 5. 創建一個可包含 20 個整數的消息隊列

				
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream> int main(int argc, char* argv[ ]) 
{using namespace boost::interprocess;try { // creating a message queuemessage_queue mq (create_only,   // only create"mq",              // name20,                 //max message countsizeof(int)      //max message size);// … more code follows} catch (interprocess_exception& e) { std::cout << e.what( ) << std::endl;} 
}

?

注意傳遞給?message_queue?的構造函數的?create_only?屬性。與共享內存對象相似,對于以只讀方式打開消息隊列,應該把open_only?屬性傳遞給構造函數。

發送和接收數據

在發送方,使用隊列的?send?方法添加數據。send?方法有三個輸入參數:原始數據的指針 (void*)、數據的大小和優先級。目前,以相同的優先級發送所有數據。清單 6?給出代碼。


清單 6. 向隊列發送消息

				
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream> int main(int argc, char* argv[ ]) 
{using namespace boost::interprocess;try { // creating a message queuemessage_queue mq (create_only,   // only create"mq",              // name20,                 //max message countsizeof(int)      //max message size);// now send the messages to the queuefor (int i=0; i<20; ++i) mq.send(&i, sizeof(int), 0); // the 3rd argument is the priority } catch (interprocess_exception& e) { std::cout << e.what( ) << std::endl;} 
}

?

在接收方,使用?open_only?屬性創建隊列。通過調用?message_queue?類的?receive?方法從隊列獲取消息。清單 7?給出?receive?的方法簽名。


清單 7. message_queue::receive 的方法簽名

				
void receive (void *buffer,           std::size_t buffer_size, std::size_t &recvd_size,unsigned int &priority);

?

我們來仔細看一下。第一個參數是從隊列接收的數據將被存儲到的位置。第二個參數是接收的數據的預期大小。第三個參數是接收的數據的實際大小。第四個參數是接收的消息的優先級。顯然,如果在執行程序期間第二個和第三個參數不相等,就是出現錯誤了。清單 8?給出接收者進程的代碼。


清單 8. 從消息隊列接收消息

				
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream> int main(int argc, char* argv[ ]) 
{using namespace boost::interprocess;try { // opening the message queue whose name is mqmessage_queue mq (open_only,     // only open"mq"               // name);size_t recvd_size; unsigned int priority; // now send the messages to the queuefor (int i=0; i<20; ++i) { int buffer; mq.receive ((void*) &buffer, sizeof(int), recvd_size, priority); if (recvd_size != sizeof(int)) ; // do the error handlingstd::cout << buffer << " " << recvd_size << " " << priority;} } catch (interprocess_exception& e) { std::cout << e.what( ) << std::endl;} 
}

?

這相當簡單。注意,仍然沒有從內存中刪除消息隊列;與共享內存對象一樣,這個隊列是持久的。要想刪除隊列,應該在使用完隊列之后添加以下行:

message_queue::remove("mq"); // remove the queue using its name

?

消息優先級

在發送方,做?清單 9?所示的修改。接收方代碼不需要修改。


清單 9. 修改消息的優先級

				message_queue::remove("mq"); // remove the old queuemessage_queue mq (…); // create as beforefor (int i=0; i<20; ++i) mq.send(&i, sizeof(int), i%2); // 第 3 個參數為消息的優先級// … rest as usual

?

再次運行代碼時,應該會看到?清單 10?所示的輸出。


清單 10. 在接收進程中看到的輸出

				
1 4 1
3 4 1
5 4 1
7 4 1
9 4 1
11 4 1
13 4 1
15 4 1
17 4 1
19 4 1
0 4 0
2 4 0
4 4 0
6 4 0
8 4 0
10 4 0
12 4 0
14 4 0
16 4 0
18 4 0

?

清單 10?證實,第二個進程優先接收優先級高的消息。

同步對文件的訪問

共享內存和消息隊列很不錯,但是文件 I/O 也是重要的進程間通信工具。對并發進程用于通信的文件訪問進行同步并非易事,但是 Boost IPC 庫提供的文件鎖功能讓同步變得簡單了。在進一步解釋之前,來看一下?清單 11,了解?file_lock?對象是如何工作的。


清單 11. 使用 file_lock 對象同步文件訪問?

				
#include <fstream> 
#include <iostream> 
#include <boost/interprocess/sync/file_lock.hpp> 
#include <cstdlib>int main() 
{ using namespace boost::interprocess; std::string fileName("test"); std::fstream file;file.open(fileName.c_str(), std::ios::out | std::ios::binary | 
std::ios::trunc); if (!file.is_open() || file.bad()) { std::cout << "Open failed" << std::endl; exit(-1); }try { file_lock f_lock(fileName.c_str());f_lock.lock();std::cout << "Locked in Process 1" << std::endl;file.write("Process 1", 9);file.flush(); f_lock.unlock();std::cout << "Unlocked from Process 1" << std::endl;} catch (interprocess_exception& e) { std::cout << e.what( ) << std::endl;}file.close();return 0; 
}

?

代碼首先打開一個文件,然后使用?file_lock?鎖定它。寫操作完成之后,它刷新文件緩沖區并解除文件鎖。使用?lock?方法獲得對文件的獨占訪問。如果另一個進程也試圖對此文件進行寫操作并已經請求了鎖,那么它會等待,直到第一個進程使用?unlock?自愿地放棄鎖。file_lock?類的構造函數接受要鎖定的文件的名稱,一定要在調用?lock?之前打開文件;否則會拋出異常。

現在,復制?清單 11?中的代碼并做一些修改。具體地說,讓第二個進程請求這個鎖。清單 12?給出相關修改。


清單 12. 試圖訪問文件的第二個進程的代碼

				// .. as in Listing 11file_lock f_lock(fileName.c_str());f_lock.lock();std::cout << "Locked in Process 2" << std::endl;system("sleep 4"); file.write("Process 2", 9);file.flush(); f_lock.unlock();std::cout << "Unlocked from Process 2" << std::endl;// file.close(); 

?

現在,如果這兩個進程同時運行,有 50% 的機會看到第一個進程等待 4 秒后才獲得?file_lock,其他情況都不變。

在使用?file_lock?時,必須記住幾點。這里討論的主題是進程間通信,重點在進程?上。這意味著,不是使用?file_lock?來同步同一進程中各個線程的數據訪問。在與 POSIX 兼容的系統上,文件句柄是進程屬性,而不是?線程屬性。下面是使用文件鎖的幾條規則:

  • 對于每個進程,每個文件使用一個?file_lock?對象。
  • 使用相同的線程來鎖定和解鎖文件。
  • 在解鎖文件之前,通過調用?C?的?flush?庫例程或?flush?方法(如果喜歡使用?C++ fstream?的話),刷新寫入者進程中的數據。

結合使用 file_lock 和有范圍(scope)的鎖

在執行程序時,可能會出現拋出異常而文件沒有解鎖的情況。這種情況可能會導致意外的程序行為。為了避免這種情況,可以考慮把file_lock?對象放在(boost/interprocess/sync/scoped_lock.hpp 中定義的)scoped_lock?中。如果使用?scoped_lock,就不需要顯式地鎖定或解鎖文件;鎖定發生在構造器內,每當您退出該范圍,就會自動發生解鎖。清單 13?給出對?清單 11?的修改,使之使用有范圍的鎖。


清單 13. 結合使用 scoped_lock 和 file_lock

				
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/file_lock.hpp>//… code as in Listing 11
file_lock f_lock(fileName.c_str());
scoped_lock<file_lock> s_lock(f_lock);  // internally calls f_lock.lock( ); // No need to call explicit lock anymore
std::cout << "Locked in Process 1" << std::endl;
file.write("Process 1", 9);
// … code as in Listing 11

?

注意:關于 Resource Acquisition Is Initialization (RAII) 編程習慣法的更多信息,參見?參考資料?中的鏈接。

了解 Boost MPI

如果您不熟悉 Message Passing Interface,那么在討論 Boost MPI 之前,應該先瀏覽?參考資料?中提供的 MPI 參考資料。MPI 是一個容易使用的標準,它采用通過傳遞消息實現進程間通信的模型。不需要使用套接字或其他通信原語;MPI 后端管理所有底層處理。那么,使用 Boost MPI 有什么好處?Boost MPI 的創建者提供了更高層的抽象,并在 MPI 提供的 API 之上構建了一套簡單的例程,比如?MPI_Init?和?MPI_Bcast

Boost MPI 不是一個單獨的庫,不能在下載和構建之后直接使用。相反,必須安裝任意 MPI 實現(比如 MPICH 或 Open MPI)并構建 Boost Serialization 庫。關于如何構建 Boost MPI 的詳細信息參見?參考資料。通常,使用以下命令構建 Boost MPI:

bash-4.1$ bjam –with-mpi

?

Windows? 用戶可以從 BoostPro 下載預先構建的 MPI 庫(見?參考資料)。這些庫與 Microsoft? HPC Pack 2008 和 2008 R2 兼容(見?參考資料),適用于帶 Service Pack 3 的 Windows XP 或更高版本的客戶機操作環境。

用 MPI 實現 Hello World 程序

您必須了解 Boost MPI 庫中的兩個主要類:environment?類和?communicator?類。前者負責分布式環境的初始化;后者用于進程之間的通信。因為這里討論的是分布式計算,我們有四個進程,它們都在終端上輸出 "Hello World"。清單 14?給出代碼。


清單 14. 使用 Boost MPI 的 Hello World

				
#include <boost/mpi.hpp>
#include <iostream>int main(int argc, char* argv[])
{boost::mpi::environment env(argc, argv);boost::mpi::communicator world;std::cout << argc << std::endl;std::cout << argv[0] << std::endl;std::cout << "Hello World! from process " << world.rank() << std::endl;return 0;
}

?

現在,構建?清單 14?中的代碼并鏈接 Boost MPI 和 Serialization 庫。在 shell 提示上運行可執行程序。應該會看到 "Hello World! from process 0"。接下來,使用 MPI 分派器工具(例如,對于 Open MPI 用戶,使用?mpirun;對于 Microsoft HPC Pack 2008,使用mpiexec)并運行可執行程序:

mpirun –np 4 <executable name> ORmpiexec –n 4 <executable name>

?

現在應該會看到與?清單 15?相似的輸出,其中的?mympi1?是可執行程序名稱。


清單 15. 運行 MPI 代碼的輸出

				
1
mympi1
Hello, World! from process 3
1
mympi1
1
mympi1
Hello, World! from process 1
Hello, World! from process 2
1
mympi1
Hello, World! from process 0 

?

在 MPI 框架中,已經創建了相同進程的四個拷貝。在 MPI 環境中,每個進程有惟一的 ID(由?communicator?對象決定)。現在,試試進程間通信。使用?send?和?receive?函數調用讓一個進程與另一個進程通信。發送消息的進程稱為主進程,接收消息的進程稱為工作者進程。主進程和接收者進程的源代碼是相同的,使用?world?對象提供的等級決定功能(見?清單 16)。


清單 16. 相互通信的進程 0、1 和 2 的代碼?

				
#include <boost/mpi.hpp>
#include <iostream>int main(int argc, char* argv[]) 
{boost::mpi::environment env(argc, argv);boost::mpi::communicator world;if (world.rank() == 0) {world.send(1, 9, 32);world.send(2, 9, 33);} else { int data;world.recv(0, 9, data);std::cout << "In process " << world.rank( ) << "with data " << data<< std::endl;} return 0;
}

?

先看一下?send?函數。第一個參數是接收者進程的 ID;第二個是消息數據的 ID;第三個是實際數據。為什么需要消息標簽?接收者進程在執行期間的特定點上可能希望處理具有特定標簽的消息,所以這個方案會有幫助。對于進程 1 和 2,recv?函數被阻塞,這意味著程序會等待,直到從進程 0 收到標簽 ID 為 9 的消息。當收到這個消息時,把信息存儲在?data?中。下面是運行代碼的輸出:

In process 1 with data 32
In process 2 with data 33

?

如果在接收方有?world.recv(0, 1, data);?這樣的代碼,會發生什么?代碼阻塞,但實際上是,接收者進程在等待一個永遠不會到達的消息。

結束語

本文只討論了這兩個庫提供的功能的很小一部分。這些庫提供的其他功能包括 IPC 的內存映射 I/O 和 MPI 的廣播功能。從易用性的角度來說,IPC 更好。MPI 庫依賴于原生的 MPI 實現,而原生 MPI 庫以及預先構建的 Boost MPI 和 Serialization 庫的現成可用性仍然是個問題。但是,花點兒精力構建 MPI 實現和 Boost 的源代碼是值得的。

?

參考資料

學習

  • 了解關于?進程間通信?的更多信息。?

  • 了解如何?構建 Boost MPI。?

  • 了解所有?mpirun?選項。?

  • 了解關于?RAII 習慣法?的更多信息。?

  • 閱讀?Message Passing Interface 標準。?

  • 了解關于?Microsoft HPC Pack 2008/R2 SDK?的更多信息。?

  • AIX and UNIX 專區:developerWorks 的“AIX and UNIX 專區”提供了大量與 AIX 系統管理的所有方面相關的信息,您可以利用它們來擴展自己的 UNIX 技能。

  • AIX and UNIX 新手入門:訪問“AIX and UNIX 新手入門”頁面可了解更多關于 AIX 和 UNIX 的內容。

  • AIX and UNIX 專題匯總:AIX and UNIX 專區已經為您推出了很多的技術專題,為您總結了很多熱門的知識點。我們在后面還會繼續推出很多相關的熱門專題給您,為了方便您的訪問,我們在這里為您把本專區的所有專題進行匯總,讓您更方便的找到您需要的內容。

  • AIX and UNIX 下載中心:在這里你可以下載到可以運行在 AIX 或者是 UNIX 系統上的 IBM 服務器軟件以及工具,讓您可以提前免費試用他們的強大功能。

  • IBM Systems Magazine for AIX 中文版:本雜志的內容更加關注于趨勢和企業級架構應用方面的內容,同時對于新興的技術、產品、應用方式等也有很深入的探討。IBM Systems Magazine 的內容都是由十分資深的業內人士撰寫的,包括 IBM 的合作伙伴、IBM 的主機工程師以及高級管理人員。所以,從這些內容中,您可以了解到更高層次的應用理念,讓您在選擇和應用 IBM 系統時有一個更好的認識。

  • 技術書店:瀏覽關于這些和其他技術主題的圖書。?

獲得產品和技術

  • 進一步了解并下載?Boost Thread 庫。?

  • 進一步了解并下載?Boost MPI 庫。?

  • 從?BoostPro?下載預先構建的 MPI 庫。?

  • 下載?Boost IPC?庫。?

  • 訪問?MPICH2 下載站點。?

  • 訪問?Open MPI v1.4 下載站點。?

討論

  • developerWorks 博客:閱讀我們的博客并加入?developerWorks 社區。?

  • 參與 AIX 和 UNIX 論壇:
    • AIX 5L — 技術論壇
    • AIX for Developers 論壇
    • 集群系統管理
    • IBM Support Assistant
    • 性能工具 — 技術
    • 更多 AIX 和 UNIX 論壇

關于作者

Arpan Sen 是致力于電子設計自動化行業的軟件開發首席工程師。他使用各種 UNIX 版本(包括 Solaris、SunOS、HP-UX 和 IRIX)以及 Linux 和 Microsoft Windows 已經多年。他熱衷于各種軟件性能優化技術、圖論和并行計算。Arpan 獲得了軟件系統碩士學位。

?

轉載于:https://www.cnblogs.com/xinyuyuanm/archive/2013/04/09/3010813.html

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/3/181073.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息