Hello World

吞风吻雨葬落日 欺山赶海踏雪径

0%

Spring 应用国内外多环境中间件接入实践

一份代码 aliyun 和 aws 部署的中间件接入实践。

背景

国内使用 aliyun 中间件的应用现在需要部署到 aws ,所有很多中间件需要做平替。并且因为是一份代码,需要支持国内外境部署。

核心思路

使用 @ConditionalOnProperty 等条件控制对接接口实现的实例生成来控制最终调用的是哪一个实现。

实施

涉及到的中间件包含

  • redis
  • oss
  • tablestore
  • starrocks

redis

aliyun 的redis 是单实例模式,但是因为在 aws 上我们是自己部署的集群模式。所以这里是需要做兼容的。

RedisStandaloneConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@Configuration
@Slf4j
@ConditionalOnProperty(prefix = "redis", name = "type", havingValue = "single", matchIfMissing = true)
public class RedisStandaloneConfig {

@Bean(name = "jedisPoolConfig")
@ConfigurationProperties(prefix = "redis.pool")
public JedisPoolConfig jedisPoolConfig() {
JedisPoolConfig config = new JedisPoolConfig();
//需要根据实际业务来调整
config.setMaxTotal(8);
config.setMaxIdle(8);
config.setMinIdle(2);
//当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)
config.setMaxWait(Duration.ofMillis(6000L));
return config;
}

@Bean("jedisConnectionFactory")
public JedisConnectionFactory jedisConnectionFactory(@Qualifier("jedisPoolConfig") JedisPoolConfig jedisPoolConfig) {
JedisConnectionFactory factory = new JedisConnectionFactory();
factory.setPoolConfig(jedisPoolConfig);
if (StringUtils.hasText(password)) {
String redisPassword = password;
if (StringUtils.hasText(username)) {
redisPassword = username + ":" + password;
}
factory.setPassword(redisPassword);
}
if (database != 0) {
factory.setDatabase(database);
}
factory.setHostName(host);
factory.setPort(port);

return factory;
}

@Bean
public JedisPool jedisPool(@Qualifier("jedisConnectionFactory") JedisConnectionFactory jedisConnectionFactory) {
JedisPool jedisPool = null;

Field pool = ReflectionUtils.findField(JedisConnectionFactory.class, "pool");
if (pool != null) {
ReflectionUtils.makeAccessible(pool);
jedisPool = (JedisPool) ReflectionUtils.getField(pool, jedisConnectionFactory);
}

AssertUtil.notNull(jedisPool, "jedis pool get failed!");

//预热最小空闲
JedisPoolConfig jedisPoolConfig = (JedisPoolConfig) jedisConnectionFactory.getPoolConfig();

if (jedisPoolConfig != null) {
int minIdle = jedisPoolConfig.getMinIdle();

for (int i = 0; i < minIdle; i++) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.ping();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}

return jedisPool;
}

@Bean
public RedisTemplate defaultRedisTemplate(@Qualifier("jedisConnectionFactory")
JedisConnectionFactory jedisConnectionFactory) {
RedisTemplate<String, Serializable> template = new RedisTemplate<String, Serializable>();
template.setConnectionFactory(jedisConnectionFactory);

template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());

return template;
}

}

RedisClusterConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Configuration
@Slf4j
@ConditionalOnProperty(prefix = "redis", name = "type", havingValue = "cluster")
public class RedisClusterConfig {

/**
* Configures the JedisPoolConfig with the necessary properties.
*
* @return JedisPoolConfig instance
*/
@Bean(name = "jedisPoolConfig")
@ConfigurationProperties(prefix = "pool")
public JedisPoolConfig jedisPoolConfig() {
JedisPoolConfig config = new JedisPoolConfig();
//需要根据实际业务来调整
config.setMaxTotal(8);
config.setMaxIdle(8);
config.setMinIdle(2);
//当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)
config.setMaxWait(Duration.ofMillis(6000L));
log.info("init redis cluster jedisPoolConfig");
return config;
}

@Bean("jedisConnectionFactory")
public JedisConnectionFactory jedisConnectionFactory(@Qualifier("jedisPoolConfig") JedisPoolConfig jedisPoolConfig) {
RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();
clusterConfig.clusterNode(host, port);
if (StringUtils.hasText(password)) {
clusterConfig.setPassword(password);
}
log.info("init redis cluster JedisConnectionFactory");
return new JedisConnectionFactory(clusterConfig);
}

@Bean
public RedisTemplate defaultRedisTemplate(@Qualifier("jedisConnectionFactory")
JedisConnectionFactory jedisConnectionFactory) {
RedisTemplate template = new RedisTemplate<String, Serializable>();
template.setConnectionFactory(jedisConnectionFactory);

template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
log.info("init redis cluster defaultRedisTemplate");
return template;
}
}

