浏览代码

【重构】执行器注册逻辑重构,降低多调度中心地址时并发注册问题;注册表“xxl_job_registry”新增唯一索引,避免冗余注册信息存储;

xuxueli 11 月之前
父节点
当前提交
f352ded10d

+ 2 - 1
doc/XXL-JOB官方文档.md

@@ -2403,7 +2403,8 @@ public void execute() {
 ### 7.36 版本 v2.5.0 Release Notes[规划中]
 - 1、【优化】框架基础守护线程异常处理逻辑优化,避免极端情况下因Error导致调度终止问题;
 - 2、【优化】部分系统日志优化,提升可读性;
-- 3、【优化】调度线程任务信息更新逻辑优化,避免极端情况下已关闭任务被启动问题;
+- 3、【重构】调度线程任务信息更新逻辑优化,避免极端情况下已关闭任务被启动问题;
+- 5、【重构】执行器注册逻辑重构,降低多调度中心地址时并发注册问题;注册表“xxl_job_registry”新增唯一索引,避免冗余注册信息存储;
 - 4、[规划中]升级springboot3.x,解决2.x老版本漏洞类问题。注意,springboot3.x依赖jdk17;
 - 5、[规划中]安全功能增强,通讯加密参数改用加密数据避免AccessToken明文, 降低token泄漏风险;
 - 6、[规划中]登陆态Token声称逻辑优化,混淆登陆时间属性,降低token泄漏风险;

+ 1 - 1
doc/db/tables_xxl_job.sql

@@ -85,7 +85,7 @@ CREATE TABLE `xxl_job_registry` (
   `registry_value` varchar(255) NOT NULL,
   `update_time` datetime DEFAULT NULL,
   PRIMARY KEY (`id`),
-  KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
+  UNIQUE KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`) USING BTREE
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
 CREATE TABLE `xxl_job_group` (

+ 9 - 3
xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java

@@ -159,13 +159,19 @@ public class JobRegistryHelper {
 		registryOrRemoveThreadPool.execute(new Runnable() {
 			@Override
 			public void run() {
-				int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
+				// 0-fail; 1-save suc; 2-update suc;
+				int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySaveOrUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
+				if (ret == 1) {
+					// fresh (add)
+					freshGroupRegistryInfo(registryParam);
+				}
+				/*int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
 				if (ret < 1) {
 					XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
 
 					// fresh
 					freshGroupRegistryInfo(registryParam);
-				}
+				}*/
 			}
 		});
 
@@ -187,7 +193,7 @@ public class JobRegistryHelper {
 			public void run() {
 				int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
 				if (ret > 0) {
-					// fresh
+					// fresh (delete)
 					freshGroupRegistryInfo(registryParam);
 				}
 			}

+ 7 - 2
xxl-job-admin/src/main/java/com/xxl/job/admin/dao/XxlJobRegistryDao.java

@@ -21,7 +21,12 @@ public interface XxlJobRegistryDao {
     public List<XxlJobRegistry> findAll(@Param("timeout") int timeout,
                                         @Param("nowTime") Date nowTime);
 
-    public int registryUpdate(@Param("registryGroup") String registryGroup,
+    public int registrySaveOrUpdate(@Param("registryGroup") String registryGroup,
+                            @Param("registryKey") String registryKey,
+                            @Param("registryValue") String registryValue,
+                            @Param("updateTime") Date updateTime);
+
+    /*public int registryUpdate(@Param("registryGroup") String registryGroup,
                               @Param("registryKey") String registryKey,
                               @Param("registryValue") String registryValue,
                               @Param("updateTime") Date updateTime);
@@ -29,7 +34,7 @@ public interface XxlJobRegistryDao {
     public int registrySave(@Param("registryGroup") String registryGroup,
                             @Param("registryKey") String registryKey,
                             @Param("registryValue") String registryValue,
-                            @Param("updateTime") Date updateTime);
+                            @Param("updateTime") Date updateTime);*/
 
     public int registryDelete(@Param("registryGroup") String registryGroup,
                           @Param("registryKey") String registryKey,

+ 9 - 0
xxl-job-admin/src/main/resources/mybatis-mapper/XxlJobRegistryMapper.xml

@@ -39,6 +39,14 @@
 		WHERE t.update_time <![CDATA[ > ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND)
 	</select>
 
+	<insert id="registrySaveOrUpdate" >
+		INSERT INTO xxl_job_registry( `registry_group` , `registry_key` , `registry_value`, `update_time`)
+		VALUES( #{registryGroup}  , #{registryKey} , #{registryValue}, #{updateTime})
+		ON DUPLICATE KEY UPDATE
+			`update_time` = #{updateTime}
+	</insert>
+
+	<!--
     <update id="registryUpdate" >
         UPDATE xxl_job_registry
         SET `update_time` = #{updateTime}
@@ -51,6 +59,7 @@
         INSERT INTO xxl_job_registry( `registry_group` , `registry_key` , `registry_value`, `update_time`)
         VALUES( #{registryGroup}  , #{registryKey} , #{registryValue}, #{updateTime})
     </insert>
+	-->
 
 	<delete id="registryDelete" >
 		DELETE FROM xxl_job_registry

+ 21 - 2
xxl-job-admin/src/test/java/com/xxl/job/admin/dao/XxlJobRegistryDaoTest.java

@@ -8,6 +8,7 @@ import javax.annotation.Resource;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
 public class XxlJobRegistryDaoTest {
@@ -17,14 +18,32 @@ public class XxlJobRegistryDaoTest {
 
     @Test
     public void test(){
-        int ret = xxlJobRegistryDao.registryUpdate("g1", "k1", "v1", new Date());
+        int ret = xxlJobRegistryDao.registrySaveOrUpdate("g1", "k1", "v1", new Date());
+        /*int ret = xxlJobRegistryDao.registryUpdate("g1", "k1", "v1", new Date());
         if (ret < 1) {
             ret = xxlJobRegistryDao.registrySave("g1", "k1", "v1", new Date());
-        }
+        }*/
 
         List<XxlJobRegistry> list = xxlJobRegistryDao.findAll(1, new Date());
 
         int ret2 = xxlJobRegistryDao.removeDead(Arrays.asList(1));
     }
 
+    @Test
+    public void test2() throws InterruptedException {
+        for (int i = 0; i < 100; i++) {
+            new Thread(()->{
+                int ret = xxlJobRegistryDao.registrySaveOrUpdate("g1", "k1", "v1", new Date());
+                System.out.println(ret);
+
+                /*int ret = xxlJobRegistryDao.registryUpdate("g1", "k1", "v1", new Date());
+                if (ret < 1) {
+                    ret = xxlJobRegistryDao.registrySave("g1", "k1", "v1", new Date());
+                }*/
+            }).start();
+        }
+
+        TimeUnit.SECONDS.sleep(10);
+    }
+
 }