Commit 6ba8a7fd authored by helinlin's avatar helinlin

对接语音数据

parent 46f970df
...@@ -8,9 +8,7 @@ import org.springframework.stereotype.Component; ...@@ -8,9 +8,7 @@ import org.springframework.stereotype.Component;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress; import java.net.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
...@@ -26,13 +24,34 @@ public class SocketClient { ...@@ -26,13 +24,34 @@ public class SocketClient {
"D:\\ffmpeg-4.4-full_build-shared\\bin\\record2.pcm", "D:\\ffmpeg-4.4-full_build-shared\\bin\\record2.pcm",
}; };
public static void main(String[] args) { public static void main(String[] args) throws SocketException {
SocketClient socketClient = new SocketClient(); SocketClient socketClient = new SocketClient();
socketClient.process(0, 0); //socketClient.processTcp(0, 0);
socketClient.processUdp(10001, 2);
} }
@Async @Async
public void process(int port, int type) { public void processUdp(int port, int type) throws SocketException {
if (type < 0) type = 0;
if (type >= testFilePath.length) type -= 1;
DatagramSocket datagramSocket = new DatagramSocket();
try {
FileInputStream fis = new FileInputStream(new File(testFilePath[type]));
byte[] b = new byte[4096];
int len;
while ((len = fis.read(b)) > 0) {
logger.info("send data pack length: " + len);
datagramSocket.send(new DatagramPacket(b, b.length,InetAddress.getLocalHost(), port));
TimeUnit.MILLISECONDS.sleep(200);
}
datagramSocket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
@Async
public void processTcp(int port, int type) {
if (type < 0) type = 0; if (type < 0) type = 0;
if (type >= testFilePath.length) type -= 1; if (type >= testFilePath.length) type -= 1;
......
...@@ -7,14 +7,25 @@ import com.yeejoin.amos.speech.AppSpeechTranscriber; ...@@ -7,14 +7,25 @@ import com.yeejoin.amos.speech.AppSpeechTranscriber;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.web.client.RestTemplate;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import java.io.IOException; import java.net.DatagramSocket;
import java.net.ServerSocket; import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.UUID;
/** /**
* 实时语音转文字 * 实时语音转文字
...@@ -22,10 +33,16 @@ import java.util.List; ...@@ -22,10 +33,16 @@ import java.util.List;
@Component @Component
public class RealTimeStream2Text { public class RealTimeStream2Text {
private static final Logger logger = LoggerFactory.getLogger(RealTimeStream2Text.class); private static final Logger logger = LoggerFactory.getLogger(RealTimeStream2Text.class);
public static int serverPort = 10001; public static int serverPort = 25000;
@Autowired @Autowired
private EmqKeeper emqKeeper; EmqKeeper emqKeeper;
@Autowired
RestTemplate restTemplate;
@Value("${ifc.url}")
String audioSystemAddress;
/** /**
* 开始语音转文字 * 开始语音转文字
...@@ -35,25 +52,33 @@ public class RealTimeStream2Text { ...@@ -35,25 +52,33 @@ public class RealTimeStream2Text {
*/ */
public HashMap<String, Object> doTranslate(String cid, String myNumber, String callerNumber) { public HashMap<String, Object> doTranslate(String cid, String myNumber, String callerNumber) {
//启动两个监听端口监听推送进来的2路语音流 //启动两个监听端口监听推送进来的2路语音流
ServerSocket serverSocket1 = initServerSocketPort(); DatagramSocket serverSocket1 = initServerSocketPort();
ServerSocket serverSocket2 = initServerSocketPort(); DatagramSocket serverSocket2 = initServerSocketPort();
//识别的记录 //识别的记录
List<AudioRecord> audioRecords = new ArrayList<>(); List<AudioRecord> audioRecords = new ArrayList<>();
//识别的关键字 //识别的关键字
AudioKeyWord audioKeyWord = new AudioKeyWord(); AudioKeyWord audioKeyWord = new AudioKeyWord();
logger.warn("myNumber监听的端口为:" + serverSocket1.getLocalPort() + " callerNumber监听的端口为:" + serverSocket2.getLocalPort()); logger.warn("myNumber监听的端口为:" + serverSocket1.getLocalPort() + " callerNumber监听的端口为:" + serverSocket2.getLocalPort());
//我的语音流 //我的语音流
Thread thread1 = new Thread(() -> { Thread thread1 =
new AppSpeechTranscriber(new RealTimeSpeechTranscriberListener(myNumber, myNumber, emqKeeper, audioRecords, audioKeyWord), serverSocket1).process(); new Thread(() -> {
}); new AppSpeechTranscriber(
new RealTimeSpeechTranscriberListener(myNumber, myNumber, emqKeeper, audioRecords, audioKeyWord), serverSocket1)
.process();
}, "我的语音流");
//呼入的语音流 //呼入的语音流
Thread thread2 = new Thread(() -> { Thread thread2 =
new AppSpeechTranscriber(new RealTimeSpeechTranscriberListener(myNumber, callerNumber, emqKeeper, audioRecords, audioKeyWord), serverSocket2).process(); new Thread(() -> {
}); new AppSpeechTranscriber(
new RealTimeSpeechTranscriberListener(myNumber, callerNumber, emqKeeper, audioRecords, audioKeyWord), serverSocket2)
.process();
}, "呼入的语音流");
thread1.setUncaughtExceptionHandler(new SubUncaughtExceptionHandler(serverSocket1)); thread1.setUncaughtExceptionHandler(new SubUncaughtExceptionHandler(serverSocket1));
thread2.setUncaughtExceptionHandler(new SubUncaughtExceptionHandler(serverSocket2)); thread2.setUncaughtExceptionHandler(new SubUncaughtExceptionHandler(serverSocket2));
thread1.start(); thread1.start();
thread2.start(); thread2.start();
startNotifyAudioStreamSystem(cid, myNumber, serverSocket1.getLocalPort());
startNotifyAudioStreamSystem(cid, callerNumber, serverSocket2.getLocalPort());
HashMap<String, Object> map = new HashMap<>(); HashMap<String, Object> map = new HashMap<>();
map.put(myNumber, serverSocket1.getLocalPort()); map.put(myNumber, serverSocket1.getLocalPort());
map.put(callerNumber, serverSocket2.getLocalPort()); map.put(callerNumber, serverSocket2.getLocalPort());
...@@ -61,15 +86,53 @@ public class RealTimeStream2Text { ...@@ -61,15 +86,53 @@ public class RealTimeStream2Text {
} }
/** /**
* 获取一个ServerSocket端口号 * @param cid 会议ID
* @param number 需要的号码的音频流
* @param port 推流端口
*/ */
private ServerSocket initServerSocketPort() { @Async
public void startNotifyAudioStreamSystem(String cid, String number, int port) {
try {
HttpHeaders httpHeaders = new HttpHeaders();
LinkedMultiValueMap<String, Object> map = new LinkedMultiValueMap<>();
map.add("cid", cid);
map.add("number", number);
map.add("codec", "PCM");
map.add("uuid", UUID.randomUUID().toString());
map.add("dstip", InetAddress.getLocalHost().getHostAddress());
map.add("dstport", String.valueOf(port));
map.add("marker", "amos");
httpHeaders.setContentType(MediaType.MULTIPART_FORM_DATA);
HttpEntity<LinkedMultiValueMap<String, Object>> linkedMultiValueMapHttpEntity = new HttpEntity<>(map, httpHeaders);
ResponseEntity<AudioResponseEntity> audioResponseEntity = restTemplate.postForEntity(
audioSystemAddress + "/StartPushingVoiceStream", linkedMultiValueMapHttpEntity, AudioResponseEntity.class);
AudioResponseEntity responseEntityBody = audioResponseEntity.getBody();
if (responseEntityBody == null) {
logger.error("调用语音融合系统接口获取音频流返回异常:响应体为空");
return;
}
if (responseEntityBody.getState() == 200) {
logger.warn("调用语音融合系统接口获取音频流返回正常:结果:" + responseEntityBody.toString());
} else {
logger.error("调用语音融合系统接口获取音频流返回异常:响应码:" + responseEntityBody.getState());
logger.error("调用语音融合系统接口获取音频流返回异常:失败原因:" + responseEntityBody.getDescribe());
}
} catch (UnknownHostException e) {
e.printStackTrace();
logger.error(e.getMessage());
}
}
/**
* 获取一个ServerSocket
*/
private DatagramSocket initServerSocketPort() {
while (true) { while (true) {
try { try {
return new ServerSocket(serverPort); return new DatagramSocket(serverPort);
} catch (IOException exception) { } catch (SocketException exception) {
serverPort++; serverPort++;
if (serverPort == 65535) serverPort = 10000; if (serverPort == 27999) serverPort = 25000;
} }
} }
} }
...@@ -79,22 +142,51 @@ public class RealTimeStream2Text { ...@@ -79,22 +142,51 @@ public class RealTimeStream2Text {
*/ */
static class SubUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { static class SubUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(SubUncaughtExceptionHandler.class); private static final Logger logger = LoggerFactory.getLogger(SubUncaughtExceptionHandler.class);
ServerSocket serverSocket; DatagramSocket serverSocket;
public SubUncaughtExceptionHandler(ServerSocket serverSocket) { public SubUncaughtExceptionHandler(DatagramSocket serverSocket) {
this.serverSocket = serverSocket; this.serverSocket = serverSocket;
} }
@Override @Override
public void uncaughtException(Thread t, Throwable e) { public void uncaughtException(Thread t, Throwable e) {
if (serverSocket != null && !serverSocket.isClosed()) { if (serverSocket != null && !serverSocket.isClosed()) {
try { serverSocket.close();
serverSocket.close(); logger.error("子线程出现异常,已关闭音频监听端口。" + e.getMessage());
logger.error("子线程出现异常,已关闭音频监听端口。" + e.getMessage());
} catch (IOException exception) {
exception.printStackTrace();
}
} }
} }
} }
/**
* 语音融合系统响应体
*/
static class AudioResponseEntity {
private int state;
private String describe;
private Object data;
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
public String getDescribe() {
return describe;
}
public void setDescribe(String describe) {
this.describe = describe;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
}
} }
package com.yeejoin.amos.boot.module.jcs.biz.controller; package com.yeejoin.amos.boot.module.jcs.biz.controller;
import com.yeejoin.amos.boot.module.jcs.biz.audioToText.streamToText.RealTimeStream2Text;
import com.yeejoin.amos.boot.module.jcs.biz.audioToText.SocketClient; import com.yeejoin.amos.boot.module.jcs.biz.audioToText.SocketClient;
import com.yeejoin.amos.boot.module.jcs.biz.audioToText.streamToText.RealTimeStream2Text;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -12,8 +12,8 @@ import org.springframework.web.bind.annotation.RestController; ...@@ -12,8 +12,8 @@ import org.springframework.web.bind.annotation.RestController;
import org.typroject.tyboot.core.foundation.enumeration.UserType; import org.typroject.tyboot.core.foundation.enumeration.UserType;
import org.typroject.tyboot.core.restful.doc.TycloudOperation; import org.typroject.tyboot.core.restful.doc.TycloudOperation;
import java.net.SocketException;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.TimeUnit;
@RestController @RestController
@Api(tags = "语音转文字Api") @Api(tags = "语音转文字Api")
...@@ -26,67 +26,58 @@ public class Audio2TextController { ...@@ -26,67 +26,58 @@ public class Audio2TextController {
@Autowired @Autowired
SocketClient socketClient; SocketClient socketClient;
/**
* 第一步收到转换请求后,启动两个serverSocket,监听不同端口
* 第一步调用语音融合系统的API并传递两个监听的端口号和本机IP地址
* 第三步serverSocket收到数据请求,开始将数据推至阿里云语音识别系统进行识别
* 第四步回调函数中获取识别结果,使用mqtt客户端推送至mqtt服务器
* 第五步前端订阅消息并进行展示
*
* @param cid 通话id
* @param myNumber 我的手机号
* @param callerNumber 呼入手机号
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@GetMapping("/startConvertAndSendAudio")
@ApiOperation(httpMethod = "GET", value = "测试语音转文字融合接口", notes = "测试语音转文字融合接口")
public HashMap<String, Object> startConvertAndSendAudio(@RequestParam String cid, @RequestParam String myNumber, @RequestParam String callerNumber) {
HashMap<String, Object> convert = audio2Text.doTranslate(cid, myNumber, callerNumber);
/* try {
TimeUnit.SECONDS.sleep(1);
socketClient.process((Integer) convert.get(myNumber), 2);
socketClient.process((Integer) convert.get(callerNumber), 3);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
return convert;
}
/** /**
* 测试语音转文字第一步 * 测试语音转文字分步接口第一步
* *
* @param myNumber 我的手机号 * @param myNumber 我的手机号
*/ */
@TycloudOperation(ApiLevel = UserType.AGENCY) @TycloudOperation(ApiLevel = UserType.AGENCY)
@GetMapping("/startConvert") @GetMapping("/startConvert")
@ApiOperation(httpMethod = "GET", value = "测试语音转文字第一步", notes = "number为当前用户的手机号") @ApiOperation(httpMethod = "GET", value = "测试语音转文字分步接口第一步", notes = "测试语音转文字分步接口第一步")
public HashMap<String, Object> startConvert(@RequestParam String myNumber, @RequestParam String callerNumber) { public HashMap<String, Object> startConvert(@RequestParam String myNumber, @RequestParam String callerNumber) {
return audio2Text.doTranslate("", myNumber, callerNumber); return audio2Text.doTranslate("", myNumber, callerNumber);
} }
/** /**
* 测试语音转文字第二步 * 测试语音转文字分步接口第二步
*/ */
@TycloudOperation(ApiLevel = UserType.AGENCY) @TycloudOperation(ApiLevel = UserType.AGENCY)
@GetMapping("/startSendAudio") @GetMapping("/startSendAudio")
@ApiOperation(httpMethod = "GET", value = "测试语音转文字第二步", notes = "测试语音转文字第二步") @ApiOperation(httpMethod = "GET", value = "测试语音转文字分步接口第二步", notes = "测试语音转文字分步接口第二步")
public String startSendAudio(@RequestParam int port, Integer type) { public String startSendAudio(@RequestParam int port, Integer type) {
if (type == null) type = 0; if (type == null) type = 0;
socketClient.process(port, type);
return "success";
}
/**
* 测试语音转文字融合接口
*
* @param myNumber 我的手机号
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@GetMapping("/startConvertAndSendAudio")
@ApiOperation(httpMethod = "GET", value = "测试语音转文字融合接口", notes = "测试语音转文字融合接口")
public HashMap<String, Object> startConvertAndSendAudio(@RequestParam String myNumber, @RequestParam String callerNumber) {
HashMap<String, Object> convert = audio2Text.doTranslate("", myNumber, callerNumber);
try { try {
TimeUnit.SECONDS.sleep(1); socketClient.processUdp(port, type);
socketClient.process((Integer) convert.get(myNumber), 2); } catch (SocketException e) {
socketClient.process((Integer) convert.get(callerNumber), 3);
} catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
return convert; return "success";
}
/**
* 第一步收到转换请求后,启动两个serverSocket,监听不同端口
* 第一步调用语音融合系统的API并传递两个监听的端口号和本机IP地址
* 第三步serverSocket收到数据请求,开始将数据推至阿里云语音识别系统进行识别
* 第四步回调函数中获取识别结果,使用mqtt客户端推送至mqtt服务器
* 第五步前端订阅消息并进行展示
*
* @param cid 通话id
* @param myPhone 我的手机号
* @param caller 呼入手机号
*/
@TycloudOperation(ApiLevel = UserType.AGENCY)
@GetMapping("/startConvertText")
@ApiOperation(httpMethod = "GET", value = "接听电话回调后端开始转文字", notes = "接听电话回调后端开始转文字")
public void startConvertText(String cid, String myPhone, String caller) {
} }
} }
......
...@@ -26,6 +26,7 @@ import com.yeejoin.amos.boot.module.jcs.api.mapper.ControllerEquipMapper; ...@@ -26,6 +26,7 @@ import com.yeejoin.amos.boot.module.jcs.api.mapper.ControllerEquipMapper;
*/ */
@Service @Service
public class ControllerEquipServiceImpl extends BaseService<ControllerEquipDto, ControllerEquip, ControllerEquipMapper> implements IControllerEquipService { public class ControllerEquipServiceImpl extends BaseService<ControllerEquipDto, ControllerEquip, ControllerEquipMapper> implements IControllerEquipService {
@Autowired @Autowired
JcsControlServerClient jcsControlServerClient; JcsControlServerClient jcsControlServerClient;
......
...@@ -7,11 +7,10 @@ import com.alibaba.nls.client.protocol.asr.SpeechTranscriberListener; ...@@ -7,11 +7,10 @@ import com.alibaba.nls.client.protocol.asr.SpeechTranscriberListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.net.DatagramPacket;
import java.io.InputStream; import java.net.DatagramSocket;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.ServerSocket; import java.util.Arrays;
import java.net.Socket;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
...@@ -26,11 +25,11 @@ public class AppSpeechTranscriber { ...@@ -26,11 +25,11 @@ public class AppSpeechTranscriber {
private static final Logger logger = LoggerFactory.getLogger(AppSpeechTranscriber.class); private static final Logger logger = LoggerFactory.getLogger(AppSpeechTranscriber.class);
private final SpeechTranscriberListener listener; private final SpeechTranscriberListener listener;
private final ServerSocket serverSocket; private final DatagramSocket serverSocket;
private final Timer serverSocketTimeoutTimer; private final Timer serverSocketTimeoutTimer;
private final TimerTask timerTask; private final TimerTask timerTask;
public AppSpeechTranscriber(SpeechTranscriberListener listener, ServerSocket serverSocket) { public AppSpeechTranscriber(SpeechTranscriberListener listener, DatagramSocket serverSocket) {
this.listener = listener; this.listener = listener;
this.serverSocket = serverSocket; this.serverSocket = serverSocket;
serverSocketTimeoutTimer = new Timer(); serverSocketTimeoutTimer = new Timer();
...@@ -38,7 +37,7 @@ public class AppSpeechTranscriber { ...@@ -38,7 +37,7 @@ public class AppSpeechTranscriber {
timerTask = new TimerTask() { timerTask = new TimerTask() {
@Override @Override
public void run() { public void run() {
logger.warn("serverSocket,port:" + serverSocket.getLocalPort() + " 等待60s无应答即将自动关闭!"); logger.warn("serverSocket,port:" + serverSocket.getLocalPort() + " 等待60s无数据回复即将自动关闭!");
closeServerSocket(); closeServerSocket();
} }
}; };
...@@ -51,29 +50,26 @@ public class AppSpeechTranscriber { ...@@ -51,29 +50,26 @@ public class AppSpeechTranscriber {
public void process() { public void process() {
SpeechTranscriber transcriber = null; SpeechTranscriber transcriber = null;
try { try {
//启动ServerSocket等待接收音频数据,只接受一次请求 //创建实例、建立连接。
byte[] b = new byte[332];
DatagramPacket datagramPacket = new DatagramPacket(b, b.length);
logger.warn("serverSocket已启动,地址:" + InetAddress.getLocalHost().getHostAddress() logger.warn("serverSocket已启动,地址:" + InetAddress.getLocalHost().getHostAddress()
+ "监听端口:" + serverSocket.getLocalPort() + " 等待语音融合系统推送数据..."); + "监听端口:" + serverSocket.getLocalPort() + " 等待语音融合系统推送数据...");
Socket socket = serverSocket.accept(); while (true) {
timerTask.cancel(); serverSocket.receive(datagramPacket);
serverSocketTimeoutTimer.cancel(); if (transcriber == null) {
logger.warn("收到用户连接请求,开始读取数据"); logger.warn("收到第一个数据包:" + b.length + " 开始进行语音翻译");
//创建实例、建立连接。 transcriber = new SpeechTranscriber(AppNslClient.instance(), listener);
transcriber = new SpeechTranscriber(AppNslClient.instance(), listener); //设置识别参数
//设置识别参数 setParam(transcriber);
setParam(transcriber); transcriber.start();
}
transcriber.start(); serverSocketTimeoutTimer.cancel();
InputStream inputStream = socket.getInputStream(); logger.warn("收到数据包:" + b.length);
byte[] b = new byte[4096]; //去掉前12个字节的rtp包头,后面的332字节为语音数据
int len; transcriber.send(Arrays.copyOfRange(b, 12, b.length));
while ((len = inputStream.read(b)) > 0) { serverSocketTimeoutTimer.schedule(timerTask, 1000 * 60);
logger.info("receive data pack length: " + len);
transcriber.send(b, len);
} }
socket.close();
transcriber.stop();
logger.warn("语音转文字已结束");
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage()); logger.error(e.getMessage());
} finally { } finally {
...@@ -81,6 +77,7 @@ public class AppSpeechTranscriber { ...@@ -81,6 +77,7 @@ public class AppSpeechTranscriber {
transcriber.close(); transcriber.close();
} }
closeServerSocket(); closeServerSocket();
logger.warn("语音转文字已结束");
} }
} }
...@@ -89,11 +86,7 @@ public class AppSpeechTranscriber { ...@@ -89,11 +86,7 @@ public class AppSpeechTranscriber {
*/ */
public void closeServerSocket() { public void closeServerSocket() {
if (serverSocket != null && !serverSocket.isClosed()) { if (serverSocket != null && !serverSocket.isClosed()) {
try { serverSocket.close();
serverSocket.close();
} catch (IOException exception) {
exception.printStackTrace();
}
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment