Commit b520ef6e authored by 朱晨阳's avatar 朱晨阳

Merge remote-tracking branch 'origin/developer' into developer

parents 2644dea7 3490be9b
......@@ -16,11 +16,13 @@ import com.yeejoin.amos.boot.module.jxiop.biz.service.impl.*;
import com.yeejoin.amos.boot.module.jxiop.biz.tdmapper.IndicatorDataMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.typroject.tyboot.component.emq.EmqKeeper;
import tech.tablesaw.api.ColumnType;
import tech.tablesaw.api.DoubleColumn;
import tech.tablesaw.api.Table;
......@@ -90,6 +92,9 @@ public class KafkaConsumerService {
@Value("${last.month.num:12}")
private Integer lastMonthNum;
@Autowired
EmqKeeper emqKeeper;
ExecutorService service = Executors.newFixedThreadPool(threadNum);
BlockingQueue<PointData> queue = new LinkedBlockingQueue<>();
......@@ -567,6 +572,13 @@ public class KafkaConsumerService {
resultMap.put("processVariable", data1);
resultMap.put("processVariableId", fanPointVarCorrelation.getSequenceNbr());
if (fanPointVarCorrelation.getEquipmentName().equals("升压站")){
try {
emqKeeper.getMqttClient().publish("gkblfx",JSON.toJSONString(resultMap).getBytes(), 2 ,false);
} catch (MqttException e) {
e.printStackTrace();
}
}
String response = HttpUtil.createPost(baseUrlGKHF).body(JSON.toJSONString(resultMap)).execute()
.body();
......@@ -606,7 +618,7 @@ public class KafkaConsumerService {
* @param consumerRecords messages
* @param ack ack
*/
@KafkaListener(id = "GKHFPvConsumer", groupId = "consumerGroup", topics = kafkaTopicConsumerGKHFPv)
@KafkaListener(id = "GKHFPvConsume12r", groupId = "consumerGroup", topics = kafkaTopicConsumerGKHFPv)
public void listenGKHFPv(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
try {
......@@ -653,7 +665,14 @@ public class KafkaConsumerService {
double[] data1 = table.where(selection).doubleColumn("value").asDoubleArray();
resultMap.put("processVariable", data1);
resultMap.put("processVariableId", pvPointVarCorrelation.getSequenceNbr());
if (pvPointVarCorrelation.getEquipmentName().equals("升压站")){
System.out.println("升压站数据进来了");
try {
emqKeeper.getMqttClient().publish("gkblfx",JSON.toJSONString(resultMap).getBytes(), 2 ,false);
} catch (MqttException e) {
e.printStackTrace();
}
}
String response = HttpUtil.createPost(baseUrlGKHF).body(JSON.toJSONString(resultMap)).execute()
.body();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment