0%

Gatway过滤器收集接口访问记录,插入到RabbitMQ

Gatway过滤器收集接口访问记录,插入到RabbitMQ

2.Gatway过滤器收集接口访问记录,插入到RabbitMQ

  1. RabbitMQ交换机和队列项目启动时初始化配置
  2. Gatway过滤器拦截所有请求,对表单请求和非表单请求分别处理,获取url和body参数。
  3. 将参数组装起来,发送到RabbitMQ。(过滤不记录日志的请求)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.xiaoruiit.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author: hxr
* @Description: 操作日志交换机和队列初始化
* @date: 2021/05/19
*/
@Configuration
public class RabbitConfig {

public final static String EXCH_LOG = "exch_log";

public final static String QUEUE_LOG = "queue_log";

@Autowired
RabbitAdmin rabbitAdmin;

@Bean
public TopicExchange exchLog() {
return new TopicExchange(this.EXCH_LOG);
}

@Bean
public Queue queueLog() {
return new Queue(this.QUEUE_LOG);
}

@Bean
Binding bindingExchangeOrderDicQueue() {
return BindingBuilder.bind(queueLog()).to(exchLog()).with("");
}

//创建初始化RabbitAdmin对象
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
//创建交换机和对列
@Bean
public void createExchangeQueue (){
rabbitAdmin.declareExchange(exchLog());
rabbitAdmin.declareQueue(queueLog());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package com.xiaoruiit.filter;

import cn.hutool.json.JSONUtil;
import com.xiaoruiit.common.constant.AuthConstant;
import com.xiaoruiit.common.domain.UserSignDTO;
import com.xiaoruiit.common.utils.IPUtils;
import com.xiaoruiit.queue.LogHistory;
import com.xiaoruiit.queue.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.cloud.gateway.support.DefaultServerRequest;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.net.InetSocketAddress;
import java.net.URI;
import java.time.LocalDateTime;

import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR;

/**
* @author hxr
* @date 2020/05/11
*/
@Slf4j
@Component
public class GatewayLoggingFilter implements GlobalFilter {

@Autowired
MessageService messageService;

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);

// 获取用户传来的数据类型
MediaType mediaType = exchange.getRequest().getHeaders().getContentType();
ServerRequest serverRequest = new DefaultServerRequest(exchange);
HttpMethod method = exchange.getRequest().getMethod();
// 如果是表单请求
if (method == HttpMethod.POST && MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class)
// .log("modify_request_mono", Level.INFO)
.flatMap(body -> {
assert route != null;
recordLog(exchange.getRequest(), body, route);

return Mono.just(body);
});

return getVoidMono(exchange, chain, String.class, modifiedBody);
}

assert route != null;
recordLog(exchange.getRequest(), "", route);
return chain.filter(exchange.mutate().request(exchange.getRequest()).build());
}

/**
* 参照 ModifyRequestBodyGatewayFilterFactory.java 截取的方法
*/
private Mono<Void> getVoidMono(ServerWebExchange exchange, GatewayFilterChain chain, Class outClass, Mono<?> modifiedBody) {
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, outClass);
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());

// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);


CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(
exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
} else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' on httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}

@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
return chain.filter(exchange.mutate().request(decorator).build());
}));
}

/**
* 组装日志需要的字段信息,并发送到RabbitMQ
*
* @param request request
* @param body 请求的body内容
*/
private void recordLog(ServerHttpRequest request, Object body, Route route) {
InetSocketAddress address = request.getRemoteAddress();

String ip = IPUtils.getIpAddrForGateWay(request);

assert address != null;
// 心跳不记录操作日志 TODO 实现可配置不记录日志
if ("/message/heartbeats".equals(request.getURI().getRawPath())){
return;
}
HttpMethod method = request.getMethod();
HttpHeaders headers = request.getHeaders();
MultiValueMap<String, String> queryParams = request.getQueryParams();
URI uri = request.getURI();

LogHistory history = new LogHistory();
history.setIp(ip);
history.setUrl(uri.toString());
history.setFunc(request.getURI().getRawPath());
history.setModel(route.getUri().getHost());

try {
history.setParams(queryParams.toString().length() > 900 ? "" : queryParams.toString());
if (method == HttpMethod.POST) {
history.setBody(body.toString().length() > 900 ? "" : body.toString());
}
} catch (Exception e) {
e.printStackTrace();
}

try {
String userStr = headers.getFirst(AuthConstant.USER_TOKEN_HEADER);
UserSignDTO userSignDto = JSONUtil.toBean(userStr, UserSignDTO.class);
if (userSignDto != null) {
history.setUserId(userSignDto.getUserId());
}
} catch (Exception e) {
e.printStackTrace();
}

history.setOptTime(LocalDateTime.now());
try {
// 发送到RabbitMQ
messageService.sendTestMessage(history);
} catch (Exception e) {
e.printStackTrace();
}
}


}

发送到RabbitMQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.xiaoruiit.queue;


import com.alibaba.fastjson.JSON;
import com.xiaoruiit.common.constant.RabbitQueueConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @author xhr
*/
@Slf4j
@Component
public class MessageService {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 这个方法被controller调用是多线程执行。
* rabbitTemplate 执行时间是一个递增的过程
*/
public void sendTestMessage(LogHistory history) {
rabbitTemplate.convertAndSend(RabbitQueueConstant.EXCH_LOG, "", JSON.toJSONString(history));
}

}