GVKun编程网logo

MySQL8.0的参数event_scheduler默认是ON,请注意一些坑(mysql常用参数设置)

23

在本文中,我们将详细介绍MySQL8.0的参数event_scheduler默认是ON,请注意一些坑的各个方面,并为您提供关于mysql常用参数设置的相关解答,同时,我们也将为您带来关于angular

在本文中,我们将详细介绍MySQL8.0的参数event_scheduler默认是ON,请注意一些坑的各个方面,并为您提供关于mysql常用参数设置的相关解答,同时,我们也将为您带来关于angular2使用PrimeNG-Scheduler实现FullCalendar-Scheduler、cocos2dx 定时器 schedule scheduleUpdate scheduleOnce、com.google.common.util.concurrent.AbstractScheduledService.Scheduler的实例源码、DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战的有用知识。

本文目录一览:

MySQL8.0的参数event_scheduler默认是ON,请注意一些坑(mysql常用参数设置)

MySQL8.0的参数event_scheduler默认是ON,请注意一些坑(mysql常用参数设置)

event_scheduler是什么?

event_scheduler是什么MySQL定时器的开关,类似于windows操作系统的定时任务的概念,指定某个时间点执行一次定时任务,或者每隔一段时间循环执行定时任务。

这个东西有企业在用么?

看了几个企业的开发规范,都没有提及需要禁用event功能,所以event功能是允许开发人员使用的。(不过,我相信java开发人员更喜欢用java定时器。)也许是因为mysql默认就不启用event功能嘛,默认就无法用event,开发规范就不提及这东西啦。况且可以通过用户权限控制来控制开发人员是否可以使用event。所以从我手上的几家公司的开发规范里,并不可以实际看出大家有没有使用event_scheduler这个功能。由于MySQL8.0默认是开启event_scheduler功能了,我认为我们还是有必要讨论一下event到底有没有坑?

我能想到的坑——主从复制的坑

首先这个事情并没有实际发生在我生产环境,是人为想出来的一种场景。

准备:
两个MySQL8.0数据库实例
mysql3308
mysql33081

配置文件不显示地指定默认值:
[root@192-168-199-198 mysql3308]# cat my.cnf |grep event_scheduler

MySQL8.0的默认值即为:
mysql> show variables like ''%event_scheduler%'';
+-----------------+-------+
| Variable_name   | Value |
+-----------------+-------+
| event_scheduler | ON    |
+-----------------+-------+
1 row in set (0.12 sec)

我给大家做了以下的测试。

一、在已有的主从环境下,建立一个重复插入的event。

主库上执行
mysql> show variables like ''%event_scheduler%'';
+-----------------+-------+
| Variable_name   | Value |
+-----------------+-------+
| event_scheduler | ON    |
+-----------------+-------+
1 row in set (0.12 sec)
create database fander;
use fander;
CREATE EVENT IF NOT EXISTS test
ON SCHEDULE EVERY 1 SECOND
ON COMPLETION PRESERVE
DO insert into fander.test values (1);

#这个event表示每一秒都往fander库的test表插入一行值为1的数据。

mysql> show create event test\G
*************************** 1. row ***************************
               Event: test
            sql_mode: ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
           time_zone: SYSTEM
        Create Event: CREATE DEFINER=`root`@`localhost` EVENT `test` ON SCHEDULE EVERY 1 SECOND STARTS ''2019-01-23 14:55:19'' ON COMPLETION PRESERVE ENABLE DO insert into fander.test values (1)
character_set_client: utf8mb4
collation_connection: utf8mb4_0900_ai_ci
  Database Collation: utf8mb4_0900_ai_ci
1 row in set (0.00 sec)

mysql> show tables;
Empty set (0.01 sec)
#我先不建立测试重复插入的test表。
从库上执行
mysql> show variables like ''%event_scheduler%'';
+-----------------+-------+
| Variable_name   | Value |
+-----------------+-------+
| event_scheduler | ON    |
+-----------------+-------+
1 row in set (0.12 sec)
mysql> use fander;
mysql> show create event test\G
*************************** 1. row ***************************
               Event: test
            sql_mode: ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
           time_zone: SYSTEM
        Create Event: CREATE DEFINER=`root`@`localhost` EVENT `test` ON SCHEDULE EVERY 1 SECOND STARTS ''2019-01-23 14:55:19'' ON COMPLETION PRESERVE DISABLE ON SLAVE DO insert into fander.test values (1)
character_set_client: utf8mb4
collation_connection: utf8mb4_0900_ai_ci
  Database Collation: utf8mb4_0900_ai_ci
1 row in set (0.00 sec)

