downloadGrib2.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468
  1. const { EventEmitter } = require('eventemitter3');
  2. const PQueue = require('p-queue').default;
  3. const got = require('./got').default;
  4. const fs = require('fs-extra');
  5. const path = require('path');
  6. const { exec } = require('child_process');
  7. const { promisify } = require('util');
  8. const execAsync = promisify(exec);
  9. class grb2DownloadManager extends EventEmitter {
  10. constructor(options = {}) {
  11. super();
  12. // 配置默认选项
  13. this.options = {
  14. concurrency: options.concurrency || 10,
  15. retryCount: options.retryCount || 3,
  16. retryDelay: options.retryDelay || 5000,
  17. wgrib2Path: options.wgrib2Path || path.join(__dirname, "../wgrib2/wgrib2.exe"),
  18. ...options
  19. };
  20. // 下载队列
  21. this.queue = new PQueue({
  22. concurrency: this.options.concurrency,
  23. autoStart: false
  24. });
  25. // 状态管理
  26. this.status = 'idle'; // idle, downloading, paused, stopped
  27. this.pendingTasks = [];
  28. this.activeTasks = new Map();
  29. this.completedTasks = [];
  30. this.failedTasks = [];
  31. // 事件类型
  32. this.EVENTS = {
  33. STATUS_CHANGE: 'statusChange',
  34. TASK_ADDED: 'taskAdded',
  35. TASK_START: 'taskStart',
  36. TASK_PROGRESS: 'taskProgress',
  37. TASK_COMPLETE: 'taskComplete',
  38. TASK_SKIP: 'taskSkip',
  39. TASK_ERROR: 'taskError',
  40. QUEUE_EMPTY: 'queueEmpty',
  41. ALL_COMPLETE: 'allComplete'
  42. };
  43. // 绑定方法
  44. this.start = this.start.bind(this);
  45. this.pause = this.pause.bind(this);
  46. this.resume = this.resume.bind(this);
  47. this.stop = this.stop.bind(this);
  48. this.addTasks = this.addTasks.bind(this);
  49. console.log('气象文件下载器初始化完成');
  50. console.log(`最大并发数: ${this.options.concurrency}`);
  51. }
  52. // 设置状态
  53. setStatus(status) {
  54. if (this.status !== status) {
  55. this.status = status;
  56. this.emit(this.EVENTS.STATUS_CHANGE, status);
  57. const statusMap = {
  58. 'idle': '空闲',
  59. 'downloading': '下载中',
  60. 'paused': '已暂停',
  61. 'stopped': '已停止'
  62. };
  63. console.log(`下载器状态变更为: ${statusMap[status] || status}`);
  64. }
  65. }
  66. // 检查文件是否存在并使用wgrib2验证
  67. async validateFile(filePath, fileName) {
  68. const fullPath = path.join(filePath, fileName);
  69. try {
  70. // 检查文件是否存在
  71. const exists = await fs.pathExists(fullPath);
  72. if (!exists) {
  73. console.log(`文件不存在: ${fullPath}`);
  74. return false;
  75. }
  76. // 检查文件大小,如果文件大小为0,则直接返回无效
  77. const stats = await fs.stat(fullPath);
  78. if (stats.size === 0) {
  79. console.log(`文件为空: ${fileName}`);
  80. return false;
  81. }
  82. // 使用wgrib2验证文件
  83. console.log(`正在验证文件: ${fileName}`);
  84. const { stdout, stderr } = await execAsync(`${this.options.wgrib2Path} "${fullPath}"`);
  85. // 如果wgrib2有输出,通常表示文件有效
  86. if (stdout && stdout.length > 0) {
  87. console.log(`文件验证成功: ${fileName}`);
  88. return true;
  89. } else {
  90. console.log(`文件验证失败: ${fileName}`);
  91. return false;
  92. }
  93. } catch (error) {
  94. console.log(`文件验证出错: ${fileName}`, error.message);
  95. return false;
  96. }
  97. }
  98. // 下载文件
  99. async downloadFile(task, retryCount = 0) {
  100. const { url, filePath, fileName } = task;
  101. const fullPath = path.join(filePath, fileName);
  102. // 确保目录存在
  103. await fs.ensureDir(filePath);
  104. try {
  105. console.log(`开始下载: ${fileName}`);
  106. const downloadStream = got.stream(url);
  107. // 使用got的流式下载API
  108. // const downloadStream = got(url, {
  109. // isStream: true,
  110. // timeout: 300000, // 5分钟超时
  111. // retry: 0 // 禁用内置重试,因为我们自己实现了重试逻辑
  112. // });
  113. const fileWriterStream = fs.createWriteStream(fullPath);
  114. // 返回Promise以便等待下载完成
  115. return new Promise((resolve, reject) => {
  116. let downloadedBytes = 0;
  117. let totalBytes = 0;
  118. // 监听下载进度
  119. downloadStream.on('downloadProgress', (progress) => {
  120. downloadedBytes = progress.transferred;
  121. totalBytes = progress.total || 0;
  122. this.emit(this.EVENTS.TASK_PROGRESS, {
  123. task,
  124. downloaded: downloadedBytes,
  125. total: totalBytes,
  126. percent: totalBytes ? (downloadedBytes / totalBytes * 100) : 0
  127. });
  128. });
  129. // 管道流到文件
  130. downloadStream.pipe(fileWriterStream);
  131. // 处理下载完成
  132. fileWriterStream.on('finish', () => {
  133. console.log(`下载完成: ${fileName}`);
  134. resolve();
  135. });
  136. // 处理错误
  137. fileWriterStream.on('error', (error) => {
  138. console.log(`文件写入错误: ${fileName}`, error.message);
  139. reject(error);
  140. });
  141. downloadStream.on('error', (error) => {
  142. console.log(`下载错误: ${fileName}`, error.message);
  143. // 重试逻辑
  144. if (retryCount < this.options.retryCount) {
  145. console.log(`准备第 ${retryCount + 1} 次重试: ${fileName}`);
  146. setTimeout(() => {
  147. this.downloadFile(task, retryCount + 1)
  148. .then(resolve)
  149. .catch(reject);
  150. }, this.options.retryDelay);
  151. } else {
  152. reject(error);
  153. }
  154. });
  155. });
  156. } catch (error) {
  157. console.log(`下载过程出错: ${fileName}`, error.message);
  158. // 重试逻辑
  159. if (retryCount < this.options.retryCount) {
  160. console.log(`准备第 ${retryCount + 1} 次重试: ${fileName}`);
  161. await new Promise(resolve => setTimeout(resolve, this.options.retryDelay));
  162. return this.downloadFile(task, retryCount + 1);
  163. } else {
  164. throw error;
  165. }
  166. }
  167. }
  168. // 处理单个任务
  169. async processTask(task) {
  170. const { fileName, filePath } = task;
  171. // 标记任务开始
  172. this.activeTasks.set(fileName, task);
  173. this.emit(this.EVENTS.TASK_START, task);
  174. console.log(`开始处理任务: ${fileName}`);
  175. try {
  176. // 检查文件是否已存在且有效
  177. const isValid = await this.validateFile(filePath, fileName);
  178. if (isValid) {
  179. // 文件已存在且有效,跳过下载
  180. this.emit(this.EVENTS.TASK_SKIP, task, '文件已存在且有效');
  181. // console.log(`跳过下载: ${fileName} (文件已存在且有效)`);
  182. this.completedTasks.push(task);
  183. return;
  184. }
  185. // 文件不存在或无效,开始下载
  186. console.log(`开始下载: ${fileName}`);
  187. await this.downloadFile(task);
  188. // 下载完成后再次验证文件
  189. const postDownloadValid = await this.validateFile(filePath, fileName);
  190. if (postDownloadValid) {
  191. // 文件下载并验证成功
  192. this.emit(this.EVENTS.TASK_COMPLETE, task);
  193. console.log(`任务完成: ${fileName}`);
  194. this.completedTasks.push(task);
  195. } else {
  196. // 文件下载后验证失败
  197. throw new Error('文件下载后验证失败');
  198. }
  199. } catch (error) {
  200. // 任务失败
  201. this.emit(this.EVENTS.TASK_ERROR, task, error);
  202. console.log(`任务失败: ${fileName}`, error.message);
  203. this.failedTasks.push({ task, error });
  204. } finally {
  205. // 从活跃任务中移除
  206. this.activeTasks.delete(fileName);
  207. }
  208. }
  209. // 添加任务到队列
  210. addTasks(tasks) {
  211. if (!Array.isArray(tasks)) {
  212. tasks = [tasks];
  213. }
  214. tasks.forEach(task => {
  215. if (!task.url || !task.fileName || !task.filePath) {
  216. console.log('任务格式错误,缺少必要字段', task);
  217. return;
  218. }
  219. this.pendingTasks.push(task);
  220. this.emit(this.EVENTS.TASK_ADDED, task);
  221. console.log(`任务已添加到队列: ${task.fileName}`);
  222. });
  223. // 如果下载器处于运行状态,自动开始处理新任务
  224. if (this.status === 'downloading') {
  225. this.processQueue();
  226. }
  227. return this.pendingTasks.length;
  228. }
  229. // 处理队列中的任务
  230. processQueue() {
  231. if (this.status !== 'downloading') {
  232. return;
  233. }
  234. // 将待处理任务添加到队列
  235. while (this.pendingTasks.length > 0 &&
  236. this.queue.size < this.options.concurrency * 2) {
  237. const task = this.pendingTasks.shift();
  238. this.queue.add(() => this.processTask(task))
  239. .then(() => {
  240. // 检查是否所有任务都已完成
  241. if (this.pendingTasks.length === 0 &&
  242. this.queue.size === 0 &&
  243. this.queue.pending === 0) {
  244. this.setStatus('idle');
  245. this.emit(this.EVENTS.QUEUE_EMPTY);
  246. this.emit(this.EVENTS.ALL_COMPLETE, {
  247. completed: this.completedTasks,
  248. failed: this.failedTasks
  249. });
  250. console.log('所有任务处理完成');
  251. console.log(`成功: ${this.completedTasks.length}, 失败: ${this.failedTasks.length}`);
  252. }
  253. })
  254. .catch(error => {
  255. console.log('任务处理出错', error.message);
  256. });
  257. }
  258. }
  259. // 开始下载
  260. start() {
  261. if (this.status === 'downloading') {
  262. console.log('下载器已经在运行中');
  263. return;
  264. }
  265. this.setStatus('downloading');
  266. this.queue.start();
  267. this.processQueue();
  268. console.log('下载器已启动');
  269. }
  270. // 暂停下载
  271. pause() {
  272. if (this.status !== 'downloading') {
  273. console.log('下载器未运行,无法暂停');
  274. return;
  275. }
  276. this.setStatus('paused');
  277. this.queue.pause();
  278. console.log('下载器已暂停');
  279. }
  280. // 恢复下载
  281. resume() {
  282. if (this.status !== 'paused') {
  283. console.log('下载器未暂停,无法恢复');
  284. return;
  285. }
  286. this.setStatus('downloading');
  287. this.queue.start();
  288. this.processQueue();
  289. console.log('下载器已恢复');
  290. }
  291. // 停止下载
  292. stop() {
  293. this.setStatus('stopped');
  294. this.queue.clear();
  295. this.pendingTasks = [];
  296. console.log('下载器已停止,所有待处理任务已清除');
  297. }
  298. // 获取状态信息
  299. getStatus() {
  300. return {
  301. status: this.status,
  302. pending: this.pendingTasks.length,
  303. active: this.activeTasks.size,
  304. completed: this.completedTasks.length,
  305. failed: this.failedTasks.length
  306. };
  307. }
  308. }
  309. // 使用示例
  310. async function main() {
  311. // 创建下载器实例
  312. const downloader = new grb2DownloadManager({
  313. concurrency: 5, // 减少并发数以避免服务器压力
  314. retryCount: 3,
  315. retryDelay: 5000,
  316. wgrib2Path: 'wgrib2' // 确保系统已安装wgrib2
  317. });
  318. // 监听事件
  319. downloader.on(downloader.EVENTS.STATUS_CHANGE, (status) => {
  320. console.log('状态变更:', status);
  321. });
  322. downloader.on(downloader.EVENTS.TASK_ADDED, (task) => {
  323. console.log('任务已添加:', task.fileName);
  324. });
  325. downloader.on(downloader.EVENTS.TASK_START, (task) => {
  326. console.log('任务开始:', task.fileName);
  327. });
  328. downloader.on(downloader.EVENTS.TASK_PROGRESS, (progress) => {
  329. console.log(`下载进度: ${progress.task.fileName} - ${progress.percent.toFixed(2)}%`);
  330. });
  331. downloader.on(downloader.EVENTS.TASK_COMPLETE, (task) => {
  332. console.log('任务完成:', task.fileName);
  333. });
  334. downloader.on(downloader.EVENTS.TASK_SKIP, (task, reason) => {
  335. console.log('任务跳过:', task.fileName, '-', reason);
  336. });
  337. downloader.on(downloader.EVENTS.TASK_ERROR, (task, error) => {
  338. console.log('任务错误:', task.fileName, '-', error.message);
  339. });
  340. downloader.on(downloader.EVENTS.QUEUE_EMPTY, () => {
  341. console.log('队列已空');
  342. });
  343. downloader.on(downloader.EVENTS.ALL_COMPLETE, (result) => {
  344. console.log('所有任务完成');
  345. console.log('成功:', result.completed.length);
  346. console.log('失败:', result.failed.length);
  347. });
  348. // 添加任务
  349. const tasks = [
  350. {
  351. "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f000&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
  352. "filePath": "./model/grb2/gfs.20250901/18/atmos",
  353. "fileName": "gfs.t18z.pgrb2full.0p50.f000"
  354. },
  355. {
  356. "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f003&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
  357. "filePath": "./model/grb2/gfs.20250901/18/atmos",
  358. "fileName": "gfs.t18z.pgrb2full.0p50.f003"
  359. },
  360. {
  361. "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f006&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
  362. "filePath": "./model/grb2/gfs.20250901/18/atmos",
  363. "fileName": "gfs.t18z.pgrb2full.0p50.f006"
  364. }
  365. ];
  366. console.log('添加任务到下载器');
  367. downloader.addTasks(tasks);
  368. // 开始下载
  369. console.log('开始下载任务');
  370. downloader.start();
  371. // 模拟中途添加新任务
  372. setTimeout(() => {
  373. const newTasks = [
  374. {
  375. "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f009&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
  376. "filePath": "./model/grb2/gfs.20250901/18/atmos",
  377. "fileName": "gfs.t18z.pgrb2full.0p50.f009"
  378. }
  379. ];
  380. console.log('添加新任务到下载器');
  381. downloader.addTasks(newTasks);
  382. }, 10000);
  383. // 监控下载状态
  384. setInterval(() => {
  385. const status = downloader.getStatus();
  386. console.log('当前状态:', status);
  387. }, 5000);
  388. }
  389. // 运行示例
  390. if (require.main === module) {
  391. main().catch(console.error);
  392. }
  393. module.exports = { grb2DownloadManager };