downloadGrib2_旧版本.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. const fs = require('fs');
  2. const path = require('path');
  3. const https = require('https');
  4. const { exec } = require('child_process');
  5. const { EventEmitter } = require('events');
  6. // 批量下载 grb2 气象文件
  7. // 调用方法:
  8. // 在需要使用的地方引入
  9. // const { grb2DownloadManager } = require("手动引导至/schema/downloadGrb2");
  10. // 在需要使用的地方创建实例
  11. // 创建下载管理器实例
  12. // const manager = new grb2DownloadManager()
  13. // 获取下载工具当前状态
  14. // manager.getStatus();
  15. // 相关监听事件钩子
  16. // manager.on('queueUpdated', (status) => {
  17. // console.log(`队列更新: 还有 ${status.queueLength} 个任务等待下载`);
  18. // });
  19. // manager.on('downloadStarted', (status) => {
  20. // console.log('下载开始');
  21. // });
  22. // manager.on('taskStarted', (status) => {
  23. // console.log(`开始下载: ${status.currentTask.fileName}`);
  24. // });
  25. // manager.on('taskProgress', ({ progress, currentTask }) => {
  26. // console.log(`下载进度: ${currentTask.fileName} - ${progress}%`);
  27. // });
  28. // manager.on('taskCompleted', (status) => {
  29. // console.log(`任务完成: 成功 ${status.successfulDownloads}/${status.totalTasks}`);
  30. // });
  31. // manager.on('taskFailed', ({ task, error }) => {
  32. // console.log(`任务失败: ${task.fileName} - ${error}`);
  33. // });
  34. // manager.on('downloadCompleted', (status) => {
  35. // console.log(`所有下载完成: 成功 ${status.successfulDownloads}, 失败: ${status.failedDownloads}`);
  36. // });
  37. // manager.on('downloadStopped', () => {
  38. // console.log('下载已停止');
  39. // });
  40. class grb2DownloadManager extends EventEmitter {
  41. constructor(options = {}) {
  42. super();
  43. this.queue = [];
  44. this.isDownloading = false;
  45. this.currentDownload = null;
  46. this.status = {
  47. state: 'idle', // 'idle', 'downloading', 'completed', 'stopped'
  48. totalTasks: 0,
  49. completedTasks: 0,
  50. successfulDownloads: 0,
  51. failedDownloads: 0,
  52. currentTask: null,
  53. progress: 0,
  54. queueLength: 0
  55. };
  56. this.retryAttempts = options.retryAttempts || 3;
  57. this.wgrib2Path = options.wgrib2Path || path.join(__dirname, "../wgrib2/wgrib2.exe");
  58. this.downloadInterval = null;
  59. }
  60. // 添加下载任务
  61. async addTasks(tasks) {
  62. if (!Array.isArray(tasks)) {
  63. tasks = [tasks];
  64. }
  65. // 检查任务是否已存在或已完成
  66. const newTasks = [];
  67. for (const task of tasks) {
  68. const isInQueue = this.queue.some(q => q.url === task.url);
  69. const isCompleted = await this.isTaskCompleted(task);
  70. if (!isInQueue && !isCompleted) {
  71. newTasks.push(task);
  72. }
  73. }
  74. if (newTasks.length > 0) {
  75. this.queue.push(...newTasks);
  76. this.status.totalTasks += newTasks.length;
  77. this.status.queueLength = this.queue.length;
  78. this.emit('queueUpdated', this.status);
  79. if (!this.isDownloading && this.status.state !== 'stopped') {
  80. this.startDownload();
  81. }
  82. return {
  83. added: newTasks.length,
  84. message: `添加了 ${newTasks.length} 个新任务到下载队列`
  85. };
  86. } else {
  87. return {
  88. added: 0,
  89. message: '没有新任务可添加,所有任务已完成或已在队列中'
  90. };
  91. }
  92. }
  93. // 检查任务是否已完成
  94. async isTaskCompleted(task) {
  95. const filePath = path.join(task.filePath, task.fileName);
  96. if (!fs.existsSync(filePath)) {
  97. return false;
  98. }
  99. try {
  100. await this.verifyFileWithWgrib2(filePath);
  101. return true;
  102. } catch (error) {
  103. console.log(error.message);
  104. return false;
  105. }
  106. }
  107. // 使用wgrib2验证文件
  108. verifyFileWithWgrib2(filePath) {
  109. return new Promise((resolve, reject) => {
  110. exec(`${this.wgrib2Path} "${filePath}"`, (error, stdout, stderr) => {
  111. if (error) {
  112. reject(new Error(`文件验证失败: ${error.message}`));
  113. } else if (stderr && stderr.trim() !== '') {
  114. reject(new Error(`文件验证失败: ${stderr}`));
  115. } else {
  116. resolve();
  117. }
  118. });
  119. });
  120. }
  121. // 开始下载
  122. startDownload() {
  123. if (this.isDownloading) {
  124. return { message: '当前已有下载任务在进行中,新任务已加入队列' };
  125. }
  126. this.isDownloading = true;
  127. this.status.state = 'downloading';
  128. // 使用setImmediate确保异步操作顺序正确
  129. setImmediate(() => {
  130. this.processNextTask();
  131. });
  132. this.emit('downloadStarted', this.status);
  133. return { message: '下载已开始' };
  134. }
  135. // 处理下一个任务
  136. async processNextTask() {
  137. // 检查是否被停止
  138. if (this.status.state === 'stopped') {
  139. this.isDownloading = false;
  140. return;
  141. }
  142. if (this.queue.length === 0) {
  143. this.isDownloading = false;
  144. this.status.state = 'completed';
  145. this.emit('downloadCompleted', this.status);
  146. return;
  147. }
  148. const task = this.queue.shift();
  149. this.status.queueLength = this.queue.length;
  150. this.status.currentTask = {
  151. ...task,
  152. retryCount: 0
  153. };
  154. this.emit('taskStarted', this.status);
  155. await this.downloadTask(task);
  156. // 处理完成后继续下一个任务
  157. if (this.isDownloading) {
  158. setImmediate(() => {
  159. this.processNextTask();
  160. });
  161. }
  162. }
  163. // 下载单个任务
  164. async downloadTask(task, retryCount = 0) {
  165. const fileDir = task.filePath;
  166. const filePath = path.join(fileDir, task.fileName);
  167. // 确保目录存在
  168. if (!fs.existsSync(fileDir)) {
  169. fs.mkdirSync(fileDir, { recursive: true });
  170. }
  171. // 检查文件是否已存在且有效
  172. try {
  173. if (await this.isTaskCompleted(task)) {
  174. this.status.completedTasks++;
  175. this.status.successfulDownloads++;
  176. this.emit('taskSkipped', { ...this.status, task, reason: '文件已存在且有效' });
  177. return;
  178. }
  179. } catch (error) {
  180. console.log(`文件检查失败: ${error.message}`);
  181. }
  182. this.status.currentTask.retryCount = retryCount;
  183. this.emit('taskProgress', { ...this.status, progress: 0 });
  184. try {
  185. await this.downloadFile(task.url, filePath, (progress) => {
  186. this.status.progress = progress;
  187. this.emit('taskProgress', { ...this.status, progress });
  188. });
  189. // 验证下载的文件
  190. await this.verifyFileWithWgrib2(filePath);
  191. this.status.completedTasks++;
  192. this.status.successfulDownloads++;
  193. this.emit('taskCompleted', this.status);
  194. } catch (error) {
  195. console.error(`下载失败: ${error.message}`);
  196. if (retryCount < this.retryAttempts) {
  197. console.log(`尝试重新下载 (${retryCount + 1}/${this.retryAttempts})`);
  198. this.emit('taskRetry', {
  199. ...this.status,
  200. task,
  201. retryCount: retryCount + 1,
  202. maxRetries: this.retryAttempts
  203. });
  204. // 等待一段时间后重试
  205. await new Promise(resolve => setTimeout(resolve, 5000)); // 5秒后重试
  206. return this.downloadTask(task, retryCount + 1);
  207. } else {
  208. this.status.completedTasks++;
  209. this.status.failedDownloads++;
  210. this.emit('taskFailed', {
  211. ...this.status,
  212. task,
  213. error: error.message
  214. });
  215. }
  216. }
  217. }
  218. // 实际下载文件
  219. downloadFile(url, filePath, onProgress) {
  220. return new Promise((resolve, reject) => {
  221. const file = fs.createWriteStream(filePath);
  222. let receivedBytes = 0;
  223. let totalBytes = 0;
  224. const request = https.get(url, (response) => {
  225. if (response.statusCode !== 200) {
  226. reject(new Error(`服务器返回状态码: ${response.statusCode}`));
  227. return;
  228. }
  229. totalBytes = parseInt(response.headers['content-length'], 10);
  230. this.currentDownload = { request, file };
  231. response.pipe(file);
  232. response.on('data', (chunk) => {
  233. receivedBytes += chunk.length;
  234. const progress = totalBytes ? (receivedBytes / totalBytes * 100) : 0;
  235. onProgress(Math.round(progress));
  236. });
  237. file.on('finish', () => {
  238. file.close();
  239. this.currentDownload = null;
  240. resolve();
  241. });
  242. });
  243. request.on('error', (err) => {
  244. if (fs.existsSync(filePath)) {
  245. fs.unlink(filePath, () => { });
  246. }
  247. reject(err);
  248. });
  249. file.on('error', (err) => {
  250. if (fs.existsSync(filePath)) {
  251. fs.unlink(filePath, () => { });
  252. }
  253. reject(err);
  254. });
  255. // 设置超时
  256. request.setTimeout(30000, () => {
  257. request.destroy();
  258. if (fs.existsSync(filePath)) {
  259. fs.unlink(filePath, () => { });
  260. }
  261. reject(new Error('请求超时'));
  262. });
  263. request.end();
  264. });
  265. }
  266. // 停止当前下载
  267. stopDownload() {
  268. if (this.currentDownload) {
  269. this.currentDownload.request.destroy();
  270. this.currentDownload.file.destroy();
  271. this.currentDownload = null;
  272. }
  273. this.isDownloading = false;
  274. this.status.state = 'stopped';
  275. this.emit('downloadStopped', this.status);
  276. return { message: '下载已停止' };
  277. }
  278. // 重置下载状态
  279. reset() {
  280. this.stopDownload();
  281. this.queue = [];
  282. this.status = {
  283. state: 'idle',
  284. totalTasks: 0,
  285. completedTasks: 0,
  286. successfulDownloads: 0,
  287. failedDownloads: 0,
  288. currentTask: null,
  289. progress: 0,
  290. queueLength: 0
  291. };
  292. this.emit('reset', this.status);
  293. return { message: '下载管理器已重置' };
  294. }
  295. // 获取当前状态
  296. getStatus() {
  297. return this.status;
  298. }
  299. }
  300. // 使用示例
  301. async function main() {
  302. const downloadManager = new DownloadManager({
  303. retryAttempts: 3,
  304. wgrib2Path: 'wgrib2' // 确保系统已安装wgrib2
  305. });
  306. // 监听事件
  307. downloadManager.on('queueUpdated', (status) => {
  308. console.log(`队列更新: 还有 ${status.queueLength} 个任务等待下载`);
  309. });
  310. downloadManager.on('downloadStarted', (status) => {
  311. console.log('下载开始');
  312. });
  313. downloadManager.on('taskStarted', (status) => {
  314. console.log(`开始下载: ${status.currentTask.fileName}`);
  315. });
  316. downloadManager.on('taskProgress', ({ progress, currentTask }) => {
  317. console.log(`下载进度: ${currentTask.fileName} - ${progress}%`);
  318. });
  319. downloadManager.on('taskCompleted', (status) => {
  320. console.log(`任务完成: 成功 ${status.successfulDownloads}/${status.totalTasks}`);
  321. });
  322. downloadManager.on('taskFailed', ({ task, error }) => {
  323. console.log(`任务失败: ${task.fileName} - ${error}`);
  324. });
  325. downloadManager.on('downloadCompleted', (status) => {
  326. console.log(`所有下载完成: 成功 ${status.successfulDownloads}, 失败: ${status.failedDownloads}`);
  327. });
  328. downloadManager.on('downloadStopped', () => {
  329. console.log('下载已停止');
  330. });
  331. // 添加下载任务
  332. const tasks = [
  333. {
  334. "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f000&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
  335. "filePath": "./model/grb2/gfs.20250901/18/atmos",
  336. "fileName": "gfs.t18z.pgrb2full.0p50.f000"
  337. },
  338. {
  339. "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f003&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
  340. "filePath": "./model/grb2/gfs.20250901/18/atmos",
  341. "fileName": "gfs.t18z.pgrb2full.0p50.f003"
  342. },
  343. {
  344. "url": "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p50.pl?dir=%2Fgfs.20250901%2F18/atmos&file=gfs.t18z.pgrb2full.0p50.f006&all_lev=on&all_var=on&leftlon=0&rightlon=360&toplat=90&bottomlat=-90",
  345. "filePath": "./model/grb2/gfs.20250901/18/atmos",
  346. "fileName": "gfs.t18z.pgrb2full.0p50.f006"
  347. }
  348. ];
  349. // 添加任务到下载队列
  350. console.log((await downloadManager.addTasks(tasks)).message);
  351. // 模拟中途停止下载
  352. setTimeout(async () => {
  353. console.log(downloadManager.stopDownload().message);
  354. // 模拟重新开始下载
  355. setTimeout(() => {
  356. console.log(downloadManager.startDownload().message);
  357. }, 2000);
  358. }, 5000);
  359. }
  360. // 如果直接运行此脚本,则执行示例
  361. if (require.main === module) {
  362. main().catch(console.error);
  363. }
  364. module.exports = { grb2DownloadManager };