关于多线程--可重入锁的设计及使用 (1)

2008-05-24 05:35:23.0     浏览:409     来源:赛迪网论坛
关键词:

Mutex是一个独占锁,只能有一个线程拥有该锁,并且即时是同一个线程,如果已经持有一个Mutex时,再次企图获取该锁时仍然会阻塞。有的时候我们需要锁能够像Java语言的synchronized那样,同一个线程可以重新进入,只要已经拥有了该锁,而不用在该锁上阻塞。我们可以对上篇中的Mutex的实现进行改造,实现一个可重入的锁--ReentrantLock。这需要ReentrantLock中记录当前锁的拥有者(线程),同时设置一个整型变量,记录当前线程进入的次数。

  1. public class ReentrantLock implements Sync {
  2. protected Thread owner_ = null;
  3. protected long holds_ = 0;
  4. //......
  5. }


在获取、释放锁时,首先判断该线程是否是锁的拥有者。如果是当前线程已经拥有该锁,则在每一次acquire()时增1,在release()时减1在次数减少到0时,说明该锁的当前拥有者已经完全释放该锁,不再拥有该锁。所以,将拥有者设置为null。如果当前线程不是锁的拥有者,那么在企图获取锁时在该锁上wait(),在release()方法中,如果拥有者已经完全释放锁,那么就将拥有者清零,并notify()其它线程。

  1. public void acquire() throws InterruptedException {
  2. if (Thread.interrupted()) throw new InterruptedException();
  3. Thread caller = Thread.currentThread();
  4. synchronized(this) { // 在this上同步
  5. if (caller == owner_)
  6. ++holds_;
  7. else {
  8. try {
  9. while (owner_ != null) wait();
  10. owner_ = caller;
  11. holds_ = 1;
  12. }
  13. catch (InterruptedException ex) {
  14. notify();
  15. throw ex;
  16. }
  17. }
  18. }
  19. }
  20. public synchronized void release() { //在this上同步
  21. if (Thread.currentThread() != owner_)
  22. throw new Error("Illegal Lock usage");
  23. if (--holds_ == 0) {
  24. owner_ = null;
  25. notify();
  26. }
  27. }



注意上面的代码要对owner_和holds_在this上进行同步,以解决在这两个变量上的竞态条件。attempt()方法实现和Mutex类似,也添加了锁拥有者的检查及计数:

  1. public boolean attempt(long msecs) throws InterruptedException {
  2. if (Thread.interrupted()) throw new InterruptedException();
  3. Thread caller = Thread.currentThread();
  4. synchronized(this) {
  5. if (caller == owner_) {
  6. ++holds_;
  7. return true;
  8. }
  9. else if (owner_ == null) {
  10. owner_ = caller;
  11. holds_ = 1;
  12. return true;
  13. }
  14. else if (msecs <= 0)
  15. return false;
  16. else {
  17. long waitTime = msecs;
  18. long start = System.currentTimeMillis();
  19. try {
  20. for (;;) {
  21. wait(waitTime);
  22. if (caller == owner_) {
  23. ++holds_;
  24. return true;
  25. }
  26. else if (owner_ == null) {
  27. owner_ = caller;
  28. holds_ = 1;
  29. return true;
  30. }
  31. else {
  32. waitTime = msecs - (System.currentTimeMillis() - start);
  33. if (waitTime <= 0)
  34. return false;
  35. }
  36. }
  37. }
  38. catch (InterruptedException ex) {
  39. notify();
  40. throw ex;
  41. }
  42. }
  43. }
  44. }



由于ReentrantLock增加了对拥有者的计数,所以,也提供了额外的两个方法:holds()和release(long),用于返回当前线程进入的次数(如果当前线程不拥有该锁,则holds()返回0),以及一次性释放多个锁:

  1. public synchronized long holds() {
  2. if (Thread.currentThread() != owner_) return 0;
  3. return holds_;
  4. }
  5. public synchronized void release(long n) {
  6. if (Thread.currentThread() != owner_ || n > holds_)
  7. throw new Error("Illegal Lock usage");
  8. holds_ -= n;
  9. if (holds_ == 0) {
  10. owner_ = null;
  11. notify();
  12. }
  13. }



使用实例


Doug Lea的concurrent包现在已经给广泛使用,在JBoss的org.jboss.mx.loading.UnifiedLoaderRepository 中就使用了concurrent包中的ReentrantLock进行JBoss中的类装载中的同步控制。下面看org.jboss.mx.loading.UnifiedLoaderRepository中对ReentrantLock的使用:

  1. public class UnifiedLoaderRepository
  2. extends LoaderRepository
  3. implements NotificationBroadcaster, UnifiedLoaderRepositoryMBean
  4. {
  5. private ReentrantLock reentrantLock = new ReentrantLock(); //生成一个重入锁
  6. public Class loadClass(String name, boolean resolve, ClassLoader cl)
  7. throws ClassNotFoundException
  8. {
  9. try
  10. {
  11. try
  12. {
  13. // Only one thread at a time can load classes
  14. // Pass the classloader to release its lock when blocking the thread
  15. // We cannot use synchronized (this), as we MUST release the lock
  16. // on the classloader. Change this only after discussion on the
  17. // developer''s list !
  18. synchronize(cl); //对传入的ClassLoader上进行同步
  19. // This syncronized block is necessary to synchronize with add/removeClassLoader
  20. // See comments in add/removeClassLoader; we iterate on the classloaders, must avoid
  21. // someone removes or adds a classloader in the meanwhile.
  22. synchronized (this)
  23. {
  24. // Try the cache before anything else.
  25. Class cls = loadClassFromCache(name, cl);
  26. // Found in cache, we''re done
  27. if (cls != null) {return cls;}
  28. // Not found in cache, ask the calling classloader
  29. cls = loadClassFromClassLoader(name, resolve, cl);
  30. // The calling classloader sees the class, we''re done
  31. if (cls != null) {return cls;}
  32. // Not visible by the calling classloader, iterate on the other classloaders
  33. cls = loadClassFromRepository(name, resolve, cl);
  34. // Some other classloader sees the class, we''re done
  35. if (cls != null) {return cls;}
  36. // This class is not visible
  37. throw new ClassNotFoundException(name);
  38. }
  39. }
  40. finally
  41. {
  42. unsynchronize(cl); //使用完毕后释放重入锁
  43. }
  44. }
  45. catch (ClassCircularityError x)
  46. {
  47. //........
  48. }
  49. }
  50. //.........
  51. }


    上面代码中的synchronize()和unsynchronize()方法如下:

    1. private void synchronize(ClassLoader cl)
    2. {
    3. // This method
    4. // 1- must allow only one thread at a time,
    5. // 2- must allow a re-entrant thread,
    6. // 3- must unlock the given classloader waiting on it,
    7. // 4- must not hold any other lock.
    8. // If these 4 are not done, deadlock will happen.
    9. // Point 3 is necessary to fix Jung''s RFE#4670071
    10. // Beware also that is possible that a classloader arrives here already locked
    11. // (for example via loadClassInternal()) and here we cannot synchronize on ''this''
    12. // otherwise we deadlock in loadClass() where we first synchronize on ''this'' and
    13. // then on the classloader (resource ordering problem).
    14. // We solve this by using a reentrant lock.
    15. // Save and clear the interrupted state of the incoming thread
    16. boolean threadWasInterrupted = Thread.currentThread().interrupted();
    17. try
    18. {
    19. // Only one thread can pass this barrier
    20. // Other will accumulate here and let passed one at a time to wait on the classloader,
    21. // like a dropping sink
    22. reentrantLock.acquire();
    23. while (!isThreadAllowed(Thread.currentThread()))
    24. {
    25. // This thread is not allowed to run (another one is already running)
    26. // so I release() to let another thread to enter (will come here again)
    27. // and they will wait on the classloader to release its lock.
    28. // It is important that the wait below is not wait(0) since it may be
    29. // possible that a notifyAll arrives before the wait.
    30. // It is also important that this release() is outside the sync block on
    31. // the classloader, to avoid deadlock with threads that triggered
    32. // loadClassInternal(), locking the classloader
    33. reentrantLock.release();
    34. synchronized (cl)
    35. {
    36. // Threads will wait here on the classloader object.
    37. // Waiting on the classloader is fundamental to workaround Jung''s RFE#4670071
    38. // However, we cannot wait(0), since it is possible that 2 threads will try to load
    39. // classes with different classloaders, so one will enter, the other wait, but
    40. // since they''re using different classloaders, nobody will wake up the waiting one.
    41. // So we wait for some time and then try again.
    42. try {cl.wait(137);}
    43. catch (InterruptedException ignored) {}
    44. }
    45. // A notifyAll() has been issued, all threads will accumulate here
    46. // and only one at a time will pass, exactly equal to the barrier
    47. // before the ''while'' statement (dropping sink).
    48. // Must be outside the synchronized block on the classloader to avoid that
    49. // waiting on the reentrant lock will hold the lock on the classloader
    50. try
    51. {
    52. reentrantLock.acquire();
    53. }
    54. catch (InterruptedException ignored)
    55. {
    56. }
    57. }
    58. }
    59. catch(InterruptedException ignored)
    60. {
    61. }
    62. finally
    63. {
    64. // I must keep track of the threads that entered, also of the reentrant ones,
    65. // see unsynchronize()
    66. increaseThreadsCount();
    67. // I release the lock, allowing another thread to enter.
    68. // This new thread will not be allowed and will wait() on the classloader object,
    69. // releasing its lock.
    70. reentrantLock.release();
    71. // Restore the interrupted state of the thread
    72. if( threadWasInterrupted )
    73. Thread.currentThread().interrupt();
    74. }
    75. }
    76. private void unsynchronize(ClassLoader cl)
    77. {
    78. // Save and clear the interrupted state of the incoming thread
    79. boolean threadWasInterrupted = Thread.currentThread().interrupted();
    80. try
    81. {
    82. reentrantLock.acquire();
    83. // Reset the current thread only if we''re not reentrant
    84. if (decreaseThreadsCount() == 0)
    85. {
    86. setCurrentThread(null);
    87. }
    88. }
    89. catch (InterruptedException ignored)
    90. {
    91. }
    92. finally
    93. {
    94. reentrantLock.release();
    95. // Notify all threads waiting on this classloader
    96. // This notification must be after the reentrantLock''s release() to avoid this scenario:
    97. // - Thread A is loading a class in the ULR
    98. // - Thread B triggers a loadClassInternal which locks the UCL
    99. // - Thread A calls unsynchronize, locks the reentrantLock
    100. // and waits to acquire the lock on the UCL
    101. // - Thread B calls synchronize and waits to lock the reentrantLock
    102. synchronized (cl)
    103. {
    104. cl.notifyAll();
    105. }
    106. // Restore the interrupted state of the thread
    107. if( threadWasInterrupted )
    108. Thread.currentThread().interrupt();
    109. }
    110. }