Hello World

吞风吻雨葬落日 欺山赶海踏雪径

0%

spring retry

Spring retry
https://github.com/spring-projects/spring-retry

前言

假设A,B两个系统,当A->B的调用失败时,我们可以采取何种策略?以下是常见的策略:

  • failfast,即快速失败像上层抛出远程调用异常
  • failover,即A->B失败,选择集群其他机器A->Bn(1…N)
  • failsafe,失败吞异常
  • failback, 过一会再重试,比如网络抖动,等一小会在重试。

failback则是本文所要讨论的核心内容。

maven依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>

框架概览

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class SpringRetryKest {

public static int i = 0;

public static void main(String[] args) {

RetryTemplate template = new RetryTemplate();

RetryCallback<String, RuntimeException> retryCallback = context -> {
i++ ;
if( i < 3){
if(i%2 == 0){
System.out.println("in fuck");
throw new IllegalArgumentException("fuck");
}
System.out.println("in no");
throw new IllegalStateException("no");
}
System.out.println("normal return");
return "hello";
};

// 恢复策略
RecoveryCallback<String> recoveryCallback = context -> {System.out.println("in recovery.."); return "recovery";};
// 设置回避策略
template.setBackOffPolicy(new FixedBackOffPolicy());
// 设置策略
template.setRetryPolicy(new SimpleRetryPolicy(5));
// 设置listener
template.setListeners(new RetryListener[]{new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {

System.out.println("RetryListener-open");
return true;
}

@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("RetryListener-close");
}

@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
System.out.println("RetryListener-onError");
}
}});
// 执行模板
String word = template.execute(retryCallback, recoveryCallback);

System.out.println(word);
}
}

输出

1
2
3
4
5
6
7
8
RetryListener-open
in no
RetryListener-onError
in fuck
RetryListener-onError
normal return
RetryListener-close
hello

RetryTemplate

重试模板,是进入spring-retry框架的整体流程入口

RetryCallback

重试回调,用户包装业务流,第一次执行和产生重试执行都会调用这个callback代码

RetryPolicy

重试策略,不同策略有不同的重试方式

BackOffPolicy

两次重试之间的回避策略,一般采用超时时间机制

RecoveryCallback

当所有重试都失败后,回调该接口,提供给业务重试回复机制

RetryContext

每次重试都会将其作为参数传入RetryCallback中使用

RetryListener

监听重试行为,主要用于监控。

当RetryCallback的调用产生异常的时候,框架首先会通过我们设置的RetryPolicy判断本次异常是否需要重试,如果需要重试,则调用BackOffPolicy,回退一定时间后,在重新调用RetryCallback。如果所有重试都失败了,则退出重试,调用RecoveryCallback退出框架。

重试策略

  • NeverRetryPolicy:只调用RetryCallback一次,不重试;
  • AlwaysRetryPolicy:无限重试,最好不要用
  • SimpleRetryPolicy:重试n次,默认3,也是模板默认的策略。很常用
  • TimeoutRetryPolicy:在n毫秒内不断进行重试,超过这个时间后停止重试
  • CircuitBreakerRetryPolicy:熔断功能的重试,关于熔断,请参考:使用hystrix保护你的应用
  • ExceptionClassifierRetryPolicy: 可以根据不同的异常,执行不同的重试策略,很常用
  • CompositeRetryPolicy:将不同的策略组合起来,有悲观组合和乐观组合。悲观默认重试,有不重试的策略则不重试。乐观默认不重试,有需要重试的策略则重试。

以上策略的实现方式

  • NeverRetryPolicy:判断是否重试的时候,直接返回false
  • AlwaysRetryPolicy:判断是否重试的时候,直接返回true
  • SimpleRetryPolicy:通过一个计数n,每次重试自增
  • TimeoutRetryPolicy:保存第一次重试时间,每次进行重试判断 当前毫秒时间-第一次重试时间 > 设置的时间间隔
  • CircuitBreakerRetryPolicy:与4类似
  • ExceptionClassifierRetryPolicy:采用一个Map实现,每次异常的时候,拿到对应重试策略,在重试即可
  • CompositeRetryPolicy:使用数据依次保存策略,执行的时候,顺序执行即可

