灵魂之桥前传:追忆
83.49M · 2026-02-09
在企业级应用中,批量数据处理是一个非常常见的需求。比如月底的工资代发、银彳对账、数据报表生成等。当数据量达到几十万甚至上百万时,如何高效、可靠地处理这些数据,就成了一个技术挑战。
本文将以"50万笔工资代发"为实际场景,详细介绍如何使用Spring Batch框架来处理大规模批量数据,并重点讲解当处理失败时,如何实现部分回滚机制,确保已成功处理的数据不会因为少量失败记录而全部回滚。
Spring Batch是一个轻量级的、全面的批处理框架,由Spring团队开发,旨在帮助企业开发健壮的批处理应用程序。它于2008年首次发布,经过十多年的发展,已经成为Java批处理领域的事实标准。
Spring Batch的核心设计理念包括:
Job是批处理的核心概念,代表一个完整的批处理任务。一个Job可以包含多个Step,按顺序或并行执行。
@Bean
public Job salaryPaymentJob() {
return jobBuilderFactory.get("salaryPaymentJob")
.start(step1())
.next(step2())
.build();
}
Step是Job的基本执行单元,每个Step包含:
@Bean
public Step salaryPaymentStep() {
return stepBuilderFactory.get("salaryPaymentStep")
.<SalaryPayment, SalaryPayment>chunk(1000)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
Chunk是Spring Batch处理数据的基本单位。每次从Reader读取指定数量的记录,处理后一起提交到数据库:
读取1000条 → 处理1000条 → 写入1000条 → 提交事务
Spring Batch适用于以下典型场景:
| 场景 | 描述 | 示例 |
|---|---|---|
| 数据迁移 | 跨系统数据同步 | 从旧系统迁移数据到新系统 |
| 数据转换 | ETL过程 | 从数据库读取、转换、写入数据仓库 |
| 批量处理 | 定期批量操作 | 月底工资代发、银彳对账 |
| 报表生成 | 定期生成报表 | 每日交易汇总报表 |
| 特性 | Spring Batch | Quartz | Scheduled Executor |
|---|---|---|---|
| 批量处理 | 专用 | 需要 | 需要 |
| 事务管理 | 内置 | 无 | 无 |
| 容错机制 | 完善的Skip/Retry | 无 | 无 |
| 坚控统计 | 数据库持久化 | 基础 | 无 |
| 并行处理 | 多种模式 | 无 | 基础 |
为什么需要部分回滚?
想象一下:你需要处理50万笔工资代发,如果第49万笔记录因为银彳卡号错误而失败,在没有部分回滚机制的情况下,前面489,999笔已成功处理的数据会全部回滚!这对于业务来说是不可接受的。
为了实现50万笔工资代发的高效处理,我们设计了如下的系统架构:
上图展示了Spring Batch工资代发系统的分层架构:
| 组件 | 职责 | 实现类 |
|---|---|---|
| Job | 整个批处理任务 | SalaryPaymentJob |
| Step | 任务中的一个步骤 | SalaryPaymentStep |
| ItemReader | 数据读取器 | FlatFileItemReader(读取CSV) |
| ItemProcessor | 数据处理器 | SalaryPaymentProcessor(数据验证) |
| ItemWriter | 数据写入器 | JdbcBatchItemWriter(批量写入数据库) |
Spring Batch采用**Chunk-Oriented Processing(块级处理)**模式,这是实现部分回滚的核心机制:
上图展示了Batch处理的核心流程:Reader读取数据 → Processor处理验证 → Writer批量写入,形成完整的处理管道。
对于50万笔数据的处理,Chunk机制的工作方式如下:
50万笔数据
│
├─► Chunk 1 (1-1000笔) ──► 独立事务 ──► 成功提交
├─► Chunk 2 (1001-2000笔) ──► 独立事务 ──► 成功提交
├─► Chunk 3 (2001-3000笔) ──► 独立事务 ──► 第2500笔失败 → 重试3次 → 跳过 → 其余999笔提交
├─► Chunk 4 (3001-4000笔) ──► 独立事务 ──► 成功提交
...
└─► Chunk 500 (499001-500000笔) ──► 独立事务 ──► 成功提交
最终结果:499,999笔成功,1笔被跳过
关键配置:
每个Chunk是独立的事务单元,这是实现部分回滚的关键:
上图清晰地展示了事务边界和部分回滚的工作机制:
事务规则:
实际案例: 假设Chunk 3中有1000笔数据,第500笔验证失败:
.faultTolerant() // 启用容错
.skipLimit(100) // 最多跳过100条
.skip(IllegalArgumentException.class) // 跳过数据验证异常
.skip(NullPointerException.class) // 跳过空指针异常
.retryLimit(3) // 失败重试3次
.retry(Exception.class) // 重试所有异常
在理解了部分回滚机制后,我们来看完整的工资代发数据处理流程:
上图展示了从CSV文件读取到数据库写入的完整数据流,包含以下关键步骤:
@Configuration
public class SalaryPaymentJobConfig {
@Value("${batch.chunk.size:1000}")
private int chunkSize; // 每次处理的记录数
@Value("${batch.skip.limit:100}")
private int skipLimit; // 跳过限制
@Value("${batch.retry.limit:3}")
private int retryLimit; // 重试次数
@Bean
public Step salaryPaymentStep() {
return stepBuilderFactory
.get("salaryPaymentStep")
.<SalaryPayment, SalaryPayment>chunk(chunkSize)
.reader(salaryPaymentReader())
.processor(salaryPaymentProcessor())
.writer(salaryPaymentWriter())
.faultTolerant() // 启用容错
.skipLimit(skipLimit)
.skip(IllegalArgumentException.class)
.skip(NullPointerException.class)
.retryLimit(retryLimit)
.retry(Exception.class)
.listener(new SalaryItemReadListener())
.listener(new SalaryItemWriteListener())
.build();
}
@Bean
public Job salaryPaymentJob(Step step, SalaryJobExecutionListener listener) {
return jobBuilderFactory.get("salaryPaymentJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.start(step)
.build();
}
}
@Bean
public FlatFileItemReader<SalaryPayment> salaryPaymentReader() {
FlatFileItemReader<SalaryPayment> reader = new FlatFileItemReader<>();
reader.setName("salaryPaymentReader");
reader.setResource(new ClassPathResource("input/salary-payments.csv"));
reader.setLinesToSkip(1); // 跳过CSV标题行
// 设置列映射
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[]{
"employeeId", "employeeName", "accountNumber",
"accountName", "bankName", "amount", "currency",
"paymentDate", "remark"
});
// 设置字段映射
BeanWrapperFieldSetMapper<SalaryPayment> mapper = new BeanWrapperFieldSetMapper<>();
mapper.setTargetType(SalaryPayment.class);
DefaultLineMapper<SalaryPayment> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(mapper);
reader.setLineMapper(lineMapper);
return reader;
}
public class SalaryPaymentProcessor implements ItemProcessor<SalaryPayment, SalaryPayment> {
private static final BigDecimal MIN_AMOUNT = new BigDecimal("0.01");
private static final BigDecimal MAX_AMOUNT = new BigDecimal("1000000");
@Override
public SalaryPayment process(SalaryPayment item) throws Exception {
// 1. 数据验证
if (item.getEmployeeId() == null || item.getEmployeeId().trim().isEmpty()) {
throw new IllegalArgumentException("员工ID不能为空");
}
// 2. 金额验证
if (item.getAmount() == null) {
throw new IllegalArgumentException("发放金额不能为空");
}
if (item.getAmount().compareTo(MIN_AMOUNT) < 0) {
throw new IllegalArgumentException("发放金额不能小于0.01元");
}
if (item.getAmount().compareTo(MAX_AMOUNT) > 0) {
throw new IllegalArgumentException("发放金额不能大于100万元");
}
// 3. 银彳卡号验证
if (item.getAccountNumber() == null ||
item.getAccountNumber().length() < 16 ||
item.getAccountNumber().length() > 19) {
throw new IllegalArgumentException("银彳账号长度必须在16-19位之间");
}
// 4. 设置处理状态
item.setStatus("PROCESSING");
item.setTransactionId("SAL" + System.currentTimeMillis() + item.getEmployeeId());
return item;
}
}
public class SalaryPaymentWriter implements ItemWriter<SalaryPayment> {
private final JdbcBatchItemWriter<SalaryPayment> delegate;
public SalaryPaymentWriter(DataSource dataSource) {
this.delegate = new JdbcBatchItemWriter<>();
this.delegate.setDataSource(dataSource);
this.delegate.setSql(
"INSERT INTO salary_payment " +
"(employee_id, employee_name, account_number, account_name, " +
"bank_name, amount, currency, payment_date, remark, " +
"status, transaction_id, create_time, update_time) " +
"VALUES (:employeeId, :employeeName, :accountNumber, :accountName, " +
":bankName, :amount, :currency, :paymentDate, :remark, " +
":status, :transactionId, :createTime, :updateTime)");
this.delegate.setItemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<>()
);
}
@Override
public void write(List<? extends SalaryPayment> items) throws Exception {
delegate.write(items);
}
}
@Component
public class PartialRollbackHandler implements SkipPolicy {
private static final int SKIP_LIMIT = 100;
@Override
public boolean shouldSkip(Throwable throwable, int skipCount) {
// 超过跳过限制
if (skipCount >= SKIP_LIMIT) {
return false;
}
// 文件不存在,不能跳过
if (throwable instanceof FileNotFoundException) {
return false;
}
// 数据格式错误,可以跳过
if (throwable instanceof FlatFileParseException) {
return true;
}
// 数据验证失败,可以跳过
if (throwable instanceof IllegalArgumentException ||
throwable instanceof NullPointerException) {
return true;
}
return false;
}
}
除了数据处理,Spring Batch还提供了完善的坚控和调度能力:
上图展示了完整的坚控与调度架构:
调度层:支持三种调度方式
执行层:核心执行组件
坚控层:坚控与统计
数据层:数据存储
CREATE TABLE salary_payment (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
employee_id VARCHAR(50) NOT NULL COMMENT '员工ID',
employee_name VARCHAR(100) NOT NULL COMMENT '员工姓名',
account_number VARCHAR(50) NOT NULL COMMENT '银彳账号',
account_name VARCHAR(100) NOT NULL COMMENT '账户名称',
bank_name VARCHAR(100) NOT NULL COMMENT '开户行',
amount DECIMAL(18,2) NOT NULL COMMENT '发放金额',
currency VARCHAR(10) NOT NULL DEFAULT 'chy' COMMENT '币种',
payment_date DATETIME NOT NULL COMMENT '发放日期',
remark VARCHAR(500) COMMENT '备注',
status VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '状态',
transaction_id VARCHAR(100) COMMENT '交易ID',
error_message VARCHAR(1000) COMMENT '错误信息',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_employee_id (employee_id),
INDEX idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Spring Batch框架会自动创建以下元表来存储Job执行信息:
batch_job_instance - Job实例表batch_job_execution - Job执行表batch_job_execution_params - Job参数表batch_step_execution - Step执行表batch_step_execution_context - Step上下文表@RestController
@RequestMapping("/api/batch")
public class BatchJobController {
// 启动Job
@PostMapping("/start")
public ResponseEntity<Map<String, Object>> startJob(
@RequestParam String inputFile
) {
JobParameters params = new JobParametersBuilder()
.addLong("startTime", System.currentTimeMillis())
.addString("inputFile", inputFile)
.toJobParameters();
JobExecution execution = jobLauncher.run(salaryPaymentJob, params);
return ResponseEntity.ok(result);
}
// 获取Job状态
@GetMapping("/status/{jobExecutionId}")
public ResponseEntity<Map<String, Object>> getJobStatus(
@PathVariable Long jobExecutionId
) {
JobExecution execution = jobRepository.getJobExecution(jobExecutionId);
// 返回执行详情
}
// 停止Job
@PostMapping("/stop/{jobExecutionId}")
public ResponseEntity<Map<String, Object>> stopJob(
@PathVariable Long jobExecutionId
) {
JobExecution execution = jobRepository.getJobExecution(jobExecutionId);
execution.stop();
return ResponseEntity.ok(result);
}
// 获取统计信息
@GetMapping("/statistics")
public ResponseEntity<Map<String, Object>> getStatistics() {
// 返回总数、成功数、失败数等统计
}
// 健康检查
@GetMapping("/health")
public ResponseEntity<Map<String, Object>> health() {
// 返回系统健康状态
}
}
当数据量达到50万甚至更多时,单线程处理可能成为瓶颈。Spring Batch提供了多种并行处理方式。
Spring Batch支持多线程并发处理,大幅提升处理效率:
上图展示了多线程并发处理的工作原理:
核心机制:
配置示例:
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("salary-batch-");
executor.initialize();
return executor;
}
// 在Step中使用
.step(stepName)
.chunk(chunkSize)
.taskExecutor(taskExecutor())
.throttleLimit(10) // 限制并发数
.build();
对于超大数据集,可以使用分区处理实现更高程度的并行:
上图展示了分区处理的架构:
核心组件:
配置示例:
@Bean
public Step masterStep() {
return stepBuilderFactory.get("masterStep")
.partitioner(slaveStep().getName(), rangePartitioner(1, 10))
.step(slaveStep())
.gridSize(10) // 分成10个分区
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Partitioner rangePartitioner(int min, int max) {
return new Partitioner() {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<>();
int range = (max - min) / gridSize;
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("minValue", min + i * range);
context.putInt("maxValue", min + (i + 1) * range - 1);
result.put("partition" + i, context);
}
return result;
}
};
}
| 参数 | 推荐值 | 说明 |
|---|---|---|
| chunkSize | 1000-5000 | 根据记录大小调整,越大吞吐量越高但内存占用也越大 |
| skipLimit | 100-500 | 根据数据质量设置 |
| retryLimit | 3-5 | 过多会浪费时间,过少可能误判暂时性故障 |
| 线程池大小 | CPU核心数*2 | 用于多线程处理 |
使用JDBC批量操作代替单条插入:
// 单条插入(慢)
for (SalaryPayment p : payments) {
jdbcTemplate.update(sql, p.getId(), p.getName(), ...);
}
// 批量插入(快)
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
// 设置参数
}
@Override
public int getBatchSize() {
return payments.size();
}
});
-- 为常用查询字段添加索引
CREATE INDEX idx_employee_id ON salary_payment(employee_id);
CREATE INDEX idx_status ON salary_payment(status);
CREATE INDEX idx_create_time ON salary_payment(create_time);
-- 复合索引
CREATE INDEX idx_status_employee ON salary_payment(status, employee_id);
某公司月底需要为50,000名员工发放工资,使用Spring Batch:
银彳提供100万笔交易对账文件:
每天凌晨生成T+1交易报表:
Spring Batch支持Job重启。通过JobRepository记录的执行状态,可以从上次失败的位置继续执行:
.job(salaryPaymentJob)
.allowStartIfComplete(false) // 已完成的Job不重新执行
.restartable(true) // 允许重启
使用Partitioning方式实现多线程并行处理:
@Bean
public Step masterStep() {
return stepBuilderFactory.get("masterStep")
.partitioner(slaveStep().getName(), partitioner())
.step(slaveStep())
.gridSize(10) // 分成10个分区并行处理
.taskExecutor(taskExecutor())
.build();
}
可以通过以下方式重试:
Spring Batch作为成熟的批处理框架,提供了完整的解决方案来处理大规模批量数据。
适用场景:
注意事项: