异步转同步
业务需求
有些接口查询反馈结果是异步返回的,无法立刻获取查询结果。
- 正常处理逻辑
触发异步操作,然后传递一个唯一标识。
等到异步结果返回,根据传入的唯一标识,匹配此次结果。
- 如何转换为同步
正常的应用场景很多,但是有时候不想做数据存储,只是想简单获取调用结果。
即想达到同步操作的结果,怎么办呢?
思路
-
发起异步操作
-
在异步结果返回之前,一直等待(可以设置超时)
-
结果返回之后,异步操作结果统一返回
循环等待
- LoopQuery.java
使用 query()
,将异步的操作 remoteCallback()
执行完成后,同步返回。
public class LoopQuery implements Async { private String result; private static final Logger LOGGER = LogManager.getLogger(LoopQuery.class.getName()); @Override public String query(String key) { startQuery(key); new Thread(new Runnable() { @Override public void run() { remoteCallback(key); } }).start(); final String queryResult = endQuery(); LOGGER.info("查询结果: {}", queryResult); return queryResult; } /** * 开始查询 * @param key 查询条件 */ private void startQuery(final String key) { LOGGER.info("执行查询: {}", key); } /** * 远程的回调是等待是随机的 * * @param key 查询条件 */ private void remoteCallback(final String key) { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } this.result = key + "-result"; LOGGER.info("remoteCallback set result: {}", result); } /** * 结束查询 * @return 返回结果 */ private String endQuery() { while (true) { if (null == result) { try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } else { return result; } } }}复制代码
- main()
public static void main(String[] args) { new LoopQuery().query("12345");}复制代码
- 测试结果
18:14:16.491 [main] INFO com.github.houbb.thread.learn.aysnc.loop.LoopQuery - 执行查询: 1234518:14:21.498 [Thread-1] INFO com.github.houbb.thread.learn.aysnc.loop.LoopQuery - remoteCallback set result: 12345-result18:14:21.548 [main] INFO com.github.houbb.thread.learn.aysnc.loop.LoopQuery - 查询结果: 12345-result复制代码
CountDownLatch
- AsyncQuery.java
使用 CountDownLatch
类达到同步的效果。
import org.apache.logging.log4j.LogManager;import org.apache.logging.log4j.Logger;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;public class AsyncQuery { private static final Logger LOGGER = LogManager.getLogger(AsyncQuery.class.getName()); /** * 结果 */ private String result; /** * 异步转同步查询 * @param key */ public void asyncQuery(final String key) { final CountDownLatch latch = new CountDownLatch(1); this.startQuery(key); new Thread(new Runnable() { @Override public void run() { LOGGER.info("远程回调线程开始"); remoteCallback(key, latch); LOGGER.info("远程回调线程结束"); } }).start(); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } this.endQuery(); } private void startQuery(final String key) { LOGGER.info("执行查询: {}", key); } /** * 远程的回调是等待是随机的 * @param key */ private void remoteCallback(final String key, CountDownLatch latch) { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } this.result = key + "-result"; latch.countDown(); } private void endQuery() { LOGGER.info("查询结果: {}", result); }}复制代码
- main()
public static void main(String[] args) { AsyncQuery asyncQuery = new AsyncQuery(); final String key = "123456"; asyncQuery.asyncQuery(key);}复制代码
- 日志
18:19:12.714 [main] INFO com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 执行查询: 12345618:19:12.716 [Thread-1] INFO com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 远程回调线程开始18:19:17.720 [main] INFO com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 查询结果: 123456-result18:19:17.720 [Thread-1] INFO com.github.houbb.thread.learn.aysnc.countdownlatch.AsyncQuery - 远程回调线程结束复制代码
Spring EventListener
使用观察者模式也可以。(对方案一的优化)
此处结合 spring 进行使用。
- BookingCreatedEvent.java
定义一个传输属性的对象。
public class BookingCreatedEvent extends ApplicationEvent { private static final long serialVersionUID = -1387078212317348344L; private String info; public BookingCreatedEvent(Object source) { super(source); } public BookingCreatedEvent(Object source, String info) { super(source); this.info = info; } public String getInfo() { return info; }}复制代码
- BookingService.java
说明:当 this.context.publishEvent(bookingCreatedEvent);
触发时, 会被 @EventListener
指定的方法监听到。
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationContext;import org.springframework.context.event.EventListener;import org.springframework.stereotype.Service;import java.util.concurrent.TimeUnit;@Servicepublic class BookingService { @Autowired private ApplicationContext context; private volatile BookingCreatedEvent bookingCreatedEvent; /** * 异步转同步查询 * @param info * @return */ public String asyncQuery(final String info) { query(info); new Thread(new Runnable() { @Override public void run() { remoteCallback(info); } }).start(); while(bookingCreatedEvent == null) { //.. 空循环 // 短暂等待。 try { TimeUnit.MILLISECONDS.sleep(1); } catch (InterruptedException e) { //... } //2. 使用两个单独的 event... } final String result = bookingCreatedEvent.getInfo(); bookingCreatedEvent = null; return result; } @EventListener public void onApplicationEvent(BookingCreatedEvent bookingCreatedEvent) { System.out.println("监听到远程的信息: " + bookingCreatedEvent.getInfo()); this.bookingCreatedEvent = bookingCreatedEvent; System.out.println("监听到远程消息后: " + this.bookingCreatedEvent.getInfo()); } /** * 执行查询 * @param info */ public void query(final String info) { System.out.println("开始查询: " + info); } /** * 远程回调 * @param info */ public void remoteCallback(final String info) { System.out.println("远程回调开始: " + info); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } // 重发结果事件 String result = info + "-result"; BookingCreatedEvent bookingCreatedEvent = new BookingCreatedEvent(this, result); //触发event this.context.publishEvent(bookingCreatedEvent); }}复制代码
- 测试方法
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration(classes = SpringConfig.class)public class BookServiceTest { @Autowired private BookingService bookingService; @Test public void asyncQueryTest() { bookingService.asyncQuery("1234"); }}复制代码
- 日志
2018-08-10 18:27:05.958 INFO [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:84 - 开始查询:12342018-08-10 18:27:05.959 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:93 - 远程回调开始:1234接收到信息: 1234-result2018-08-10 18:27:07.964 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:73 - 监听到远程的信息: 1234-result2018-08-10 18:27:07.964 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:75 - 监听到远程消息后: 1234-result2018-08-10 18:27:07.964 INFO [Thread-2] com.github.houbb.spring.lean.core.ioc.event.BookingService:106 - 已经触发event2018-08-10 18:27:07.964 INFO [main] com.github.houbb.spring.lean.core.ioc.event.BookingService:67 - 查询结果: 1234-result2018-08-10 18:27:07.968 INFO [Thread-1] org.springframework.context.support.GenericApplicationContext:993 - Closing org.springframework.context.support.GenericApplicationContext@5cee5251: startup date [Fri Aug 10 18:27:05 CST 2018]; root of context hierarchy复制代码
超时和空循环
空循环
空循环会导致 cpu 飙升
while(true) {}复制代码
- 解决方式
while(true) { // 小睡即可 TimeUnit.sleep(1);}复制代码
超时编写
不可能一直等待反馈,可以设置超时时间。
/** * 循环等待直到获取结果 * @param key key * @param timeoutInSeconds 超时时间 * @param泛型 * @return 结果。如果超时则抛出异常 */public T loopWaitForValue(final String key, long timeoutInSeconds) { long startTime = System.nanoTime(); long deadline = startTime + TimeUnit.SECONDS.toNanos(timeoutInSeconds); //1. 如果没有新回调,或者 key 对应元素不存在。则一直循环 while(ObjectUtil.isNull(map.get(key))) { try { TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException e) { LOGGER.warn("Loop meet InterruptedException, just ignore it.", e); } // 超时判断 long currentTime = System.nanoTime(); if(currentTime >= deadline) { throw new BussinessException(ErrorCode.READ_TIME_OUT); } } final T target = (T) map.get(key); LOGGER.debug("loopWaitForValue get value:{} for key:{}", JSON.toJSON(target), key); //2. 获取到元素之后,需要移除掉对应的值 map.remove(key); return target;}复制代码