回避策略

  • NoBackOffPolicy:不回避
  • FixedBackOffPolicy:n毫秒退避后再进行重试
  • UniformRandomBackOffPolicy:随机选择一个[n,m](如20ms,40ms)回避时间回避后,然后在重试
  • ExponentialBackOffPolicy:指数退避策略,休眠时间指数递增
  • ExponentialRandomBackOffPolicy:随机指数退避,指数中乘积会混入随机值

以上有两点需要注意:

  • 如何执行回避?一般使用ThreadWaitSleeper,即当前线程直接sleep一段时间。
  • 凡是带有随机性的策略,大多都是为了避免惊群效应,防止相同时间执行大量操作。

以上策略的实现方式

  • NoBackOffPolicy:直接返回即可
  • FixedBackOffPolicy`: 直接通过Sleeper设置n秒即可
  • UniformRandomBackOffPolicy: FixedBackOffPolicy + Random()
  • ExponentialBackOffPolicy:T = initial; T = T + T * multiplier
  • ExponentialRandomBackOffPolicy:T = initial; T = (T + T multiplier) (1 + randomFloat() * (multiplier - 1))

监听器

监听器接口如下

1
2
3
4
5
6
  // 第一次尝试之前会执行的方法
<T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback);
// 最后一次尝试之后会调用的方法
<T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);
// 每次尝试失败之后调用的方法
<T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);

参考例子,可以进行重试判断open,和重试的结果记录onError,close.

有状态和无状态重试

### 无状态尝试
无状态重试,是在一个循环中执行完重试策略,即重试上下文保持在一个线程上下文中,在一次调用中进行完整的重试策略判断
非常简单的情况,如远程调用某个查询方法时是最常见的无状态重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
RetryTemplate template = new RetryTemplate();
//重试策略:次数重试策略
RetryPolicy retryPolicy = new SimpleRetryPolicy(3);
template.setRetryPolicy(retryPolicy);
//退避策略:指数退避策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(100);
backOffPolicy.setMaxInterval(3000);
backOffPolicy.setMultiplier(2);
backOffPolicy.setSleeper(new ThreadWaitSleeper());
template.setBackOffPolicy(backOffPolicy);

//当重试失败后,抛出异常
String result = template.execute(new RetryCallback<String, RuntimeException>() {
@Override
public String doWithRetry(RetryContext context) throws RuntimeException {
throw new RuntimeException("timeout");
}
});
//当重试失败后,执行RecoveryCallback
String result = template.execute(new RetryCallback<String, RuntimeException>() {
@Override
public String doWithRetry(RetryContext context) throws RuntimeException {
System.out.println("retry count:" + context.getRetryCount());
throw new RuntimeException("timeout");
}
}, new RecoveryCallback<String>() {
@Override
public String recover(RetryContext context) throws Exception {
return "default";
}
});

有状态重试

有状态重试,有两种情况需要使用有状态重试:事务操作需要回滚或者熔断器模式
事务操作需要回滚场景时,当整个操作中抛出的是数据库异常DataAccessException,则不能进行重试需要回滚,而抛出其他异常则可以进行重试,可以通过RetryState实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//当前状态的名称,当把状态放入缓存时,通过该key查询获取
Object key = "mykey";
//是否每次都重新生成上下文还是从缓存中查询,即全局模式(如熔断器策略时从缓存中查询)
boolean isForceRefresh = true;
//对DataAccessException进行回滚
BinaryExceptionClassifier rollbackClassifier =
new BinaryExceptionClassifier(Collections.<Class<? extends Throwable>>singleton(DataAccessException.class));
RetryState state = new DefaultRetryState(key, isForceRefresh, rollbackClassifier);

String result = template.execute(new RetryCallback<String, RuntimeException>() {
@Override
public String doWithRetry(RetryContext context) throws RuntimeException {
System.out.println("retry count:" + context.getRetryCount());
throw new TypeMismatchDataAccessException("");
}
}, new RecoveryCallback<String>() {
@Override
public String recover(RetryContext context) throws Exception {
return "default";
}
}, state);

RetryTemplate中在有状态重试时,回滚场景时直接抛出异常处理代码:

1
2
3
4
5
//state != null && state.rollbackFor(context.getLastThrowable())
//在有状态重试时,如果是需要执行回滚操作的异常,则立即抛出异常
if (shouldRethrow(retryPolicy,context, state)) {
throw RetryTemplate.<E>wrapIfNecessary(e);
}

熔断器场景。在有状态重试时,且是全局模式,不在当前循环中处理重试,而是全局重试模式(不是线程上下文),如熔断器策略时测试代码如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
RetryTemplate template = new RetryTemplate();
CircuitBreakerRetryPolicy retryPolicy =
new CircuitBreakerRetryPolicy(new SimpleRetryPolicy(3));
retryPolicy.setOpenTimeout(5000);
retryPolicy.setResetTimeout(20000);
template.setRetryPolicy(retryPolicy);

for (int i = 0; i < 10; i++) {
try {
Object key = "circuit";
boolean isForceRefresh = false;
RetryState state = new DefaultRetryState(key, isForceRefresh);
String result = template.execute(new RetryCallback<String, RuntimeException>() {
@Override
public String doWithRetry(RetryContext context) throws RuntimeException {
System.out.println("retry count:" + context.getRetryCount());
throw new RuntimeException("timeout");
}
}, new RecoveryCallback<String>() {
@Override
public String recover(RetryContext context) throws Exception {
return "default";
}
}, state);
System.out.println(result);
} catch (Exception e) {
System.out.println(e);
}
}

为什么说是全局模式呢?我们配置了isForceRefresh为false,则在获取上下文时是根据key “circuit”从缓存中获取,从而拿到同一个上下文。

1
2
3
4
5
6
7
8
9
Object key = "circuit";
boolean isForceRefresh = false;
RetryState state = new DefaultRetryState(key,isForceRefresh);

如下RetryTemplate代码说明在有状态模式下,不会在循环中进行重试。
if (state != null && context.hasAttribute(GLOBAL_STATE)) {
break;
}

熔断器策略配置代码,CircuitBreakerRetryPolicy需要配置三个参数:

  • delegate:是真正判断是否重试的策略,当重试失败时,则执行熔断策略;
  • openTimeout:openWindow,配置熔断器电路打开的超时时间,当超过openTimeout之后熔断器电路变成半打开状态(主要有一次重试成功,则闭合电路); resetTimeout:timeout,配置重置熔断器重新闭合的超时时间。
    判断熔断器电路是否打开的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean isOpen() {
long time = System.currentTimeMillis() - this.start;
boolean retryable = this.policy.canRetry(this.context);
if (!retryable) {//重试失败
//在重置熔断器超时后,熔断器器电路闭合,重置上下文
if (time > this.timeout) {
this.context = createDelegateContext(policy, getParent());
this.start = System.currentTimeMillis();
retryable = this.policy.canRetry(this.context);
} else if (time < this.openWindow) {
//当在熔断器打开状态时,熔断器电路打开,立即熔断
if ((Boolean) getAttribute(CIRCUIT_OPEN) == false) {
setAttribute(CIRCUIT_OPEN, true);
}
this.start = System.currentTimeMillis();
return true;
}
} else {//重试成功
//在熔断器电路半打开状态时,断路器电路闭合,重置上下文
if (time > this.openWindow) {
this.start = System.currentTimeMillis();
this.context = createDelegateContext(policy, getParent());
}
}
setAttribute(CIRCUIT_OPEN, !retryable);
return !retryable;
}

从如上代码可看出spring-retry的熔断策略相对简单:

  • 当重试失败,且在熔断器打开时间窗口[0,openWindow) 内,立即熔断;
  • 当重试失败,且在指定超时时间后(>timeout),熔断器电路重新闭合;
  • 在熔断器半打开状态[openWindow, timeout] 时,只要重试成功则重置上下文,断路器闭合。

参考

spring-retry重试与熔断详解—《亿级流量》内容补充
https://blog.csdn.net/broadview2006/article/details/72841056

重试框架Spring retry实践
https://blog.csdn.net/u011116672/article/details/77823867

利用Spring-Retry定制化你的RPC重试
http://kriszhang.com/spring-retry/

spring-retry
https://github.com/spring-projects/spring-retry

注解的使用

spring-retry注解方式使用
https://blog.csdn.net/hulei19900322/article/details/78153310?reload

https://blog.csdn.net/clj198606061111/article/details/77256033