有没有发现问题?

  1. 主从环境的event_scheduler都是开启的。这意味这从库有可能会重复写入数据。一部分数据来源于主库的event的循环插入的复制,一部分数据来源于从库自身的event的循环插入。

  2. 主从库的Create Event语句不一样?
    在create event时,event在主库创建后,复制到从库上是disable的状态!所以MySQL在设计之初就考虑了这个问题。event在从库是disable的,无法执行,从而从库不会发生写入操作而导致数据不一致!

我们进一步测试,是不是确实如何此。

主库上执行
mysql> create table test (a int);
Query OK, 0 rows affected (0.04 sec)
mysql> select count(1) from test;
+----------+
| count(1) |
+----------+
|       52 |
+----------+
1 row in set (0.01 sec)
从库上执行
mysql> select count(1) from test;
+----------+
| count(1) |
+----------+
|       52 |
+----------+
1 row in set (0.01 sec)

#注意,也可以用对比gtid的方法查看从库,发现从库确实没有自身数据写入。

结论是,在已有的主从环境下,建立一个重复插入的event,event不会在从库上执行,不会有数据写入,从而保证了数据的一致性。这种情况下event的开启没有带来坑。

二、在已有一个重复插入的event的主库上,扩展建立一个从库。

主库:
mysql> show variables like ''%event_scheduler%'';
+-----------------+-------+
| Variable_name   | Value |
+-----------------+-------+
| event_scheduler | ON    |
+-----------------+-------+
1 row in set (0.12 sec)
mysql> show create event test\G
*************************** 1. row ***************************
               Event: test
            sql_mode: ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
           time_zone: SYSTEM
        Create Event: CREATE DEFINER=`root`@`localhost` EVENT `test` ON SCHEDULE EVERY 1 SECOND STARTS ''2019-01-23 14:55:19'' ON COMPLETION PRESERVE ENABLE DO insert into fander.test values (1)
character_set_client: utf8mb4
collation_connection: utf8mb4_0900_ai_ci
  Database Collation: utf8mb4_0900_ai_ci
1 row in set (0.06 sec)

冷备的方法建立从库。

mysql> show variables like ''%event_scheduler%'';
+-----------------+-------+
| Variable_name   | Value |
+-----------------+-------+
| event_scheduler | ON    |
+-----------------+-------+
1 row in set (0.12 sec)
mysql> CHANGE MASTER TO
    ->   MASTER_HOST=''localhost'',
    ->   MASTER_USER=''rpl_user'',
    ->   MASTER_PASSWORD=''password'',
    ->   MASTER_PORT=3308,
    ->   master_auto_position=1;
Query OK, 0 rows affected, 2 warnings (0.04 sec)

mysql> start slave;
Query OK, 0 rows affected (0.02 sec)

mysql> use fander
Database changed
mysql> show create event test\G
*************************** 1. row ***************************
               Event: test
            sql_mode: ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
           time_zone: SYSTEM
        Create Event: CREATE DEFINER=`root`@`localhost` EVENT `test` ON SCHEDULE EVERY 1 SECOND STARTS ''2019-01-23 14:55:19'' ON COMPLETION PRESERVE ENABLE DO insert into fander.test values (1)
character_set_client: utf8mb4
collation_connection: utf8mb4_0900_ai_ci
  Database Collation: utf8mb4_0900_ai_ci
1 row in set (0.00 sec)

可以发现主从库的Create Event语句这时是一模一样了。 下面接着测试。

主库:
mysql>  select count(1) from test;
+----------+
| count(1) |
+----------+
|       14 |
+----------+
1 row in set (0.00 sec)

从库:
mysql>  select count(1) from test;
+----------+
| count(1) |
+----------+
|       33 |
+----------+
1 row in set (0.00 sec)


mysql> show slave status\G
...
          Retrieved_Gtid_Set: 07b92486-64b0-11e8-b4cf-000c29c71881:501-561
            Executed_Gtid_Set: 07b92486-64b0-11e8-b4cf-000c29c71881:1-561,
59613f3a-1ee0-11e9-a9e7-000c29259487:1-13
...

以上测试用的是物理冷备。我测试在用逻辑备份主库,扩展从库后,情况一样。 结论是,在已有一个重复插入的event的主库上,扩展建立一个从库,会因为从库也有这个event schedule,导致从库重复执行insert语句。等于计划任务重新执行了两次!数据会造成不一致!

如何避免?

建议:
1.规范my.cnf模板,event_scheduler设置为0,而不是采用官方默认值1。
2.从库建议开启read_only=1; 严格防止写入。

扩展

如果日常备份时备份的是文章"一"情况的从库,那么恢复数据库后,event默认是disable的,是跑不了的。具体见下面:

[root@192-168-199-198 ~]# cat all2.sql |grep "EVERY 1 SECOND"
/*!50106 CREATE*/ /*!50117 DEFINER=`root`@`localhost`*/ /*!50106 EVENT `test` ON SCHEDULE EVERY 1 SECOND STARTS ''2019-01-23 16:03:16'' ON COMPLETION PRESERVE DISABLE ON SLAVE DO insert into fander.test values (1) */ ;;

