| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437 |
- const fs = require('fs');
- const path = require('path');
- const https = require('https');
- const { exec } = require('child_process');
- const { EventEmitter } = require('events');
- // 批量下载 grb2 气象文件
- // 调用方法:
- // 在需要使用的地方引入
- // const { grb2DownloadManager } = require("手动引导至/schema/downloadGrb2");
- // 在需要使用的地方创建实例
- // 创建下载管理器实例
- // const manager = new grb2DownloadManager()
- // 获取下载工具当前状态
- // manager.getStatus();
- // 相关监听事件钩子
- // manager.on('queueUpdated', (status) => {
- // console.log(`队列更新: 还有 ${status.queueLength} 个任务等待下载`);
- // });
- // manager.on('downloadStarted', (status) => {
- // console.log('下载开始');
- // });
- // manager.on('taskStarted', (status) => {
- // console.log(`开始下载: ${status.currentTask.fileName}`);
- // });
- // manager.on('taskProgress', ({ progress, currentTask }) => {
- // console.log(`下载进度: ${currentTask.fileName} - ${progress}%`);
- // });
- // manager.on('taskCompleted', (status) => {
- // console.log(`任务完成: 成功 ${status.successfulDownloads}/${status.totalTasks}`);
- // });
- // manager.on('taskFailed', ({ task, error }) => {
- // console.log(`任务失败: ${task.fileName} - ${error}`);
- // });
- // manager.on('downloadCompleted', (status) => {
- // console.log(`所有下载完成: 成功 ${status.successfulDownloads}, 失败: ${status.failedDownloads}`);
- // });
- // manager.on('downloadStopped', () => {
- // console.log('下载已停止');
- // });
- class grb2DownloadManager extends EventEmitter {
- constructor(options = {}) {
- super();
- this.queue = [];
- this.isDownloading = false;
- this.currentDownload = null;
- this.status = {
- state: 'idle', // 'idle', 'downloading', 'completed', 'stopped'
- totalTasks: 0,
- completedTasks: 0,
- successfulDownloads: 0,
- failedDownloads: 0,
- currentTask: null,
- progress: 0,
- queueLength: 0
- };
- this.retryAttempts = options.retryAttempts || 3;
- this.wgrib2Path = options.wgrib2Path || path.join(__dirname, "../wgrib2/wgrib2.exe");
- this.downloadInterval = null;
- }
- // 添加下载任务
- async addTasks(tasks) {
- if (!Array.isArray(tasks)) {
- tasks = [tasks];
- }
- // 检查任务是否已存在或已完成
- const newTasks = [];
- for (const task of tasks) {
- const isInQueue = this.queue.some(q => q.url === task.url);
- const isCompleted = await this.isTaskCompleted(task);
- if (!isInQueue && !isCompleted) {
- newTasks.push(task);
- }
- }
- if (newTasks.length > 0) {
- this.queue.push(...newTasks);
- this.status.totalTasks += newTasks.length;
- this.status.queueLength = this.queue.length;
- this.emit('queueUpdated', this.status);
- if (!this.isDownloading && this.status.state !== 'stopped') {
- this.startDownload();
- }
- return {
- added: newTasks.length,
- message: `添加了 ${newTasks.length} 个新任务到下载队列`
- };
- } else {
- return {
- added: 0,
- message: '没有新任务可添加,所有任务已完成或已在队列中'
- };
- }
- }
- // 检查任务是否已完成
- async isTaskCompleted(task) {
- const filePath = path.join(task.filePath, task.fileName);
- if (!fs.existsSync(filePath)) {
- return false;
- }
- try {
- await this.verifyFileWithWgrib2(filePath);
- return true;
- } catch (error) {
- console.log(error.message);
- return false;
- }
- }
- // 使用wgrib2验证文件
- verifyFileWithWgrib2(filePath) {
- return new Promise((resolve, reject) => {
- exec(`${this.wgrib2Path} "${filePath}"`, (error, stdout, stderr) => {
- if (error) {
- reject(new Error(`文件验证失败: ${error.message}`));
- } else if (stderr && stderr.trim() !== '') {
- reject(new Error(`文件验证失败: ${stderr}`));
- } else {
- resolve();
- }
- });
- });
- }
- // 开始下载
- startDownload() {
- if (this.isDownloading) {
- return { message: '当前已有下载任务在进行中,新任务已加入队列' };
- }
- this.isDownloading = true;
- this.status.state = 'downloading';
- // 使用setImmediate确保异步操作顺序正确
- setImmediate(() => {
- this.processNextTask();
- });
- this.emit('downloadStarted', this.status);
- return { message: '下载已开始' };
- }
- // 处理下一个任务
- async processNextTask() {
- // 检查是否被停止
- if (this.status.state === 'stopped') {
- this.isDownloading = false;
- return;
- }
- if (this.queue.length === 0) {
- this.isDownloading = false;
- this.status.state = 'completed';
- this.emit('downloadCompleted', this.status);
- return;
- }
- const task = this.queue.shift();
- this.status.queueLength = this.queue.length;
- this.status.currentTask = {
- ...task,
- retryCount: 0
- };
- this.emit('taskStarted', this.status);
- await this.downloadTask(task);
- // 处理完成后继续下一个任务
- if (this.isDownloading) {
- setImmediate(() => {
- this.processNextTask();
- });
- }
- }
- // 下载单个任务
- async downloadTask(task, retryCount = 0) {
- const fileDir = task.filePath;
- const filePath = path.join(fileDir, task.fileName);
- // 确保目录存在
- if (!fs.existsSync(fileDir)) {
- fs.mkdirSync(fileDir, { recursive: true });
- }
- // 检查文件是否已存在且有效
- try {
- if (await this.isTaskCompleted(task)) {
- this.status.completedTasks++;
- this.status.successfulDownloads++;
- this.emit('taskSkipped', { ...this.status, task, reason: '文件已存在且有效' });
- return;
- }
- } catch (error) {
- console.log(`文件检查失败: ${error.message}`);
- }
- this.status.currentTask.retryCount = retryCount;
- this.emit('taskProgress', { ...this.status, progress: 0 });
- try {
- await this.downloadFile(task.url, filePath, (progress) => {
- this.status.progress = progress;
- this.emit('taskProgress', { ...this.status, progress });
- });
- // 验证下载的文件
- await this.verifyFileWithWgrib2(filePath);
- this.status.completedTasks++;
- this.status.successfulDownloads++;
- this.emit('taskCompleted', this.status);
- } catch (error) {
- console.error(`下载失败: ${error.message}`);
- if (retryCount < this.retryAttempts) {
- console.log(`尝试重新下载 (${retryCount + 1}/${this.retryAttempts})`);
- this.emit('taskRetry', {
- ...this.status,
- task,
- retryCount: retryCount + 1,
- maxRetries: this.retryAttempts
- });
- // 等待一段时间后重试
- await new Promise(resolve => setTimeout(resolve, 5000)); // 5秒后重试
- return this.downloadTask(task, retryCount + 1);
- } else {
- this.status.completedTasks++;
- this.status.failedDownloads++;
- this.emit('taskFailed', {
- ...this.status,
- task,
- error: error.message
- });
- }
- }
- }
- // 实际下载文件
- downloadFile(url, filePath, onProgress) {
- return new Promise((resolve, reject) => {
- const file = fs.createWriteStream(filePath);
- let receivedBytes = 0;
- let totalBytes = 0;
- const request = https.get(url, (response) => {
- if (response.statusCode !== 200) {
- reject(new Error(`服务器返回状态码: ${response.statusCode}`));
- return;
- }
- totalBytes = parseInt(response.headers['content-length'], 10);
- this.currentDownload = { request, file };
- response.pipe(file);
- response.on('data', (chunk) => {
- receivedBytes += chunk.length;
- const progress = totalBytes ? (receivedBytes / totalBytes * 100) : 0;
- onProgress(Math.round(progress));
- });
- file.on('finish', () => {
- file.close();
- this.currentDownload = null;
- resolve();
- });
- });
- request.on('error', (err) => {
- if (fs.existsSync(filePath)) {
- fs.unlink(filePath, () => { });
- }
- reject(err);
- });
- file.on('error', (err) => {
- if (fs.existsSync(filePath)) {
- fs.unlink(filePath, () => { });
- }
- reject(err);
- });
- // 设置超时
- request.setTimeout(30000, () => {
- request.destroy();
- if (fs.existsSync(filePath)) {
- fs.unlink(filePath, () => { });
- }
- reject(new Error('请求超时'));
- });
- request.end();
- });
- }
- // 停止当前下载
- stopDownload() {
- if (this.currentDownload) {
- this.currentDownload.request.destroy();
- this.currentDownload.file.destroy();
- this.currentDownload = null;
- }
- this.isDownloading = false;
- this.status.state = 'stopped';
- this.emit('downloadStopped', this.status);
- return { message: '下载已停止' };
- }
- // 重置下载状态
- reset() {
- this.stopDownload();
- this.queue = [];
- this.status = {
- state: 'idle',
- totalTasks: 0,
- completedTasks: 0,
- successfulDownloads: 0,
- failedDownloads: 0,
- currentTask: null,
- progress: 0,
- queueLength: 0
- };
- this.emit('reset', this.status);
- return { message: '下载管理器已重置' };
- }
- // 获取当前状态
- getStatus() {
- return this.status;
- }
- }
- // 使用示例
- async function main() {
- const downloadManager = new DownloadManager({
- retryAttempts: 3,
- wgrib2Path: 'wgrib2' // 确保系统已安装wgrib2
- });
- // 监听事件
- downloadManager.on('queueUpdated', (status) => {
- console.log(`队列更新: 还有 ${status.queueLength} 个任务等待下载`);
- });
- downloadManager.on('downloadStarted', (status) => {
- console.log('下载开始');
- });
- downloadManager.on('taskStarted', (status) => {
- console.log(`开始下载: ${status.currentTask.fileName}`);
- });
- downloadManager.on('taskProgress', ({ progress, currentTask }) => {
- console.log(`下载进度: ${currentTask.fileName} - ${progress}%`);
- });
- downloadManager.on('taskCompleted', (status) => {
- console.log(`任务完成: 成功 ${status.successfulDownloads}/${status.totalTasks}`);
- });
- downloadManager.on('taskFailed', ({ task, error }) => {
- console.log(`任务失败: ${task.fileName} - ${error}`);
- });
- downloadManager.on('downloadCompleted', (status) => {
- console.log(`所有下载完成: 成功 ${status.successfulDownloads}, 失败: ${status.failedDownloads}`);
- });
- downloadManager.on('downloadStopped', () => {
- console.log('下载已停止');
- });
- // 添加下载任务
- const tasks = [
- {
- "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f000&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
- "filePath": "./model/grb2/gfs.20250901/18/atmos",
- "fileName": "gfs.t18z.pgrb2full.0p50.f000"
- },
- {
- "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f003&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
- "filePath": "./model/grb2/gfs.20250901/18/atmos",
- "fileName": "gfs.t18z.pgrb2full.0p50.f003"
- },
- {
- "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f006&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
- "filePath": "./model/grb2/gfs.20250901/18/atmos",
- "fileName": "gfs.t18z.pgrb2full.0p50.f006"
- }
- ];
- // 添加任务到下载队列
- console.log((await downloadManager.addTasks(tasks)).message);
- // 模拟中途停止下载
- setTimeout(async () => {
- console.log(downloadManager.stopDownload().message);
- // 模拟重新开始下载
- setTimeout(() => {
- console.log(downloadManager.startDownload().message);
- }, 2000);
- }, 5000);
- }
- // 如果直接运行此脚本,则执行示例
- if (require.main === module) {
- main().catch(console.error);
- }
- module.exports = { grb2DownloadManager };
|