《ACE程序员指南》中领导者与跟随者模式示例代码中的bug和解决方案

    技术2024-11-03  24

    《ACE程序员指南》中线程池一章中提到了两个模式:

     

    半同步半异步模式和领导者和跟随者模式,对于后者

     

    书中给出了一个示例程序,为了方便说明,我把示例代码贴在这里:

     

    #include "ace/config-lite.h"#if defined (ACE_HAS_THREADS)

    #include "ace/OS_NS_string.h"#include "ace/OS_NS_sys_time.h"#include "ace/Task.h"#include "ace/Containers.h"#include "ace/Synch.h"

    // Listing 4 code/ch16class Follower{public:  Follower (ACE_Thread_Mutex &leader_lock)    : cond_(leader_lock)  {    owner_ = ACE_Thread::self ();  }

      //FUZZ: disable check_for_lack_ACE_OS  int wait (void)  {    return this->cond_.wait ();  }

      int signal (void)  {    return this->cond_.signal ();  }  //FUZZ: enable check_for_lack_ACE_OS

      ACE_thread_t owner (void)  {    return this->owner_;  }

    private:  ACE_Condition<ACE_Thread_Mutex> cond_;  ACE_thread_t owner_;};// Listing 4// Listing 1 code/ch16class LF_ThreadPool : public ACE_Task<ACE_MT_SYNCH>{public:  LF_ThreadPool () : shutdown_(0), current_leader_(0)  {    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::TP"));  }

      virtual int svc (void);

      void shut_down (void)  {    shutdown_ = 1;  }

    private:  int become_leader (void);

      Follower *make_follower (void);

      int elect_new_leader (void);

      int leader_active (void)  {    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));    return this->current_leader_ != 0;  }

      void leader_active (ACE_thread_t leader)  {    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));    this->current_leader_ = leader;  }

      void process_message (ACE_Message_Block *mb);

      int done (void)  {    return (shutdown_ == 1);  }

    private:  int shutdown_;  ACE_thread_t current_leader_;  ACE_Thread_Mutex leader_lock_;  ACE_Unbounded_Queue<Follower*> followers_;  ACE_Thread_Mutex followers_lock_;  static long LONG_TIME;};// Listing 1// Listing 2 code/ch16intLF_ThreadPool::svc (void){  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::svc"));  while (!done ())    {      become_leader ();  // Block until this thread is the leader.

          ACE_Message_Block *mb = 0;      ACE_Time_Value tv (LONG_TIME);      tv += ACE_OS::gettimeofday ();

          // Get a message, elect new leader, then process message.      if (this->getq (mb, &tv) < 0)        {          if (elect_new_leader () == 0)            break;          continue;        }

          elect_new_leader ();      process_message (mb);    }

      return 0;}// Listing 2// Listing 3 code/ch16intLF_ThreadPool::become_leader (void){  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::become_leader"));

      ACE_GUARD_RETURN    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);  if (leader_active ())    {      Follower *fw = make_follower ();      {        // Wait until told to do so.        while (leader_active ())          fw->wait ();      }

          delete fw;    }

      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Becoming the leader/n")));

      // Mark yourself as the active leader.  leader_active (ACE_Thread::self ());  return 0;}

    Follower*LF_ThreadPool::make_follower (void){  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::make_follower"));

      ACE_GUARD_RETURN    (ACE_Thread_Mutex, follower_mon, this->followers_lock_, 0);  Follower *fw;  ACE_NEW_RETURN (fw, Follower (this->leader_lock_), 0);  this->followers_.enqueue_tail (fw);  return fw;}// Listing 3// Listing 5 code/ch16intLF_ThreadPool::elect_new_leader (void){  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::elect_new_leader"));

      ACE_GUARD_RETURN    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);  leader_active (0);

      // Wake up a follower  if (!followers_.is_empty ())    {      ACE_GUARD_RETURN (ACE_Thread_Mutex,                        follower_mon,                        this->followers_lock_,                        -1);      // Get the old follower.      Follower *fw;      if (this->followers_.dequeue_head (fw) != 0)        return -1;      ACE_DEBUG ((LM_DEBUG,                  ACE_TEXT ("(%t) Resigning and Electing %d/n"),                  fw->owner ()));      return (fw->signal () == 0) ? 0 : -1;    }  else    {      ACE_DEBUG        ((LM_ERROR, ACE_TEXT ("(%t) Oops no followers left/n")));      return -1;    }}// Listing 5

    voidLF_ThreadPool::process_message (ACE_Message_Block *mb){  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::process_message"));  int msgId;  ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));  mb->release ();

      ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("(%t) Started processing message:%d/n"),              msgId));  ACE_OS::sleep (1);  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("(%t) Finished processing message:%d/n"),              msgId));}

    long LF_ThreadPool::LONG_TIME = 5L;

    int ACE_TMAIN (int, ACE_TCHAR *[]){  LF_ThreadPool tp;  tp.activate (THR_NEW_LWP| THR_JOINABLE, 5);

      // Wait for a few seconds...  ACE_OS::sleep (2);  ACE_Time_Value tv (1L);

      ACE_Message_Block *mb;  for (int i = 0; i < 30; i++)    {      ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1);      ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));      ACE_OS::sleep (tv);

          // Add a new work item.      tp.putq (mb);    }

      ACE_Thread_Manager::instance ()->wait ();

      ACE_OS::sleep (10);

      return 0;}

    #else#include "ace/OS_main.h"#include "ace/OS_NS_stdio.h"

    int ACE_TMAIN (int, ACE_TCHAR *[]){  ACE_OS::puts (ACE_TEXT ("This example requires threads."));  return 0;}

    #endif /* ACE_HAS_THREADS */

     

    我简单解释一下程序的思路:首先创建一个线程池,这个线程池中包括5个线程,每个线程都试图成为领导者,

     

    但领导者只能有一个,因此一旦某个线程竞争成为领导者,其他线程只能把自己定位为跟随者,

     

    并把与之相应的跟随者对象放在一个跟随者队列中,然后等待机会成为领导者,等待时跟随者线程处于挂起状态。

     

    那个竞争中成为领导者的线程,试图从消息队列中抓起消息,如果消息队列中没有消息,该线程也会处于挂起状态。

     

    一旦客户把一个消息放入队列(相当于添加了一个任务),领导者线程会被激活,并从消息队列中获取一条消息,

     

    然后它做的第一件事情是找一个新领导者,寻找算法就是简单的从跟随者队列的头部取出跟随者,

     

    指定它为新领导者,并激活该跟随者线程,当完成这些事情后,再去处理从队列中获取的消息,去执行相应的任务。

     

    等它完成任务处理后,它再去努力成为领导者,如果发现目前已经有领导者,只好把自己定位为跟随者,并把与之相

     

    对应的跟随者对象放入跟随者队列,等待机会成为领导者。

     

    我在Windows平台上测试上述代码,确定是有问题的,最开始处理几条消息,领导者和跟随者的切换还是对的,

     

    但到后来,有几个线程就永远也无法成为领导者了,最后这有两个线程在轮流坐庄,这个显然不是我们期望的结果。

     

    我仔细的研究了一下线程运行的中间过程,找到了问题的原因。

     

    elect_new_leader 函数在选择完跟随者线程作为新领导者后,并没有在函数中直接修改current_leader_变量为

    新领导者的值,而是把该变量设置为0,修改current_leader_变量为新领导者的值事情延迟到新领导者线程被激活后

     

    再去在该线程中执行,这样中间就有一个时间差,这就给了老领导者一个机会偷窃领导者的位置。

     

    假设老领者已经完成了任务,这时新领导者的线程刚刚被操作系统激活,还没有来得及把current_leader_标记为

     

    自己的Threadid,老领导者发现领导者的位置还没有人,直接把自己设置为领导者,而新领导者本来作为正统的被

     

    选出来的领导者,在做设置领导者前,发现位置上已经有领导了,只好又继续等待,但因为elect_new_leader 函数

     

    已经把它的跟随者对象从队列中移除,这样它永远没有机会成为领导者,而只能一直挂起等待了。

     

    找到问题的原因后,修改bug并不复杂,只要在elect_new_leader 函数中把current_leader_改成新领导者的线程id

     

    就不会出问题了,修改后的代码如下:

     

    #include "ace/config-lite.h"#if defined (ACE_HAS_THREADS)

    #include "ace/OS_NS_string.h"#include "ace/OS_NS_sys_time.h"#include "ace/Task.h"#include "ace/Containers.h"#include "ace/Synch.h"#include <sstream>

    using namespace std;

    // Listing 4 code/ch16class Follower{public:  Follower (ACE_Thread_Mutex &leader_lock)    : cond_(leader_lock)  {    owner_ = ACE_Thread::self ();  }

      //FUZZ: disable check_for_lack_ACE_OS  int wait (void)  {    return this->cond_.wait ();  }

      int signal (void)  {    return this->cond_.signal ();  }  //FUZZ: enable check_for_lack_ACE_OS

      ACE_thread_t owner (void)  {    return this->owner_;  }

    private:  ACE_Condition<ACE_Thread_Mutex> cond_;  ACE_thread_t owner_;};// Listing 4// Listing 1 code/ch16class LF_ThreadPool : public ACE_Task<ACE_MT_SYNCH>{public:  LF_ThreadPool () : shutdown_(0), current_leader_(0)  {    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::TP"));  }

      virtual int svc (void);

      void shut_down (void)  {    shutdown_ = 1;  }

    private:  int become_leader (void);

      Follower *make_follower (void);

      int elect_new_leader (void);

      int leader_active (void)  {    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));    return this->current_leader_ != 0;  }

      void leader_active (ACE_thread_t leader)  {    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));    this->current_leader_ = leader;  }

      void process_message (ACE_Message_Block *mb);

      int done (void)  {    return (shutdown_ == 1);  }

    private:  int shutdown_;  ACE_thread_t current_leader_;  ACE_Thread_Mutex leader_lock_;  ACE_Unbounded_Queue<Follower*> followers_;  ACE_Thread_Mutex followers_lock_;  static long LONG_TIME;};// Listing 1// Listing 2 code/ch16intLF_ThreadPool::svc (void){  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::svc"));  while (!done ())    {      become_leader ();  // Block until this thread is the leader.

          ACE_Message_Block *mb = 0;      ACE_Time_Value tv (LONG_TIME);      tv += ACE_OS::gettimeofday ();

          // Get a message, elect new leader, then process message.      if (this->getq (mb, &tv) < 0)        {          elect_new_leader ();          break;        }

          elect_new_leader ();      process_message (mb);    }

      return 0;}// Listing 2// Listing 3 code/ch16intLF_ThreadPool::become_leader (void){  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::become_leader"));

      ACE_GUARD_RETURN    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);  if (leader_active ())    {      Follower *fw = make_follower ();      {        // Wait until told to do so.        while (leader_active ())        {          fw->wait ();          ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) thread activate!/n")));          break;        }      }

          delete fw;    }

      ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Becoming the leader/n")));

      // Mark yourself as the active leader.  leader_active (ACE_Thread::self ());  return 0;}

    Follower*LF_ThreadPool::make_follower (void){  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::make_follower"));

      ACE_GUARD_RETURN    (ACE_Thread_Mutex, follower_mon, this->followers_lock_, 0);  Follower *fw;  ACE_NEW_RETURN (fw, Follower (this->leader_lock_), 0);  this->followers_.enqueue_tail (fw);  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) thread follower enter queue/n")));  return fw;}// Listing 3// Listing 5 code/ch16intLF_ThreadPool::elect_new_leader (void){  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::elect_new_leader"));

      ACE_GUARD_RETURN    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);  leader_active (0);

      // Wake up a follower  if (!followers_.is_empty ())    {      ACE_GUARD_RETURN (ACE_Thread_Mutex,                        follower_mon,                        this->followers_lock_,                        -1);      // Get the old follower.      Follower *fw;      if (this->followers_.dequeue_head (fw) != 0)        return -1;      leader_active (fw->owner());      ACE_DEBUG ((LM_DEBUG,                  ACE_TEXT ("(%t) Resigning and Electing %d/n"),                  fw->owner ()));      return (fw->signal () == 0) ? 0 : -1;    }  else    {      ACE_DEBUG        ((LM_ERROR, ACE_TEXT ("(%t) Oops no followers left/n")));      return -1;    }}// Listing 5

    voidLF_ThreadPool::process_message (ACE_Message_Block *mb){  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::process_message"));  int msgId;  ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));  mb->release ();

      ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("(%t) Started processing message:%d/n"),              msgId));  ACE_OS::sleep (1);  ACE_DEBUG ((LM_DEBUG,              ACE_TEXT ("(%t) Finished processing message:%d/n"),              msgId));}

    long LF_ThreadPool::LONG_TIME = 5L;

    int ACE_TMAIN (int, ACE_TCHAR *[]){  LF_ThreadPool tp;  tp.activate (THR_NEW_LWP| THR_JOINABLE, 5);

      // Wait for a few seconds...  ACE_OS::sleep (2);  ACE_Time_Value tv (1L);

      ACE_Message_Block *mb;  for (int i = 0; i < 30; i++)    {      ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1);      ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));      ACE_OS::sleep (tv);

          // Add a new work item.      tp.putq (mb);    }

      ACE_Thread_Manager::instance ()->wait ();

      ACE_OS::sleep (10);

      return 0;}

    #else#include "ace/OS_main.h"#include "ace/OS_NS_stdio.h"

    int ACE_TMAIN (int, ACE_TCHAR *[]){  ACE_OS::puts (ACE_TEXT ("This example requires threads."));  return 0;}

    #endif /* ACE_HAS_THREADS */

     

     

    最新回复(0)