Commit 9e4f4f3c authored by tangwei's avatar tangwei

Merge branch 'developer' of http://172.16.10.76/moa/amos-boot-biz into developer

parents 97f7b070 1df592b9
......@@ -8,9 +8,7 @@ import org.springframework.stereotype.Component;
import java.io.File;
import java.io.FileInputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.*;
import java.util.concurrent.TimeUnit;
/**
......@@ -26,13 +24,34 @@ public class SocketClient {
"D:\\ffmpeg-4.4-full_build-shared\\bin\\record2.pcm",
};
public static void main(String[] args) {
public static void main(String[] args) throws SocketException {
SocketClient socketClient = new SocketClient();
socketClient.process(0, 0);
//socketClient.processTcp(0, 0);
socketClient.processUdp(10001, 2);
}
@Async
public void process(int port, int type) {
public void processUdp(int port, int type) throws SocketException {
if (type < 0) type = 0;
if (type >= testFilePath.length) type -= 1;
DatagramSocket datagramSocket = new DatagramSocket();
try {
FileInputStream fis = new FileInputStream(new File(testFilePath[type]));
byte[] b = new byte[4096];
int len;
while ((len = fis.read(b)) > 0) {
logger.info("send data pack length: " + len);
datagramSocket.send(new DatagramPacket(b, b.length,InetAddress.getLocalHost(), port));
TimeUnit.MILLISECONDS.sleep(200);
}
datagramSocket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
@Async
public void processTcp(int port, int type) {
if (type < 0) type = 0;
if (type >= testFilePath.length) type -= 1;
......
......@@ -7,14 +7,25 @@ import com.yeejoin.amos.speech.AppSpeechTranscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.web.client.RestTemplate;
import org.typroject.tyboot.component.emq.EmqKeeper;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
/**
* 实时语音转文字
......@@ -22,10 +33,16 @@ import java.util.List;
@Component
public class RealTimeStream2Text {
private static final Logger logger = LoggerFactory.getLogger(RealTimeStream2Text.class);
public static int serverPort = 10001;
public static int serverPort = 25000;
@Autowired
private EmqKeeper emqKeeper;
EmqKeeper emqKeeper;
@Autowired
RestTemplate restTemplate;
@Value("${ifc.url}")
String audioSystemAddress;
/**
* 开始语音转文字
......@@ -35,25 +52,33 @@ public class RealTimeStream2Text {
*/
public HashMap<String, Object> doTranslate(String cid, String myNumber, String callerNumber) {
//启动两个监听端口监听推送进来的2路语音流
ServerSocket serverSocket1 = initServerSocketPort();
ServerSocket serverSocket2 = initServerSocketPort();
DatagramSocket serverSocket1 = initServerSocketPort();
DatagramSocket serverSocket2 = initServerSocketPort();
//识别的记录
List<AudioRecord> audioRecords = new ArrayList<>();
//识别的关键字
AudioKeyWord audioKeyWord = new AudioKeyWord();
logger.warn("myNumber监听的端口为:" + serverSocket1.getLocalPort() + " callerNumber监听的端口为:" + serverSocket2.getLocalPort());
//我的语音流
Thread thread1 = new Thread(() -> {
new AppSpeechTranscriber(new RealTimeSpeechTranscriberListener(myNumber, myNumber, emqKeeper, audioRecords, audioKeyWord), serverSocket1).process();
});
Thread thread1 =
new Thread(() -> {
new AppSpeechTranscriber(
new RealTimeSpeechTranscriberListener(myNumber, myNumber, emqKeeper, audioRecords, audioKeyWord), serverSocket1)
.process();
}, "我的语音流");
//呼入的语音流
Thread thread2 = new Thread(() -> {
new AppSpeechTranscriber(new RealTimeSpeechTranscriberListener(myNumber, callerNumber, emqKeeper, audioRecords, audioKeyWord), serverSocket2).process();
});
Thread thread2 =
new Thread(() -> {
new AppSpeechTranscriber(
new RealTimeSpeechTranscriberListener(myNumber, callerNumber, emqKeeper, audioRecords, audioKeyWord), serverSocket2)
.process();
}, "呼入的语音流");
thread1.setUncaughtExceptionHandler(new SubUncaughtExceptionHandler(serverSocket1));
thread2.setUncaughtExceptionHandler(new SubUncaughtExceptionHandler(serverSocket2));
thread1.start();
thread2.start();
startNotifyAudioStreamSystem(cid, myNumber, serverSocket1.getLocalPort());
startNotifyAudioStreamSystem(cid, callerNumber, serverSocket2.getLocalPort());
HashMap<String, Object> map = new HashMap<>();
map.put(myNumber, serverSocket1.getLocalPort());
map.put(callerNumber, serverSocket2.getLocalPort());
......@@ -61,15 +86,53 @@ public class RealTimeStream2Text {
}
/**
* 获取一个ServerSocket端口号
* @param cid 会议ID
* @param number 需要的号码的音频流
* @param port 推流端口
*/
private ServerSocket initServerSocketPort() {
@Async
public void startNotifyAudioStreamSystem(String cid, String number, int port) {
try {
HttpHeaders httpHeaders = new HttpHeaders();
LinkedMultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
map.add("cid", cid);
map.add("number", number);
map.add("codec", "PCM");
map.add("uuid", UUID.randomUUID().toString());
map.add("dstip", InetAddress.getLocalHost().getHostAddress());
map.add("dstport", String.valueOf(port));
map.add("marker", "amos");
httpHeaders.setContentType(MediaType.MULTIPART_FORM_DATA);
HttpEntity<LinkedMultiValueMap<String, Object>> linkedMultiValueMapHttpEntity = new HttpEntity<>(map, httpHeaders);
ResponseEntity<AudioResponseEntity> audioResponseEntity = restTemplate.postForEntity(
audioSystemAddress + "/StartPushingVoiceStream", linkedMultiValueMapHttpEntity, AudioResponseEntity.class);
AudioResponseEntity responseEntityBody = audioResponseEntity.getBody();
if (responseEntityBody == null) {
logger.error("调用语音融合系统接口获取音频流返回异常:响应体为空");
return;
}
if (responseEntityBody.getState() == 200) {
logger.warn("调用语音融合系统接口获取音频流返回正常:结果:" + responseEntityBody.toString());
} else {
logger.error("调用语音融合系统接口获取音频流返回异常:响应码:" + responseEntityBody.getState());
logger.error("调用语音融合系统接口获取音频流返回异常:失败原因:" + responseEntityBody.getDescribe());
}
} catch (UnknownHostException e) {
e.printStackTrace();
logger.error(e.getMessage());
}
}
/**
* 获取一个ServerSocket
*/
private DatagramSocket initServerSocketPort() {
while (true) {
try {
return new ServerSocket(serverPort);
} catch (IOException exception) {
return new DatagramSocket(serverPort);
} catch (SocketException exception) {
serverPort++;
if (serverPort == 65535) serverPort = 10000;
if (serverPort == 27999) serverPort = 25000;
}
}
}
......@@ -79,22 +142,51 @@ public class RealTimeStream2Text {
*/
static class SubUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(SubUncaughtExceptionHandler.class);
ServerSocket serverSocket;
DatagramSocket serverSocket;
public SubUncaughtExceptionHandler(ServerSocket serverSocket) {
public SubUncaughtExceptionHandler(DatagramSocket serverSocket) {
this.serverSocket = serverSocket;
}
@Override
public void uncaughtException(Thread t, Throwable e) {
if (serverSocket != null && !serverSocket.isClosed()) {
try {
serverSocket.close();
logger.error("子线程出现异常,已关闭音频监听端口。" + e.getMessage());
} catch (IOException exception) {
exception.printStackTrace();
}
serverSocket.close();
logger.error("子线程出现异常,已关闭音频监听端口。" + e.getMessage());
}
}
}
/**
* 语音融合系统响应体
*/
static class AudioResponseEntity {
private int state;
private String describe;
private Object data;
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
public String getDescribe() {
return describe;
}
public void setDescribe(String describe) {
this.describe = describe;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
}
}
package com.yeejoin.amos.boot.module.jcs.biz.controller;
import com.yeejoin.amos.boot.module.jcs.biz.audioToText.streamToText.RealTimeStream2Text;
import com.yeejoin.amos.boot.module.jcs.biz.audioToText.SocketClient;
import com.yeejoin.amos.boot.module.jcs.biz.audioToText.streamToText.RealTimeStream2Text;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -12,8 +12,8 @@ import org.springframework.web.bind.annotation.RestController;
import org.typroject.tyboot.core.foundation.enumeration.UserType;
import org.typroject.tyboot.core.restful.doc.TycloudOperation;
import java.net.SocketException;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
@RestController
@Api(tags = "语音转文字Api")
......@@ -26,67 +26,58 @@ public class Audio2TextController {
@Autowired
SocketClient socketClient;
/**
* 第一步收到转换请求后,启动两个serverSocket,监听不同端口
* 第一步调用语音融合系统的API并传递两个监听的端口号和本机IP地址
* 第三步serverSocket收到数据请求,开始将数据推至阿里云语音识别系统进行识别
* 第四步回调函数中获取识别结果,使用mqtt客户端推送至mqtt服务器
* 第五步前端订阅消息并进行展示
*
* @param cid 通话id
* @param myNumber 我的手机号
* @param callerNumber 呼入手机号
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@GetMapping("/startConvertAndSendAudio")
@ApiOperation(httpMethod = "GET", value = "测试语音转文字融合接口", notes = "测试语音转文字融合接口")
public HashMap<String, Object> startConvertAndSendAudio(@RequestParam String cid, @RequestParam String myNumber, @RequestParam String callerNumber) {
HashMap<String, Object> convert = audio2Text.doTranslate(cid, myNumber, callerNumber);
/* try {
TimeUnit.SECONDS.sleep(1);
socketClient.process((Integer) convert.get(myNumber), 2);
socketClient.process((Integer) convert.get(callerNumber), 3);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
return convert;
}
/**
* 测试语音转文字第一步
* 测试语音转文字分步接口第一步
*
* @param myNumber 我的手机号
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@GetMapping("/startConvert")
@ApiOperation(httpMethod = "GET", value = "测试语音转文字第一步", notes = "number为当前用户的手机号")
@ApiOperation(httpMethod = "GET", value = "测试语音转文字分步接口第一步", notes = "测试语音转文字分步接口第一步")
public HashMap<String, Object> startConvert(@RequestParam String myNumber, @RequestParam String callerNumber) {
return audio2Text.doTranslate("", myNumber, callerNumber);
}
/**
* 测试语音转文字第二步
* 测试语音转文字分步接口第二步
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@GetMapping("/startSendAudio")
@ApiOperation(httpMethod = "GET", value = "测试语音转文字第二步", notes = "测试语音转文字第二步")
@ApiOperation(httpMethod = "GET", value = "测试语音转文字分步接口第二步", notes = "测试语音转文字分步接口第二步")
public String startSendAudio(@RequestParam int port, Integer type) {
if (type == null) type = 0;
socketClient.process(port, type);
return "success";
}
/**
* 测试语音转文字融合接口
*
* @param myNumber 我的手机号
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@GetMapping("/startConvertAndSendAudio")
@ApiOperation(httpMethod = "GET", value = "测试语音转文字融合接口", notes = "测试语音转文字融合接口")
public HashMap<String, Object> startConvertAndSendAudio(@RequestParam String myNumber, @RequestParam String callerNumber) {
HashMap<String, Object> convert = audio2Text.doTranslate("", myNumber, callerNumber);
try {
TimeUnit.SECONDS.sleep(1);
socketClient.process((Integer) convert.get(myNumber), 2);
socketClient.process((Integer) convert.get(callerNumber), 3);
} catch (InterruptedException e) {
socketClient.processUdp(port, type);
} catch (SocketException e) {
e.printStackTrace();
}
return convert;
}
/**
* 第一步收到转换请求后,启动两个serverSocket,监听不同端口
* 第一步调用语音融合系统的API并传递两个监听的端口号和本机IP地址
* 第三步serverSocket收到数据请求,开始将数据推至阿里云语音识别系统进行识别
* 第四步回调函数中获取识别结果,使用mqtt客户端推送至mqtt服务器
* 第五步前端订阅消息并进行展示
*
* @param cid 通话id
* @param myPhone 我的手机号
* @param caller 呼入手机号
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@GetMapping("/startConvertText")
@ApiOperation(httpMethod = "GET", value = "接听电话回调后端开始转文字", notes = "接听电话回调后端开始转文字")
public void startConvertText(String cid, String myPhone, String caller) {
return "success";
}
}
......
......@@ -26,6 +26,7 @@ import com.yeejoin.amos.boot.module.jcs.api.mapper.ControllerEquipMapper;
*/
@Service
public class ControllerEquipServiceImpl extends BaseService<ControllerEquipDto, ControllerEquip, ControllerEquipMapper> implements IControllerEquipService {
@Autowired
JcsControlServerClient jcsControlServerClient;
......
......@@ -7,11 +7,10 @@ import com.alibaba.nls.client.protocol.asr.SpeechTranscriberListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
......@@ -26,11 +25,11 @@ public class AppSpeechTranscriber {
private static final Logger logger = LoggerFactory.getLogger(AppSpeechTranscriber.class);
private final SpeechTranscriberListener listener;
private final ServerSocket serverSocket;
private final DatagramSocket serverSocket;
private final Timer serverSocketTimeoutTimer;
private final TimerTask timerTask;
public AppSpeechTranscriber(SpeechTranscriberListener listener, ServerSocket serverSocket) {
public AppSpeechTranscriber(SpeechTranscriberListener listener, DatagramSocket serverSocket) {
this.listener = listener;
this.serverSocket = serverSocket;
serverSocketTimeoutTimer = new Timer();
......@@ -38,7 +37,7 @@ public class AppSpeechTranscriber {
timerTask = new TimerTask() {
@Override
public void run() {
logger.warn("serverSocket,port:" + serverSocket.getLocalPort() + " 等待60s无应答即将自动关闭!");
logger.warn("serverSocket,port:" + serverSocket.getLocalPort() + " 等待60s无数据回复即将自动关闭!");
closeServerSocket();
}
};
......@@ -51,29 +50,26 @@ public class AppSpeechTranscriber {
public void process() {
SpeechTranscriber transcriber = null;
try {
//启动ServerSocket等待接收音频数据,只接受一次请求
//创建实例、建立连接。
byte[] b = new byte[332];
DatagramPacket datagramPacket = new DatagramPacket(b, b.length);
logger.warn("serverSocket已启动,地址:" + InetAddress.getLocalHost().getHostAddress()
+ "监听端口:" + serverSocket.getLocalPort() + " 等待语音融合系统推送数据...");
Socket socket = serverSocket.accept();
timerTask.cancel();
serverSocketTimeoutTimer.cancel();
logger.warn("收到用户连接请求,开始读取数据");
//创建实例、建立连接。
transcriber = new SpeechTranscriber(AppNslClient.instance(), listener);
//设置识别参数
setParam(transcriber);
transcriber.start();
InputStream inputStream = socket.getInputStream();
byte[] b = new byte[4096];
int len;
while ((len = inputStream.read(b)) > 0) {
logger.info("receive data pack length: " + len);
transcriber.send(b, len);
while (true) {
serverSocket.receive(datagramPacket);
if (transcriber == null) {
logger.warn("收到第一个数据包:" + b.length + " 开始进行语音翻译");
transcriber = new SpeechTranscriber(AppNslClient.instance(), listener);
//设置识别参数
setParam(transcriber);
transcriber.start();
}
serverSocketTimeoutTimer.cancel();
logger.warn("收到数据包:" + b.length);
//去掉前12个字节的rtp包头,后面的332字节为语音数据
transcriber.send(Arrays.copyOfRange(b, 12, b.length));
serverSocketTimeoutTimer.schedule(timerTask, 1000 * 60);
}
socket.close();
transcriber.stop();
logger.warn("语音转文字已结束");
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
......@@ -81,6 +77,7 @@ public class AppSpeechTranscriber {
transcriber.close();
}
closeServerSocket();
logger.warn("语音转文字已结束");
}
}
......@@ -89,11 +86,7 @@ public class AppSpeechTranscriber {
*/
public void closeServerSocket() {
if (serverSocket != null && !serverSocket.isClosed()) {
try {
serverSocket.close();
} catch (IOException exception) {
exception.printStackTrace();
}
serverSocket.close();
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment