JoyLau's Blog

JoyLau 的技术学习与思考

背景

有时我们想在同一个流程实例中查看流程从上到下的流转顺序, 效果像这样

Camunda-1

这时会到 ACT_HI_ACTINST 表里去查流程实例的节点信息

但是我们根据流程是 ID 去查数据的时候发现, 并没有很好的根据时间顺序进行排列,就比如上面的数据,在数据库反应的情况如下:

Camunda-2

可以看到 发起人和抄送人是几乎同时进行的,因为发起人发起后,第一个是抄送人任务, sendTask 的操作很快,创建时间 START_TIME 都是 2023-07-20 09:11:21
于是同一流程实例根据创建时间 START_TIME 排序就会出现问题,上面的数据就是抄送人跑到发起人前面去了

于是想着用没用其他的方法,来根据其他字段进行排序

解决方案

被我找到一个方案, 就是表里的主键 UUID

Camunda 默认主键策略是 UUID 这个配置可以在配置文件中修改

1
2
3
camunda:
bpm:
id-generator: strong

可以修改为 simplestrongprefixed, 默认是 strong

strong 策略的实现是 StrongUuidGenerator, 源码位置在 org.camunda.bpm.engine.impl.persistence.StrongUuidGenerator

其中生成 ID 的 getNextId 方法是使用的 fasterxmlTimeBasedGenerator, 源码位置在 com.fasterxml.uuid.impl.TimeBasedGenerator

从名字可以看出是基于时间生成的,实际上生成的是 UUID 的 version 1 版本

于是就是可以根据时间来排序了

具体操作

继续翻源码, 我们来到 java-uuid-generator-3.2.0.jar 这个依赖包, 其中 2 个类引起了我的注意
UUIDUtilUUIDComparator

那么排序就很简单了, 如下代码就可轻松排序:

1
2
3
4
5
6
7
8
9
public class UUIDTimeOrderTest {
public static void main(String[] args) {
String[] demos = ("c0767f45-0367-11ee-9698-8edff34f00ff,c07aec28-0367-11ee-9698-8edff34f00ff,c167211a-0367-11ee-9698-8edff34f00ff,c1687fb5-0367-11ee-9698-8edff34f00ff,c169b840-0367-11ee-9698-8edff34f00ff,c16b8d0b-0367-11ee-9698-8edff34f00ff").split(",");
Arrays.stream(demos)
.sorted((s1, s2) -> UUIDComparator.staticCompare(UUIDUtil.uuid(s1), UUIDUtil.uuid(s2)))
.forEach(s -> System.out.println(s));
}

}

背景

特斯拉开了将近 2 年了, 没有进行任何保养过, 最近想去保养下, 发下没有什么值得保养的

就我目前收集的信息, 目前可以保养的有

  1. 空调滤芯, 官方建议每年更换一次
  2. 空调管道支架,很早之前有发召回的公告,需要加装支架
  3. 前支臂,原地方向盘打满的时候, 前轮会发出哒哒哒的异响,不是刹车片摩擦的声音,我查询了下,应该通病

其实前 2 个问题, 并没有什么安全隐患,也并不是强制性的, 然后我就看了下空调滤芯的事情, 发现如果到官方的售后去更换的话, 材料费加上人工工时费,大概在 340
我在京东买个 2 个前置的曼牌空调滤芯, 打算自己动手更换

准备工具

  1. T20 螺丝刀
  2. 撬棒,家里没有,我拿了个金属勺子代替

步骤

  1. 将副驾驶的座位移到最后面
  2. 拿掉副驾驶脚垫
  3. 使用 T20 螺丝刀拧下副驾驶下方右侧的螺丝, 左侧的没有螺丝, 是个塑料的膨胀螺丝, 拿东西撬一下就行
  4. 沿着手套箱下面的边缝撬起,去下下方的塑料壳盖板,注意有 2 根排线相连,取得时候用力不能过大,否则会扯断排线
  5. 我这里为了顺利拆掉侧方的挡板, 我拔掉了喇叭的排线,这样能够把整个塑料壳放到傍边,也可以把氛围灯的排线一并拆掉,这样可以整体拿下来
  6. 使用撬棒沿着侧边挡板的缝隙慢慢撬起, 随后用手沿着挡板把整个挡板拽下来,要用点力气, 这里注意拔下来的挡板白色卡扣,有时候不会被一起带出来,需要手动去下来,再装回卡勾上, 不然还原装回去的时候容易掉下去,导致车里异响

