0%

获取RabbitMQ数据,并插入到mysql

获取RabbitMQ数据,并插入到mysql

获取RabbitMQ数据,并插入到mysql

  1. 从RabbitMQ中取数据
  2. 获取用户信息,分为登录接口与非登录接口
    1. 登录接口
      1. 对账号解密,通过远程接口查出userId、userCode、userName。
    2. 非登录接口
      1. 通过userId去Redis中查询userCode和userName
      2. redis中没有,调用远程接口查询并将结果插入到redis
  3. 将 模块+接口 作为key获取redis中key对应的swagger注释
  4. 获取其他参数,设置主键值。
  5. 每10个插入到mysql库中。(缓解插入压力)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package com.xiaoruiit.logmanager.queue;

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.xiaoruiit.common.constant.RabbitQueueConstant;
import com.xiaoruiit.common.service.RedisService;
import com.xiaoruiit.common.utils.*;
import com.xiaoruiit.logmanager.constant.LogMgrIdConstants;
import com.xiaoruiit.logmanager.dao.MyLogHistoryMapper;
import com.xiaoruiit.logmanager.entity.auto.LogmgrLogHistory;
import com.xiaoruiit.logmanager.entity.vo.UserVO;
import com.xiaoruiit.logmanager.service.MdmService;
import lombok.extern.slf4j.Slf4j;
import org.bouncycastle.crypto.InvalidCipherTextException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


/**
* @author hxr
*/
@Slf4j
@Component
public class QueueMessageListener {
private static final String LOG_USER_INFO_HASH_KEY = "logmgr:user_info";
/**
* 合并插入值
*/
private final static int INSERT_CNT = 10;
/**
* 合并插入超时
*/
private final static int INSERT_TIME_OUT = 5000;
/**
* 合并插入集合
*/
private static final List<LogmgrLogHistory> LIST = new ArrayList<>();
private long lastInsert;
private final Lock lock = new ReentrantLock(); //可重入的互斥锁

@Autowired
private MyLogHistoryMapper myHistoryMapper;
@Autowired
private RedisService redisService;
@Autowired
private MdmService userService;

/**
* 监听器单线程执行
*/
@RabbitListener(queues = RabbitQueueConstant.QUEUE_LOG)
public void logListener(String msg) {
lock.lock();
try {
// 优化点:批量插入
LogmgrLogHistory history = JSON.parseObject(msg, LogmgrLogHistory.class);
if (StrUtil.isEmpty(history.getModel()) || history.getUserId() == null || StrUtil.isEmpty(history.getUserId() + "")) {
if (!("/oauth/token".equals(history.getFunc()))){
return;
}

// TODO 登录日志入库 登录时获取不到userId单独处理
String url = history.getUrl();
Map<String, String> params = URLUtils.getParamsByUrl(url);
signInParameterHandle(params);
String userPhone = params.get("username");
// 通过手机号获取用户信息 用户id,code,名称
Result<UserVO> userByMobile = userService.getUserByMobile(userPhone);
if (userByMobile.getData()!= null){
history.setUserId(userByMobile.getData().getUserId());
history.setUserCode(userByMobile.getData().getUserCode());
history.setUserName(userByMobile.getData().getUserName());
}
} else {
setUserInfo(history);
}
if (StrUtil.isNotEmpty(history.getModel()) && StrUtil.isNotEmpty(history.getFunc())) {
Object o = redisService.hGet(Constant.SWAGGER_HASH_KEY, history.getModel() + history.getFunc());
if (o == null) {
o = redisService.hGet(Constant.SWAGGER_HASH_KEY, history.getModel() + deleteLastSlash(history.getFunc()));
}
history.setDescription(o == null ? "" : o.toString());
}
if (history.getUrl() != null && history.getUrl().length() > 800) {
history.setUrl(history.getUrl().substring(0, 790));
}
if (history.getFunc() != null && history.getFunc().length() > 250) {
history.setFunc(history.getFunc().substring(0, 250));
}
if (StrUtil.isNotEmpty(history.getBody()) && history.getBody().length() >= 4000) {
history.setBody(history.getBody().substring(0, 4000));
}
history.setLogHisId(IdWorker.nextId(LogMgrIdConstants.LOG_HIS_ID_PREFIX));
LIST.add(history);
if (LIST.size() >= INSERT_CNT) {
insertAndReset();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

private void setUserInfo(LogmgrLogHistory history) {
try {
Object o = redisService.hGet(LOG_USER_INFO_HASH_KEY, history.getUserId() + "");
if (o != null) {
Map<String, Object> map = (Map<String, Object>) o;
history.setUserName(map.get("userName") + "");
history.setUserCode(map.get("userCode") + "");
return;
}
Result result = userService.getUserInfoById(history.getUserId());
Map<String, Object> map = (Map<String, Object>) result.getData();
if (map != null) {
history.setUserName(map.get("userName") + "");
history.setUserCode(map.get("userCode") + "");
}
redisService.hSet(LOG_USER_INFO_HASH_KEY, history.getUserId() + "", map);
} catch (Exception e) {
e.printStackTrace();
}
}

private static String deleteLastSlash(String src) {
int i = src.lastIndexOf("/");
if (i != -1) {
return src.substring(0, i);
}
return src;
}

/**
* 监听不满足合并要求的数据
*/
@Scheduled(fixedDelay = 10000)
public void listListener() {
if (lock.tryLock()) {
try {
if (!LIST.isEmpty() && System.currentTimeMillis() - lastInsert > INSERT_TIME_OUT) {
insertAndReset();
}
} finally {
lock.unlock();
}
}
}

private void insertAndReset() {
lock.lock();
try {
log.info("myHistoryMapper.insertList(LIST) size = {}", LIST.size());
myHistoryMapper.insertList(LIST);
LIST.clear();
lastInsert = System.currentTimeMillis();
} catch (Exception e) {
LIST.forEach(lh -> lh.setLogHisId(IdWorker.nextId(LogMgrIdConstants.LOG_HIS_ID_PREFIX)));
e.printStackTrace();
} finally {
lock.unlock();
}
}

// 解密
public void signInParameterHandle(Map<String, String> parameters) throws InvalidCipherTextException {
String usernamePlaintext;

String username = parameters.get("username");
String signInType = parameters.get("signInType");
if ("1".equals(signInType)) {
String sm2PublicKeyHex = parameters.get("公钥字符串");

String sm2PrivateKeyHex = (String) redisService.get("私钥redis中key" + sm2PublicKeyHex);
// 还原私钥 私钥16进制格式
BigInteger privateKeyD = new BigInteger(sm2PrivateKeyHex, 16);

usernamePlaintext = Sm2Utils.sm2PrivateKeyDecrypt(privateKeyD, username);
parameters.put("username", usernamePlaintext);
}
}


}