每天10万份PDF的批处理优化实战
文章摘要
从单线程处理到分布式集群,从内存爆炸到毫秒级响应,记录一个PDF批处理系统从崩溃边缘到高性能运行的完整优化历程。
危机:系统在10万份PDF前投降了
去年双十一前夕,业务方突然提出要求:系统需要支持每天处理10万份PDF文档的批量操作。当时我心想,不就是把现有的单文件处理逻辑套个循环吗?
结果第一次压测时,系统在处理到第1000份文档时就开始频繁GC,第5000份时内存溢出,第10000份时直接宕机。那一刻我才意识到,批处理系统和单文件处理完全是两个概念。
接下来的两个月,我经历了从架构重设计到性能调优的完整历程...
初版系统的灾难现场
让我们先看看初版系统是怎么崩溃的:
// 初版系统的天真实现
@Service
public class NaiveBatchProcessor {
@Autowired
private PDFService pdfService;
public BatchResult processBatch(List filePaths) {
List results = new ArrayList<>();
// 致命错误1:同步串行处理
for (String filePath : filePaths) {
try {
// 致命错误2:每次都重新加载完整文件到内存
byte[] pdfData = Files.readAllBytes(Paths.get(filePath));
// 致命错误3:没有资源释放机制
PDDocument document = PDDocument.load(pdfData);
// 致命错误4:复杂操作没有进度反馈
ProcessResult result = pdfService.complexProcess(document);
results.add(result);
// 致命错误5:忘记关闭文档,内存泄漏
// document.close(); // 注释掉的代码...
} catch (Exception e) {
// 致命错误6:单个文件出错就终止整个批次
throw new BatchProcessException("处理失败: " + filePath, e);
}
}
return new BatchResult(results);
}
}
性能测试结果
- 1000份文档:耗时25分钟,内存使用8GB
- 5000份文档:内存溢出,系统崩溃
- 错误处理:一个文件出错,整批次失败
- 监控盲区:无法知道处理进度
重新设计:分布式批处理架构
痛定思痛,我重新设计了整套批处理架构:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 任务调度器 │───▶│ 任务分片器 │───▶│ 消息队列 │
│ TaskScheduler │ │ TaskSplitter │ │ Redis/RabbitMQ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 状态管理器 │◀───│ 工作节点池 │◀───│ 资源监控器 │
│ StateManager │ │ WorkerPool │ │ ResourceMonitor │
└─────────────────┘ └─────────────────┘ └─────────────────┘
核心组件实现
// 改进后的批处理控制器
@RestController
@RequestMapping("/batch")
public class OptimizedBatchController {
@Autowired
private BatchTaskManager taskManager;
@PostMapping("/submit")
public ResponseEntity submitBatch(@RequestBody BatchRequest request) {
// 创建批处理任务
BatchTask task = BatchTask.builder()
.taskId(UUID.randomUUID().toString())
.fileList(request.getFiles())
.processingType(request.getType())
.priority(request.getPriority())
.build();
// 提交到任务管理器
String taskId = taskManager.submitTask(task);
return ResponseEntity.ok(BatchTaskResponse.builder()
.taskId(taskId)
.status("SUBMITTED")
.estimatedTime(taskManager.estimateProcessingTime(task))
.build());
}
@GetMapping("/status/{taskId}")
public ResponseEntity getTaskStatus(@PathVariable String taskId) {
TaskStatus status = taskManager.getTaskStatus(taskId);
return ResponseEntity.ok(status);
}
}
// 任务管理器
@Service
public class BatchTaskManager {
@Autowired
private TaskSplitter taskSplitter;
@Autowired
private MessageQueue messageQueue;
@Autowired
private TaskStateManager stateManager;
public String submitTask(BatchTask task) {
// 1. 保存任务状态
stateManager.saveTask(task);
// 2. 任务分片
List subTasks = taskSplitter.split(task);
// 3. 提交到消息队列
for (SubTask subTask : subTasks) {
messageQueue.send("pdf.processing.queue", subTask);
}
// 4. 更新状态为处理中
stateManager.updateTaskStatus(task.getTaskId(), TaskStatus.PROCESSING);
return task.getTaskId();
}
}
工作节点优化
内存优化策略
// 内存友好的工作节点实现
@Component
public class OptimizedPDFWorker {
private final int MAX_HEAP_USAGE = 70; // 最大堆内存使用率
private final Object memoryLock = new Object();
@RabbitListener(queues = "pdf.processing.queue")
public void processSubTask(SubTask subTask) {
try {
// 内存检查
if (!checkMemoryAvailable()) {
// 内存不足,重新排队
requeueTask(subTask);
return;
}
// 流式处理,避免大文件全载入内存
processFileStream(subTask);
} catch (Exception e) {
handleProcessingError(subTask, e);
} finally {
// 强制垃圾回收
System.gc();
}
}
private boolean checkMemoryAvailable() {
Runtime runtime = Runtime.getRuntime();
long maxMemory = runtime.maxMemory();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;
double memoryUsage = (double) usedMemory / maxMemory * 100;
return memoryUsage < MAX_HEAP_USAGE;
}
private void processFileStream(SubTask subTask) {
// 使用流式处理,分块读取
try (FileInputStream fis = new FileInputStream(subTask.getFilePath());
BufferedInputStream bis = new BufferedInputStream(fis)) {
// 分块处理
byte[] buffer = new byte[8192]; // 8KB缓冲区
PDFStreamProcessor processor = new PDFStreamProcessor();
int bytesRead;
while ((bytesRead = bis.read(buffer)) != -1) {
processor.processChunk(buffer, bytesRead);
// 定期检查内存使用情况
if (!checkMemoryAvailable()) {
processor.pauseProcessing();
Thread.sleep(1000); // 等待GC
}
}
// 完成处理
ProcessResult result = processor.finishProcessing();
updateTaskProgress(subTask, result);
} catch (IOException e) {
throw new ProcessingException("文件处理失败", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ProcessingException("处理被中断", e);
}
}
}
并行处理优化
// 智能线程池管理
@Configuration
public class BatchProcessingConfig {
@Bean("pdfProcessingExecutor")
public ThreadPoolTaskExecutor pdfProcessingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 动态调整线程池大小
int availableProcessors = Runtime.getRuntime().availableProcessors();
long availableMemoryGB = Runtime.getRuntime().maxMemory() / (1024 * 1024 * 1024);
// 基于CPU和内存计算最优线程数
int corePoolSize = Math.max(2, availableProcessors / 2);
int maxPoolSize = Math.min(availableProcessors * 2, (int) availableMemoryGB * 2);
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("PDF-Batch-");
// 自定义拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
// 自适应批处理器
@Service
public class AdaptiveBatchProcessor {
@Autowired
@Qualifier("pdfProcessingExecutor")
private ThreadPoolTaskExecutor executor;
private final AtomicInteger activeTaskCount = new AtomicInteger(0);
private final CircularBuffer performanceHistory = new CircularBuffer(100);
public CompletableFuture processAsync(SubTask subTask) {
return CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
activeTaskCount.incrementAndGet();
try {
ProcessResult result = doProcess(subTask);
// 记录性能数据
long processingTime = System.currentTimeMillis() - startTime;
recordPerformance(subTask.getFileSize(), processingTime);
return result;
} finally {
activeTaskCount.decrementAndGet();
}
}, executor);
}
private void recordPerformance(long fileSize, long processingTime) {
PerformanceMetric metric = new PerformanceMetric(
fileSize, processingTime, System.currentTimeMillis()
);
performanceHistory.add(metric);
// 动态调整处理策略
if (performanceHistory.isFull()) {
adjustProcessingStrategy();
}
}
private void adjustProcessingStrategy() {
double avgProcessingTime = performanceHistory.getAverageProcessingTime();
int currentActiveCount = activeTaskCount.get();
// 如果平均处理时间过长,减少并发数
if (avgProcessingTime > 30000 && currentActiveCount > 2) { // 30秒
executor.setCorePoolSize(Math.max(1, executor.getCorePoolSize() - 1));
log.info("降低并发数到: {}", executor.getCorePoolSize());
}
// 如果处理很快且CPU使用率不高,增加并发数
else if (avgProcessingTime < 5000 && currentActiveCount < executor.getMaxPoolSize()) {
executor.setCorePoolSize(Math.min(executor.getMaxPoolSize(),
executor.getCorePoolSize() + 1));
log.info("提高并发数到: {}", executor.getCorePoolSize());
}
}
}
监控和告警系统
建立了完善的监控体系,实时掌握处理状态:
// 实时监控组件
@Component
public class BatchProcessingMonitor {
@Autowired
private MeterRegistry meterRegistry;
@Autowired
private AlertService alertService;
private final Timer processingTimer;
private final Counter successCounter;
private final Counter errorCounter;
private final Gauge activeTaskGauge;
public BatchProcessingMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.processingTimer = Timer.builder("pdf.processing.duration")
.description("PDF处理耗时")
.register(meterRegistry);
this.successCounter = Counter.builder("pdf.processing.success")
.description("处理成功数量")
.register(meterRegistry);
this.errorCounter = Counter.builder("pdf.processing.error")
.description("处理错误数量")
.register(meterRegistry);
this.activeTaskGauge = Gauge.builder("pdf.processing.active")
.description("当前活跃任务数")
.register(meterRegistry, this, BatchProcessingMonitor::getActiveTaskCount);
}
@EventListener
public void handleTaskCompleted(TaskCompletedEvent event) {
// 记录指标
processingTimer.record(event.getProcessingTime(), TimeUnit.MILLISECONDS);
if (event.isSuccess()) {
successCounter.increment();
} else {
errorCounter.increment();
// 错误率过高时告警
double errorRate = getErrorRate();
if (errorRate > 0.1) { // 错误率超过10%
alertService.sendAlert("PDF批处理错误率过高",
"当前错误率: " + String.format("%.2f%%", errorRate * 100));
}
}
}
@Scheduled(fixedDelay = 30000) // 每30秒检查一次
public void performHealthCheck() {
HealthMetrics metrics = collectHealthMetrics();
// 检查内存使用率
if (metrics.getMemoryUsage() > 0.85) {
alertService.sendAlert("内存使用率过高",
"当前内存使用率: " + String.format("%.1f%%", metrics.getMemoryUsage() * 100));
}
// 检查队列积压
if (metrics.getQueueSize() > 10000) {
alertService.sendAlert("任务队列积压严重",
"当前队列长度: " + metrics.getQueueSize());
}
// 检查处理速度
if (metrics.getProcessingRate() < 10) { // 每秒处理少于10个文件
alertService.sendAlert("处理速度过慢",
"当前处理速度: " + metrics.getProcessingRate() + " files/sec");
}
}
}
优化效果对比
经过全面优化后,系统性能有了质的飞跃:
性能指标 | 优化前 | 优化后 | 提升倍数 |
---|---|---|---|
10万文件处理时间 | 系统崩溃 | 2.5小时 | 从不可用到可用 |
并发处理能力 | 1个/秒 | 50个/秒 | 50倍 |
内存使用 | 8GB+溢出 | 2GB稳定 | 降低75% |
系统可用性 | 50% | 99.5% | 提升99% |
生产环境实战经验
关键成功因素
- 分而治之:大任务拆分成小任务,降低单点故障影响
- 资源隔离:不同优先级任务使用独立资源池
- 弹性设计:支持动态扩缩容和故障恢复
- 监控先行:全链路监控,问题早发现早处理
- 渐进优化:持续收集数据,不断调优
踩坑记录
- 内存泄漏:PDF文档处理完后必须显式关闭
- 线程死锁:要注意共享资源的锁竞争
- 消息丢失:队列消费失败要有重试和死信机制
- 数据一致性:分布式环境下状态同步很重要
运维自动化
建立了自动化运维体系,减少人工干预:
- 自动扩容:根据队列长度和处理速度自动调整节点数
- 故障恢复:节点异常时自动重启和任务重新分配
- 性能调优:基于历史数据自动调整参数
- 预警机制:异常情况及时通知运维人员
写在最后
这次PDF批处理系统的优化让我深刻体会到了"量变引起质变"的道理。单个文件处理的逻辑看起来正确,但放到批处理场景下就会暴露各种问题。
大规模系统设计不是简单的功能堆砌,而是需要从架构层面重新思考。资源管理、错误处理、监控告警、运维自动化,每一个环节都至关重要。
最重要的是,优化不是一次性的工作,而是一个持续的过程。系统上线后,我们还在不断根据实际运行数据进行调优,这种持续改进的文化才是系统长期稳定运行的保障。
如果你也在设计批处理系统,记住一个原则:先让它能跑起来,再让它跑得快,最后让它跑得稳。三个阶段缺一不可!