挡板

  1. 拆下挡板后可以看到最里面的滤芯槽盖板, 上方有一个 T20 的螺丝, 拆下他, 可以去除盖板

滤芯槽盖板
滤芯槽盖板2

  1. 取出 2 片空调滤芯,再放入新买的

新旧对比1
新旧对比2

对比了下新旧, 发现我之前旧的滤芯并不是很脏,但是有点潮湿了,还有异味

  1. 还原

我的 Tesla

Spring Cloud Gateway 配合 Sentinel 实现限流
在 sentinel dashboard 配置贵州后,重启服务会失效

本篇介绍如何持久化

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-sentinel-gateway</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>

配置服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
spring:
cloud:
sentinel:
filter:
enabled: false
scg:
# 请求被 ban 的响应
fallback:
response-status: 429
response-body: 操作频繁,您已被限流,请稍后重试
mode: "response"
content-type: "text/plain; charset=utf-8"
datasource:
# 接口分组配置
gw-api-group:
nacos:
server-addr: ${nacos.server-addr}
username: ${spring.cloud.nacos.username}
password: ${spring.cloud.nacos.password}
namespace: sentinel
group-id: DEFAULT_GROUP
data-id: gw-api-group-rules
rule-type: gw-api-group
# 接口限流配置
gw-rule:
nacos:
server-addr: ${nacos.server-addr}
username: ${spring.cloud.nacos.username}
password: ${spring.cloud.nacos.password}
namespace: sentinel
group-id: DEFAULT_GROUP
data-id: gw_flow-rules
rule-type: gw_flow
eager: true

创建配置文件

在 Nacos 上新建命名空间 sentinel,
再分别新建配置文件 gw-api-group-rulesgw_flow-rules

分别配置如下:

gw-api-group-rules:

1
2
3
4
5
6
7
8
9
10
11
[
{
"apiName": "cipher_group",
"predicateItems": [
{
"pattern": "/im-wildfirechat/support/cipher",
"matchStrategy": 0
}
]
}
]

gw_flow-rules:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[
{
"resource": "cipher_group",
"resourceMode": 1,
"grade": 1,
"count": 2,
"intervalSec": 60,
"controlBehavior": 0,
"burst": 0,
"maxQueueingTimeoutMs": 500,
"paramItem": {
"parseStrategy": 1,
"fieldName": null,
"pattern": null,
"matchStrategy": 0
}
}
]

保存

配置文件的配置项参考官方文档: https://sentinelguard.io/zh-cn/docs/api-gateway-flow-control.html

或者参考 代码类 com.alibaba.csp.sentinel.adapter.gateway.common.rule.GatewayFlowRulecom.alibaba.csp.sentinel.adapter.gateway.common.api.ApiDefinition

具有源码可查看 com.alibaba.cloud.sentinel.datasource.RuleType

验证

启动服务

  1. 查看 sentinel dashboard 的配置
  2. 调用接口,看是否返回被 ban 信息

api-group
aip=rule

ssh 配置开启隧道的 tcp 转发

有时我们开启 本地端口转发或者远程端口转发想在内网的其他服务器上使用该端口,修改修改 /etc/ssh/sshd_config 配置文件来开启转发

  1. 打开配置项 AllowTcpForwarding yes
  2. 打开配置项 GatewayPorts yes
  3. 如果长时间保持连接,那么还需要开启 TCPKeepAlive yes

配置场景:
有时我们有多个前缀需要反向代理到同一个后端服务,
比如 /s1, /s2, /s3-xxx, 都代理到同一个后端服务
最普通的写法可以写多个 location 来匹配, 这里介绍使用一个 location 完成匹配

