Commit bad05b52 authored by 刘凡's avatar 刘凡

优化:【空工大】大数据量插入时,分批次执行插入语句

parent ff779d37
...@@ -51,13 +51,13 @@ public class ClientHandler<path> implements Runnable { ...@@ -51,13 +51,13 @@ public class ClientHandler<path> implements Runnable {
} }
@Value("${spring.datasource.url}") @Value("${spring.datasource.url}")
private static String url="jdbc:mysql://172.16.3.101:3306/jd_bearing?allowMultiQueries=true&serverTimezone=GMT%2B8&characterEncoding=utf8"; private static String url = "jdbc:mysql://172.16.3.101:3306/jd_bearing?allowMultiQueries=true&serverTimezone=GMT%2B8&characterEncoding=utf8";
@Value("${spring.datasource.username}") @Value("${spring.datasource.username}")
private static String username="root"; private static String username = "root";
@Value("${spring.datasource.password}") @Value("${spring.datasource.password}")
private static String password="Yeejoin@2020"; private static String password = "Yeejoin@2020";
public static final String DATABASE_NAME = "jd_bearing"; public static final String DATABASE_NAME = "jd_bearing";
/*String*/ /*String*/
...@@ -114,7 +114,7 @@ public class ClientHandler<path> implements Runnable { ...@@ -114,7 +114,7 @@ public class ClientHandler<path> implements Runnable {
Map<String, Object> tableInfoMap = ExcelTool.readFromExcel(contentIps, filenameWithSuffix); Map<String, Object> tableInfoMap = ExcelTool.readFromExcel(contentIps, filenameWithSuffix);
// 新增mysql表结构和数据和数据源 // 新增mysql表结构和数据和数据源
if(StringUtil.isNotEmpty(tableName) && !tableInfoMap.isEmpty() && tableInfoMap.get("tableColumns") != null){ if (StringUtil.isNotEmpty(tableName) && !tableInfoMap.isEmpty() && tableInfoMap.get("tableColumns") != null) {
List<String> tableColumns = (List<String>) tableInfoMap.get("tableColumns"); List<String> tableColumns = (List<String>) tableInfoMap.get("tableColumns");
List<List<String>> tableDatas = (List<List<String>>) tableInfoMap.get("tableDatas"); List<List<String>> tableDatas = (List<List<String>>) tableInfoMap.get("tableDatas");
this.addMySqlDatasources(tableColumns, tableDatas, hostAndPort, tableName); this.addMySqlDatasources(tableColumns, tableDatas, hostAndPort, tableName);
...@@ -123,7 +123,7 @@ public class ClientHandler<path> implements Runnable { ...@@ -123,7 +123,7 @@ public class ClientHandler<path> implements Runnable {
socket.close(); socket.close();
ips.close(); ips.close();
// 销毁缓存 // 销毁缓存
if(inputStreamCacher != null) inputStreamCacher.destroyCache(); if (inputStreamCacher != null) inputStreamCacher.destroyCache();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} catch (Exception e) { } catch (Exception e) {
...@@ -204,15 +204,27 @@ public class ClientHandler<path> implements Runnable { ...@@ -204,15 +204,27 @@ public class ClientHandler<path> implements Runnable {
state.executeUpdate(createTableSql); state.executeUpdate(createTableSql);
} }
// 2.2插入数据 // 2.2插入数据
if (dataList.size() > 0) {
String insertSQL = insertDataSQLBatch(tableName, tableColumns, dataList); for (int i = 0; i < dataList.size(); i++) {
if(StringUtil.isNotEmpty(insertSQL)){ List<List<String>> lists;
if (dataList.size() > 1000 && i % 1000 == 0 && i > 0) {
lists = (i + 1000) < dataList.size() ? dataList.subList(i, i + 1000) : dataList.subList(i, dataList.size());
}else {
lists = dataList;
}
if(lists.size()>0){
String insertSQL = insertDataSQLBatch(tableName, tableColumns, lists);
if (StringUtil.isNotEmpty(insertSQL)) {
state.execute(insertSQL); state.execute(insertSQL);
} }
}catch (Exception e){ }
}
}
} catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
throw new Exception("数据表创建失败!"); throw new Exception("数据表创建失败!");
}finally { } finally {
// 释放资源 // 释放资源
state.close(); state.close();
conn.close(); conn.close();
...@@ -237,7 +249,7 @@ public class ClientHandler<path> implements Runnable { ...@@ -237,7 +249,7 @@ public class ClientHandler<path> implements Runnable {
/** /**
* 连接MySQL数据库 * 连接MySQL数据库
*/ */
public static Map<String, Object> connectMySQL(){ public static Map<String, Object> connectMySQL() {
Connection connection = null; Connection connection = null;
Statement statement = null; Statement statement = null;
Map<String, Object> map = new HashMap<>(); Map<String, Object> map = new HashMap<>();
...@@ -247,10 +259,10 @@ public class ClientHandler<path> implements Runnable { ...@@ -247,10 +259,10 @@ public class ClientHandler<path> implements Runnable {
// 根据连接获取可执行Statement // 根据连接获取可执行Statement
statement = connection.createStatement(); statement = connection.createStatement();
map.put("connection",connection); map.put("connection", connection);
map.put("statement",statement); map.put("statement", statement);
return map; return map;
}catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
log.error("连接数据库错误!"); log.error("连接数据库错误!");
} }
...@@ -259,99 +271,100 @@ public class ClientHandler<path> implements Runnable { ...@@ -259,99 +271,100 @@ public class ClientHandler<path> implements Runnable {
/** /**
* 创建生成表的SQL * 创建生成表的SQL
*
* @param tableModel * @param tableModel
* @return * @return
*/ */
public static String createTableSQL(TableModel tableModel){ public static String createTableSQL(TableModel tableModel) {
String tableName = tableModel.getTableName(); String tableName = tableModel.getTableName();
// 创建主键集合 // 创建主键集合
List<String> priKeyList = new ArrayList<String>(); List<String> priKeyList = new ArrayList<String>();
// 创建 StringBuffer 拼接sql // 创建 StringBuffer 拼接sql
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();
sb.append("CREATE TABLE `"+ tableName.toUpperCase() +"` (\n"); sb.append("CREATE TABLE `" + tableName.toUpperCase() + "` (\n");
List<TableFieldModel> tableFields = tableModel.getTableFields(); List<TableFieldModel> tableFields = tableModel.getTableFields();
for (int i = 0; i < tableFields.size(); i++) { for (int i = 0; i < tableFields.size(); i++) {
// 当前条数据 // 当前条数据
TableFieldModel field = tableFields.get(i); TableFieldModel field = tableFields.get(i);
// 判断数据类型 // 判断数据类型
String fieldType = judgeDataType(field.getFieldType()); String fieldType = judgeDataType(field.getFieldType());
sb.append(""+ field.getFieldName() +""); sb.append("" + field.getFieldName() + "");
if ("double".equals(fieldType)){ if ("double".equals(fieldType)) {
// 追加列 // 追加列
sb.append(" "+fieldType+"("+field.getFieldLength()+","+ field.getDecimalPoint() +") "); sb.append(" " + fieldType + "(" + field.getFieldLength() + "," + field.getDecimalPoint() + ") ");
}else if ("decimal".equals(fieldType)){ } else if ("decimal".equals(fieldType)) {
// 追加列 // 追加列
sb.append(" "+fieldType+"("+field.getFieldLength()+","+ field.getDecimalPoint() +") "); sb.append(" " + fieldType + "(" + field.getFieldLength() + "," + field.getDecimalPoint() + ") ");
}else if("datetime".equals(fieldType)){ } else if ("datetime".equals(fieldType)) {
// 追加列 // 追加列
sb.append(" "+fieldType+" "); sb.append(" " + fieldType + " ");
}else { } else {
// 追加列 // 追加列
sb.append(" "+fieldType+"("+field.getFieldLength()+") "); sb.append(" " + fieldType + "(" + field.getFieldLength() + ") ");
} }
// 判断是否为主键 - 等于1是主键 // 判断是否为主键 - 等于1是主键
if ("1".equals(field.getPrimaryKey())){ if ("1".equals(field.getPrimaryKey())) {
// 字段名称放进去 // 字段名称放进去
priKeyList.add(field.getFieldName()); priKeyList.add(field.getFieldName());
// 判断是否允许为空 等于1是允许为空; 只有不为空的时候,需要设置 // 判断是否允许为空 等于1是允许为空; 只有不为空的时候,需要设置
if (!"1".equals(field.getIsNull())){ if (!"1".equals(field.getIsNull())) {
sb.append("NOT NULL COMMENT '"+field.getFieldRemark()+"',\n"); sb.append("NOT NULL COMMENT '" + field.getFieldRemark() + "',\n");
} }
// 如果到了最后一条,并且只有一个主键时 // 如果到了最后一条,并且只有一个主键时
if (i >= tableFields.size()-1 && priKeyList.size() == 1){ if (i >= tableFields.size() - 1 && priKeyList.size() == 1) {
sb.append("PRIMARY KEY (`"+ priKeyList.get(0) +"`)"); sb.append("PRIMARY KEY (`" + priKeyList.get(0) + "`)");
sb.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8;"); sb.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8;");
}else if (i >= tableFields.size() -1 && priKeyList.size() > 1){ } else if (i >= tableFields.size() - 1 && priKeyList.size() > 1) {
// 最后一条,并且存在多个主键时 // 最后一条,并且存在多个主键时
sb.append("PRIMARY KEY ("); sb.append("PRIMARY KEY (");
// 遍历主键集合 // 遍历主键集合
for (int j = 0; j < priKeyList.size(); j++) { for (int j = 0; j < priKeyList.size(); j++) {
// 最后一个时 // 最后一个时
if (j == priKeyList.size() -1){ if (j == priKeyList.size() - 1) {
sb.append("`"+ priKeyList.get(j) +"`) USING BTREE \n"); sb.append("`" + priKeyList.get(j) + "`) USING BTREE \n");
}else { } else {
sb.append("`"+ priKeyList.get(j) +"`,"); sb.append("`" + priKeyList.get(j) + "`,");
} }
} }
sb.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8;"); sb.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8;");
} }
// 非主键,直接判断是否允许为空 // 非主键,直接判断是否允许为空
}else { } else {
// 存在主键,并且为最后一个了 // 存在主键,并且为最后一个了
if (priKeyList.size() > 0 && i >= tableFields.size() -1 ){ if (priKeyList.size() > 0 && i >= tableFields.size() - 1) {
// 判断是否为空 if是可以为空 // 判断是否为空 if是可以为空
if ("1".equals(field.getIsNull())){ if ("1".equals(field.getIsNull())) {
sb.append("DEFAULT NULL COMMENT '"+ field.getFieldRemark() +"',\n"); sb.append("DEFAULT NULL COMMENT '" + field.getFieldRemark() + "',\n");
}else { } else {
sb.append("NOT NULL COMMENT '"+ field.getFieldRemark() +"',\n"); sb.append("NOT NULL COMMENT '" + field.getFieldRemark() + "',\n");
} }
// 表示只有一个主键 // 表示只有一个主键
if (priKeyList.size() == 1){ if (priKeyList.size() == 1) {
sb.append("PRIMARY KEY (`"+ priKeyList.get(0) +"`)\n"); sb.append("PRIMARY KEY (`" + priKeyList.get(0) + "`)\n");
sb.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8;"); sb.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8;");
}else { } else {
// 最后一条,并且存在多个主键时 // 最后一条,并且存在多个主键时
sb.append("PRIMARY KEY ("); sb.append("PRIMARY KEY (");
// 遍历主键集合 // 遍历主键集合
for (int j = 0; j < priKeyList.size(); j++) { for (int j = 0; j < priKeyList.size(); j++) {
// 最后一个时 // 最后一个时
if (j == priKeyList.size() -1){ if (j == priKeyList.size() - 1) {
sb.append("`"+ priKeyList.get(j) +"`) USING BTREE \n"); sb.append("`" + priKeyList.get(j) + "`) USING BTREE \n");
}else { } else {
sb.append("`"+ priKeyList.get(j) +"`,"); sb.append("`" + priKeyList.get(j) + "`,");
} }
} }
sb.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8;"); sb.append(") ENGINE=InnoDB DEFAULT CHARSET=utf8;");
} }
}else { } else {
// 没有就追加 判断是否为空 // 没有就追加 判断是否为空
if ("1".equals(field.getIsNull())){ if ("1".equals(field.getIsNull())) {
sb.append("DEFAULT NULL COMMENT '"+ field.getFieldRemark() +"',\n"); sb.append("DEFAULT NULL COMMENT '" + field.getFieldRemark() + "',\n");
}else { } else {
sb.append("NOT NULL COMMENT '"+ field.getFieldRemark() +"',\n"); sb.append("NOT NULL COMMENT '" + field.getFieldRemark() + "',\n");
} }
} }
} }
...@@ -360,66 +373,66 @@ public class ClientHandler<path> implements Runnable { ...@@ -360,66 +373,66 @@ public class ClientHandler<path> implements Runnable {
} }
/** /**
* 创建单条插入数据的SQL * da
* 创建批量插入数据的SQL
*
* @param tableName 表名 * @param tableName 表名
* @param tableColumns 列名 * @param tableColumns 列名
* @param dataList 数据 * @param dataList 数据
* @return * @return
*/ */
public String insertDataSQL(String tableName, List<String> tableColumns, List<String> dataList){ public String insertDataSQLBatch(String tableName, List<String> tableColumns, List<List<String>> dataList) {
String sql = "INSERT INTO "+tableName+" VALUES "; StringBuilder sqlBuilder = new StringBuilder();
if(dataList.size()>0){ for (int i = 0; i < dataList.size(); i++) {
sql+= "("; List<String> rowData = dataList.get(i);
sql += "\""+ UUID.randomUUID() +"\","; if (!rowData.isEmpty()) {
sql += "\""+ DateUtils.getDateNowString() +"\""; //默认ID和时间
for (int n = 0; n < dataList.size(); n++){ String values = "'"+UUID.randomUUID()+"','"+DateUtils.getDateNowString()+"'";
sql += ",\""+dataList.get(n)+"\""; for (int j = 0; j < rowData.size() - 1; j++) {
Object value = rowData.get(j);
if (value instanceof Integer || value instanceof Long) {
values += value + ",";
} else {
values += "'" + value + "',";
} }
if(tableColumns.size() > dataList.size()){
for (int t = 0; t < tableColumns.size() - dataList.size(); t++){
sql += ",\"\"";
} }
if (tableColumns.size() > rowData.size()) {
for (int t = 0; t < tableColumns.size() - rowData.size(); t++) {
values += "''";
} }
sql+="),";
} }
if(sql.lastIndexOf(",")>-1){ sqlBuilder.append("INSERT INTO "+tableName+" VALUES (" + values + ");\n");
return sql.substring(0, sql.length() - 1);
} }
return null;
} }
/**da if(sqlBuilder !=null){
* 创建批量插入数据的SQL return sqlBuilder.toString();
* @param tableName 表名
* @param tableColumns 列名
* @param dataList 数据
* @return
*/
public String insertDataSQLBatch(String tableName, List<String> tableColumns, List<List<String>> dataList){
String sql = "INSERT INTO "+tableName+" VALUES ";
if(dataList.size()>0){
for (int i = 0; i < dataList.size(); i++){
List<String> rowdatas = dataList.get(i);
if(rowdatas.size()>0){
sql+= "(";
sql += "\""+ UUID.randomUUID() +"\",";
sql += "\""+ DateUtils.getDateNowString() +"\"";
for (int n = 0; n < rowdatas.size(); n++){
sql += ",\""+rowdatas.get(n)+"\"";
}
if(tableColumns.size() > rowdatas.size()){
for (int t = 0; t < tableColumns.size() - rowdatas.size(); t++){
sql += ",\"\"";
}
}
sql+="),";
} }
} // String sql = "INSERT INTO " + tableName + " VALUES ";
} // if (dataList.size() > 0) {
if(sql.lastIndexOf(",")>-1){ // for (int i = 0; i < dataList.size(); i++) {
return sql.substring(0, sql.length() - 1); // List<String> rowdatas = dataList.get(i);
} // if (rowdatas.size() > 0) {
// sql += "(";
// sql += "\"" + UUID.randomUUID() + "\",";
// sql += "\"" + DateUtils.getDateNowString() + "\"";
// for (int n = 0; n < rowdatas.size(); n++) {
// sql += ",\"" + rowdatas.get(n) + "\"";
// }
// if (tableColumns.size() > rowdatas.size()) {
// for (int t = 0; t < tableColumns.size() - rowdatas.size(); t++) {
// sql += ",\"\"";
// }
// }
// sql += "),";
// }
//
// }
// }
// if (sql.lastIndexOf(",") > -1) {
// return sql.substring(0, sql.length() - 1);
// }
return null; return null;
} }
...@@ -435,8 +448,8 @@ public class ClientHandler<path> implements Runnable { ...@@ -435,8 +448,8 @@ public class ClientHandler<path> implements Runnable {
* @param type * @param type
* @return * @return
*/ */
private static String judgeDataType(String type){ private static String judgeDataType(String type) {
switch (type){ switch (type) {
case TYPE_STRING: case TYPE_STRING:
return "varchar"; return "varchar";
case TYPE_INTEGER: case TYPE_INTEGER:
...@@ -449,10 +462,12 @@ public class ClientHandler<path> implements Runnable { ...@@ -449,10 +462,12 @@ public class ClientHandler<path> implements Runnable {
return "decimal"; return "decimal";
case TYPE_TEXT: case TYPE_TEXT:
return "text"; return "text";
default : default:
return "varchar"; return "varchar";
} }
}; }
;
//新增Excel数据源 //新增Excel数据源
private String upload2Maas(InputStream inputStream, String hostAndPort, String filename) throws IOException { private String upload2Maas(InputStream inputStream, String hostAndPort, String filename) throws IOException {
...@@ -518,7 +533,7 @@ public class ClientHandler<path> implements Runnable { ...@@ -518,7 +533,7 @@ public class ClientHandler<path> implements Runnable {
String sheetsUrl = "http://" + hostAndPort + "/maas/dsm/excel/sheets"; String sheetsUrl = "http://" + hostAndPort + "/maas/dsm/excel/sheets";
Map<String, String> sheetsParams = new HashMap<>(); Map<String, String> sheetsParams = new HashMap<>();
sheetsParams.put("fileName", result); sheetsParams.put("fileName", result);
HttpEntity<Map<String, String>> sheetsRequestEntity = new HttpEntity<>(sheetsParams, getHeader(token,product,appKey,hostAndPort,false)); HttpEntity<Map<String, String>> sheetsRequestEntity = new HttpEntity<>(sheetsParams, getHeader(token, product, appKey, hostAndPort, false));
ResponseEntity<String> sheetsResponseEntity = restTemplate.exchange(sheetsUrl, HttpMethod.POST, sheetsRequestEntity, String.class); ResponseEntity<String> sheetsResponseEntity = restTemplate.exchange(sheetsUrl, HttpMethod.POST, sheetsRequestEntity, String.class);
String sheetsResponseEntityBody = sheetsResponseEntity.getBody(); String sheetsResponseEntityBody = sheetsResponseEntity.getBody();
JSONObject sheetJsonObject = JSONObject.parseObject(sheetsResponseEntityBody); JSONObject sheetJsonObject = JSONObject.parseObject(sheetsResponseEntityBody);
...@@ -606,12 +621,13 @@ public class ClientHandler<path> implements Runnable { ...@@ -606,12 +621,13 @@ public class ClientHandler<path> implements Runnable {
} }
throw new IOException("没有开始分隔符"); throw new IOException("没有开始分隔符");
} }
public static byte[] checkDelimiter(byte[] buf, int byteRead, byte[] delimiter) { public static byte[] checkDelimiter(byte[] buf, int byteRead, byte[] delimiter) {
if (delimiter.length > buf.length) return buf; if (delimiter.length > buf.length) return buf;
for (int i = 0; i <= buf.length - delimiter.length; i ++ ) { for (int i = 0; i <= buf.length - delimiter.length; i++) {
boolean isMatch = true; boolean isMatch = true;
for (int j = 0; j < delimiter.length; j ++ ) { for (int j = 0; j < delimiter.length; j++) {
if (buf[i + j] != delimiter[j]) { if (buf[i + j] != delimiter[j]) {
isMatch = false; isMatch = false;
break; break;
...@@ -722,7 +738,7 @@ public class ClientHandler<path> implements Runnable { ...@@ -722,7 +738,7 @@ public class ClientHandler<path> implements Runnable {
log.info("文件名:" + filePath); log.info("文件名:" + filePath);
String file = string1.substring(startFile, endFile).substring("##STAFILE##".length()); String file = string1.substring(startFile, endFile).substring("##STAFILE##".length());
log.info("文件:" + file); log.info("文件:" + file);
System.out.println("file size " + file.length() ); System.out.println("file size " + file.length());
// FileOutputStream fileOutputStream = new FileOutputStream(new File("D:\\kgdcz\\data\\maas\\test.xlsx")); // FileOutputStream fileOutputStream = new FileOutputStream(new File("D:\\kgdcz\\data\\maas\\test.xlsx"));
// fileOutputStream.write(file.getBytes()); // fileOutputStream.write(file.getBytes());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment