1. 首页
  2. sharding-jdbc源码分析

10. sharding-jdbc源码之异步送达JOB

阿飞Javaer,转载请注明原创出处,谢谢!

最大努力送达型异步JOB任务

当最大努力送达型监听器多次失败尝试后,把任务交给最大努力送达型异步JOB任务处理,异步多次尝试处理;核心源码在模块sharding-jdbc-transaction-async-job中。该模块是一个独立异步处理模块,使用者决定是否需要启用,源码比较少,大概看一下源码结构:

dbcyuanmazhiyibusongdajob_1.png

源码结构

resouces目录下的脚本和dubbo非常相似(作者应该也看过dubbo源码,哈),start.sh&stop.sh分别是服务启动脚本和服务停止脚本;根据start.sh脚本可知,该模块的主方法是BestEffortsDeliveryJobMain

  CONTAINER_MAIN=com.dangdang.ddframe.rdb.transaction.soft.bed.BestEffortsDeliveryJobMain
    nohup java -classpath $CONF_DIR:$LIB_DIR:. $CONTAINER_MAIN >/dev/null 2>&1 &

Main方法的核心源码如下:

  public final class BestEffortsDeliveryJobMain {

        public static void main(final String[] args) throws Exception {
            try (InputStreamReader inputStreamReader = new InputStreamReader(BestEffortsDeliveryJobMain.class.getResourceAsStream("/conf/config.yaml"), "UTF-8")) {
                BestEffortsDeliveryConfiguration config = new Yaml(new Constructor(BestEffortsDeliveryConfiguration.class)).loadAs(inputStreamReader, BestEffortsDeliveryConfiguration.class);
                new BestEffortsDeliveryJobFactory(config).init();
            }
        }
    }

由源码可知,主配置文件是config.yaml;将该文件解析为BestEffortsDeliveryConfiguration,然后调用new BestEffortsDeliveryJobFactory(config).init()

config.yaml配置文件中job相关配置内容如下:

  jobConfig:
      #作业名称
      name: bestEffortsDeliveryJob

      #触发作业的cron表达式--每5s重试一次
      cron: 0/5 * * * * ?

      #每次作业获取的事务日志最大数量
      transactionLogFetchDataCount: 100

      #事务送达的最大尝试次数.
      maxDeliveryTryTimes: 3

      #执行送达事务的延迟毫秒数,早于此间隔时间的入库事务才会被作业执行,其SQL为 where *** AND `creation_time`< (now() - maxDeliveryTryDelayMillis),即至少60000ms,即一分钟前入库的事务日志才会被拉取出来;
      maxDeliveryTryDelayMillis: 60000

maxDeliveryTryDelayMillis: 60000这个配置也可以理解为60s内的transaction_log不处理;

BestEffortsDeliveryJobFactory核心源码:

  @RequiredArgsConstructor
    public final class BestEffortsDeliveryJobFactory {

        // 这个属性赋值通过有参构造方法进行赋值--new BestEffortsDeliveryJobFactory(config),就是通过`config.yaml`配置的属性
        private final BestEffortsDeliveryConfiguration bedConfig;

        /**
         * BestEffortsDeliveryJobMain中调用该init()方法,初始化最大努力尝试型异步JOB,该JOB基于elastic-job;
         * Initialize best efforts delivery job.
         */
        public void init() {
            // 根据config.yaml中配置的zkConfig节点,得到协调调度中心CoordinatorRegistryCenter
            CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(createZookeeperConfiguration(bedConfig));
            // 调度中心初始化
            regCenter.init();
            // 构造elastic-job调度任务
            JobScheduler jobScheduler = new JobScheduler(regCenter, createBedJobConfiguration(bedConfig));
            jobScheduler.setField("bedConfig", bedConfig);
            jobScheduler.setField("transactionLogStorage", TransactionLogStorageFactory.createTransactionLogStorage(new RdbTransactionLogDataSource(bedConfig.getDefaultTransactionLogDataSource())));
            jobScheduler.init();
        }

        // 根据该方法可知,创建的是BestEffortsDeliveryJob
        private JobConfiguration createBedJobConfiguration(final BestEffortsDeliveryConfiguration bedJobConfig) {
            // 根据config.yaml中配置的jobConfig节点得到job配置信息,且指定job类型为BestEffortsDeliveryJob
            JobConfiguration result = new JobConfiguration(bedJobConfig.getJobConfig().getName(), BestEffortsDeliveryJob.class, 1, bedJobConfig.getJobConfig().getCron());
            result.setFetchDataCount(bedJobConfig.getJobConfig().getTransactionLogFetchDataCount());
            result.setOverwrite(true);
            return result;
        }

BestEffortsDeliveryJob核心源码:

  @Slf4j
    public class BestEffortsDeliveryJob extends AbstractIndividualThroughputDataFlowElasticJob<TransactionLog> {

        @Setter
        private BestEffortsDeliveryConfiguration bedConfig;

        @Setter
        private TransactionLogStorage transactionLogStorage;

        @Override
        public List<TransactionLog> fetchData(final JobExecutionMultipleShardingContext context) {
            // 从transaction_log表中抓取最多100条事务日志(相关参数都在config.yaml中jobConfig节点下)
            return transactionLogStorage.findEligibleTransactionLogs(context.getFetchDataCount(), 
                bedConfig.getJobConfig().getMaxDeliveryTryTimes(), bedConfig.getJobConfig().getMaxDeliveryTryDelayMillis());
        }

        @Override
        public boolean processData(final JobExecutionMultipleShardingContext context, final TransactionLog data) {
            try (
                Connection conn = bedConfig.getTargetDataSource(data.getDataSource()).getConnection()) {
                // 调用事务日志存储器的processData()进行处理
                transactionLogStorage.processData(conn, data, bedConfig.getJobConfig().getMaxDeliveryTryTimes());
            } catch (final SQLException | TransactionCompensationException ex) {
                log.error(String.format("Async delivery times %s error, max try times is %s, exception is %s", data.getAsyncDeliveryTryTimes() + 1, 
                    bedConfig.getJobConfig().getMaxDeliveryTryTimes(), ex.getMessage()));
                return false;
            }
            return true;
        }
    }

作者:阿飞的博客

来源:https://www.jianshu.com/p/0f1a938c9017


看完两件小事

如果你觉得这篇文章对你挺有启发,我想请你帮我两个小忙:

  1. 关注我们的 GitHub 博客,让我们成为长期关系
  2. 把这篇文章分享给你的朋友 / 交流群,让更多的人看到,一起进步,一起成长!
  3. 关注公众号 「方志朋」,公众号后台回复「666」 免费领取我精心整理的进阶资源教程
  4. JS中文网,Javascriptc中文网是中国领先的新一代开发者社区和专业的技术媒体,一个帮助开发者成长的社区,是给开发者用的 Hacker News,技术文章由为你筛选出最优质的干货,其中包括:Android、iOS、前端、后端等方面的内容。目前已经覆盖和服务了超过 300 万开发者,你每天都可以在这里找到技术世界的头条内容。

    本文著作权归作者所有,如若转载,请注明出处

    转载请注明:文章转载自「 Java极客技术学习 」https://www.javajike.com

    标题:10. sharding-jdbc源码之异步送达JOB

    链接:https://www.javajike.com/article/1868.html

« 11. sharding-jdbc集成–基于ssm
9. sharding-jdbc源码之最大努力型事务»

相关推荐

QR code