采用CompletableFuture异步编程实现数据汇总

传统的Future异步编程实现起来非常复杂,它需要实现FutureTask方法并实现Callable内部类,再结合Thread或者线程池的方式实现,获得返回值要调用FutureTask的get方法,他会阻塞后面的代码,但是如果后面的代码不依赖返回值的话,我们希望它们能以并行的方式去执行,那我们就能结合CompletableFuture去改造

它提供了两种异步任务:

  • runAsync(),方法执行任务是没有返回值的
  • supplyAsync()方法执行任务则支持返回值

两种组合处理:

  • anyOf返回跑的最快的那个future。最快的如果异常都玩完
  • allOf全部并行执行,如果需要获得返回值,需要配合thenApply,异常会抛出不影响其他任何任务

异步回调方法:不会阻碍后面的任务执行

  • whenComplete()没有返回值,且返回的CompletableFuture为任务结果,而非回调结果
  • handle()有返回值,且返回的CompletableFuture为回调结果

上面两个方法出现异常不会中断throwable:参数会接收前面的任务的异常异常会通过get抛出到主线程

  • 链式处理:–出现异常后面的任务会中断处理任务中感知不到异常异常会通过get抛出到主线程
  1. thenRun(Runnable runnable): 对异步任务的结果进行操作,不能传入参,也没有返回值
  2. thenAccept(Consumer consumer):可传入参数
  3. thenApply(Function function):可传入参数,并返回结果

例如:

public static void main(String[]args)throws ExecutionException,InterruptedException{
CompletableFuture<Integer>future1 CompletableFuture.supplyAsync(()->15);
CompletableFuture<Integer>future2 CompletableFuture.supplyAsync(()->10);
CompletableFuture<Integer>allFutures CompletableFuture.allOf(future1,future2)
thenApply(res ->{
return future1.join()+future2.join();
//TODO....
});
System.out.println(allFutures.join());
}
}

然后看一下我采用CompletableFuture异步编程后的思路(简单展示思路,并非项目真实代码)

controller层

@RestController
@RequestMapping("/api/surveys")
public class SurveyController {

private final SurveyService surveyService;

public SurveyController(SurveyService surveyService) {
this.surveyService = surveyService;
}

@PostMapping("/process")
public CompletableFuture<Void> processSurveyConcurrently(@RequestBody SurveyData data,
@RequestParam SurveyStatus newStatus) {
return surveyService.processSurveyConcurrentlyAsync(data, newStatus);
}
}

service层

// SurveyService.java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SurveyServiceImpl implements SurveyService {
//先用一个mapper实例
private final SurveyMapper surveyMapper;
private final ExecutorService executorService = Executors.newFixedThreadPool(3);

public SurveyServiceImpl(SurveyMapper surveyMapper) {
this.surveyMapper = surveyMapper;
}

@Override
public CompletableFuture<Void> saveSurveyDataAsync(SurveyData data) {
return CompletableFuture.runAsync(() -> surveyMapper.saveSurveyData(data), executorService);
}

@Override
public CompletableFuture<Void> updateSurveyStatusAsync(String surveyId, SurveyStatus status) {
return CompletableFuture.runAsync(() -> surveyMapper.updateSurveyStatus(surveyId, status), executorService);
}

public CompletableFuture<AnalysisResult> analyzeSurveyResultsAsync(SurveyData data) {
// 这里只是一个示例,实际的数据处理更复杂
return CompletableFuture.supplyAsync(() -> {
return new AnalysisResult();
}, executorService);
}

@Override
public CompletableFuture<Void> processSurveyConcurrentlyAsync(SurveyData data, SurveyStatus newStatus) {
CompletableFuture<Void> saveFuture = saveSurveyDataAsync(data);
CompletableFuture<AnalysisResult> analyzeFuture = analyzeSurveyResultsAsync(data);
CompletableFuture<Void> updateFuture = updateSurveyStatusAsync(data.getSurveyId(), newStatus);

return CompletableFuture.allOf(saveFuture, analyzeFuture, updateFuture)
.thenRun(() -> System.out.println("All operations completed successfully!"));
}
}

mapper层

// SurveyMapper.java
public interface SurveyMapper {
void saveSurveyData(SurveyData data);
void updateSurveyStatus(String surveyId, SurveyStatus status);
SurveyData getSurveyDataById(String surveyId);
}

数据库数据批量导入es

当时想的是如果项目上线的时候,肯定要把数据库中的中药材数据全部导入到es索引库中,为了以防万一,如果数据量很大,一次性导入肯定会发生OOM,所以当时我就想到可以使用线程池的方式导入,利用CountDownLatch来控制,就能避免一次性加载过多,防止内存溢出

具体流程我画成了流程图:

public class ApMedicineServiceImpl{

@Autowired
private ApMedicineMapper apMedicineMapper;
@Autowired
private RestHighLevelclient client;
@Autowired
private ExecutorService executorService;
private static final String ARTICLE_ES_INDEX ="app_info_article";
private static final int PAGE_SIZE=2000;
@SneakyThrows
public void importAll(){
//总条数
int count apMedicineMapper.selectCount();
//总页数
int totalPagesize count PAGE_SIZE =0 count/PAGE_SIZE count/PAGE_SIZE +1;
//开始执行时间
Long startTime =System.currentTimeMillis();
//一共有多少页,就创建多少个CountDownLatch的计数
CountDownLatch countDownLatch =new CountDownLatch(totalPagesize);
int fromIndex;
List<SearchMedicineVo> medicineList =null;
for (int i =0;i<totalPagesize;i++){
//起始分页条数
fromIndex= i*PAGE_SIZE;
//查询
medicineList= apMedicineMapper.LoadMedicineList(fromIndex,PAGE_SIZE);
//创建线程,做批量插入es数据操作
TaskThread taskThread =new TaskThread(medicineList,countDownLatch);
//执行线程
executorService.execute(taskThread);
}
//调用await()方法,用来等待计数归零
countDownLatch.await();

Long endTime =System.currentTimeMillis();
log.info("es索引数据批量导入共:{}条,共消耗时间:{}秒",count,(endTime-startTime)/1000);
}

class TaskThread implementst Runnable{
List<SearchMedicineVo> medicineList;
CountDownLatch cdl;
public TaskThread(List<SearchMedicineVo> medicineList,CountDownLatch cdl){
this.medicineList =medicineList;
this.cdl=cdl;
}

public void run(){
//批量导入
BulkRequest bulkRequest new BulkRequest(ARTICLE_ES_INDEX);
for (SearchMedicineVo searchMedicineVo medicineList){
bulkRequest.add(new IndexRequest().id(searchMedicinevo.getId().tostring())
source(JSON.toJSONString(searchMedicineVo),XContentType.JSON));
}
//发送请求,批量添加数据到es索引库中
client.bulk(bulkRequest,Requestoptions.DEFAULT);
//让计数减
cdl.countDown();
}
}
}