多线程环境中,经常出现这样一种场景。一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,最终结束于另一个线程。这就是传说中的生产者/消费者模型(前者是生产者,后者是消费者)。

而实现这种功能比较简单的方法就是让消费者线程不断的循环变量判断是否符合预期。如下所示

while (!isOK(...)){
    Thread.sleep(1000);
}
doIt();

while 循环中设置不满足的条件,如果条件满足则退出循环(睡眠可以防止过快的“无效尝试”)

如果isOK方法的判断逻辑比较简单,操作耗时较短,而且并发冲突量不大的情况下,使用这种方案也未尝不可。但是如果判断逻辑比较复杂,或者并发冲突较大时。这种方案就不行了,while循环会执行很多很多次,CPU消耗较大。

针对这种问题,Java内置的等待通知机制能够较好的解决。当条件不满足时,线程阻塞自己,进入等待状态。当线程条件满足时,通知等待的线程重新执行。线程阻塞避免了循环等待带来的CPU消耗问题。

就诊流程

这里借用极客时间中大佬文章(文末有链接,名额十个,先到先得)中的一个例子来更好的理解一下等待通知机制。

我们去医院看病,一般都会经历以下流程:

  1. 患者先去挂号,然后到就诊门口分诊,等待叫号。
  2. 当叫到自己的号时,患者就可以去找大夫了。
  3. 就诊过程中,大夫可能会让患者去做一些辅助检查,同时叫下一位患者。
  4. 当患者做完检查之后,拿到检测报告重新分诊,等待叫号。
  5. 当大夫在此叫到自己的号时,患者再去找大夫就诊。

第一/二步中,患者去挂号分诊类似于线程去过去互斥锁的过程,当患者会叫到号时类似于拿到了锁

大夫让患者去做检查的操作类似于,线程没有满足条件。

患者去做检查类似于线程进入等待状态,然后大夫叫下一个患者,这个步骤我们在前面的等待 - 通知机制中忽略掉了,这个步骤对应到程序里,本质是线程释放持有的互斥锁。

患者做完检查,类似于线程要求的条件已经满足;患者拿检测报告重新分诊,类似于线程需要重新获取互斥锁,这个步骤我们在前面的等待 - 通知机制中也忽视了。

由此可以总结出:一个完整的等待 - 通知机制:线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁。

用Synchronized实现等待通知机制

通过synchronized配合wait(), notifyAll(), notify()方法来实现等待通知机制。

// 调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被中断才返回,调用wait方法会释放锁
wait();

// 通知一个在对象上等待的线程从wait()方法返回,返回的前提是该线程获取到对象锁
notify();

// 通知所有等待在该对象上的线程
notifyAll();

下面通过一个具体实例来具体实现以下

public class WaitNotify {

    static boolean flag = true;    
    static Object lock = new Object();    

    public static void main(String[] args) throws Exception {
        new Thread(new Wait(), "WaitThread").start();

        Thread.sleep(1000);   

        new Thread(new Notify(), "NotifyThread").start();    
    }

    // 等待
    static class Wait implements Runnable {        
        public void run() {            
            // 加锁,拥有lock的Monitor            
            synchronized (lock) {               
                // 当条件不满足时,继续wait,同时释放了lock的锁               
                while (flag) {                   
                    try {                        
                        System.out.println(Thread.currentThread().getName() + " flag is true. wait ");                       
                        lock.wait();                    
                    } catch (InterruptedException e) { }                
                }               
                // 条件满足时,完成工作
                System.out.println(Thread.currentThread().getName() + " flag is false. running ");            
            }        
        }    
    }

    // 通知
    static class Notify implements Runnable {        
        public void run() {            
            // 加锁,拥有lock的Monitor            
            synchronized (lock) {                
                // 获取lock的锁,然后进行通知,通知时不会释放lock的锁,               
                // 直到当前线程释放了lock后,WaitThread才能从wait方法中返回                
                System.out.println(Thread.currentThread().getName() + " hold lock. notify ");                
                lock.notifyAll();                
                flag = false; 
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } 
            }

            // 再次加锁            
            synchronized (lock) {                
                System.out.println(Thread.currentThread().getName() + " hold lock again. sleep ");                
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }           
            }        
        }    
    }
}

在上面的实例中,等待线程(WaitThread)首先获得了对象锁,然后调用对象的wait()方法,从而放弃了锁,并进入对象的等待队列中,进入等待状态。由于等待线程释放了对象锁,通知线程(NotifyThread)随后获取了对象的锁,改变状态使满足条件,并调用对象的notify()方法,通知等待队列(互斥锁的等待队列)中的线程,告诉它条件曾经满足过。(这里notify()只能保证在通知时间点,条件是满足的。而被通知线程的执行时间点和通知时间点基本上不会重合,所以当线程执行的时候,很可能线程条件已经不满足了)等待线程从等待队列中移出,状态变为阻塞状态。通知线程释放锁之后,等待线程再次获取锁从wait()方法返回继续执行。

这个过程需要注意以下几点:

  1. 使用wait(), notify(), notifyAll()时需要先对调用对象加锁。
  2. 调用wait()方法后,线程状态有RUNNING变为WAITING,并将当前线程放置到对象的等待队列
  3. notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或 notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。
  4. 从wait()方法返回的前提是获得了调用对象的锁。

等待超时模式

在开发的过程中经常会遇到这样的情况,调用一个方法时等待一段时间(一般来说是给
定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,
超时返回默认结果或自定义处理。

而要在等待/通知模型中要加入超市等待逻辑,只需要做一些小小的改动。

假设超时时间段是T,那么可以推断出在当前时间now+T之后就会超时。

定义如下变量:

  • 等待持续时间:REMAINING=T。
  • 超时时间:FUTURE=now+T。

这时仅需要wait(REMAINING)即可,在wait(REMAINING)返回之后会将执行: REMAINING=FUTURE–now。如果REMAINING小于等于0,表示已经超时,直接退出,否则将 继续执行wait(REMAINING)。

相关实现如下

// 对当前对象加锁 
public synchronized Object get(long mills) throws InterruptedException {       
    long future = System.currentTimeMillis() + mills;       
    long remaining = mills;       
    // 当超时大于0并且result返回值不满足要求       
    while ((result == null) && remaining > 0) {              
        wait(remaining);
        remaining = future - System.currentTimeMillis();       
    }              
    return result; 
}

一个简单的数据库连接池示例

我们使用等待超时模式来构造一个简单的数据库连接池,在示例中模拟从连接池中获取、使用和释放连接的过程,而客户端获取连接的过程被设定为等待超时的模式,也就是在1000毫秒内如果无法获取到可用连接,将会返回给客户端一个null。设定连接池的大小为10个,然后通过调节客户端的线程数来模拟无法获取连接的场景。

首先看一下连接池的定义。它通过构造函数初始化连接的最大上限,通过一个双向队列来维护连接,调用方需要先调用fetchConnection(long)方法来指定在多少毫秒内超时获取连接,当连接使用完成后,需要调用releaseConnection(Connection)方法将连接放回线程池。

import java.sql.Connection;
import java.util.LinkedList;

public class ConnectionPool {    

    private LinkedList<Connection> pool = new LinkedList<Connection>();    

    public ConnectionPool(int initialSize) {        
        if (initialSize > 0) {            
            for (int i = 0; i < initialSize; i++) {                
                pool.addLast(ConnectionDriver.createConnection()); 
            }       
        }    
    }    

    public void releaseConnection(Connection connection) {        
        if (connection != null) {            
            synchronized (pool) {                
                // 连接释放后需要进行通知,这样其他消费者能够感知到连接池中已经归还了一个连接                
                pool.addLast(connection);                
                pool.notifyAll();            
            }        
        }    
    }    

    // 在mills内无法获取到连接,将会返回null    
    public Connection fetchConnection(long mills) throws InterruptedException {  

        synchronized (pool) {            
            // 完全超时            
            if (mills <= 0) {                
                while (pool.isEmpty()) {                    
                    pool.wait();                
                }                
                return pool.removeFirst();            
            } else {
                // 超过设置的时间mills,将推出while循环
                long future = System.currentTimeMillis() + mills;                
                long remaining = mills;                
                while (pool.isEmpty() && remaining > 0) {                    
                    pool.wait(remaining);                    
                    remaining = future - System.currentTimeMillis();                
                }  

                Connection result = null;                
                if (!pool.isEmpty()) {                    
                    result = pool.removeFirst();                
                }                
                return result;            
            }        
        }    
    } 
}

由于java.sql.Connection是一个接口,最终的实现是由数据库驱动提供方来实现的,考虑到只是个示例,我们通过动态代理构造了一个Connection,该Connection的代理实现仅仅是在commit()方法调用时休眠100毫秒.

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.concurrent.TimeUnit;

public class ConnectionDriver {   

    static class ConnectionHandler implements InvocationHandler {     
        // commit时会回调
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {       
            if (method.getName().equals("commit")) {                
                TimeUnit.MILLISECONDS.sleep(100);            
            }            
            return null;        
        }    
    }    

    // 创建一个Connection的代理,在commit时休眠100毫秒    
    public static final Connection createConnection() {        
        return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),  
                new Class[] { Connection.class }, new ConnectionHandler());    
    } 
}

下面通过一个示例来测试简易数据库连接池的工作情况,模拟客户端ConnectionRunner获取、使用、最后释放连接的过程,当它使用时连接将会增加获取到连接的数量,反之,将会增加未获取到连接的数量。

import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {

    static ConnectionPool pool = new ConnectionPool(10);
    // 保证所有CountDownLatch能够同时开始
    static CountDownLatch start = new CountDownLatch(1);
    // 保证main线程等待所有ConnectionRunner结束之后才继续执行
    static CountDownLatch end;

    public static void main(String[] args) throws InterruptedException {
        // 线程数量,可以修改线程数量进行观察       
        int threadCount = 10;        
        end = new CountDownLatch(threadCount);        
        int count = 20;    
        // 成功的个数
        AtomicInteger got = new AtomicInteger();   
        // 失败的个数
        AtomicInteger notGot = new AtomicInteger();        
        for (int i = 0; i < threadCount; i++) {            
            Thread thread = new Thread(new ConnetionRunner(count, got, notGot),
                    "ConnectionRunnerThread");            
            thread.start();        
        }      

        start.countDown();        
        end.await();    

        System.out.println("total invoke: " + (threadCount * count));   
        System.out.println("got connection:  " + got);    
        System.out.println("not got connection " + notGot); 
    }

    static class ConnetionRunner implements Runnable {        
        int count;        
        AtomicInteger got;        
        AtomicInteger notGot;        
        public ConnetionRunner(int count, AtomicInteger got, AtomicInteger notGot) {            
            this.count = count;            
            this.got = got;            
            this.notGot = notGot;        
        }        

        @Override
        public void run() {            
            try {                
                start.await();            
            } catch (Exception ex) {
            }   

            while (count > 0) {                
                try {                    
                    // 从线程池中获取连接,如果1000ms内无法获取到,将会返回null                    
                    // 分别统计连接获取的数量got和未获取到的数量notGot                    
                    Connection connection = pool.fetchConnection(1000);                    
                    if (connection != null) {                        
                        try {                            
                            connection.createStatement();                            
                            connection.commit();                        
                        } finally {                            
                            pool.releaseConnection(connection);  
                            // 成功个数+1
                            got.incrementAndGet();                        
                        }                    
                    } else {  
                        // 失败个数+1
                        notGot.incrementAndGet();                    
                    }                
                } catch (Exception ex) {                
                } finally {                
                    count--;                
                }
            }            
            end.countDown();        
        }    
    } 
}

上述示例中使用了CountDownLatch来确保ConnectionRunnerThread能够同时开始执行,并且在全部结束之后,才使main线程从等待状态中返回。

当前设定的场景是10个线程同时运行获取连接池(10个连接)中的连接,可以通过调节线程数量来观察未获取到连接的情况。线程数、总获取次数、获取到的数量、未获取到的数量以及未获取到的比率。

评论