采用CompletableFuture异步编程实现数据汇总
传统的Future异步编程实现起来非常复杂,它需要实现FutureTask方法并实现Callable内部类,再结合Thread或者线程池的方式实现,获得返回值要调用FutureTask的get方法,他会阻塞后面的代码,但是如果后面的代码不依赖返回值的话,我们希望它们能以并行的方式去执行,那我们就能结合CompletableFuture去改造
它提供了两种异步任务:
- runAsync(),方法执行任务是没有返回值的
- supplyAsync()方法执行任务则支持返回值
两种组合处理:
- anyOf返回跑的最快的那个future。最快的如果异常都玩完
- allOf全部并行执行,如果需要获得返回值,需要配合thenApply,异常会抛出不影响其他任何任务
异步回调方法:不会阻碍后面的任务执行
- whenComplete()没有返回值,且返回的CompletableFuture为任务结果,而非回调结果
- handle()有返回值,且返回的CompletableFuture为回调结果
上面两个方法出现异常不会中断throwable:参数会接收前面的任务的异常异常会通过get抛出到主线程
- 链式处理:–出现异常后面的任务会中断处理任务中感知不到异常异常会通过get抛出到主线程
- thenRun(Runnable runnable): 对异步任务的结果进行操作,不能传入参,也没有返回值
- thenAccept(Consumer consumer):可传入参数
- thenApply(Function function):可传入参数,并返回结果
例如:
public static void main(String[]args)throws ExecutionException,InterruptedException{ CompletableFuture<Integer>future1 CompletableFuture.supplyAsync(()->15); CompletableFuture<Integer>future2 CompletableFuture.supplyAsync(()->10); CompletableFuture<Integer>allFutures CompletableFuture.allOf(future1,future2) thenApply(res ->{ return future1.join()+future2.join();
}); System.out.println(allFutures.join()); } }
|
然后看一下我采用CompletableFuture异步编程后的思路(简单展示思路,并非项目真实代码)
controller层
@RestController @RequestMapping("/api/surveys") public class SurveyController {
private final SurveyService surveyService;
public SurveyController(SurveyService surveyService) { this.surveyService = surveyService; }
@PostMapping("/process") public CompletableFuture<Void> processSurveyConcurrently(@RequestBody SurveyData data, @RequestParam SurveyStatus newStatus) { return surveyService.processSurveyConcurrentlyAsync(data, newStatus); } }
|
service层
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class SurveyServiceImpl implements SurveyService { private final SurveyMapper surveyMapper; private final ExecutorService executorService = Executors.newFixedThreadPool(3);
public SurveyServiceImpl(SurveyMapper surveyMapper) { this.surveyMapper = surveyMapper; }
@Override public CompletableFuture<Void> saveSurveyDataAsync(SurveyData data) { return CompletableFuture.runAsync(() -> surveyMapper.saveSurveyData(data), executorService); }
@Override public CompletableFuture<Void> updateSurveyStatusAsync(String surveyId, SurveyStatus status) { return CompletableFuture.runAsync(() -> surveyMapper.updateSurveyStatus(surveyId, status), executorService); }
public CompletableFuture<AnalysisResult> analyzeSurveyResultsAsync(SurveyData data) { return CompletableFuture.supplyAsync(() -> { return new AnalysisResult(); }, executorService); }
@Override public CompletableFuture<Void> processSurveyConcurrentlyAsync(SurveyData data, SurveyStatus newStatus) { CompletableFuture<Void> saveFuture = saveSurveyDataAsync(data); CompletableFuture<AnalysisResult> analyzeFuture = analyzeSurveyResultsAsync(data); CompletableFuture<Void> updateFuture = updateSurveyStatusAsync(data.getSurveyId(), newStatus);
return CompletableFuture.allOf(saveFuture, analyzeFuture, updateFuture) .thenRun(() -> System.out.println("All operations completed successfully!")); } }
|
mapper层
public interface SurveyMapper { void saveSurveyData(SurveyData data); void updateSurveyStatus(String surveyId, SurveyStatus status); SurveyData getSurveyDataById(String surveyId); }
|
数据库数据批量导入es
当时想的是如果项目上线的时候,肯定要把数据库中的中药材数据全部导入到es索引库中,为了以防万一,如果数据量很大,一次性导入肯定会发生OOM,所以当时我就想到可以使用线程池的方式导入,利用CountDownLatch来控制,就能避免一次性加载过多,防止内存溢出
具体流程我画成了流程图:
public class ApMedicineServiceImpl{
@Autowired private ApMedicineMapper apMedicineMapper; @Autowired private RestHighLevelclient client; @Autowired private ExecutorService executorService; private static final String ARTICLE_ES_INDEX ="app_info_article"; private static final int PAGE_SIZE=2000; @SneakyThrows public void importAll(){ int count apMedicineMapper.selectCount(); int totalPagesize count PAGE_SIZE =0 count/PAGE_SIZE count/PAGE_SIZE +1; Long startTime =System.currentTimeMillis(); CountDownLatch countDownLatch =new CountDownLatch(totalPagesize); int fromIndex; List<SearchMedicineVo> medicineList =null; for (int i =0;i<totalPagesize;i++){ fromIndex= i*PAGE_SIZE; medicineList= apMedicineMapper.LoadMedicineList(fromIndex,PAGE_SIZE); TaskThread taskThread =new TaskThread(medicineList,countDownLatch); executorService.execute(taskThread); } countDownLatch.await(); Long endTime =System.currentTimeMillis(); log.info("es索引数据批量导入共:{}条,共消耗时间:{}秒",count,(endTime-startTime)/1000); }
class TaskThread implementst Runnable{ List<SearchMedicineVo> medicineList; CountDownLatch cdl; public TaskThread(List<SearchMedicineVo> medicineList,CountDownLatch cdl){ this.medicineList =medicineList; this.cdl=cdl; } public void run(){ BulkRequest bulkRequest new BulkRequest(ARTICLE_ES_INDEX); for (SearchMedicineVo searchMedicineVo medicineList){ bulkRequest.add(new IndexRequest().id(searchMedicinevo.getId().tostring()) source(JSON.toJSONString(searchMedicineVo),XContentType.JSON)); } client.bulk(bulkRequest,Requestoptions.DEFAULT); cdl.countDown(); } } }
|