JVM堆溢出解决方法:Java大数据迁移实战
本文针对Java微服务在大数据迁移(如百万级记录)时易发的JVM堆内存溢出问题,提供了一套基于数据库分页的批处理解决方案。文章深入剖析了内存溢出的原因,并详细阐述了如何利用数据库的LIMIT和OFFSET特性,结合应用程序的迭代处理逻辑,实现数据的分批查询与处理,有效降低单次操作的内存消耗,避免服务崩溃。通过优化SQL查询构建方法和改造数据归档流程,保证数据迁移的稳定性和效率,为Java微服务处理海量数据提供了一种实用的实践方案。

1. 问题背景与内存溢出分析
在Java微服务架构中,当需要处理或迁移大量数据(例如百万级数据库记录)时,常见的做法是查询所有符合条件的数据并加载到内存中进行后续处理。然而,这种一次性加载海量数据的策略极易导致JVM堆内存耗尽(java.lang.OutOfMemoryError: Java heap space),表现为服务崩溃或响应缓慢。
错误日志通常会显示类似以下信息:
Resource exhaustion event: the JVM was unable to allocate memory from the heap. ResourceExhausted! (1/0)
这表明应用程序试图分配的内存超出了JVM堆的可用容量。在数据迁移场景中,如将一个表中的大量记录复制到另一个表,如果一次性查询并缓存所有源数据(例如使用 JdbcTemplate.queryForList()),即使后续使用了JDBC批处理进行写入,内存压力也可能在数据查询阶段就达到极限。
原始代码片段中的 List
2. 解决方案:基于数据库分页的批处理策略
为了解决一次性加载海量数据导致的内存溢出问题,核心思想是将大批量数据处理分解为多个小批次处理。这可以通过结合数据库的分页查询能力和应用程序的迭代处理逻辑来实现。
2.1 数据库层面的批次查询 (LIMIT 和 OFFSET)
数据库提供了 LIMIT(或 TOP)和 OFFSET(或 SKIP)子句,允许我们指定查询结果的数量以及从哪个位置开始返回。这是实现数据分批查询的基础。
SQL查询示例:
SELECT * FROM your_table WHERE your_condition ORDER BY unique_id_column -- 必须指定一个排序字段,确保每次查询的顺序稳定 LIMIT batch_size OFFSET current_offset;
- LIMIT batch_size: 定义每个批次要查询的记录数量。
- OFFSET current_offset: 定义从结果集的哪个位置开始返回记录。current_offset 会随着已处理记录的数量递增。
- ORDER BY unique_id_column: 至关重要! 必须根据一个稳定、唯一且通常是索引的列(如主键ID、创建时间戳等)进行排序。这确保了:
- 每次分页查询的结果顺序是确定的。
- 不会遗漏或重复获取数据。
- 对于大型数据集和高 OFFSET 值,数据库可能需要扫描大量跳过的行,因此 OFFSET 的性能会随之下降。在极端情况下,可以考虑使用基于游标(Keyset Pagination)的方法(即 WHERE id > last_id ORDER BY id LIMIT batch_size),但这超出了本教程的范围。
2.2 应用程序层面的迭代处理
在应用程序中,我们需要构建一个循环,在每次迭代中:
- 计算当前的 OFFSET 值。
- 使用 LIMIT 和计算出的 OFFSET 从源数据库中查询一个批次的数据。
- 处理这个批次的数据(例如,复制到目标表)。
- 更新 OFFSET 值,为下一次迭代做准备。
- 当查询结果为空时,表示所有数据已处理完毕,退出循环。
3. 示例代码实现
下面我们将基于原有的代码结构,展示如何修改 archiveTableRecords 和 buildSQLQueryToFetchSourceRecords 方法以实现批处理。
3.1 修改SQL查询构建方法
为了支持 LIMIT 和 OFFSET,我们需要修改 ArchiveSQLQueries.buildSQLQueryToFetchSourceRecords 方法,使其能够接收批次大小、偏移量以及用于排序的列名。
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter;
// 假设 ArchiveConfigDTO 和 getCSTDateNew 方法已定义
// 假设 ArchiveSQLQueries 是一个包含静态方法的类
public class DataArchiverService {
private static final Logger logger = LoggerFactory.getLogger(DataArchiverService.class);
@Value("${batch-size}")
private int batchSize; // 配置的批处理大小,例如1000或5000
// 辅助方法,假设存在
private String getCSTDateNew(String archiveMonths) {
// 实现日期计算逻辑
return "2022-09-01"; // 示例值
}
// 辅助方法,假设存在
private int getSumOfArray(int[][] array) {
int sum = 0;
for (int[] innerArray : array) {
for (int value : innerArray) {
sum += value;
}
}
return sum;
}
// --- 修改后的 ArchiveSQLQueries 类中的方法 ---
public static class ArchiveSQLQueries {
public static String buildSQLQueryToFetchSourceRecords(String sourceTable, String orderByColumn, int limit, int offset) {
StringBuilder sb = new StringBuilder("SELECT * FROM " + sourceTable + " where update_dts <= ?");
// 确保 orderByColumn 是安全的,避免SQL注入风险
if (orderByColumn != null && !orderByColumn.isEmpty()) {
sb.append(" ORDER BY ").append(orderByColumn);
}
sb.append(" LIMIT ").append(limit);
sb.append(" OFFSET ").append(offset);
return sb.toString();
}
public static String buildSQLTargetRecordInsertionQuery(String targetTable, Map<String, Object> record, String primaryKeyColumn) {
// 假设此方法已正确实现,根据Map构建INSERT语句
// 示例:INSERT INTO target_table (col1, col2) VALUES (?, ?)
StringBuilder sb = new StringBuilder("INSERT INTO ").append(targetTable).append(" (");
StringBuilder values = new StringBuilder(" VALUES (");
boolean firstColumn = true;
for (String key : record.keySet()) {
if (!key.equals(primaryKeyColumn)) { // 假设主键在插入时是自增的,不包含在VALUES中
if (!firstColumn) {
sb.append(", ");
values.append(", ");
}
sb.append(key);
values.append("?");
firstColumn = false;
}
}
sb.append(")").append(values).append(")");
return sb.toString();
}
}
// --- 原始的 copySourceRecords 方法,其内部已使用批处理写入 ---
public int copySourceRecords(JdbcTemplate targetDbTemplate, String targetTable, String primaryKeyColumn,
List<Map<String, Object>> sourceRecords, List<Object> primaryKeyValueList) {
int result = 0;
logger.info("Copying {} records to {}", sourceRecords.size(), targetTable);
if (sourceRecords.isEmpty()) {
return 0;
}
// 构建插入语句,基于第一个记录的结构
String insertSql = ArchiveSQLQueries.buildSQLTargetRecordInsertionQuery(targetTable, sourceRecords.get(0), primaryKeyColumn);
int[][] insertResult = targetDbTemplate.batchUpdate(
insertSql,
sourceRecords,
batchSize, // 这里使用了配置的batchSize进行JDBC批处理写入
new ParameterizedPreparedStatementSetter<Map<String, Object>>() {
@Override
public void setValues(PreparedStatement ps, Map<String, Object> argument) throws SQLException {
int index = 1;
for (Entry<String, Object> obj : argument.entrySet()) {
// 假设主键列在目标表是自增的,或者不作为插入参数
if (obj.getKey().equals(primaryKeyColumn)) {
primaryKeyValueList.add(obj.getValue()); // 收集主键值
} else {
ps.setObject(index++, obj.getValue());
}
}
}
});
result = getSumOfArray(insertResult);
logger.info("Inserted {} record(s) in {}", result, targetTable);
return result;
}
// --- 修改后的 archiveTableRecords 方法,实现批次循环 ---
public void archiveTableRecords(JdbcTemplate sourceDbTemplate, JdbcTemplate targetDbTemplate,
ArchiveConfigDTO archiveObj) {
try {
String sourceTable = archiveObj.getSourceTable();
String targetTable = archiveObj.getTargetTable();
String primaryKeyColumn = archiveObj.getPrimaryKeyColumn(); // 假设这是用于排序的列
String archive_months = archiveObj.getArchiveCriteriaMonths();
String compareDate1 = getCSTDateNew(archive_months);
logger.info("Archive criteria date: {}", compareDate1);
int processedRecords = 0;
List<Object> allPrimaryKeyValueList = new ArrayList<>(); // 用于收集所有已处理记录的主键,以便后续删除
while (true) {
// 1. 批次查询源数据
List<Map<String, Object>> sourceRecordsBatch = sourceDbTemplate.queryForList(
ArchiveSQLQueries.buildSQLQueryToFetchSourceRecords(sourceTable, primaryKeyColumn, batchSize, processedRecords),
compareDate1
);
if (sourceRecordsBatch.isEmpty()) {
logger.info("No more records to fetch for table {}. Total processed: {}", sourceTable, processedRecords);
break; // 没有更多记录,退出循环
}
logger.info("Fetched batch of {} records from {} (offset: {})", sourceRecordsBatch.size(), sourceTable, processedRecords);
// 2. 准备当前批次的主键列表
List<Object> currentBatchPrimaryKeys = new ArrayList<>();
// 3. 复制当前批次数据到目标表
int recordsInsertedInBatch = copySourceRecords(targetDbTemplate, targetTable, primaryKeyColumn, sourceRecordsBatch, currentBatchPrimaryKeys);
if (recordsInsertedInBatch > 0) {
// 将当前批次的主键添加到总列表中
allPrimaryKeyValueList.addAll(currentBatchPrimaryKeys);
logger.info("Copied {} record(s) to {}. Total copied: {}", recordsInsertedInBatch, targetTable, allPrimaryKeyValueList.size());
}
// 4. 更新已处理记录数,作为下一次查询的偏移量
processedRecords += sourceRecordsBatch.size();
}
// 5. 所有批次处理完毕后,根据收集到的主键删除源数据
if (!allPrimaryKeyValueList.isEmpty()) {
logger.info("Initiating deletion of {} records from source table {}.", allPrimaryKeyValueList.size(), sourceTable);
// 注意:如果 allPrimaryKeyValueList 极其庞大,deleteSourceRecords 方法也可能需要内部批处理
deleteSourceRecords(sourceDbTemplate, sourceTable, primaryKeyColumn, allPrimaryKeyValueList);
logger.info("Deletion completed for {} records from source table {}.", allPrimaryKeyValueList.size(), sourceTable);
}
} catch (Exception e) {
logger.error("Exception in archiveTableRecords: {} {}", e.getMessage(), e);
// 在实际应用中,这里需要更健壮的异常处理,例如记录失败的批次信息,以便后续重试
}
}
// 假设 deleteSourceRecords 方法已存在
public void deleteSourceRecords(JdbcTemplate sourceDbTemplate, String sourceTable, String primaryKeyColumn, List<Object> primaryKeyValueList) {
// 示例:DELETE FROM source_table WHERE primaryKeyColumn IN (?, ?, ...)
// 对于非常大的 primaryKeyValueList,需要考虑分批次执行DELETE语句,或使用其他高效的删除策略
String deleteSql = "DELETE FROM " + sourceTable + " WHERE " + primaryKeyColumn + " IN (" +
String.join(",", java.util.Collections.nCopies(primaryKeyValueList.size(), "?")) + ")";
try {
sourceDbTemplate.batchUpdate(deleteSql, new ArrayList<>(primaryKeyValueList)); // 假设可以一次性处理
} catch (Exception e) {今天带大家了解了的相关知识,希望对你有所帮助;关于文章的技术知识我们会一点点深入介绍,欢迎大家关注golang学习网公众号,一起学习编程~
GolangHTTP分块上传优化技巧
- 上一篇
- GolangHTTP分块上传优化技巧
- 下一篇
- Golang微服务版本兼容全攻略
-
- 文章 · java教程 | 1分钟前 |
- JUnit5assertThat方法详解与使用教程
- 335浏览 收藏
-
- 文章 · java教程 | 32分钟前 |
- Java环境搭建指南:JDK与IDE安装步骤
- 441浏览 收藏
-
- 文章 · java教程 | 49分钟前 |
- 解压JDK如何配置环境变量?
- 366浏览 收藏
-
- 文章 · java教程 | 1小时前 |
- Java开发投票评分系统教程实战
- 221浏览 收藏
-
- 文章 · java教程 | 1小时前 |
- KafkaConnectSinkTask隔离与对象管理解析
- 226浏览 收藏
-
- 文章 · java教程 | 1小时前 |
- Java接口回调解耦技巧分享
- 224浏览 收藏
-
- 文章 · java教程 | 1小时前 |
- JavaStream转Map技巧:toMap使用详解
- 318浏览 收藏
-
- 文章 · java教程 | 1小时前 |
- KubernetesOperator开发实战指南
- 430浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3186次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3398次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3429次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4535次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3807次使用
-
- 提升Java功能开发效率的有力工具:微服务架构
- 2023-10-06 501浏览
-
- 掌握Java海康SDK二次开发的必备技巧
- 2023-10-01 501浏览
-
- 如何使用java实现桶排序算法
- 2023-10-03 501浏览
-
- Java开发实战经验:如何优化开发逻辑
- 2023-10-31 501浏览
-
- 如何使用Java中的Math.max()方法比较两个数的大小?
- 2023-11-18 501浏览

