背景
框架之前完成了多数据源的动态切换及事务的处理,想更近一步提供一个简单的跨库事务处理功能,经过网上的搜索调研,大致有XA事务/SEGA事务/TCC事务等方案,因为业务主要涉及政府及企业且并发量不大,所以采用XA事务,虽然性能有所损失,但是可以保证数据的强一致性
方案设计
针对注册的数据源拷贝一份用于XA事务,使得本地事务和XA全局事务相互独立可选择的使用
Maven配置
引入atomikos第三方组件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
注册XA数据源
使用Druid连接池,需要使用DruidXADataSource数据源对象,再使用AtomikosDataSourceBean进行包装
注册数据源时针对同一个连接注册两份,一份正常数据源,一份用于XA事务的数据源,数据源标识区分并关联
因为spring默认注册了XA事务管理器后,所有事务操作不再走本地事务,我们通过切换不同的数据源决定走本地事务还是XA事务
//主数据源xa模式
@Bean
@Qualifier(\"masterXADataSource\")
public DataSource masterXADataSource() {
DruidXADataSource datasource = new DruidXADataSource();
if(driverClassName.equals(\"com.mysql.cj.jdbc.Driver\")){
if(!dbUrl.contains(\"useOldAliasMetadataBehavior\")){
dbUrl += \"&useOldAliasMetadataBehavior=true\";
}
if(!dbUrl.contains(\"useAffectedRows\")){
dbUrl += \"&useAffectedRows=true\";
}
}
datasource.setUrl(this.dbUrl);
datasource.setUsername(username);
datasource.setPassword(password);
datasource.setDriverClassName(driverClassName);
//configuration
datasource.setInitialSize(1);
datasource.setMinIdle(3);
datasource.setMaxActive(20);
datasource.setMaxWait(60000);
datasource.setTimeBetweenEvictionRunsMillis(60000);
datasource.setMinEvictableIdleTimeMillis(60000);
datasource.setValidationQuery(\"select \'x\'\");
datasource.setTestWhileIdle(true);
datasource.setTestOnBorrow(false);
datasource.setTestOnReturn(false);
datasource.setPoolPreparedStatements(true);
datasource.setMaxPoolPreparedStatementPerConnectionSize(20);
datasource.setLogAbandoned(false); //移除泄露连接发生是是否记录日志
try {
datasource.setFilters(\"stat,slf4j\");
} catch (SQLException e) {
logger.error(\"druid configuration initialization filter\", e);
}
datasource.setConnectionProperties(\"druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000\");//connectionProperties);
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSourceClassName(\"com.alibaba.druid.pool.xa.DruidXADataSource\");
atomikosDataSourceBean.setUniqueResourceName(\"master-xa\");
atomikosDataSourceBean.setXaDataSource(datasource);
atomikosDataSourceBean.setPoolSize(5);
atomikosDataSourceBean.setMaxPoolSize(20);
atomikosDataSourceBean.setTestQuery(\"select 1\");
return atomikosDataSourceBean;
}
注册XA事务管理器
使用spring内置的JtaTransactionManager事务管理器对象,设置AllowCustomIsolationLevels为true,否则指定自定义事务隔离级别会报错
//xa模式全局事务管理器
@Bean(name = \"jtaTransactionManager\")
public PlatformTransactionManager transactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
UserTransaction userTransaction = new UserTransactionImp();
JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, userTransactionManager);
jtaTransactionManager.setAllowCustomIsolationLevels(true);
return jtaTransactionManager;
}
定义XA事务切面
自定义注解@GlobalTransactional并定义对应切面,使用指定注解时在ThreadLocal变量值进行标识,组合
@Transactional注解指定XA事务管理器,在切换数据原时判断当前是否在XA事物中,从而切换不同的数据源
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Transactional(rollbackFor = Exception.class,isolation = Isolation.READ_UNCOMMITTED,transactionManager = \"jtaTransactionManager\")
public @interface GlobalTransactional {
}
@Aspect
@Component
@Order(value = 99)
public class GlobalTransitionAspect {
private static Logger logger = LoggerFactory.getLogger(GlobalTransitionAspect.class);
@Autowired
private DynamicDataSource dynamicDataSource;
/**
* 切面点 指定注解
*/
@Pointcut(\"@annotation(com.code2roc.fastkernel.datasource.GlobalTransactional) \" +
\"|| @within(com.code2roc.fastkernel.datasource.GlobalTransactional)\")
public void dataSourcePointCut() {
}
/**
* 拦截方法指定为 dataSourcePointCut
*/
@Around(\"dataSourcePointCut()\")
public Object around(ProceedingJoinPoint point) throws Throwable {
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
GlobalTransactional methodAnnotation = method.getAnnotation(GlobalTransactional.class);
if (methodAnnotation != null) {
DataSourceContextHolder.tagGlobal();
logger.info(\"标记全局事务\");
}
try {
return point.proceed();
} finally {
logger.info(\"清除全局事务\");
DataSourceContextHolder.clearGlobal();
}
}
}
public class DataSourceContextHolder {
private static Logger log = LoggerFactory.getLogger(DataSourceContextHolder.class);
// 对当前线程的操作-线程安全的
private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>();
private static final ThreadLocal<String> contextGlobalHolder = new ThreadLocal<String>();
// 调用此方法,切换数据源
public static void setDataSource(String dataSource) {
contextHolder.set(dataSource);
log.debug(\"已切换到数据源:{}\", dataSource);
}
// 获取数据源
public static String getDataSource() {
String value = contextHolder.get();
if (StringUtil.isEmpty(value)) {
value = \"master\";
}
if (!StringUtil.isEmpty(getGlobal())) {
value = value + \"-xa\";
}
return value;
}
// 删除数据源
public static void clearDataSource() {
contextHolder.remove();
log.debug(\"已切换到主数据源\");
}
//====================全局事务标记================
public static void tagGlobal() {
contextGlobalHolder.set(\"1\");
}
public static String getGlobal() {
String value = contextGlobalHolder.get();
return value;
}
public static void clearGlobal() {
contextGlobalHolder.remove();
}
//===================================================
}
配置XA事务日志
通过在resource文件夹下创建transactions.properties文件可以指定XA事务日志的存储路径
com.atomikos.icatch.log_base_dir= tempfiles/transition/
来源:https://www.cnblogs.com/yanpeng19940119/p/16332663.html
本站部分图文来源于网络,如有侵权请联系删除。