你需要人手修改event让其可以跑。这也属于一个坑吧。

angular2使用PrimeNG-Scheduler实现FullCalendar-Scheduler

angular2使用PrimeNG-Scheduler实现FullCalendar-Scheduler

FullCalendar有一个名为Scheduler的附加组件,我试图与PrimeNG-Schedule组件一起使用.查看PrimeNG文档,我可以使用“选项”属性向FullCalendar发送任意信息.这确实有效,但是当我将数据检索连接到异步API时,会导致问题.

API使用Observables然后我在组件中订阅.这适用于事件,因为视图在事件更改时自动更新.

但是,当通过PrimeNG’选项’属性向FullCalendar提供’资源’时,事情不能按预期工作,因为设置’options’属性的代码在API调用有机会返回之前运行,因此空.

我确信这一点,因为如果我对资源进行硬编码,就会有所作为.

我可以想出几种方法来解决这个问题:

>使调用同步(希望避免这种情况)
>等待加载所有数据,然后(重新)渲染视图(使其与#1几乎相同)
>配置options.resources属性,以便在更改时,视图会更新,就像它对事件一样(这是最好的选择,但不确定它是否可能)

我将不胜感激任何帮助.谢谢.

<p-schedule 
    [events]="events" 
    [businessHours]="businessHours"
    [options]="optionConfig"
    >
</p-schedule>

我的(现在)虚拟API

getEvents() {
    return this.http
    .get('assets/api/mockEvents.json')
    .map((response : Response) => <Appointment[]>response.json().data)
    .catch(this.handleError);
  }

  getResources() {
    return this.http
    .get('assets/api/mockResources.json')
    .map((response : Response) => <Resource[]>response.json().data)
    .catch(this.handleError);
  }

组件文件

ngOnInit() {

  this.schedulerService.getEvents()
      .subscribe(events=> this.events = events);

      this.schedulerService.getResources()
      .subscribe(resources => this.resources = resources);
      // ***** If the following code is uncommented,resources are displayed in Schedule view ****
    // this.resources = [
    //     new Resource(1,"Dr. Hibbert","blue",true,new BusinessHours("08:00","16:00")),//     new Resource(2,"Dr. Simpson","green",new BusinessHours("10:00","18:00"))
    // ];
    this.optionConfig = {
      "resources": this.resources
      }
}

编辑:我想到的一件事是,只能通过它的setter方法设置this.resources属性.这样,我确切地知道何时设置了值,但问题仍然存在,如何在初始化之后将新值推送到调度组件.

解决方法

PS:我无法重现你的问题所以建议你没有经过测试

你可以使用angular2的异步管道来获取视图部分中的数据

<p-schedule 
    [events]="events" 
    [businessHours]="businessHours"
    [options]="optionConfig | async"
    >
</p-schedule>

甚至你可以使用异步管道直接分配资源,而不是包装到optionConfig中,如果合适的话.

通过这样做,您既不需要进行同步调用,也不需要在加载数据后重新加入视图.

如果还有问题,请告诉我.

cocos2dx 定时器 schedule scheduleUpdate scheduleOnce

cocos2dx 定时器 schedule scheduleUpdate scheduleOnce

1. schedule a. 启用: this-schedule(schedule_selector(HelloWorld::Move),1.0f); b. 停用: this-unschedule(schedule_selector(HelloWorld::Move)); c.timeProc: void Move(float dt); void HelloWorld::Move(floatdt) { CCLOG(Tick! Tick!); } 2. sche

1. schedule

a. 启用:this->schedule(schedule_selector(HelloWorld::Move),1.0f);

b. 停用:this->unschedule(schedule_selector(HelloWorld::Move));

c.timeProc:void Move(float dt);

void HelloWorld::Move(floatdt)

{

    CCLOG("Tick! Tick!");

}


2. scheduleUpdate

a. 启用:this->scheduleUpdate(); 

b. 停用:this->unscheduleUpdate();

c.timeProc:void Update(float dt);

voidHelloWorld::Update(float dt) // 函数名必须为Update,规定

{

    CCLOG("Tick! Tick!");

}


3. scheduleOnce

a. 启用:this->scheduleOnce(schedule_selector(HelloWorld::Move),1.0f);

b. 停用:响应一次之后自动停止


4.停止所有定时器:this->unscheduleAllSelectors();



com.google.common.util.concurrent.AbstractScheduledService.Scheduler的实例源码

com.google.common.util.concurrent.AbstractScheduledService.Scheduler的实例源码

项目:guava-mock    文件:AbstractScheduledServiceTest.java   
public void testDefaultExecutorIsShutdownWhenServiceIsstopped() throws Exception {
  final atomicreference<scheduledexecutorservice> executor = Atomics.newReference();
  AbstractScheduledService service = new AbstractScheduledService() {
    @Override protected void runOneIteration() throws Exception {}

    @Override protected scheduledexecutorservice executor() {
      executor.set(super.executor());
      return executor.get();
    }

    @Override protected Scheduler scheduler() {
      return newFixedDelaySchedule(0,1,TimeUnit.MILLISECONDS);
    }
  };

  service.startAsync();
  assertFalse(service.executor().isShutdown());
  service.awaitRunning();
  service.stopAsync();
  service.awaitTerminated();
  assertTrue(executor.get().awaitTermination(100,TimeUnit.MILLISECONDS));
}
项目:guava-mock    文件:AbstractScheduledServiceTest.java   
public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
  final atomicreference<scheduledexecutorservice> executor = Atomics.newReference();
  AbstractScheduledService service = new AbstractScheduledService() {
    @Override protected void startUp() throws Exception {
      throw new Exception("Failed");
    }

    @Override protected void runOneIteration() throws Exception {}

    @Override protected scheduledexecutorservice executor() {
      executor.set(super.executor());
      return executor.get();
    }

    @Override protected Scheduler scheduler() {
      return newFixedDelaySchedule(0,TimeUnit.MILLISECONDS);
    }
  };

  try {
    service.startAsync().awaitRunning();
    fail("Expected service to fail during startup");
  } catch (IllegalStateException expected) {}

  assertTrue(executor.get().awaitTermination(100,TimeUnit.MILLISECONDS));
}
项目:guava-mock    文件:AbstractScheduledServiceTest.java   
public void testTimeout() {
  // Create a service whose executor will never run its commands
  Service service = new AbstractScheduledService() {
    @Override protected Scheduler scheduler() {
      return Scheduler.newFixedDelaySchedule(0,TimeUnit.NANOSECONDS);
    }

    @Override protected scheduledexecutorservice executor() {
      return TestingExecutors.noOpScheduledExecutor();
    }

    @Override protected void runOneIteration() throws Exception {}

    @Override protected String serviceName() {
      return "Foo";
    }
  };
  try {
    service.startAsync().awaitRunning(1,TimeUnit.MILLISECONDS);
    fail("Expected timeout");
  } catch (TimeoutException e) {
    assertthat(e).hasMessage("Timed out waiting for Foo [STARTING] to reach the RUNNING state.");
  }
}
项目:guava-mock    文件:AbstractScheduledServiceTest.java   
public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()
    throws Exception {
  TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
    @Override protected Scheduler scheduler() {
      return newFixedDelaySchedule(Long.MAX_VALUE,Long.MAX_VALUE,SECONDS);
    }
  };
  service.startAsync().awaitRunning();
  try {
    service.firstBarrier.await(5,SECONDS);
    fail();
  } catch (TimeoutException expected) {
  }
  assertEquals(0,service.numIterations.get());
  service.stopAsync();
  service.awaitTerminated();
}
项目:guava-mock    文件:AbstractScheduledServiceTest.java   
public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()
    throws Exception {
  TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
    @Override protected Scheduler scheduler() {
      return new AbstractScheduledService.CustomScheduler() {
        @Override
        protected Schedule getNextSchedule() throws Exception {
          return new Schedule(Long.MAX_VALUE,SECONDS);
        }
      };
    }
  };
  service.startAsync().awaitRunning();
  try {
    service.firstBarrier.await(5,service.numIterations.get());
  service.stopAsync();
  service.awaitTerminated();
}
项目:guava-mock    文件:AbstractScheduledServiceTest.java   
public void testCustomScheduler_deadlock() throws InterruptedException,brokenBarrierException {
  final CyclicBarrier inGetNextSchedule = new CyclicBarrier(2);
  // This will flakily deadlock,so run it multiple times to increase the flake likelihood
  for (int i = 0; i < 1000; i++) {
    Service service = new AbstractScheduledService() {
      @Override protected void runOneIteration() {}
      @Override protected Scheduler scheduler() {
        return new CustomScheduler() {
          @Override protected Schedule getNextSchedule() throws Exception {
            if (state() != State.STARTING) {
              inGetNextSchedule.await();
              Thread.yield();
              throw new RuntimeException("boom");
            }
            return new Schedule(0,TimeUnit.NANOSECONDS);
          }
        };
      }
    };
    service.startAsync().awaitRunning();
    inGetNextSchedule.await();
    service.stopAsync();
  }
}
项目:guava-mock    文件:AbstractScheduledServiceTest.java   
public void testBig() throws Exception {
  TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
    @Override protected Scheduler scheduler() {
      return new AbstractScheduledService.CustomScheduler() {
        @Override
        protected Schedule getNextSchedule() throws Exception {
          // Explicitly yield to increase the probability of a pathological scheduling.
          Thread.yield();
          return new Schedule(0,TimeUnit.SECONDS);
        }
      };
    }
  };
  service.useBarriers = false;
  service.startAsync().awaitRunning();
  Thread.sleep(50);
  service.useBarriers = true;
  service.firstBarrier.await();
  int numIterations = service.numIterations.get();
  service.stopAsync();
  service.secondBarrier.await();
  service.awaitTerminated();
  assertEquals(numIterations,service.numIterations.get());
}
项目:googles-monorepo-demo    文件:AbstractScheduledServiceTest.java   
public void testDefaultExecutorIsShutdownWhenServiceIsstopped() throws Exception {
  final atomicreference<scheduledexecutorservice> executor = Atomics.newReference();
  AbstractScheduledService service = new AbstractScheduledService() {
    @Override protected void runOneIteration() throws Exception {}

    @Override protected scheduledexecutorservice executor() {
      executor.set(super.executor());
      return executor.get();
    }

    @Override protected Scheduler scheduler() {
      return newFixedDelaySchedule(0,TimeUnit.MILLISECONDS));
}
项目:googles-monorepo-demo    文件:AbstractScheduledServiceTest.java   
public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
  final atomicreference<scheduledexecutorservice> executor = Atomics.newReference();
  AbstractScheduledService service = new AbstractScheduledService() {
    @Override protected void startUp() throws Exception {
      throw new Exception("Failed");
    }

    @Override protected void runOneIteration() throws Exception {}

    @Override protected scheduledexecutorservice executor() {
      executor.set(super.executor());
      return executor.get();
    }

    @Override protected Scheduler scheduler() {
      return newFixedDelaySchedule(0,TimeUnit.MILLISECONDS));
}
项目:googles-monorepo-demo    文件:AbstractScheduledServiceTest.java   
public void testTimeout() {
  // Create a service whose executor will never run its commands
  Service service = new AbstractScheduledService() {
    @Override protected Scheduler scheduler() {
      return Scheduler.newFixedDelaySchedule(0,TimeUnit.MILLISECONDS);
    fail("Expected timeout");
  } catch (TimeoutException e) {
    assertthat(e).hasMessage("Timed out waiting for Foo [STARTING] to reach the RUNNING state.");
  }
}
项目:googles-monorepo-demo    文件:AbstractScheduledServiceTest.java   
public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()
    throws Exception {
  TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
    @Override protected Scheduler scheduler() {
      return newFixedDelaySchedule(Long.MAX_VALUE,service.numIterations.get());
  service.stopAsync();
  service.awaitTerminated();
}
项目:googles-monorepo-demo    文件:AbstractScheduledServiceTest.java   
public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()
    throws Exception {
  TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
    @Override protected Scheduler scheduler() {
      return new AbstractScheduledService.CustomScheduler() {
        @Override
        protected Schedule getNextSchedule() throws Exception {
          return new Schedule(Long.MAX_VALUE,service.numIterations.get());
  service.stopAsync();
  service.awaitTerminated();
}
项目:googles-monorepo-demo    文件:AbstractScheduledServiceTest.java   
public void testCustomScheduler_deadlock() throws InterruptedException,TimeUnit.NANOSECONDS);
          }
        };
      }
    };
    service.startAsync().awaitRunning();
    inGetNextSchedule.await();
    service.stopAsync();
  }
}
项目:googles-monorepo-demo    文件:AbstractScheduledServiceTest.java   
public void testBig() throws Exception {
  TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
    @Override protected Scheduler scheduler() {
      return new AbstractScheduledService.CustomScheduler() {
        @Override
        protected Schedule getNextSchedule() throws Exception {
          // Explicitly yield to increase the probability of a pathological scheduling.
          Thread.yield();
          return new Schedule(0,service.numIterations.get());
}
项目:miscellaneous    文件:ScheduleConfiguration.java   
public Scheduler newScheduler() {
  long initialDelay =
      initialDelay().isPresent()
          ? initialDelay().get().toMillis()
          : 0;

  if (delay().isPresent()) {
    return Scheduler.newFixedDelaySchedule(
        initialDelay,delay().get().toMillis(),TimeUnit.MILLISECONDS);
  }

  return Scheduler.newFixedrateSchedule(
      initialDelay,rate().get().toMillis(),TimeUnit.MILLISECONDS);
}
项目:guava-libraries    文件:AbstractScheduledServiceTest.java   
public void testDefaultExecutorIsShutdownWhenServiceIsstopped() throws Exception {
  final atomicreference<scheduledexecutorservice> executor = Atomics.newReference();
  AbstractScheduledService service = new AbstractScheduledService() {
    @Override protected void runOneIteration() throws Exception {}

    @Override protected scheduledexecutorservice executor() {
      executor.set(super.executor());
      return executor.get();
    }

    @Override protected Scheduler scheduler() {
      return Scheduler.newFixedDelaySchedule(0,TimeUnit.MILLISECONDS));
}
项目:guava-libraries    文件:AbstractScheduledServiceTest.java   
public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
  final atomicreference<scheduledexecutorservice> executor = Atomics.newReference();
  AbstractScheduledService service = new AbstractScheduledService() {
    @Override protected void startUp() throws Exception {
      throw new Exception("Failed");
    }

    @Override protected void runOneIteration() throws Exception {}

    @Override protected scheduledexecutorservice executor() {
      executor.set(super.executor());
      return executor.get();
    }

    @Override protected Scheduler scheduler() {
      return Scheduler.newFixedDelaySchedule(0,TimeUnit.MILLISECONDS));
}
项目:guava-libraries    文件:AbstractScheduledServiceTest.java   
public void testBig() throws Exception {
  TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
    @Override protected Scheduler scheduler() {
      return new AbstractScheduledService.CustomScheduler() {
        @Override
        protected Schedule getNextSchedule() throws Exception {
          // Explicitly yield to increase the probability of a pathological scheduling.
          Thread.yield();
          return new Schedule(0,service.numIterations.get());
}
项目:guava    文件:AbstractScheduledServiceTest.java   
public void testDefaultExecutorIsShutdownWhenServiceIsstopped() throws Exception {
  final atomicreference<scheduledexecutorservice> executor = Atomics.newReference();
  AbstractScheduledService service =
      new AbstractScheduledService() {
        @Override
        protected void runOneIteration() throws Exception {}

        @Override
        protected scheduledexecutorservice executor() {
          executor.set(super.executor());
          return executor.get();
        }

        @Override
        protected Scheduler scheduler() {
          return newFixedDelaySchedule(0,TimeUnit.MILLISECONDS);
        }
      };

  service.startAsync();
  assertFalse(service.executor().isShutdown());
  service.awaitRunning();
  service.stopAsync();
  service.awaitTerminated();
  assertTrue(executor.get().awaitTermination(100,TimeUnit.MILLISECONDS));
}
项目:guava    文件:AbstractScheduledServiceTest.java   
public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()
    throws Exception {
  TestAbstractScheduledCustomService service =
      new TestAbstractScheduledCustomService() {
        @Override
        protected Scheduler scheduler() {
          return newFixedDelaySchedule(Long.MAX_VALUE,SECONDS);
        }
      };
  service.startAsync().awaitRunning();
  try {
    service.firstBarrier.await(5,service.numIterations.get());
  service.stopAsync();
  service.awaitTerminated();
}
项目:guava    文件:AbstractScheduledServiceTest.java   
public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()
    throws Exception {
  TestAbstractScheduledCustomService service =
      new TestAbstractScheduledCustomService() {
        @Override
        protected Scheduler scheduler() {
          return new AbstractScheduledService.CustomScheduler() {
            @Override
            protected Schedule getNextSchedule() throws Exception {
              return new Schedule(Long.MAX_VALUE,SECONDS);
            }
          };
        }
      };
  service.startAsync().awaitRunning();
  try {
    service.firstBarrier.await(5,service.numIterations.get());
  service.stopAsync();
  service.awaitTerminated();
}
项目:guava    文件:AbstractScheduledServiceTest.java   
public void testBig() throws Exception {
  TestAbstractScheduledCustomService service =
      new TestAbstractScheduledCustomService() {
        @Override
        protected Scheduler scheduler() {
          return new AbstractScheduledService.CustomScheduler() {
            @Override
            protected Schedule getNextSchedule() throws Exception {
              // Explicitly yield to increase the probability of a pathological scheduling.
              Thread.yield();
              return new Schedule(0,TimeUnit.SECONDS);
            }
          };
        }
      };
  service.useBarriers = false;
  service.startAsync().awaitRunning();
  Thread.sleep(50);
  service.useBarriers = true;
  service.firstBarrier.await();
  int numIterations = service.numIterations.get();
  service.stopAsync();
  service.secondBarrier.await();
  service.awaitTerminated();
  assertEquals(numIterations,service.numIterations.get());
}
项目:guava-mock    文件:AbstractScheduledServiceTest.java   
@Override protected Scheduler scheduler() {
  return new CustomScheduler() {
    @Override
    protected Schedule getNextSchedule() throws Exception {
      return new Schedule(delay,unit);
    }};
}
项目:guava-mock    文件:AbstractScheduledServiceTest.java   
@Override protected Scheduler scheduler() {
  return new CustomScheduler() {
    @Override
    protected Schedule getNextSchedule() throws Exception {
      if (numIterations.get() > 2) {
        throw new IllegalStateException("Failed");
      }
      return new Schedule(delay,unit);
    }};
}
项目:googles-monorepo-demo    文件:AbstractScheduledServiceTest.java   
@Override protected Scheduler scheduler() {
  return new CustomScheduler() {
    @Override
    protected Schedule getNextSchedule() throws Exception {
      return new Schedule(delay,unit);
    }};
}
项目:googles-monorepo-demo    文件:AbstractScheduledServiceTest.java   
@Override protected Scheduler scheduler() {
  return new CustomScheduler() {
    @Override
    protected Schedule getNextSchedule() throws Exception {
      if (numIterations.get() > 2) {
        throw new IllegalStateException("Failed");
      }
      return new Schedule(delay,unit);
    }};
}
项目:Mastering-Mesos    文件:AsyncStatsModule.java   
@Override
protected void configure() {
  bind(TaskStatCalculator.class).in(Singleton.class);
  bind(CachedCounters.class).in(Singleton.class);
  bind(MachineResourceProvider.class).to(OfferAdapter.class);
  bind(SlotSizeCounter.class).in(Singleton.class);

  install(new PrivateModule() {
    @Override
    protected void configure() {
      bind(TaskStatUpdaterService.class).in(Singleton.class);
      bind(Scheduler.class).toInstance(
          Scheduler.newFixedrateSchedule(
              TASK_STAT_INTERVAL.get().getValue(),TASK_STAT_INTERVAL.get().getValue(),TASK_STAT_INTERVAL.get().getUnit().getTimeUnit()));
      expose(TaskStatUpdaterService.class);
    }
  });
  SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
      .to(TaskStatUpdaterService.class);

  install(new PrivateModule() {
    @Override
    protected void configure() {
      bind(SlotSizeCounterService.class).in(Singleton.class);
      bind(Scheduler.class).toInstance(
          Scheduler.newFixedrateSchedule(
              SLOT_STAT_INTERVAL.get().getValue(),SLOT_STAT_INTERVAL.get().getValue(),SLOT_STAT_INTERVAL.get().getUnit().getTimeUnit()));
      expose(SlotSizeCounterService.class);
    }
  });
  SchedulerServicesModule.addSchedulerActiveServiceBinding(binder())
      .to(SlotSizeCounterService.class);
}
项目:guava-libraries    文件:AbstractScheduledServiceTest.java   
@Override protected Scheduler scheduler() {
  return new CustomScheduler() {
    @Override
    protected Schedule getNextSchedule() throws Exception {
      return new Schedule(delay,unit);
    }};
}
项目:guava-libraries    文件:AbstractScheduledServiceTest.java   
@Override protected Scheduler scheduler() {
  return new CustomScheduler() {
    @Override
    protected Schedule getNextSchedule() throws Exception {
      if (numIterations.get() > 2) {
        throw new IllegalStateException("Failed");
      }
      return new Schedule(delay,unit);
    }};
}
项目:guava    文件:AbstractScheduledServiceTest.java   
public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
  final atomicreference<scheduledexecutorservice> executor = Atomics.newReference();
  AbstractScheduledService service =
      new AbstractScheduledService() {
        @Override
        protected void startUp() throws Exception {
          throw new Exception("Failed");
        }

        @Override
        protected void runOneIteration() throws Exception {}

        @Override
        protected scheduledexecutorservice executor() {
          executor.set(super.executor());
          return executor.get();
        }

        @Override
        protected Scheduler scheduler() {
          return newFixedDelaySchedule(0,TimeUnit.MILLISECONDS);
        }
      };

  try {
    service.startAsync().awaitRunning();
    fail("Expected service to fail during startup");
  } catch (IllegalStateException expected) {
  }

  assertTrue(executor.get().awaitTermination(100,TimeUnit.MILLISECONDS));
}
项目:guava    文件:AbstractScheduledServiceTest.java   
public void testTimeout() {
  // Create a service whose executor will never run its commands
  Service service =
      new AbstractScheduledService() {
        @Override
        protected Scheduler scheduler() {
          return Scheduler.newFixedDelaySchedule(0,TimeUnit.NANOSECONDS);
        }

        @Override
        protected scheduledexecutorservice executor() {
          return TestingExecutors.noOpScheduledExecutor();
        }

        @Override
        protected void runOneIteration() throws Exception {}

        @Override
        protected String serviceName() {
          return "Foo";
        }
      };
  try {
    service.startAsync().awaitRunning(1,TimeUnit.MILLISECONDS);
    fail("Expected timeout");
  } catch (TimeoutException e) {
    assertthat(e).hasMessage("Timed out waiting for Foo [STARTING] to reach the RUNNING state.");
  }
}
项目:guava    文件:AbstractScheduledServiceTest.java   
public void testCustomScheduler_deadlock() throws InterruptedException,so run it multiple times to increase the flake likelihood
  for (int i = 0; i < 1000; i++) {
    Service service =
        new AbstractScheduledService() {
          @Override
          protected void runOneIteration() {}

          @Override
          protected Scheduler scheduler() {
            return new CustomScheduler() {
              @Override
              protected Schedule getNextSchedule() throws Exception {
                if (state() != State.STARTING) {
                  inGetNextSchedule.await();
                  Thread.yield();
                  throw new RuntimeException("boom");
                }
                return new Schedule(0,TimeUnit.NANOSECONDS);
              }
            };
          }
        };
    service.startAsync().awaitRunning();
    inGetNextSchedule.await();
    service.stopAsync();
  }
}
项目:guava    文件:AbstractScheduledServiceTest.java   
@Override
protected Scheduler scheduler() {
  return new CustomScheduler() {
    @Override
    protected Schedule getNextSchedule() throws Exception {
      return new Schedule(delay,unit);
    }
  };
}
项目:guava    文件:AbstractScheduledServiceTest.java   
@Override
protected Scheduler scheduler() {
  return new CustomScheduler() {
    @Override
    protected Schedule getNextSchedule() throws Exception {
      if (numIterations.get() > 2) {
        throw new IllegalStateException("Failed");
      }
      return new Schedule(delay,unit);
    }
  };
}

DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战

DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战

背景

MySQL库A 到 MySQL库B的增量数据同步需求

DolphinScheduler中配置DataX MySQL To MySQL工作流

工作流定义

工作流定义 > 创建工作流 > 拖入1个SHELL组件 > 拖入1个DATAX组件 SHELL组件(文章) 脚本

echo ''文章同步 MySQL To MySQL''

DATAX组件(t_article) 用到2个插件mysqlreader^[1]、mysqlwriter^[2] 选 自定义模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库A?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
                                ],
                                "querySql": [
                                    "select a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= ''${biz_update_dt}'';"
                                ]
                            }
                        ],
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "`id`",
                            "`title`",
                            "`content`",
                            "`is_delete`",
                            "`delete_date`",
                            "`create_date`",
                            "`update_date`"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库B?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
                                "table": [
                                    "t_article"
                                ]
                            }
                        ],
                        "writeMode": "replace",
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                }
            }
        ],
        "setting": {
            "errorLimit": {
                "percentage": 0,
                "record": 0
            },
            "speed": {
                "channel": 1,
                "record": 1000
            }
        }
    }
}

reader和writer的字段配置需保持一致

自定义参数:

biz_update_dt: ${global_bizdate}
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql账号
biz_mysql_password: 你的mysql密码

# 本文实验环境A库和B库用的同一个实例,如果MySQL是多个实例,可以再新增加参数定义例如 biz_mysql_host_b,在模板中对应引用即可

配置的自定义参数将会自动替换json模板中的同名变量

reader mysqlreader插件中关键配置: a.update_date >= ''${biz_update_dt}'' 就是实现增量同步的关键配置 writer mysqlwriter插件中关键配置: ``

"parameter": {
    "writeMode": "replace",
    ......
}

writeMode为replace,相同主键id重复写入数据,就会更新数据。sql本质上执行的是 replace into

保存工作流

全局变量设置 global_bizdate: $[yyyy-MM-dd 00:00:00-1]

global_bizdate 引用的变量为 DolphinScheduler 内置变量,具体参考官网文档^[3] 结合调度时间设计好时间滚动的窗口时长,比如按1天增量,那么这里时间就是减1天

最终的工作流DAG图为:

爬坑记录

  • 官网下载的DataX不包含ElasticSearchWriter写插件 默认不带该插件,需要自己编译ElasticSearchWriter插件。
git clone https://github.com/alibaba/DataX.git