RedisService 自身使用的是 stringRedisTemplate

1
2
3
4
5
6
7
8
@Service
public class RedisService {

@Resource(name = "stringRedisTemplate")
private RedisTemplate<String, String> stringRedisTemplate;

// ...
}

这里需要注意的是 stringRedisTemplate 是在 RedisAutoConfiguration 中生成的

1
2
3
4
5
6
7
8
@Bean
@ConditionalOnMissingBean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory)
throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}

这里的 redisConnectionFactory 其实就是 JedisConnectionFactory 的父类,也就是最终的注入实例是 JedisConnectionFactory 的实例。

oss

首先定义接口

1
2
3
4
5
public interface ObjectStorageService {

String name();
// ...
}

在定义对应的实现类

OssService

1
2
3
4
5
6
@Slf4j
@Service
@ConditionalOnProperty(name = "oss.type", havingValue = "oss", matchIfMissing = true)
public class OssService implements ObjectStorageService {
// ...
}

S3Service

1
2
3
4
5
6
@Slf4j
@Service
@ConditionalOnProperty(name = "oss.type", havingValue = "s3")
public class S3Service implements ObjectStorageService {
// ...
}

使用的地方使用接口注入

1
2
@Resource
ObjectStorageService objectStorageService;

tablestore

处理方式类似 oss 的处理,只不过多出两个配置类

TableStoreConfig

1
2
3
4
5
6
7
8
@Configuration
@ConditionalOnProperty(name = "tablestore.type", havingValue = "tablestore", matchIfMissing = true)
public class TableStoreConfig {
@Bean("tableStoreClient")
public SyncClient getTableStoreClient() {
return new SyncClient(endPoint, accessKeyId, accessKeySecret, instanceName);
}
}

DynamodbConfig

1
2
3
4
5
6
7
8
9
10
11
@Configuration
@ConditionalOnProperty(name = "tablestore.type", havingValue = "dynamodb")
public class DynamodbConfig {
@Bean("dynamodbClient")
public DynamoDbClient getDynamoDbClient() {
return DynamoDbClient.builder()
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, accessSecret)))
.region(Region.of(region))
.build();
}
}

定义接口

1
2
3
public interface TableStoreTemplate {
// ...
}

定义对应的实现类
DynamodbComponent

1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
@ConditionalOnProperty(name = "tablestore.type", havingValue = "dynamodb")
public class DynamodbComponent implements TableStoreTemplate {

@Resource(name = "dynamodbClient")
DynamoDbClient dynamoDbClient;

// ...
}

TableStoreComponent

1
2
3
4
5
6
7
8
9
@Slf4j
@Component
@ConditionalOnProperty(name = "tablestore.type", havingValue = "tablestore", matchIfMissing = true)
public class TableStoreComponent implements TableStoreTemplate {

@Resource(name = "tableStoreClient")
SyncClient tableStoreClient;
// ...
}

同样在使用的地方引入接口即可。

1
2
@Resource
TableStoreTemplate tableStoreTemplate;

需要注意的是:dynamodb 批量写操作最大只有25每批,批量查询只有100每批。这里为了兼容,直接开多线程执行。

starrocks

因为海外没有部署starrocks 库,这里多数据源的情况下需要把这个数据源的初始化也加上判断:

1
2
3
4
5
6
@Configuration
@MapperScan(basePackages = "com.xxx.smapper", sqlSessionFactoryRef = "starrocksSqlSessionFactory")
@ConditionalOnProperty(name = "spring.starrocks.enabled", havingValue = "1", matchIfMissing = true)
public class StarRocksConfig {
// ...
}

但是如果这个数据源都不初始化,对应的mapper 也就不会初始化,所以需要加上:

1
2
@Autowired(required = false)
UserGroupDaMapper userGroupDaMapper;

当然在使用的地方也需要判断 userGroupDaMapper 空。