最近设计实现了从Excel中读取百万级数据并将其插入数据库的解决方案、主要为了应对给出了很多数据而且是以excel表格形式,怎么将这些数据既快又安全地导入到数据库。
场景分析
首先分析一下 从Excel中读取百万级数据并将其插入数据库时可能遇到的问题:
- 内存溢出风险
加载如此庞大的Excel数据可能导致内存溢出,需要注意内存管理。
- 性能瓶颈
处理百万级数据的读取和插入操作可能很耗时,性能优化至关重要。
- 异常处理策略
读取和导入过程中会有各种潜在问题,我们需妥善处理各类异常情况。
问题分析
内存溢出问题
处理百万级数据,直接加载到内存中显然不现实。解决之道在于采用流式读取,分批处理数据。
在技术选型上,我选择了EasyExcel。它专为处理大数据量和复杂Excel文件进行了优化。EasyExcel在解析Excel时,不会将整个文件一次性加载到内存中,而是按行从磁盘逐个读取数据并解析。
性能问题
针对百万级数据的处理,单线程显然效率低下。提升性能的关键在于多线程处理。
多线程应用涉及两个场景:一是多线程读取文件,另一个是多线程实现数据插入。这涉及到生产者-消费者模式,多线程读取并多线程插入,以最大程度提升整体性能。
在数据插入方面,除了利用多线程,还应当结合数据库的批量插入功能以进一步提升速度。
错误处理
在文件读取和数据库写入过程中,可能遇到诸多问题,如数据格式错误、不一致性和重复数据等。
因此,应分两步处理。首先进行数据检查,在插入操作前检查数据格式等问题,然后在插入过程中处理异常情况。
处理方式多种多样,可通过事务回滚或记录日志。但经过我们知道,但是我们知道一般不推荐直接回滚操作,而是自动重试,若尝试多次仍无效,则记录日志,随后重新插入数据。
此外,在这一过程中,需考虑数据重复问题,可在Excel中设定若干字段为数据库唯一约束。遇到数据冲突时,可覆盖、跳过或报错处理。根据实际业务情况选择合适的处理方式,一般情况下,跳过并记录日志是相对合理的选择。
解决思路
所以,我们最后的总体解决方案如下:
利用EasyExcel进行Excel数据读取,因其逐行读取数据而非一次性加载整个文件至内存。为提高并发效率,将百万级数据分布在不同的工作表中,利用线程池和多线程同时读取各个工作表。在读取过程中,借助EasyExcel的ReadListener进行数据处理。
在处理过程中,并非每条数据都直接操作数据库,以免对数据库造成过大压力。设定一个批次大小,例如每1000条数据,将从Excel中读取的数据临时存储在内存中(可使用List实现)。每读取1000条数据后,执行数据的批量插入操作,可简单地借助mybatis/mybatisPlus实现批量插入。
此外,在处理过程中,需要考虑并发问题,因此我们将使用线程安全的队列来存储内存中的临时数据,如ConcurrentLinkedQueue。
经验证,通过上述方案,读取并插入100万条数据的Excel所需时间约为100秒,不超过2分钟。
具体实现
为了提升并发处理能力,我们将百万级数据存储在同一个Excel文件的不同工作表中,然后通过EasyExcel并发地读取这些工作表数据。
EasyExcel提供了ReadListener接口,允许在每批数据读取后进行自定义处理。我们可以基于这一功能实现文件的分批读取。
推荐是使用 DataNami 来造假数据当然也能用excel公式造。
pom依赖
首先,需要添加以下依赖:
<dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>easyexcel</artifactId> <version>latest_version</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> </dependencies>
|
并发读取多个sheet
然后实现并发读取多个sheet的代码:
@Service public class ExcelImporterService {
@Autowired private MyDataService myDataService; public void doImport() { String filePath = "users/paidaxing/workspace/excel/test.xlsx";
int numberOfSheets = 20;
ExecutorService executor = Executors.newFixedThreadPool(numberOfSheets);
for (int sheetNo = 0; sheetNo < numberOfSheets; sheetNo++) { int finalSheetNo = sheetNo; executor.submit(() -> { EasyExcel.read(filePath, MyDataModel.class, new MyDataModelListener(myDataService)) .sheet(finalSheetNo) .doRead(); }); } executor.shutdown(); try { executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
这段代码通过创建一个固定大小的线程池来并发读取一个包含多个sheet的Excel文件。每个sheet的读取作为一个单独的任务提交给线程池。我们在代码中用了一个MyDataModelListener,这个类是ReadListener的一个实现类。当EasyExcel读取每一行数据时,它会自动调用我们传入的这个ReadListener实例的invoke方法。在这个方法中,我们就可以定义如何处理这些数据。MyDataModelListener还包含doAfterAllAnalysed方法,这个方法在所有数据都读取完毕后被调用。这里可以执行一些清理工作,或处理剩余的数据。
ReadListener
import com.alibaba.excel.context.AnalysisContext; import com.alibaba.excel.read.listener.ReadListener; import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.List;
public class MyDataModelListener implements ReadListener<MyDataModel> { private static final int BATCH_SIZE = 1000; private List<MyDataModel> batch = new ArrayList<>();
private MyDataService myDataService;
public MyDataModelListener(MyDataService myDataService) { this.myDataService = myDataService; }
@Override public void invoke(MyDataModel data, AnalysisContext context) { if (validateData(data)) { batch.add(data); } else { } if (batch.size() >= BATCH_SIZE) { processBatch(); } }
private boolean validateData(MyDataModel data) { int count = myDataService.countByColumn1(data.getColumn1()); if(count == 0){ return true; } return false; }
@Override public void doAfterAllAnalysed(AnalysisContext context) { if (!batch.isEmpty()) { processBatch(); } }
private void processBatch() { int retryCount = 0; while (retryCount < 3) { try { myDataService.batchInsert(batch); batch.clear(); break; } catch (Exception e) { retryCount++; if (retryCount >= 3) { logError(e, batch); } }
|
通过自定义MyDataModelListener,在读取Excel文件过程中可实现数据处理。每读取一条数据后,将其加入列表,在列表累积达到1000条时,执行一次数据库批量插入操作。若插入失败,则进行重试;若多次尝试仍失败,则记录错误日志。
批量插入
import org.apache.ibatis.annotations.Mapper; import java.util.List;
@Mapper public interface MyDataMapper{ void batchInsert(List<MyDataModel> dataList);
int countByColumn1(String column1); } <insert id="batchInsert" parameterType="list"> INSERT INTO paidaxing_test_table_name (column1, column2, ...) VALUES <foreach collection="list" item="item" index="index" separator=","> (#{item.column1}, #{item.column2}, ...) </foreach> </insert>
<select id="countByColumn1" resultType="int"> SELECT COUNT(*) FROM your_table WHERE column1 = #{column1} </select>
|