const { EventEmitter } = require('eventemitter3'); const PQueue = require('p-queue').default; const got = require('./got').default; const fs = require('fs-extra'); const path = require('path'); const { exec } = require('child_process'); const { promisify } = require('util'); const execAsync = promisify(exec); class grb2DownloadManager extends EventEmitter { constructor(options = {}) { super(); // 配置默认选项 this.options = { concurrency: options.concurrency || 10, retryCount: options.retryCount || 3, retryDelay: options.retryDelay || 5000, wgrib2Path: options.wgrib2Path || path.join(__dirname, "../wgrib2/wgrib2.exe"), ...options }; // 下载队列 this.queue = new PQueue({ concurrency: this.options.concurrency, autoStart: false }); // 状态管理 this.status = 'idle'; // idle, downloading, paused, stopped this.pendingTasks = []; this.activeTasks = new Map(); this.completedTasks = []; this.failedTasks = []; // 事件类型 this.EVENTS = { STATUS_CHANGE: 'statusChange', TASK_ADDED: 'taskAdded', TASK_START: 'taskStart', TASK_PROGRESS: 'taskProgress', TASK_COMPLETE: 'taskComplete', TASK_SKIP: 'taskSkip', TASK_ERROR: 'taskError', QUEUE_EMPTY: 'queueEmpty', ALL_COMPLETE: 'allComplete' }; // 绑定方法 this.start = this.start.bind(this); this.pause = this.pause.bind(this); this.resume = this.resume.bind(this); this.stop = this.stop.bind(this); this.addTasks = this.addTasks.bind(this); console.log('气象文件下载器初始化完成'); console.log(`最大并发数: ${this.options.concurrency}`); } // 设置状态 setStatus(status) { if (this.status !== status) { this.status = status; this.emit(this.EVENTS.STATUS_CHANGE, status); const statusMap = { 'idle': '空闲', 'downloading': '下载中', 'paused': '已暂停', 'stopped': '已停止' }; console.log(`下载器状态变更为: ${statusMap[status] || status}`); } } // 检查文件是否存在并使用wgrib2验证 async validateFile(filePath, fileName) { const fullPath = path.join(filePath, fileName); try { // 检查文件是否存在 const exists = await fs.pathExists(fullPath); if (!exists) { console.log(`文件不存在: ${fullPath}`); return false; } // 检查文件大小,如果文件大小为0,则直接返回无效 const stats = await fs.stat(fullPath); if (stats.size === 0) { console.log(`文件为空: ${fileName}`); return false; } // 使用wgrib2验证文件 console.log(`正在验证文件: ${fileName}`); const { stdout, stderr } = await execAsync(`${this.options.wgrib2Path} "${fullPath}"`); // 如果wgrib2有输出,通常表示文件有效 if (stdout && stdout.length > 0) { console.log(`文件验证成功: ${fileName}`); return true; } else { console.log(`文件验证失败: ${fileName}`); return false; } } catch (error) { console.log(`文件验证出错: ${fileName}`, error.message); return false; } } // 下载文件 async downloadFile(task, retryCount = 0) { const { url, filePath, fileName } = task; const fullPath = path.join(filePath, fileName); // 确保目录存在 await fs.ensureDir(filePath); try { console.log(`开始下载: ${fileName}`); const downloadStream = got.stream(url); // 使用got的流式下载API // const downloadStream = got(url, { // isStream: true, // timeout: 300000, // 5分钟超时 // retry: 0 // 禁用内置重试,因为我们自己实现了重试逻辑 // }); const fileWriterStream = fs.createWriteStream(fullPath); // 返回Promise以便等待下载完成 return new Promise((resolve, reject) => { let downloadedBytes = 0; let totalBytes = 0; // 监听下载进度 downloadStream.on('downloadProgress', (progress) => { downloadedBytes = progress.transferred; totalBytes = progress.total || 0; this.emit(this.EVENTS.TASK_PROGRESS, { task, downloaded: downloadedBytes, total: totalBytes, percent: totalBytes ? (downloadedBytes / totalBytes * 100) : 0 }); }); // 管道流到文件 downloadStream.pipe(fileWriterStream); // 处理下载完成 fileWriterStream.on('finish', () => { console.log(`下载完成: ${fileName}`); resolve(); }); // 处理错误 fileWriterStream.on('error', (error) => { console.log(`文件写入错误: ${fileName}`, error.message); reject(error); }); downloadStream.on('error', (error) => { console.log(`下载错误: ${fileName}`, error.message); // 重试逻辑 if (retryCount < this.options.retryCount) { console.log(`准备第 ${retryCount + 1} 次重试: ${fileName}`); setTimeout(() => { this.downloadFile(task, retryCount + 1) .then(resolve) .catch(reject); }, this.options.retryDelay); } else { reject(error); } }); }); } catch (error) { console.log(`下载过程出错: ${fileName}`, error.message); // 重试逻辑 if (retryCount < this.options.retryCount) { console.log(`准备第 ${retryCount + 1} 次重试: ${fileName}`); await new Promise(resolve => setTimeout(resolve, this.options.retryDelay)); return this.downloadFile(task, retryCount + 1); } else { throw error; } } } // 处理单个任务 async processTask(task) { const { fileName, filePath } = task; // 标记任务开始 this.activeTasks.set(fileName, task); this.emit(this.EVENTS.TASK_START, task); console.log(`开始处理任务: ${fileName}`); try { // 检查文件是否已存在且有效 const isValid = await this.validateFile(filePath, fileName); if (isValid) { // 文件已存在且有效,跳过下载 this.emit(this.EVENTS.TASK_SKIP, task, '文件已存在且有效'); // console.log(`跳过下载: ${fileName} (文件已存在且有效)`); this.completedTasks.push(task); return; } // 文件不存在或无效,开始下载 console.log(`开始下载: ${fileName}`); await this.downloadFile(task); // 下载完成后再次验证文件 const postDownloadValid = await this.validateFile(filePath, fileName); if (postDownloadValid) { // 文件下载并验证成功 this.emit(this.EVENTS.TASK_COMPLETE, task); console.log(`任务完成: ${fileName}`); this.completedTasks.push(task); } else { // 文件下载后验证失败 throw new Error('文件下载后验证失败'); } } catch (error) { // 任务失败 this.emit(this.EVENTS.TASK_ERROR, task, error); console.log(`任务失败: ${fileName}`, error.message); this.failedTasks.push({ task, error }); } finally { // 从活跃任务中移除 this.activeTasks.delete(fileName); } } // 添加任务到队列 addTasks(tasks) { if (!Array.isArray(tasks)) { tasks = [tasks]; } tasks.forEach(task => { if (!task.url || !task.fileName || !task.filePath) { console.log('任务格式错误,缺少必要字段', task); return; } this.pendingTasks.push(task); this.emit(this.EVENTS.TASK_ADDED, task); console.log(`任务已添加到队列: ${task.fileName}`); }); // 如果下载器处于运行状态,自动开始处理新任务 if (this.status === 'downloading') { this.processQueue(); } return this.pendingTasks.length; } // 处理队列中的任务 processQueue() { if (this.status !== 'downloading') { return; } // 将待处理任务添加到队列 while (this.pendingTasks.length > 0 && this.queue.size < this.options.concurrency * 2) { const task = this.pendingTasks.shift(); this.queue.add(() => this.processTask(task)) .then(() => { // 检查是否所有任务都已完成 if (this.pendingTasks.length === 0 && this.queue.size === 0 && this.queue.pending === 0) { this.setStatus('idle'); this.emit(this.EVENTS.QUEUE_EMPTY); this.emit(this.EVENTS.ALL_COMPLETE, { completed: this.completedTasks, failed: this.failedTasks }); console.log('所有任务处理完成'); console.log(`成功: ${this.completedTasks.length}, 失败: ${this.failedTasks.length}`); } }) .catch(error => { console.log('任务处理出错', error.message); }); } } // 开始下载 start() { if (this.status === 'downloading') { console.log('下载器已经在运行中'); return; } this.setStatus('downloading'); this.queue.start(); this.processQueue(); console.log('下载器已启动'); } // 暂停下载 pause() { if (this.status !== 'downloading') { console.log('下载器未运行,无法暂停'); return; } this.setStatus('paused'); this.queue.pause(); console.log('下载器已暂停'); } // 恢复下载 resume() { if (this.status !== 'paused') { console.log('下载器未暂停,无法恢复'); return; } this.setStatus('downloading'); this.queue.start(); this.processQueue(); console.log('下载器已恢复'); } // 停止下载 stop() { this.setStatus('stopped'); this.queue.clear(); this.pendingTasks = []; console.log('下载器已停止,所有待处理任务已清除'); } // 获取状态信息 getStatus() { return { status: this.status, pending: this.pendingTasks.length, active: this.activeTasks.size, completed: this.completedTasks.length, failed: this.failedTasks.length }; } } // 使用示例 async function main() { // 创建下载器实例 const downloader = new grb2DownloadManager({ concurrency: 5, // 减少并发数以避免服务器压力 retryCount: 3, retryDelay: 5000, wgrib2Path: 'wgrib2' // 确保系统已安装wgrib2 }); // 监听事件 downloader.on(downloader.EVENTS.STATUS_CHANGE, (status) => { console.log('状态变更:', status); }); downloader.on(downloader.EVENTS.TASK_ADDED, (task) => { console.log('任务已添加:', task.fileName); }); downloader.on(downloader.EVENTS.TASK_START, (task) => { console.log('任务开始:', task.fileName); }); downloader.on(downloader.EVENTS.TASK_PROGRESS, (progress) => { console.log(`下载进度: ${progress.task.fileName} - ${progress.percent.toFixed(2)}%`); }); downloader.on(downloader.EVENTS.TASK_COMPLETE, (task) => { console.log('任务完成:', task.fileName); }); downloader.on(downloader.EVENTS.TASK_SKIP, (task, reason) => { console.log('任务跳过:', task.fileName, '-', reason); }); downloader.on(downloader.EVENTS.TASK_ERROR, (task, error) => { console.log('任务错误:', task.fileName, '-', error.message); }); downloader.on(downloader.EVENTS.QUEUE_EMPTY, () => { console.log('队列已空'); }); downloader.on(downloader.EVENTS.ALL_COMPLETE, (result) => { console.log('所有任务完成'); console.log('成功:', result.completed.length); console.log('失败:', result.failed.length); }); // 添加任务 const tasks = [ { "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f000&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90", "filePath": "./model/grb2/gfs.20250901/18/atmos", "fileName": "gfs.t18z.pgrb2full.0p50.f000" }, { "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f003&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90", "filePath": "./model/grb2/gfs.20250901/18/atmos", "fileName": "gfs.t18z.pgrb2full.0p50.f003" }, { "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f006&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90", "filePath": "./model/grb2/gfs.20250901/18/atmos", "fileName": "gfs.t18z.pgrb2full.0p50.f006" } ]; console.log('添加任务到下载器'); downloader.addTasks(tasks); // 开始下载 console.log('开始下载任务'); downloader.start(); // 模拟中途添加新任务 setTimeout(() => { const newTasks = [ { "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f009&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90", "filePath": "./model/grb2/gfs.20250901/18/atmos", "fileName": "gfs.t18z.pgrb2full.0p50.f009" } ]; console.log('添加新任务到下载器'); downloader.addTasks(newTasks); }, 10000); // 监控下载状态 setInterval(() => { const status = downloader.getStatus(); console.log('当前状态:', status); }, 5000); } // 运行示例 if (require.main === module) { main().catch(console.error); } module.exports = { grb2DownloadManager };