| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
- const { EventEmitter } = require('eventemitter3');
- const PQueue = require('p-queue').default;
- const got = require('./got').default;
- const fs = require('fs-extra');
- const path = require('path');
- const { exec } = require('child_process');
- const { promisify } = require('util');
- const execAsync = promisify(exec);
- class grb2DownloadManager extends EventEmitter {
- constructor(options = {}) {
- super();
- // 配置默认选项
- this.options = {
- concurrency: options.concurrency || 10,
- retryCount: options.retryCount || 3,
- retryDelay: options.retryDelay || 5000,
- wgrib2Path: options.wgrib2Path || path.join(__dirname, "../wgrib2/wgrib2.exe"),
- ...options
- };
- // 下载队列
- this.queue = new PQueue({
- concurrency: this.options.concurrency,
- autoStart: false
- });
- // 状态管理
- this.status = 'idle'; // idle, downloading, paused, stopped
- this.pendingTasks = [];
- this.activeTasks = new Map();
- this.completedTasks = [];
- this.failedTasks = [];
- // 事件类型
- this.EVENTS = {
- STATUS_CHANGE: 'statusChange',
- TASK_ADDED: 'taskAdded',
- TASK_START: 'taskStart',
- TASK_PROGRESS: 'taskProgress',
- TASK_COMPLETE: 'taskComplete',
- TASK_SKIP: 'taskSkip',
- TASK_ERROR: 'taskError',
- QUEUE_EMPTY: 'queueEmpty',
- ALL_COMPLETE: 'allComplete'
- };
- // 绑定方法
- this.start = this.start.bind(this);
- this.pause = this.pause.bind(this);
- this.resume = this.resume.bind(this);
- this.stop = this.stop.bind(this);
- this.addTasks = this.addTasks.bind(this);
- console.log('气象文件下载器初始化完成');
- console.log(`最大并发数: ${this.options.concurrency}`);
- }
- // 设置状态
- setStatus(status) {
- if (this.status !== status) {
- this.status = status;
- this.emit(this.EVENTS.STATUS_CHANGE, status);
- const statusMap = {
- 'idle': '空闲',
- 'downloading': '下载中',
- 'paused': '已暂停',
- 'stopped': '已停止'
- };
- console.log(`下载器状态变更为: ${statusMap[status] || status}`);
- }
- }
- // 检查文件是否存在并使用wgrib2验证
- async validateFile(filePath, fileName) {
- const fullPath = path.join(filePath, fileName);
- try {
- // 检查文件是否存在
- const exists = await fs.pathExists(fullPath);
- if (!exists) {
- console.log(`文件不存在: ${fullPath}`);
- return false;
- }
- // 检查文件大小,如果文件大小为0,则直接返回无效
- const stats = await fs.stat(fullPath);
- if (stats.size === 0) {
- console.log(`文件为空: ${fileName}`);
- return false;
- }
- // 使用wgrib2验证文件
- console.log(`正在验证文件: ${fileName}`);
- const { stdout, stderr } = await execAsync(`${this.options.wgrib2Path} "${fullPath}"`);
- // 如果wgrib2有输出,通常表示文件有效
- if (stdout && stdout.length > 0) {
- console.log(`文件验证成功: ${fileName}`);
- return true;
- } else {
- console.log(`文件验证失败: ${fileName}`);
- return false;
- }
- } catch (error) {
- console.log(`文件验证出错: ${fileName}`, error.message);
- return false;
- }
- }
- // 下载文件
- async downloadFile(task, retryCount = 0) {
- const { url, filePath, fileName } = task;
- const fullPath = path.join(filePath, fileName);
- // 确保目录存在
- await fs.ensureDir(filePath);
- try {
- console.log(`开始下载: ${fileName}`);
- const downloadStream = got.stream(url);
- // 使用got的流式下载API
- // const downloadStream = got(url, {
- // isStream: true,
- // timeout: 300000, // 5分钟超时
- // retry: 0 // 禁用内置重试,因为我们自己实现了重试逻辑
- // });
- const fileWriterStream = fs.createWriteStream(fullPath);
- // 返回Promise以便等待下载完成
- return new Promise((resolve, reject) => {
- let downloadedBytes = 0;
- let totalBytes = 0;
- // 监听下载进度
- downloadStream.on('downloadProgress', (progress) => {
- downloadedBytes = progress.transferred;
- totalBytes = progress.total || 0;
- this.emit(this.EVENTS.TASK_PROGRESS, {
- task,
- downloaded: downloadedBytes,
- total: totalBytes,
- percent: totalBytes ? (downloadedBytes / totalBytes * 100) : 0
- });
- });
- // 管道流到文件
- downloadStream.pipe(fileWriterStream);
- // 处理下载完成
- fileWriterStream.on('finish', () => {
- console.log(`下载完成: ${fileName}`);
- resolve();
- });
- // 处理错误
- fileWriterStream.on('error', (error) => {
- console.log(`文件写入错误: ${fileName}`, error.message);
- reject(error);
- });
- downloadStream.on('error', (error) => {
- console.log(`下载错误: ${fileName}`, error.message);
- // 重试逻辑
- if (retryCount < this.options.retryCount) {
- console.log(`准备第 ${retryCount + 1} 次重试: ${fileName}`);
- setTimeout(() => {
- this.downloadFile(task, retryCount + 1)
- .then(resolve)
- .catch(reject);
- }, this.options.retryDelay);
- } else {
- reject(error);
- }
- });
- });
- } catch (error) {
- console.log(`下载过程出错: ${fileName}`, error.message);
- // 重试逻辑
- if (retryCount < this.options.retryCount) {
- console.log(`准备第 ${retryCount + 1} 次重试: ${fileName}`);
- await new Promise(resolve => setTimeout(resolve, this.options.retryDelay));
- return this.downloadFile(task, retryCount + 1);
- } else {
- throw error;
- }
- }
- }
- // 处理单个任务
- async processTask(task) {
- const { fileName, filePath } = task;
- // 标记任务开始
- this.activeTasks.set(fileName, task);
- this.emit(this.EVENTS.TASK_START, task);
- console.log(`开始处理任务: ${fileName}`);
- try {
- // 检查文件是否已存在且有效
- const isValid = await this.validateFile(filePath, fileName);
- if (isValid) {
- // 文件已存在且有效,跳过下载
- this.emit(this.EVENTS.TASK_SKIP, task, '文件已存在且有效');
- // console.log(`跳过下载: ${fileName} (文件已存在且有效)`);
- this.completedTasks.push(task);
- return;
- }
- // 文件不存在或无效,开始下载
- console.log(`开始下载: ${fileName}`);
- await this.downloadFile(task);
- // 下载完成后再次验证文件
- const postDownloadValid = await this.validateFile(filePath, fileName);
- if (postDownloadValid) {
- // 文件下载并验证成功
- this.emit(this.EVENTS.TASK_COMPLETE, task);
- console.log(`任务完成: ${fileName}`);
- this.completedTasks.push(task);
- } else {
- // 文件下载后验证失败
- throw new Error('文件下载后验证失败');
- }
- } catch (error) {
- // 任务失败
- this.emit(this.EVENTS.TASK_ERROR, task, error);
- console.log(`任务失败: ${fileName}`, error.message);
- this.failedTasks.push({ task, error });
- } finally {
- // 从活跃任务中移除
- this.activeTasks.delete(fileName);
- }
- }
- // 添加任务到队列
- addTasks(tasks) {
- if (!Array.isArray(tasks)) {
- tasks = [tasks];
- }
- tasks.forEach(task => {
- if (!task.url || !task.fileName || !task.filePath) {
- console.log('任务格式错误,缺少必要字段', task);
- return;
- }
- this.pendingTasks.push(task);
- this.emit(this.EVENTS.TASK_ADDED, task);
- console.log(`任务已添加到队列: ${task.fileName}`);
- });
- // 如果下载器处于运行状态,自动开始处理新任务
- if (this.status === 'downloading') {
- this.processQueue();
- }
- return this.pendingTasks.length;
- }
- // 处理队列中的任务
- processQueue() {
- if (this.status !== 'downloading') {
- return;
- }
- // 将待处理任务添加到队列
- while (this.pendingTasks.length > 0 &&
- this.queue.size < this.options.concurrency * 2) {
- const task = this.pendingTasks.shift();
- this.queue.add(() => this.processTask(task))
- .then(() => {
- // 检查是否所有任务都已完成
- if (this.pendingTasks.length === 0 &&
- this.queue.size === 0 &&
- this.queue.pending === 0) {
- this.setStatus('idle');
- this.emit(this.EVENTS.QUEUE_EMPTY);
- this.emit(this.EVENTS.ALL_COMPLETE, {
- completed: this.completedTasks,
- failed: this.failedTasks
- });
- console.log('所有任务处理完成');
- console.log(`成功: ${this.completedTasks.length}, 失败: ${this.failedTasks.length}`);
- }
- })
- .catch(error => {
- console.log('任务处理出错', error.message);
- });
- }
- }
- // 开始下载
- start() {
- if (this.status === 'downloading') {
- console.log('下载器已经在运行中');
- return;
- }
- this.setStatus('downloading');
- this.queue.start();
- this.processQueue();
- console.log('下载器已启动');
- }
- // 暂停下载
- pause() {
- if (this.status !== 'downloading') {
- console.log('下载器未运行,无法暂停');
- return;
- }
- this.setStatus('paused');
- this.queue.pause();
- console.log('下载器已暂停');
- }
- // 恢复下载
- resume() {
- if (this.status !== 'paused') {
- console.log('下载器未暂停,无法恢复');
- return;
- }
- this.setStatus('downloading');
- this.queue.start();
- this.processQueue();
- console.log('下载器已恢复');
- }
- // 停止下载
- stop() {
- this.setStatus('stopped');
- this.queue.clear();
- this.pendingTasks = [];
- console.log('下载器已停止,所有待处理任务已清除');
- }
- // 获取状态信息
- getStatus() {
- return {
- status: this.status,
- pending: this.pendingTasks.length,
- active: this.activeTasks.size,
- completed: this.completedTasks.length,
- failed: this.failedTasks.length
- };
- }
- }
- // 使用示例
- async function main() {
- // 创建下载器实例
- const downloader = new grb2DownloadManager({
- concurrency: 5, // 减少并发数以避免服务器压力
- retryCount: 3,
- retryDelay: 5000,
- wgrib2Path: 'wgrib2' // 确保系统已安装wgrib2
- });
- // 监听事件
- downloader.on(downloader.EVENTS.STATUS_CHANGE, (status) => {
- console.log('状态变更:', status);
- });
- downloader.on(downloader.EVENTS.TASK_ADDED, (task) => {
- console.log('任务已添加:', task.fileName);
- });
- downloader.on(downloader.EVENTS.TASK_START, (task) => {
- console.log('任务开始:', task.fileName);
- });
- downloader.on(downloader.EVENTS.TASK_PROGRESS, (progress) => {
- console.log(`下载进度: ${progress.task.fileName} - ${progress.percent.toFixed(2)}%`);
- });
- downloader.on(downloader.EVENTS.TASK_COMPLETE, (task) => {
- console.log('任务完成:', task.fileName);
- });
- downloader.on(downloader.EVENTS.TASK_SKIP, (task, reason) => {
- console.log('任务跳过:', task.fileName, '-', reason);
- });
- downloader.on(downloader.EVENTS.TASK_ERROR, (task, error) => {
- console.log('任务错误:', task.fileName, '-', error.message);
- });
- downloader.on(downloader.EVENTS.QUEUE_EMPTY, () => {
- console.log('队列已空');
- });
- downloader.on(downloader.EVENTS.ALL_COMPLETE, (result) => {
- console.log('所有任务完成');
- console.log('成功:', result.completed.length);
- console.log('失败:', result.failed.length);
- });
- // 添加任务
- const tasks = [
- {
- "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f000&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
- "filePath": "./model/grb2/gfs.20250901/18/atmos",
- "fileName": "gfs.t18z.pgrb2full.0p50.f000"
- },
- {
- "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f003&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
- "filePath": "./model/grb2/gfs.20250901/18/atmos",
- "fileName": "gfs.t18z.pgrb2full.0p50.f003"
- },
- {
- "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f006&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
- "filePath": "./model/grb2/gfs.20250901/18/atmos",
- "fileName": "gfs.t18z.pgrb2full.0p50.f006"
- }
- ];
- console.log('添加任务到下载器');
- downloader.addTasks(tasks);
- // 开始下载
- console.log('开始下载任务');
- downloader.start();
- // 模拟中途添加新任务
- setTimeout(() => {
- const newTasks = [
- {
- "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f009&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
- "filePath": "./model/grb2/gfs.20250901/18/atmos",
- "fileName": "gfs.t18z.pgrb2full.0p50.f009"
- }
- ];
- console.log('添加新任务到下载器');
- downloader.addTasks(newTasks);
- }, 10000);
- // 监控下载状态
- setInterval(() => {
- const status = downloader.getStatus();
- console.log('当前状态:', status);
- }, 5000);
- }
- // 运行示例
- if (require.main === module) {
- main().catch(console.error);
- }
- module.exports = { grb2DownloadManager };
|