Просмотр исходного кода

数据适配器将实时数据添加至redis

wangb 1 день назад
Родитель
Сommit
3951617a6b

+ 4 - 0
data-adapter/pom.xml

@@ -91,6 +91,10 @@
             <artifactId>taos-jdbcdriver</artifactId>
             <version>3.3.2</version>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-redis</artifactId>
+        </dependency>
 
     </dependencies>
     <dependencyManagement>

+ 2 - 0
data-adapter/src/main/java/com/gyee/dataadapter/DataAdapterApp.java

@@ -2,8 +2,10 @@ package com.gyee.dataadapter;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
 @SpringBootApplication
+@EnableScheduling
 public class DataAdapterApp {
 
     public static void main(String[] args) {

+ 29 - 1
data-adapter/src/main/java/com/gyee/dataadapter/config/DataConverterManager2.java

@@ -1,10 +1,11 @@
 package com.gyee.dataadapter.config;
 
-import cn.hutool.core.date.DateUtil;
 import com.gyee.dataadapter.cache.MqttCache;
 import com.gyee.dataadapter.entity.PointData;
 import lombok.Data;
+import org.springframework.data.redis.core.RedisTemplate;
 
+import javax.annotation.Resource;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -15,6 +16,10 @@ import java.util.Date;
  */
 @Data
 public class DataConverterManager2 {
+
+    @Resource
+    private RedisTemplate<String, Object> redisTemplate;
+
     /**
      * 序号
      */
@@ -102,6 +107,29 @@ public class DataConverterManager2 {
         MqttCache.subData2.put(pname, pi);
     }
 
+
+//    private void getPointInfo() {
+//        PointData pi = new PointData();
+//        index += 1;
+//        int pnameLen = getUshort();
+//        String pname = getString(pnameLen);
+//        pi.setTagName(pname);
+//        byte vtype = getByte();
+//        int vlen = 0;
+//        if (vtype == 0 || vtype == 254) {
+//            vlen = getUshort();
+//        }
+//        setValueByType(pi, vtype, vlen);
+//        pi.setTime(getDate());
+//        index += 4;
+//        index += 1;
+//
+//        MqttCache.subData2.put(pname, pi);
+//        // Redis 存储
+//        HashOperations<String, String, PointData> hashOps = redisTemplate.opsForHash();
+//        hashOps.put("mqtt", pname, pi);
+//    }
+
     /**
      * 获取实际值
      *

+ 40 - 0
data-adapter/src/main/java/com/gyee/dataadapter/config/RedisConfig.java

@@ -0,0 +1,40 @@
+package com.gyee.dataadapter.config;
+
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+@Configuration
+public class RedisConfig {
+
+    @Bean
+    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
+        RedisTemplate<String, Object> template = new RedisTemplate<>();
+        template.setConnectionFactory(factory);
+
+        // 使用 Jackson2JsonRedisSerializer 来序列化和反序列化 value
+        Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
+        ObjectMapper om = new ObjectMapper();
+        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
+        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
+        jacksonSerializer.setObjectMapper(om);
+
+        // 使用 StringRedisSerializer 来序列化和反序列化 key
+        StringRedisSerializer stringSerializer = new StringRedisSerializer();
+
+        template.setKeySerializer(stringSerializer);
+        template.setHashKeySerializer(stringSerializer);
+        template.setValueSerializer(jacksonSerializer);
+        template.setHashValueSerializer(jacksonSerializer);
+
+        template.afterPropertiesSet();
+        return template;
+    }
+}

+ 100 - 13
data-adapter/src/main/java/com/gyee/dataadapter/service/impl/AdapterServiceImpl.java

@@ -9,13 +9,16 @@ import com.gyee.dataadapter.cache.MqttCache;
 import com.gyee.dataadapter.dao.MyWebClient;
 import com.gyee.dataadapter.entity.*;
 import com.gyee.dataadapter.service.IAdapterService;
+import com.gyee.dataadapter.service.TsDataService;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.ssl.SSLContextBuilder;
-import org.springframework.http.*;
+import org.springframework.data.redis.core.HashOperations;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 import reactor.core.publisher.Mono;
@@ -36,6 +39,15 @@ public class AdapterServiceImpl implements IAdapterService {
     @Resource
     private MyWebClient myWebClient;
 
+    @Resource
+    private RedisTemplate redisTemplate;
+
+    @Resource
+    private IAdapterService adapterService;
+
+    @Resource
+    private TsDataService tsDataService;
+
 
     /**
      * 请求历史数据
@@ -71,22 +83,22 @@ public class AdapterServiceImpl implements IAdapterService {
         sb.append("/scadarest/api/SA/HistoryQuerySdb?start=").append(uStart).append("&end=").append(uEnd)
                 .append("&sampleType=").append(sampleType).append("&paths=").append(paths);
         sb.append("&sampleRate=");
-        if (sampleRate != null){
+        if (sampleRate != null) {
             sb.append(sampleRate);
-        }else {
-            long jg = (end.getTime()-start.getTime())/1000;
+        } else {
+            long jg = (end.getTime() - start.getTime()) / 1000;
             sb.append(jg);
         }
         sb.append("&pageIndex=");
-        if (StrUtil.isNotBlank(pageIndex)){
+        if (StrUtil.isNotBlank(pageIndex)) {
             sb.append(pageIndex);
-        }else {
+        } else {
             sb.append(1);
         }
         sb.append("&pageSize=");
-        if (StrUtil.isNotBlank(pageSize)){
+        if (StrUtil.isNotBlank(pageSize)) {
             sb.append(pageSize);
-        }else {
+        } else {
             sb.append(90000);
         }
         sb.append("&isDesc=");
@@ -133,7 +145,7 @@ public class AdapterServiceImpl implements IAdapterService {
         end = DateUtil.offsetHour(end, -8);
         String uStart = DateUtil.formatDateTime(start);
         String uEnd = DateUtil.formatDateTime(end);
-        long jg = (end.getTime()-start.getTime())/1000;
+        long jg = (end.getTime() - start.getTime()) / 1000;
 
         StringBuilder sb = new StringBuilder();
         sb.append("/scadarest/api/SA/HistoryQuerySdb?start=").append(uStart).append("&end=").append(uEnd)
@@ -228,18 +240,93 @@ public class AdapterServiceImpl implements IAdapterService {
      * @param tagNames 根据逗号隔开的测点
      * @return 测点的实时数据
      */
-    @Override
+//    @Override
+//    public Map<String, PointData> getLatestData2(List<String> tagNames) {
+//        Map<String, PointData> map = new HashMap<>();
+//        if (CollUtil.isEmpty(tagNames)) return map;
+//        for (String path : tagNames) {
+//            PointData pd = MqttCache.subData2.get(path);
+//            if(pd!=null)  map.put(path, pd);
+//        }
+//        return map;
+//    }
     public Map<String, PointData> getLatestData2(List<String> tagNames) {
         Map<String, PointData> map = new HashMap<>();
-        if (CollUtil.isEmpty(tagNames)) return map;
+        if (CollUtil.isEmpty(tagNames)) {
+            return map;
+        }
+
         for (String path : tagNames) {
-            PointData pd = MqttCache.subData2.get(path);
-            if(pd!=null)  map.put(path, pd);
+            String redisKey = "mqtt:point:" + path;   // 构造完整 Key
+
+            // 1. 优先从 Redis 获取实时数据(使用 ValueOperations)
+            PointData pd = (PointData) redisTemplate.opsForValue().get(redisKey);
+            if (pd != null) {
+                map.put(path, pd);
+                continue;
+            }
+
+            // 2. Redis 无数据,尝试从历史接口获取最新时刻的数据
+            PointData resultPd = null;
+            Date latestTime = new Date();
+            Map<String, PointData> historyResult = tsDataService.getHistorySection(latestTime, path);
+            if (historyResult != null && historyResult.containsKey(path)) {
+                resultPd = historyResult.get(path);
+            }
+
+            // 3. 仍无数据,查询最近一个月的原始历史数据,取最新一条
+            if (resultPd == null) {
+                Date endTime = new Date();
+                Date startTime = DateUtil.offsetMonth(endTime, -1);
+                List<PointData> rawHistoryList = tsDataService.getHistoryRaw(path, startTime, endTime);
+                if (CollUtil.isNotEmpty(rawHistoryList)) {
+                    resultPd = rawHistoryList.get(rawHistoryList.size() - 1);
+                }
+            }
+
+            // 4. 如果获取到了有效数据,则回填 Redis 并加入返回 Map
+            if (resultPd != null) {
+                redisTemplate.opsForValue().set(redisKey, resultPd);  // 回填 Redis
+                map.put(path, resultPd);
+            }
         }
+
         return map;
     }
 
 
+    /**
+     * 每 30 秒执行一次,将 MqttCache.subData2 全量写入 Redis
+     */
+    @Scheduled(fixedRate = 30000)  // 单位:毫秒,30000ms = 30s
+    public void scheduledWriteToRedis() {
+        writeRedis();
+    }
+
+    /**
+     * 将 MqttCache.subData2 中的所有数据写入 Redis
+     */
+    public void writeRedis() {
+        Map<String, PointData> pdMap = MqttCache.subData2;
+        if (pdMap == null || pdMap.isEmpty()) {
+            return;
+        }
+
+        // 遍历 Map,每个测点一个独立的 Redis Key
+        for (Map.Entry<String, PointData> entry : pdMap.entrySet()) {
+            String pname = entry.getKey();
+            PointData pi = entry.getValue();
+            String redisKey = "mqtt:point:" + pname;
+            try {
+                redisTemplate.opsForValue().set(redisKey, pi);
+            } catch (Exception e) {
+                // 记录日志,避免单条失败影响其他数据
+                System.err.println("写入 Redis 失败,Key: " + redisKey + ",错误:" + e.getMessage());
+            }
+        }
+    }
+
+
     public static RestTemplate createRestTemplateWithSSLDisabled() {
         try {
             // 创建一个不验证主机名的 SSLContext

+ 12 - 4
data-adapter/src/main/resources/application.yaml

@@ -7,10 +7,10 @@ publish:
     host: tcp://10.220.1.5:2883
     username: admin@scada.com
     password: Scada135}+?
-    cleansession: true
+    cleansession: false
     clientid: HUIANTOGUANGYAO
-    keepalive: 10
-    connectionTimeout: 3000
+    keepalive: 120
+    connectionTimeout: 10000
 
 
 spring:
@@ -22,6 +22,14 @@ spring:
     username: smartuser
     password: smart123!@#
 
+  redis:
+    host: 10.220.1.5
+    port: 6389
+    timeout: 100000
+    password: GYeeNXF#@!
+    database: 5
+
+
 subscribe:
   topic: scada/public/HUIANTOGUANGYAO/tag_values
   qos: 0
@@ -40,4 +48,4 @@ taoscz:
   #taos中ai测点的超级表名
   di_stable_name: pointdi
   #taos中di测点的超级表名
-  ai_stable_name: pointci
+  ai_stable_name: pointci

+ 1 - 0
runeconomy-xk/src/main/java/com/gyee/runeconomy/service/auto/impl/ProEconEquipmentInfoDayTopServiceImpl.java

@@ -578,6 +578,7 @@ public class ProEconEquipmentInfoDayTopServiceImpl extends ServiceImpl<ProEconEq
             i.setDayRank((int) rank);
             i.setDayLevel(level(tur.getScore(), DateUtil.dayOfMonth(i.getRecordDate()) - 1));
             i.setDayjfpl(NumberUtil.round(tur.getJfpl() * 100, 2).doubleValue());
+            i.setDaygl(NumberUtil.round(tur.getRfdl()/(tur.getYxMin()/60),2).doubleValue());
         });
         resultMap.put("data", ptls);
 

+ 2 - 1
runeconomy-xk/src/main/java/com/gyee/runeconomy/service/singleanalysis/SingleAnalysisService.java

@@ -1115,8 +1115,9 @@ public class SingleAnalysisService {
 
 
     public Map<String, List<SingleAnalysisVo>> SingleAnalysisListByWtIdDetiml(String wtId, Date beginDate, Date endDate) {
+        int day = DateUtil.dayOfMonth(DateUtil.date());
 
-        if (DateUtil.dayOfMonth(beginDate) == 1) {
+        if (DateUtil.dayOfMonth(beginDate) == 1 && 1 == day) {
             beginDate = DateUtil.offsetMonth(beginDate, -1);
             endDate = DateUtil.offsetMonth(endDate, -1);
         }