为了加快编译速度,可以只编译<module>elasticsearchwriter</module> 项目根目录的pom.xml <!-- reader --> 全注释掉,<!-- writer -->下只保留<module>elasticsearchwriter</module>其他注释掉,另外<!-- common support module -->也需要保留

如果自己不想编译或者编译失败请搜索 "流水理鱼"微信公众号,或者加我私人微信我给你已经编译好的插件包

by 流水理鱼|wwek

参考

1. DataX MysqlReader 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 2. DataX MysqlWriter 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md 3. Apache DolphinScheduler 内置参数 https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/built-in.html 本文首发于流水理鱼博客,如要转载请注明出处。 欢迎关注我的公众号:流水理鱼(liushuiliyu),全栈、云原生、Homelab交流。 如果您对相关文章感兴趣,也可以关注我的博客:www.iamle.com 上面有更多内容

今天关于MySQL8.0的参数event_scheduler默认是ON,请注意一些坑mysql常用参数设置的介绍到此结束,谢谢您的阅读,有关angular2使用PrimeNG-Scheduler实现FullCalendar-Scheduler、cocos2dx 定时器 schedule scheduleUpdate scheduleOnce、com.google.common.util.concurrent.AbstractScheduledService.Scheduler的实例源码、DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战等更多相关知识的信息可以在本站进行查询。

本文标签: