博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java 异步查询转同步多种实现方式:循环等待,CountDownLatch,Spring EventListener,超时处理和空循环性能优化...
阅读量:5894 次
发布时间:2019-06-19

本文共 9096 字,大约阅读时间需要 30 分钟。

异步转同步

业务需求

有些接口查询反馈结果是异步返回的,无法立刻获取查询结果。

  • 正常处理逻辑

触发异步操作,然后传递一个唯一标识。

等到异步结果返回,根据传入的唯一标识,匹配此次结果。

  • 如何转换为同步

正常的应用场景很多,但是有时候不想做数据存储,只是想简单获取调用结果。

即想达到同步操作的结果,怎么办呢?

思路

  1. 发起异步操作

  2. 在异步结果返回之前,一直等待(可以设置超时)

  3. 结果返回之后,异步操作结果统一返回

循环等待

  • LoopQuery.java

使用 query(),将异步的操作 remoteCallback() 执行完成后,同步返回。

public class LoopQuery implements Async {    private String result;    private static final Logger LOGGER = LogManager.getLogger(LoopQuery.class.getName());    @Override    public String query(String key) {        startQuery(key);        new Thread(new Runnable() {            @Override            public void run() {                remoteCallback(key);            }        }).start();        final String queryResult = endQuery();        LOGGER.info("查询结果: {}", queryResult);        return queryResult;    }    /**     * 开始查询     * @param key 查询条件     */    private void startQuery(final String key) {        LOGGER.info("执行查询: {}", key);    }    /**     * 远程的回调是等待是随机的     *     * @param key 查询条件     */    private void remoteCallback(final String key) {        try {            TimeUnit.SECONDS.sleep(5);        } catch (InterruptedException e) {            e.printStackTrace();        }        this.result = key + "-result";        LOGGER.info("remoteCallback set result: {}", result);    }    /**     * 结束查询     * @return 返回结果     */    private String endQuery() {        while (true) {            if (null == result) {                try {                    TimeUnit.MILLISECONDS.sleep(100);                } catch (InterruptedException e) {                    e.printStackTrace();                }            } else {                return result;            }        }    }}复制代码
  • main()
public static void main(String[] args) {    new LoopQuery().query("12345");}复制代码
  • 测试结果
18:14:16.491 [main] INFO  com.github.houbb.thread.learn.aysnc.loop.LoopQuery - 执行查询: 1234518:14:21.498 [Thread-1] INFO  com.github.houbb.thread.learn.aysnc.loop.LoopQuery - remoteCallback set result: 12345-result18:14:21.548 [main] INFO  com.github.houbb.thread.learn.aysnc.loop.LoopQuery - 查询结果: 12345-result复制代码

CountDownLatch

  • AsyncQuery.java

使用 CountDownLatch 类达到同步的效果。

import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;public class AsyncQuery {    private static final Logger LOGGER = LogManager.getLogger(AsyncQuery.class.getName());    /**     * 结果     */    private String result;    /**     * 异步转同步查询     * @param key     */    public void asyncQuery(final String key) {        final CountDownLatch latch = new CountDownLatch(1);        this.startQuery(key);        new Thread(new Runnable() {            @Override            public void run() {                LOGGER.info("远程回调线程开始");                remoteCallback(key, latch);                LOGGER.info("远程回调线程结束");            }        }).start();        try {            latch.await();        } catch (InterruptedException e) {            e.printStackTrace();        }        this.endQuery();    }    private void startQuery(final String key) {        LOGGER.info("执行查询: {}", key);    }    /**     * 远程的回调是等待是随机的     * @param key     */    private void remoteCallback(final String key, CountDownLatch latch) {        try {            TimeUnit.SECONDS.sleep(5);        } catch (InterruptedException e) {            e.printStackTrace();        }        this.result = key + "-result";        latch.countDown();    }    private void endQuery() {        LOGGER.info("查询结果: {}", result);    }}复制代码
  • main()
public static void main(String[] args) {    AsyncQuery asyncQuery = new AsyncQuery();    final String key = "123456";    asyncQuery.asyncQuery(key);}复制代码
  • 日志
18:19:12.714 [main] INFO  com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 执行查询: 12345618:19:12.716 [Thread-1] INFO  com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 远程回调线程开始18:19:17.720 [main] INFO  com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 查询结果: 123456-result18:19:17.720 [Thread-1] INFO  com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 远程回调线程结束复制代码

Spring EventListener

使用观察者模式也可以。(对方案一的优化)

此处结合 spring 进行使用。

  • BookingCreatedEvent.java

定义一个传输属性的对象。

public class BookingCreatedEvent extends ApplicationEvent {    private static final long serialVersionUID = -1387078212317348344L;    private String info;    public BookingCreatedEvent(Object source) {        super(source);    }    public BookingCreatedEvent(Object source, String info) {        super(source);        this.info = info;    }    public String getInfo() {        return info;    }}复制代码
  • BookingService.java

说明:当 this.context.publishEvent(bookingCreatedEvent); 触发时, 会被 @EventListener 指定的方法监听到。

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationContext;import org.springframework.context.event.EventListener;import org.springframework.stereotype.Service;import java.util.concurrent.TimeUnit;@Servicepublic class BookingService {    @Autowired    private ApplicationContext context;    private volatile BookingCreatedEvent bookingCreatedEvent;    /**     * 异步转同步查询     * @param info     * @return     */    public String asyncQuery(final String info) {        query(info);        new Thread(new Runnable() {            @Override            public void run() {                remoteCallback(info);            }        }).start();        while(bookingCreatedEvent == null) {            //.. 空循环            // 短暂等待。            try {                TimeUnit.MILLISECONDS.sleep(1);            } catch (InterruptedException e) {                //...            }            //2. 使用两个单独的 event...        }        final String result = bookingCreatedEvent.getInfo();        bookingCreatedEvent = null;        return result;    }    @EventListener    public void onApplicationEvent(BookingCreatedEvent bookingCreatedEvent) {        System.out.println("监听到远程的信息: " + bookingCreatedEvent.getInfo());        this.bookingCreatedEvent = bookingCreatedEvent;        System.out.println("监听到远程消息后: " + this.bookingCreatedEvent.getInfo());    }    /**     * 执行查询     * @param info     */    public void query(final String info) {        System.out.println("开始查询: " + info);    }    /**     * 远程回调     * @param info     */    public void remoteCallback(final String info) {        System.out.println("远程回调开始: " + info);        try {            TimeUnit.SECONDS.sleep(2);        } catch (InterruptedException e) {            e.printStackTrace();        }        // 重发结果事件        String result = info + "-result";        BookingCreatedEvent bookingCreatedEvent = new BookingCreatedEvent(this, result);        //触发event        this.context.publishEvent(bookingCreatedEvent);    }}复制代码
  • 测试方法
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(classes = SpringConfig.class)public class BookServiceTest {    @Autowired    private BookingService bookingService;    @Test    public void asyncQueryTest() {        bookingService.asyncQuery("1234");    }}复制代码
  • 日志
2018-08-10 18:27:05.958  INFO  [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:84 - 开始查询:12342018-08-10 18:27:05.959  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:93 - 远程回调开始:1234接收到信息: 1234-result2018-08-10 18:27:07.964  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:73 - 监听到远程的信息: 1234-result2018-08-10 18:27:07.964  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:75 - 监听到远程消息后: 1234-result2018-08-10 18:27:07.964  INFO  [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:106 - 已经触发event2018-08-10 18:27:07.964  INFO  [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:67 - 查询结果: 1234-result2018-08-10 18:27:07.968  INFO  [Thread-1] org.springframework.context.support.GenericApplicationContext:993 - Closing org.springframework.context.support.GenericApplicationContext@5cee5251: startup date [Fri Aug 10 18:27:05 CST 2018]; root of context hierarchy复制代码

超时和空循环

空循环

空循环会导致 cpu 飙升

while(true) {}复制代码
  • 解决方式
while(true) {    // 小睡即可    TimeUnit.sleep(1);}复制代码

超时编写

不可能一直等待反馈,可以设置超时时间。

/** * 循环等待直到获取结果 * @param key key * @param timeoutInSeconds 超时时间 * @param 
泛型 * @return 结果。如果超时则抛出异常 */public
T loopWaitForValue(final String key, long timeoutInSeconds) { long startTime = System.nanoTime(); long deadline = startTime + TimeUnit.SECONDS.toNanos(timeoutInSeconds); //1. 如果没有新回调,或者 key 对应元素不存在。则一直循环 while(ObjectUtil.isNull(map.get(key))) { try { TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException e) { LOGGER.warn("Loop meet InterruptedException, just ignore it.", e); } // 超时判断 long currentTime = System.nanoTime(); if(currentTime >= deadline) { throw new BussinessException(ErrorCode.READ_TIME_OUT); } } final T target = (T) map.get(key); LOGGER.debug("loopWaitForValue get value:{} for key:{}", JSON.toJSON(target), key); //2. 获取到元素之后,需要移除掉对应的值 map.remove(key); return target;}复制代码

代码地址

转载地址:http://npisx.baihongyu.com/

你可能感兴趣的文章
c/c++性能优化--I/O优化(上)
查看>>
将HTML特殊转义为实体字符的两种实现方式
查看>>
jquery 保留两个小数的方法
查看>>
网站架构设计的误区
查看>>
iis 故障导致网站无法访问
查看>>
C++ 基础笔记(一)
查看>>
System.Func<>与System.Action<>
查看>>
asp.net开源CMS推荐
查看>>
csharp skype send message in winform
查看>>
MMORPG 游戏服务器端设计--转载
查看>>
《星辰傀儡线》人物续:“灭世者”、“疯狂者”、“叛逆者”三兄妹
查看>>
安装系统字体
查看>>
SILK 的 Tilt的意思
查看>>
Html学习笔记3
查看>>
批处理学习笔记8 - 深入学习For命令1
查看>>
微信支付开发(11) Native支付
查看>>
HDFS dfsclient写文件过程 源码分析
查看>>
关于多线程的那些事
查看>>
JavaScript获取DOM元素位置和尺寸大小
查看>>
EL 表达式小结
查看>>