1
2
3
4
5
6
7
8
9
10
11
12
location ~* ^/(s1|s2|s3-*)/ {
proxy_pass http://GATEWAY;

proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Forwarded-Host $host;
}

上面还配置了支持升级http协议支持了https

注意

上篇中,我们获取到了请求数据报文,这篇继续获取响应报文并发往持久化存储
这里获取响应报文需要排除掉文件下载的情况

使用

  1. 新建类 ModifyResponseBodyGatewayFilterFactoryCopy
    该类照抄子 spring 源码 ModifyResponseBodyGatewayFilterFactory 添加了判断,当请求返回的头信息非 json 响应时, 将不再解析报文
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
/**
* 根据 spring 源码优化
* 优化了文件下载请求解析报文带来的内存使用
* 添加了如下代码判断,当请求返回的头信息非 json 响应时, 将不再解析报文
* public Mono<Void> writeWith(Publisher<? extends DataBuffer> body)
* if (!Utils.isJsonResponse(exchange)) {
* config.getRewriteFunction().apply(exchange, null);
* return getDelegate().writeWith(body);
* }
*
*/
public class ModifyResponseBodyGatewayFilterFactoryCopy extends AbstractGatewayFilterFactory<ModifyResponseBodyGatewayFilterFactoryCopy.Config> {

private final Map<String, MessageBodyDecoder> messageBodyDecoders;

private final Map<String, MessageBodyEncoder> messageBodyEncoders;

private final List<HttpMessageReader<?>> messageReaders;

public ModifyResponseBodyGatewayFilterFactoryCopy(List<HttpMessageReader<?>> messageReaders,
Set<MessageBodyDecoder> messageBodyDecoders, Set<MessageBodyEncoder> messageBodyEncoders) {
super(Config.class);
this.messageReaders = messageReaders;
this.messageBodyDecoders = messageBodyDecoders.stream()
.collect(Collectors.toMap(MessageBodyDecoder::encodingType, identity()));
this.messageBodyEncoders = messageBodyEncoders.stream()
.collect(Collectors.toMap(MessageBodyEncoder::encodingType, identity()));
}

@Override
public GatewayFilter apply(Config config) {
ModifyResponseGatewayFilter gatewayFilter = new ModifyResponseGatewayFilter(config);
gatewayFilter.setFactory(this);
return gatewayFilter;
}

public static class Config {

private Class inClass;

private Class outClass;

private Map<String, Object> inHints;

private Map<String, Object> outHints;

private String newContentType;

private RewriteFunction rewriteFunction;

public Class getInClass() {
return inClass;
}

public Config setInClass(Class inClass) {
this.inClass = inClass;
return this;
}

public Class getOutClass() {
return outClass;
}

public Config setOutClass(Class outClass) {
this.outClass = outClass;
return this;
}

public Map<String, Object> getInHints() {
return inHints;
}

public Config setInHints(Map<String, Object> inHints) {
this.inHints = inHints;
return this;
}

public Map<String, Object> getOutHints() {
return outHints;
}

public Config setOutHints(Map<String, Object> outHints) {
this.outHints = outHints;
return this;
}

public String getNewContentType() {
return newContentType;
}

public Config setNewContentType(String newContentType) {
this.newContentType = newContentType;
return this;
}

public RewriteFunction getRewriteFunction() {
return rewriteFunction;
}

public Config setRewriteFunction(RewriteFunction rewriteFunction) {
this.rewriteFunction = rewriteFunction;
return this;
}

public <T, R> Config setRewriteFunction(Class<T> inClass, Class<R> outClass,
RewriteFunction<T, R> rewriteFunction) {
setInClass(inClass);
setOutClass(outClass);
setRewriteFunction(rewriteFunction);
return this;
}

}

public class ModifyResponseGatewayFilter implements GatewayFilter, Ordered {

private final Config config;

private GatewayFilterFactory<Config> gatewayFilterFactory;

public ModifyResponseGatewayFilter(Config config) {
this.config = config;
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange.mutate().response(new ModifiedServerHttpResponse(exchange, config)).build());
}

@Override
public int getOrder() {
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
}

@Override
public String toString() {
Object obj = (this.gatewayFilterFactory != null) ? this.gatewayFilterFactory : this;
return filterToStringCreator(obj).append("New content type", config.getNewContentType())
.append("In class", config.getInClass()).append("Out class", config.getOutClass()).toString();
}

public void setFactory(GatewayFilterFactory<Config> gatewayFilterFactory) {
this.gatewayFilterFactory = gatewayFilterFactory;
}

}

protected class ModifiedServerHttpResponse extends ServerHttpResponseDecorator {

private final ServerWebExchange exchange;

private final Config config;

public ModifiedServerHttpResponse(ServerWebExchange exchange, Config config) {
super(exchange.getResponse());
this.exchange = exchange;
this.config = config;
}

@SuppressWarnings("unchecked")
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (!Utils.isJsonResponse(exchange)) {
config.getRewriteFunction().apply(exchange, null);
return getDelegate().writeWith(body);
}

Class inClass = config.getInClass();
Class outClass = config.getOutClass();

String originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);

HttpHeaders httpHeaders = new HttpHeaders();
// explicitly add it in this way instead of
// 'httpHeaders.setContentType(originalResponseContentType)'
// this will prevent exception in case of using non-standard media
// types like "Content-Type: image"
httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType);

ClientResponse clientResponse = prepareClientResponse(body, httpHeaders);

// TODO: flux or mono
Mono modifiedBody = extractBody(exchange, clientResponse, inClass)
.flatMap(originalBody -> config.getRewriteFunction().apply(exchange, originalBody))
.switchIfEmpty(Mono.defer(() -> (Mono) config.getRewriteFunction().apply(exchange, null)));

BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, outClass);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange,
exchange.getResponse().getHeaders());
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
Mono<DataBuffer> messageBody = writeBody(getDelegate(), outputMessage, outClass);
HttpHeaders headers = getDelegate().getHeaders();
if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
|| headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
}
// TODO: fail if isStreamingMediaType?
return getDelegate().writeWith(messageBody);
}));
}

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMapSequential(p -> p));
}

private ClientResponse prepareClientResponse(Publisher<? extends DataBuffer> body, HttpHeaders httpHeaders) {
ClientResponse.Builder builder;
builder = ClientResponse.create(exchange.getResponse().getStatusCode(), messageReaders);
return builder.headers(headers -> headers.putAll(httpHeaders)).body(Flux.from(body)).build();
}

private <T> Mono<T> extractBody(ServerWebExchange exchange, ClientResponse clientResponse, Class<T> inClass) {
// if inClass is byte[] then just return body, otherwise check if
// decoding required
if (byte[].class.isAssignableFrom(inClass)) {
return clientResponse.bodyToMono(inClass);
}

List<String> encodingHeaders = exchange.getResponse().getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING);
for (String encoding : encodingHeaders) {
MessageBodyDecoder decoder = messageBodyDecoders.get(encoding);
if (decoder != null) {
return clientResponse.bodyToMono(byte[].class).publishOn(Schedulers.parallel()).map(decoder::decode)
.map(bytes -> exchange.getResponse().bufferFactory().wrap(bytes))
.map(buffer -> prepareClientResponse(Mono.just(buffer),
exchange.getResponse().getHeaders()))
.flatMap(response -> response.bodyToMono(inClass));
}
}

return clientResponse.bodyToMono(inClass);
}

private Mono<DataBuffer> writeBody(ServerHttpResponse httpResponse, CachedBodyOutputMessage message,
Class<?> outClass) {
Mono<DataBuffer> response = DataBufferUtils.join(message.getBody());
if (byte[].class.isAssignableFrom(outClass)) {
return response;
}

List<String> encodingHeaders = httpResponse.getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING);
for (String encoding : encodingHeaders) {
MessageBodyEncoder encoder = messageBodyEncoders.get(encoding);
if (encoder != null) {
DataBufferFactory dataBufferFactory = httpResponse.bufferFactory();
response = response.publishOn(Schedulers.parallel()).map(buffer -> {
byte[] encodedResponse = encoder.encode(buffer);
DataBufferUtils.release(buffer);
return encodedResponse;
}).map(dataBufferFactory::wrap);
break;
}
}

return response;
}

}

}
  1. 新建自动导入类
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class ModifyResponseBodyGatewayFilterAutoConfiguration {

@Bean
@ConditionalOnEnabledFilter
public ModifyResponseBodyGatewayFilterFactoryCopy modifyResponseBodyGatewayFilterFactoryCopy(
ServerCodecConfigurer codecConfigurer, Set<MessageBodyDecoder> bodyDecoders,
Set<MessageBodyEncoder> bodyEncoders) {
return new ModifyResponseBodyGatewayFilterFactoryCopy(codecConfigurer.getReaders(), bodyDecoders, bodyEncoders);
}
}
  1. 新建响应日志全局拦截类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@Component
@AllArgsConstructor
public class ResponseLogFilter implements GlobalFilter, Ordered {
private final ModifyResponseBodyGatewayFilterFactoryCopy modifyResponseBodyGatewayFilterFactoryCopy;

private final AsyncResponseHandler asyncResponseHandler;

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ModifyResponseBodyGatewayFilterFactoryCopy.Config config = new ModifyResponseBodyGatewayFilterFactoryCopy.Config()
.setRewriteFunction(byte[].class, byte[].class, (e, bytes) -> {
asyncResponseHandler.handle(e, bytes);
return Mono.justOrEmpty(bytes);
});
return modifyResponseBodyGatewayFilterFactoryCopy.apply(config).filter(exchange, chain);
}

@Override
public int getOrder() {
return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 98;
}
}

  1. 新建日志记录处理类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Slf4j
@Component
@AllArgsConstructor
public class AsyncResponseHandler {

private final Jackson2HashMapper jackson2HashMapper;

private final RedisTemplate<String, AbstractLog> logRedisTemplate;

private final ObjectMapper objectMapper;

@Async("logTaskPool")
@SuppressWarnings("unchecked")
public void handle(ServerWebExchange exchange, byte[] bytes) {
ServerHttpResponse response = exchange.getResponse();
try {
Future<AccessLog> future =
(Future<AccessLog>) exchange.getAttributes().get(Constant.ACCESS_LOG_REQUEST_FUTURE_ATTR);
AccessLog accessLog = future.get(10, TimeUnit.SECONDS);
accessLog.setResponseHeaders(response.getHeaders().toSingleValueMap().toString());
try {
String aud = JwtUtils.verifyTokenSubject(accessLog.getToken());
accessLog.setOriginType(OriginType.getByValue(aud));
} catch (Exception e) {
accessLog.setOriginType(OriginType.OTHER);
}
accessLog.setTakenTime(System.currentTimeMillis() - accessLog.getTakenTime());
accessLog.setHttpCode(response.getRawStatusCode());
if (!Utils.isJsonResponse(exchange)) {
accessLog.setResponse("非 json 报文");
}
if (Utils.isDownloadResponse(exchange)) {
accessLog.setResponse("二进制文件");
}
Optional.ofNullable(bytes).ifPresent(bs -> {
if (bytes.length <= DataSize.ofKilobytes(256).toBytes()) {
// 小于指定大小报文进行转化(考虑到文件下载的响应报文)
accessLog.setResponse(new String(bytes, StandardCharsets.UTF_8));
} else {
accessLog.setResponse("报文过长");
}
});
logRedisTemplate.opsForStream().add(LogConstant.ACCESS_LOG_KEY_NAME, jackson2HashMapper.toHash(accessLog));
// 进行修剪,限制其最大长度, 防止内存过高
logRedisTemplate.opsForStream().trim(LogConstant.ACCESS_LOG_KEY_NAME, LogConstant.ACCESS_LOG_MAX_LENGTH,
true);
if (log.isDebugEnabled()) {
String logger = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(accessLog);
log.debug("log: \n{}", logger);
}
} catch (Exception e) {
log.warn("access log save error: ", e);
}
}
}
0%