2013-05-01 5 views
9

Ho implementato alcuni modelli di progettazione di oggetti attivi basati su moduli. È un'implementazione molto semplice. Ho Scheduler, ActivationList, Requests e Futures per ottenere risposta. miei requisiti erano così:boost :: asio e oggetto attivo

  • L'accesso all'oggetto attiva è serializzato eseguendo i suoi metodi nel proprio thread (req principale e assunzione di attivo design pattern oggetto )
  • chiamante deve essere in grado di specificare la priorità dell'esecuzione delle richieste. Significa che se ci sono più di zero richieste in attesa di esecuzione, devono essere ordinate in base alla priorità assegnata a ciascuna richiesta. Le richieste con priorità più alta devono essere eseguite per prime, quindi se ci saranno sempre richieste in sospeso su ActivationList e avranno priorità più alta di una determinata richiesta, questa richiesta non verrà mai eseguita - è OK per me
  • Deve essere possibile specificare il numero massimo di richieste in sospeso nell'elenco (limitare l'utilizzo della memoria)
  • Deve essere possibile invalidare tutte le richieste in sospeso
  • Le richieste devono essere in grado di restituire valori (bloccando il chiamante) OPPURE devono essere eseguite senza valore ritorno ma il chiamante deve essere bloccato fino a quando la richiesta non viene elaborata OPPURE il chiamante non deve essere bloccato e non è importante per esso se la richiesta data è stata elaborata o meno g
  • Poco prima dell'esecuzione della richiesta, deve essere eseguito un metodo di guardia per verificare se la richiesta data deve essere eseguita o meno. In caso contrario - deve tornare un valore indefinito a chiamante (nel mio attuale implementazione è boost :: nessuno, perché ogni tipo di ritorno alla richiesta non è boost :: opzionale)

OK ora domanda: E 'possibile utilizzare boost :: asio e soddisfare tutte le mie esigenze? La mia implementazione sta funzionando, ma vorrei usare qualcosa che probabilmente è implementato in modo molto migliore di quanto ho fatto io. Inoltre mi piacerebbe conoscerlo per il futuro e non "reinventare la ruota" ancora una volta.

+0

spinta ASIO non bloccherà. L'ultima parte del tuo penultimo è coperta dall'ultima dichiarazione. tutto il resto è completamente in grado di C++ regolare con una spinta in più, anche se, certamente, più facile con esso. Potrebbe volere verificare anche la serializzazione di boost, se non la stai già utilizzando. – johnathon

+0

L'ho già implementato usando il semplice C++. In realtà con un grande aiuto di boost thread e boost conatizzatore multi indice. Ma l'obiettivo è non usare la mia ipmplementation e invece di usare boost :: asio. – user2301299

risposta

28

Boost.Asio può essere utilizzato per comprendere l'intenzione di Active Object: disaccoppiamento dell'esecuzione del metodo dall'invocazione del metodo. Requisiti aggiuntivi dovranno essere gestiti ad un livello superiore, ma non è eccessivamente complesso quando si utilizza Boost.Asio in combinazione con altre librerie Boost.

Scheduler potrebbe usare:

ActivationList potrebbero essere attuate come:

  • Un Boost.MultiIndex per l'ottenimento di elevate richieste di metodo priorità. Con una posizione suggerita insert(), l'ordine di inserimento viene conservato per la richiesta con la stessa priorità.
  • std::multiset o std::multimap può essere utilizzato. Tuttavia, in C++ 03 non è specificato l'ordine di richiesta con la stessa chiave (priorità).
  • Se Request non è necessario un metodo di protezione, è possibile utilizzare std::priority_queue.

Request potrebbe essere un tipo specificato:

  • boost::function e boost::bind potrebbero essere utilizzati per fornire un tipo-cancellazione, mentre il legame ai tipi richiamabili senza introdurre una gerarchia Request.

Futures potrebbe utilizzare il supporto di Boost.Thread Futures.

  • future.valid() tornerà vero se Request è stato aggiunto a ActivationList.
  • future.wait() bloccherà in attesa che un risultato sia disponibile.
  • future.get() bloccherà in attesa del risultato.
  • Se il chiamante non fa nulla con future, il chiamante non verrà bloccato.
  • Un altro vantaggio dell'utilizzo di Futures di Boost.Thread è che le eccezioni provenienti da un Request verranno passate allo Future.

Ecco un esempio completo sfruttando varie librerie Boost e deve soddisfare i requisiti:

// Standard includes 
#include <algorithm> // std::find_if 
#include <iostream> 
#include <string> 

// 3rd party includes 
#include <boost/asio.hpp> 
#include <boost/bind.hpp> 
#include <boost/function.hpp> 
#include <boost/make_shared.hpp> 
#include <boost/multi_index_container.hpp> 
#include <boost/multi_index/ordered_index.hpp> 
#include <boost/multi_index/member.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/thread.hpp> 
#include <boost/utility/result_of.hpp> 

/// @brief scheduler that provides limits with prioritized jobs. 
template <typename Priority, 
      typename Compare = std::less<Priority> > 
class scheduler 
{ 
public: 
    typedef Priority priority_type; 
private: 

    /// @brief method_request is used to couple the guard and call 
    ///  functions for a given method. 
    struct method_request 
    { 
    typedef boost::function<bool()> ready_func_type; 
    typedef boost::function<void()> run_func_type; 

    template <typename ReadyFunctor, 
       typename RunFunctor> 
    method_request(ReadyFunctor ready, 
        RunFunctor run) 
     : ready(ready), 
     run(run) 
    {} 

    ready_func_type ready; 
    run_func_type run; 
    }; 

    /// @brief Pair type used to associate a request with its priority. 
    typedef std::pair<priority_type, 
        boost::shared_ptr<method_request> > pair_type; 

    static bool is_method_ready(const pair_type& pair) 
    { 
    return pair.second->ready(); 
    } 

public: 

    /// @brief Construct scheduler. 
    /// 
    /// @param max_threads Maximum amount of concurrent task. 
    /// @param max_request Maximum amount of request. 
    scheduler(std::size_t max_threads, 
      std::size_t max_request) 
    : work_(io_service_), 
     max_request_(max_request), 
     request_count_(0) 
    { 
    // Spawn threads, dedicating them to the io_service. 
    for (std::size_t i = 0; i < max_threads; ++i) 
     threads_.create_thread(
     boost::bind(&boost::asio::io_service::run, &io_service_)); 
    } 

    /// @brief Destructor. 
    ~scheduler() 
    { 
    // Release threads from the io_service. 
    io_service_.stop(); 
    // Cleanup. 
    threads_.join_all(); 
    } 

    /// @brief Insert a method request into the scheduler. 
    /// 
    /// @param priority Priority of job. 
    /// @param ready_func Invoked to check if method is ready to run. 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @return future associated with the method. 
    template <typename ReadyFunctor, 
      typename RunFunctor> 
    boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
    insert(priority_type priority, 
     const ReadyFunctor& ready_func, 
     const RunFunctor& run_func) 
    { 
    typedef typename boost::result_of<RunFunctor()>::type result_type; 
    typedef boost::unique_future<result_type> future_type; 

    boost::unique_lock<mutex_type> lock(mutex_); 

    // If max request has been reached, then return an invalid future. 
    if (max_request_ && 
     (request_count_ == max_request_)) 
     return future_type(); 

    ++request_count_; 

    // Use a packaged task to handle populating promise and future. 
    typedef boost::packaged_task<result_type> task_type; 

    // Bind does not work with rvalue, and packaged_task is only moveable, 
    // so allocate a shared pointer. 
    boost::shared_ptr<task_type> task = 
     boost::make_shared<task_type>(run_func); 

    // Create method request. 
    boost::shared_ptr<method_request> request = 
     boost::make_shared<method_request>(
     ready_func, 
     boost::bind(&task_type::operator(), task)); 

    // Insert into priority. Hint to inserting as close to the end as 
    // possible to preserve insertion order for request with same priority. 
    activation_list_.insert(activation_list_.end(), 
          pair_type(priority, request)); 

    // There is now an outstanding request, so post to dispatch. 
    io_service_.post(boost::bind(&scheduler::dispatch, this)); 

    return task->get_future(); 
    } 

    /// @brief Insert a method request into the scheduler. 
    /// 
    /// @param ready_func Invoked to check if method is ready to run. 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @return future associated with the method. 
    template <typename ReadyFunctor, 
      typename RunFunctor> 
    boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
    insert(const ReadyFunctor& ready_func, 
     const RunFunctor& run_func) 
    { 
    return insert(priority_type(), ready_func, run_func); 
    } 

    /// @brief Insert a method request into the scheduler. 
    /// 
    /// @param priority Priority of job. 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @return future associated with the method. 
    template <typename RunFunctor> 
    boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
    insert(priority_type priority, 
     const RunFunctor& run_func) 
    { 
    return insert(priority, &always_ready, run_func); 
    } 

    /// @brief Insert a method request with default priority into the 
    ///  scheduler. 
    /// 
    /// @param run_func Invoked when ready to run. 
    /// 
    /// @param functor Job to run. 
    /// 
    /// @return future associated with the job. 
    template <typename RunFunc> 
    boost::unique_future<typename boost::result_of<RunFunc()>::type> 
    insert(const RunFunc& run_func) 
    { 
    return insert(&always_ready, run_func); 
    } 

    /// @brief Cancel all outstanding request. 
    void cancel() 
    { 
    boost::unique_lock<mutex_type> lock(mutex_); 
    activation_list_.clear(); 
    request_count_ = 0; 
    } 

private: 

    /// @brief Dispatch a request. 
    void dispatch() 
    { 
    // Get the current highest priority request ready to run from the queue. 
    boost::unique_lock<mutex_type> lock(mutex_); 
    if (activation_list_.empty()) return; 

    // Find the highest priority method ready to run. 
    typedef typename activation_list_type::iterator iterator; 
    iterator end = activation_list_.end(); 
    iterator result = std::find_if(
     activation_list_.begin(), end, &is_method_ready); 

    // If no methods are ready, then post into dispatch, as the 
    // method may have become ready. 
    if (end == result) 
    { 
     io_service_.post(boost::bind(&scheduler::dispatch, this)); 
     return; 
    } 

    // Take ownership of request. 
    boost::shared_ptr<method_request> method = result->second; 
    activation_list_.erase(result); 

    // Run method without mutex. 
    lock.unlock(); 
    method->run();  
    lock.lock(); 

    // Perform bookkeeping. 
    --request_count_; 
    } 

    static bool always_ready() { return true; } 

private: 

    /// @brief List of outstanding request. 
    typedef boost::multi_index_container< 
    pair_type, 
    boost::multi_index::indexed_by< 
     boost::multi_index::ordered_non_unique< 
     boost::multi_index::member<pair_type, 
            typename pair_type::first_type, 
            &pair_type::first>, 
     Compare 
     > 
    > 
    > activation_list_type; 
    activation_list_type activation_list_; 

    /// @brief Thread group managing threads servicing pool. 
    boost::thread_group threads_; 

    /// @brief io_service used to function as a thread pool. 
    boost::asio::io_service io_service_; 

    /// @brief Work is used to keep threads servicing io_service. 
    boost::asio::io_service::work work_; 

    /// @brief Maximum amount of request. 
    const std::size_t max_request_; 

    /// @brief Count of outstanding request. 
    std::size_t request_count_; 

    /// @brief Synchronize access to the activation list. 
    typedef boost::mutex mutex_type; 
    mutex_type mutex_; 
}; 

typedef scheduler<unsigned int, 
        std::greater<unsigned int> > high_priority_scheduler; 

/// @brief adder is a simple proxy that will delegate work to 
///  the scheduler. 
class adder 
{ 
public: 
    adder(high_priority_scheduler& scheduler) 
    : scheduler_(scheduler) 
    {} 

    /// @brief Add a and b with a priority. 
    /// 
    /// @return Return future result. 
    template <typename T> 
    boost::unique_future<T> add(
    high_priority_scheduler::priority_type priority, 
    const T& a, const T& b) 
    { 
    // Insert method request 
    return scheduler_.insert(
     priority, 
     boost::bind(&adder::do_add<T>, a, b)); 
    } 

    /// @brief Add a and b. 
    /// 
    /// @return Return future result. 
    template <typename T> 
    boost::unique_future<T> add(const T& a, const T& b) 
    { 
    return add(high_priority_scheduler::priority_type(), a, b); 
    } 

private: 

    /// @brief Actual add a and b. 
    template <typename T> 
    static T do_add(const T& a, const T& b) 
    { 
    std::cout << "Starting addition of '" << a 
       << "' and '" << b << "'" << std::endl; 
    // Mimic busy work. 
    boost::this_thread::sleep_for(boost::chrono::seconds(2)); 
    std::cout << "Finished addition" << std::endl; 
    return a + b; 
    } 

private: 
    high_priority_scheduler& scheduler_; 
}; 

bool get(bool& value) { return value; } 
void guarded_call() 
{ 
    std::cout << "guarded_call" << std::endl; 
} 

int main() 
{ 
    const unsigned int max_threads = 1; 
    const unsigned int max_request = 4; 

    // Sscheduler 
    high_priority_scheduler scheduler(max_threads, max_request); 

    // Proxy 
    adder adder(scheduler); 

    // Client 

    // Add guarded method to scheduler. 
    bool ready = false; 
    std::cout << "Add guarded method." << std::endl; 
    boost::unique_future<void> future1 = scheduler.insert(
    boost::bind(&get, boost::ref(ready)), 
    &guarded_call); 

    // Add 1 + 100 with default priority. 
    boost::unique_future<int> future2 = adder.add(1, 100); 

    // Force sleep to try to get scheduler to run request 2 first. 
    boost::this_thread::sleep_for(boost::chrono::seconds(1)); 

    // Add: 
    // 2 + 200 with low priority (5) 
    // "test" + "this" with high priority (99) 
    boost::unique_future<int> future3 = adder.add(5, 2, 200); 
    boost::unique_future<std::string> future4 = adder.add(99, 
    std::string("test"), std::string("this")); 

    // Max request should have been reached, so add another. 
    boost::unique_future<int> future5 = adder.add(3, 300); 

    // Check if request was added. 
    std::cout << "future1 is valid: " << future1.valid() 
      << "\nfuture2 is valid: " << future2.valid() 
      << "\nfuture3 is valid: " << future3.valid() 
      << "\nfuture4 is valid: " << future4.valid() 
      << "\nfuture5 is valid: " << future5.valid() 
      << std::endl; 

    // Get results for future2 and future3. Do nothing with future4's results. 
    std::cout << "future2 result: " << future2.get() 
      << "\nfuture3 result: " << future3.get() 
      << std::endl; 

    std::cout << "Unguarding method." << std::endl; 
    ready = true; 
    future1.wait(); 
} 

L'esecuzione utilizza pool di thread di 1 con un massimo di 4 richiesta.

  • request1 è protetto fino alla fine del programma e dovrebbe essere l'ultimo a essere eseguito.
  • request2 (1 + 100) viene inserito con priorità predefinita e dovrebbe essere il primo a essere eseguito.
  • request3 (2 + 200) è inserito a bassa priorità e deve essere eseguito dopo request4.
  • request4 ('test' + 'this') viene inserito con priorità alta e deve essere eseguito prima di request3.
  • request5 non dovrebbe essere inserito a causa della richiesta massima e non dovrebbe essere valido.

L'uscita è il seguente:

Add guarded method. 
Starting addition of '1' and '100' 
future1 is valid: 1 
future2 is valid: 1 
future3 is valid: 1 
future4 is valid: 1 
future5 is valid: 0 
Finished addition 
Starting addition of 'test' and 'this' 
Finished addition 
Starting addition of '2' and '200' 
Finished addition 
future2 result: 101 
future3 result: 202 
Unguarding method. 
guarded_call
+1

Grazie per questa risposta, vorrei poterti dare più di 1 upvote. – MrEvil

+0

Post molto utile, quello che manca sono i casi d'uso che non devo setacciare github/so per – arynaq