const fs = require('fs'); const path = require('path'); const https = require('https'); const { exec } = require('child_process'); const { EventEmitter } = require('events'); // 批量下载 grb2 气象文件 // 调用方法: // 在需要使用的地方引入 // const { grb2DownloadManager } = require("手动引导至/schema/downloadGrb2"); // 在需要使用的地方创建实例 // 创建下载管理器实例 // const manager = new grb2DownloadManager() // 获取下载工具当前状态 // manager.getStatus(); // 相关监听事件钩子 // manager.on('queueUpdated', (status) => { // console.log(`队列更新: 还有 ${status.queueLength} 个任务等待下载`); // }); // manager.on('downloadStarted', (status) => { // console.log('下载开始'); // }); // manager.on('taskStarted', (status) => { // console.log(`开始下载: ${status.currentTask.fileName}`); // }); // manager.on('taskProgress', ({ progress, currentTask }) => { // console.log(`下载进度: ${currentTask.fileName} - ${progress}%`); // }); // manager.on('taskCompleted', (status) => { // console.log(`任务完成: 成功 ${status.successfulDownloads}/${status.totalTasks}`); // }); // manager.on('taskFailed', ({ task, error }) => { // console.log(`任务失败: ${task.fileName} - ${error}`); // }); // manager.on('downloadCompleted', (status) => { // console.log(`所有下载完成: 成功 ${status.successfulDownloads}, 失败: ${status.failedDownloads}`); // }); // manager.on('downloadStopped', () => { // console.log('下载已停止'); // }); class grb2DownloadManager extends EventEmitter { constructor(options = {}) { super(); this.queue = []; this.isDownloading = false; this.currentDownload = null; this.status = { state: 'idle', // 'idle', 'downloading', 'completed', 'stopped' totalTasks: 0, completedTasks: 0, successfulDownloads: 0, failedDownloads: 0, currentTask: null, progress: 0, queueLength: 0 }; this.retryAttempts = options.retryAttempts || 3; this.wgrib2Path = options.wgrib2Path || path.join(__dirname, "../wgrib2/wgrib2.exe"); this.downloadInterval = null; } // 添加下载任务 async addTasks(tasks) { if (!Array.isArray(tasks)) { tasks = [tasks]; } // 检查任务是否已存在或已完成 const newTasks = []; for (const task of tasks) { const isInQueue = this.queue.some(q => q.url === task.url); const isCompleted = await this.isTaskCompleted(task); if (!isInQueue && !isCompleted) { newTasks.push(task); } } if (newTasks.length > 0) { this.queue.push(...newTasks); this.status.totalTasks += newTasks.length; this.status.queueLength = this.queue.length; this.emit('queueUpdated', this.status); if (!this.isDownloading && this.status.state !== 'stopped') { this.startDownload(); } return { added: newTasks.length, message: `添加了 ${newTasks.length} 个新任务到下载队列` }; } else { return { added: 0, message: '没有新任务可添加,所有任务已完成或已在队列中' }; } } // 检查任务是否已完成 async isTaskCompleted(task) { const filePath = path.join(task.filePath, task.fileName); if (!fs.existsSync(filePath)) { return false; } try { await this.verifyFileWithWgrib2(filePath); return true; } catch (error) { console.log(error.message); return false; } } // 使用wgrib2验证文件 verifyFileWithWgrib2(filePath) { return new Promise((resolve, reject) => { exec(`${this.wgrib2Path} "${filePath}"`, (error, stdout, stderr) => { if (error) { reject(new Error(`文件验证失败: ${error.message}`)); } else if (stderr && stderr.trim() !== '') { reject(new Error(`文件验证失败: ${stderr}`)); } else { resolve(); } }); }); } // 开始下载 startDownload() { if (this.isDownloading) { return { message: '当前已有下载任务在进行中,新任务已加入队列' }; } this.isDownloading = true; this.status.state = 'downloading'; // 使用setImmediate确保异步操作顺序正确 setImmediate(() => { this.processNextTask(); }); this.emit('downloadStarted', this.status); return { message: '下载已开始' }; } // 处理下一个任务 async processNextTask() { // 检查是否被停止 if (this.status.state === 'stopped') { this.isDownloading = false; return; } if (this.queue.length === 0) { this.isDownloading = false; this.status.state = 'completed'; this.emit('downloadCompleted', this.status); return; } const task = this.queue.shift(); this.status.queueLength = this.queue.length; this.status.currentTask = { ...task, retryCount: 0 }; this.emit('taskStarted', this.status); await this.downloadTask(task); // 处理完成后继续下一个任务 if (this.isDownloading) { setImmediate(() => { this.processNextTask(); }); } } // 下载单个任务 async downloadTask(task, retryCount = 0) { const fileDir = task.filePath; const filePath = path.join(fileDir, task.fileName); // 确保目录存在 if (!fs.existsSync(fileDir)) { fs.mkdirSync(fileDir, { recursive: true }); } // 检查文件是否已存在且有效 try { if (await this.isTaskCompleted(task)) { this.status.completedTasks++; this.status.successfulDownloads++; this.emit('taskSkipped', { ...this.status, task, reason: '文件已存在且有效' }); return; } } catch (error) { console.log(`文件检查失败: ${error.message}`); } this.status.currentTask.retryCount = retryCount; this.emit('taskProgress', { ...this.status, progress: 0 }); try { await this.downloadFile(task.url, filePath, (progress) => { this.status.progress = progress; this.emit('taskProgress', { ...this.status, progress }); }); // 验证下载的文件 await this.verifyFileWithWgrib2(filePath); this.status.completedTasks++; this.status.successfulDownloads++; this.emit('taskCompleted', this.status); } catch (error) { console.error(`下载失败: ${error.message}`); if (retryCount < this.retryAttempts) { console.log(`尝试重新下载 (${retryCount + 1}/${this.retryAttempts})`); this.emit('taskRetry', { ...this.status, task, retryCount: retryCount + 1, maxRetries: this.retryAttempts }); // 等待一段时间后重试 await new Promise(resolve => setTimeout(resolve, 5000)); // 5秒后重试 return this.downloadTask(task, retryCount + 1); } else { this.status.completedTasks++; this.status.failedDownloads++; this.emit('taskFailed', { ...this.status, task, error: error.message }); } } } // 实际下载文件 downloadFile(url, filePath, onProgress) { return new Promise((resolve, reject) => { const file = fs.createWriteStream(filePath); let receivedBytes = 0; let totalBytes = 0; const request = https.get(url, (response) => { if (response.statusCode !== 200) { reject(new Error(`服务器返回状态码: ${response.statusCode}`)); return; } totalBytes = parseInt(response.headers['content-length'], 10); this.currentDownload = { request, file }; response.pipe(file); response.on('data', (chunk) => { receivedBytes += chunk.length; const progress = totalBytes ? (receivedBytes / totalBytes * 100) : 0; onProgress(Math.round(progress)); }); file.on('finish', () => { file.close(); this.currentDownload = null; resolve(); }); }); request.on('error', (err) => { if (fs.existsSync(filePath)) { fs.unlink(filePath, () => { }); } reject(err); }); file.on('error', (err) => { if (fs.existsSync(filePath)) { fs.unlink(filePath, () => { }); } reject(err); }); // 设置超时 request.setTimeout(30000, () => { request.destroy(); if (fs.existsSync(filePath)) { fs.unlink(filePath, () => { }); } reject(new Error('请求超时')); }); request.end(); }); } // 停止当前下载 stopDownload() { if (this.currentDownload) { this.currentDownload.request.destroy(); this.currentDownload.file.destroy(); this.currentDownload = null; } this.isDownloading = false; this.status.state = 'stopped'; this.emit('downloadStopped', this.status); return { message: '下载已停止' }; } // 重置下载状态 reset() { this.stopDownload(); this.queue = []; this.status = { state: 'idle', totalTasks: 0, completedTasks: 0, successfulDownloads: 0, failedDownloads: 0, currentTask: null, progress: 0, queueLength: 0 }; this.emit('reset', this.status); return { message: '下载管理器已重置' }; } // 获取当前状态 getStatus() { return this.status; } } // 使用示例 async function main() { const downloadManager = new DownloadManager({ retryAttempts: 3, wgrib2Path: 'wgrib2' // 确保系统已安装wgrib2 }); // 监听事件 downloadManager.on('queueUpdated', (status) => { console.log(`队列更新: 还有 ${status.queueLength} 个任务等待下载`); }); downloadManager.on('downloadStarted', (status) => { console.log('下载开始'); }); downloadManager.on('taskStarted', (status) => { console.log(`开始下载: ${status.currentTask.fileName}`); }); downloadManager.on('taskProgress', ({ progress, currentTask }) => { console.log(`下载进度: ${currentTask.fileName} - ${progress}%`); }); downloadManager.on('taskCompleted', (status) => { console.log(`任务完成: 成功 ${status.successfulDownloads}/${status.totalTasks}`); }); downloadManager.on('taskFailed', ({ task, error }) => { console.log(`任务失败: ${task.fileName} - ${error}`); }); downloadManager.on('downloadCompleted', (status) => { console.log(`所有下载完成: 成功 ${status.successfulDownloads}, 失败: ${status.failedDownloads}`); }); downloadManager.on('downloadStopped', () => { console.log('下载已停止'); }); // 添加下载任务 const tasks = [ { "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f000&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90", "filePath": "./model/grb2/gfs.20250901/18/atmos", "fileName": "gfs.t18z.pgrb2full.0p50.f000" }, { "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f003&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90", "filePath": "./model/grb2/gfs.20250901/18/atmos", "fileName": "gfs.t18z.pgrb2full.0p50.f003" }, { "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f006&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90", "filePath": "./model/grb2/gfs.20250901/18/atmos", "fileName": "gfs.t18z.pgrb2full.0p50.f006" } ]; // 添加任务到下载队列 console.log((await downloadManager.addTasks(tasks)).message); // 模拟中途停止下载 setTimeout(async () => { console.log(downloadManager.stopDownload().message); // 模拟重新开始下载 setTimeout(() => { console.log(downloadManager.startDownload().message); }, 2000); }, 5000); } // 如果直接运行此脚本,则执行示例 if (require.main === module) { main().catch(console.error); } module.exports = { grb2DownloadManager };