在本文中,我们将详细介绍MySQL8.0的参数event_scheduler默认是ON,请注意一些坑的各个方面,并为您提供关于mysql常用参数设置的相关解答,同时,我们也将为您带来关于angular
在本文中,我们将详细介绍MySQL8.0的参数event_scheduler默认是ON,请注意一些坑的各个方面,并为您提供关于mysql常用参数设置的相关解答,同时,我们也将为您带来关于angular2使用PrimeNG-Scheduler实现FullCalendar-Scheduler、cocos2dx 定时器 schedule scheduleUpdate scheduleOnce、com.google.common.util.concurrent.AbstractScheduledService.Scheduler的实例源码、DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战的有用知识。
本文目录一览:- MySQL8.0的参数event_scheduler默认是ON,请注意一些坑(mysql常用参数设置)
- angular2使用PrimeNG-Scheduler实现FullCalendar-Scheduler
- cocos2dx 定时器 schedule scheduleUpdate scheduleOnce
- com.google.common.util.concurrent.AbstractScheduledService.Scheduler的实例源码
- DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战
MySQL8.0的参数event_scheduler默认是ON,请注意一些坑(mysql常用参数设置)
event_scheduler是什么?
event_scheduler是什么MySQL定时器的开关,类似于windows操作系统的定时任务的概念,指定某个时间点执行一次定时任务,或者每隔一段时间循环执行定时任务。
这个东西有企业在用么?
看了几个企业的开发规范,都没有提及需要禁用event功能,所以event功能是允许开发人员使用的。(不过,我相信java开发人员更喜欢用java定时器。)也许是因为mysql默认就不启用event功能嘛,默认就无法用event,开发规范就不提及这东西啦。况且可以通过用户权限控制来控制开发人员是否可以使用event。所以从我手上的几家公司的开发规范里,并不可以实际看出大家有没有使用event_scheduler这个功能。由于MySQL8.0默认是开启event_scheduler功能了,我认为我们还是有必要讨论一下event到底有没有坑?
我能想到的坑——主从复制的坑
首先这个事情并没有实际发生在我生产环境,是人为想出来的一种场景。
准备:
两个MySQL8.0数据库实例
mysql3308
mysql33081
配置文件不显示地指定默认值:
[root@192-168-199-198 mysql3308]# cat my.cnf |grep event_scheduler
MySQL8.0的默认值即为:
mysql> show variables like ''%event_scheduler%'';
+-----------------+-------+
| Variable_name | Value |
+-----------------+-------+
| event_scheduler | ON |
+-----------------+-------+
1 row in set (0.12 sec)
我给大家做了以下的测试。
一、在已有的主从环境下,建立一个重复插入的event。
主库上执行
mysql> show variables like ''%event_scheduler%'';
+-----------------+-------+
| Variable_name | Value |
+-----------------+-------+
| event_scheduler | ON |
+-----------------+-------+
1 row in set (0.12 sec)
create database fander;
use fander;
CREATE EVENT IF NOT EXISTS test
ON SCHEDULE EVERY 1 SECOND
ON COMPLETION PRESERVE
DO insert into fander.test values (1);
#这个event表示每一秒都往fander库的test表插入一行值为1的数据。
mysql> show create event test\G
*************************** 1. row ***************************
Event: test
sql_mode: ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
time_zone: SYSTEM
Create Event: CREATE DEFINER=`root`@`localhost` EVENT `test` ON SCHEDULE EVERY 1 SECOND STARTS ''2019-01-23 14:55:19'' ON COMPLETION PRESERVE ENABLE DO insert into fander.test values (1)
character_set_client: utf8mb4
collation_connection: utf8mb4_0900_ai_ci
Database Collation: utf8mb4_0900_ai_ci
1 row in set (0.00 sec)
mysql> show tables;
Empty set (0.01 sec)
#我先不建立测试重复插入的test表。
从库上执行
mysql> show variables like ''%event_scheduler%'';
+-----------------+-------+
| Variable_name | Value |
+-----------------+-------+
| event_scheduler | ON |
+-----------------+-------+
1 row in set (0.12 sec)
mysql> use fander;
mysql> show create event test\G
*************************** 1. row ***************************
Event: test
sql_mode: ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
time_zone: SYSTEM
Create Event: CREATE DEFINER=`root`@`localhost` EVENT `test` ON SCHEDULE EVERY 1 SECOND STARTS ''2019-01-23 14:55:19'' ON COMPLETION PRESERVE DISABLE ON SLAVE DO insert into fander.test values (1)
character_set_client: utf8mb4
collation_connection: utf8mb4_0900_ai_ci
Database Collation: utf8mb4_0900_ai_ci
1 row in set (0.00 sec)
有没有发现问题?
-
主从环境的event_scheduler都是开启的。这意味这从库有可能会重复写入数据。一部分数据来源于主库的event的循环插入的复制,一部分数据来源于从库自身的event的循环插入。
-
主从库的Create Event语句不一样?
在create event时,event在主库创建后,复制到从库上是disable的状态!所以MySQL在设计之初就考虑了这个问题。event在从库是disable的,无法执行,从而从库不会发生写入操作而导致数据不一致!
我们进一步测试,是不是确实如何此。
主库上执行
mysql> create table test (a int);
Query OK, 0 rows affected (0.04 sec)
mysql> select count(1) from test;
+----------+
| count(1) |
+----------+
| 52 |
+----------+
1 row in set (0.01 sec)
从库上执行
mysql> select count(1) from test;
+----------+
| count(1) |
+----------+
| 52 |
+----------+
1 row in set (0.01 sec)
#注意,也可以用对比gtid的方法查看从库,发现从库确实没有自身数据写入。
结论是,在已有的主从环境下,建立一个重复插入的event,event不会在从库上执行,不会有数据写入,从而保证了数据的一致性。这种情况下event的开启没有带来坑。
二、在已有一个重复插入的event的主库上,扩展建立一个从库。
主库:
mysql> show variables like ''%event_scheduler%'';
+-----------------+-------+
| Variable_name | Value |
+-----------------+-------+
| event_scheduler | ON |
+-----------------+-------+
1 row in set (0.12 sec)
mysql> show create event test\G
*************************** 1. row ***************************
Event: test
sql_mode: ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
time_zone: SYSTEM
Create Event: CREATE DEFINER=`root`@`localhost` EVENT `test` ON SCHEDULE EVERY 1 SECOND STARTS ''2019-01-23 14:55:19'' ON COMPLETION PRESERVE ENABLE DO insert into fander.test values (1)
character_set_client: utf8mb4
collation_connection: utf8mb4_0900_ai_ci
Database Collation: utf8mb4_0900_ai_ci
1 row in set (0.06 sec)
冷备的方法建立从库。
mysql> show variables like ''%event_scheduler%'';
+-----------------+-------+
| Variable_name | Value |
+-----------------+-------+
| event_scheduler | ON |
+-----------------+-------+
1 row in set (0.12 sec)
mysql> CHANGE MASTER TO
-> MASTER_HOST=''localhost'',
-> MASTER_USER=''rpl_user'',
-> MASTER_PASSWORD=''password'',
-> MASTER_PORT=3308,
-> master_auto_position=1;
Query OK, 0 rows affected, 2 warnings (0.04 sec)
mysql> start slave;
Query OK, 0 rows affected (0.02 sec)
mysql> use fander
Database changed
mysql> show create event test\G
*************************** 1. row ***************************
Event: test
sql_mode: ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION
time_zone: SYSTEM
Create Event: CREATE DEFINER=`root`@`localhost` EVENT `test` ON SCHEDULE EVERY 1 SECOND STARTS ''2019-01-23 14:55:19'' ON COMPLETION PRESERVE ENABLE DO insert into fander.test values (1)
character_set_client: utf8mb4
collation_connection: utf8mb4_0900_ai_ci
Database Collation: utf8mb4_0900_ai_ci
1 row in set (0.00 sec)
可以发现主从库的Create Event语句这时是一模一样了。 下面接着测试。
主库:
mysql> select count(1) from test;
+----------+
| count(1) |
+----------+
| 14 |
+----------+
1 row in set (0.00 sec)
从库:
mysql> select count(1) from test;
+----------+
| count(1) |
+----------+
| 33 |
+----------+
1 row in set (0.00 sec)
mysql> show slave status\G
...
Retrieved_Gtid_Set: 07b92486-64b0-11e8-b4cf-000c29c71881:501-561
Executed_Gtid_Set: 07b92486-64b0-11e8-b4cf-000c29c71881:1-561,
59613f3a-1ee0-11e9-a9e7-000c29259487:1-13
...
以上测试用的是物理冷备。我测试在用逻辑备份主库,扩展从库后,情况一样。 结论是,在已有一个重复插入的event的主库上,扩展建立一个从库,会因为从库也有这个event schedule,导致从库重复执行insert语句。等于计划任务重新执行了两次!数据会造成不一致!
如何避免?
建议:
1.规范my.cnf模板,event_scheduler设置为0,而不是采用官方默认值1。
2.从库建议开启read_only=1; 严格防止写入。
扩展
如果日常备份时备份的是文章"一"情况的从库,那么恢复数据库后,event默认是disable的,是跑不了的。具体见下面:
[root@192-168-199-198 ~]# cat all2.sql |grep "EVERY 1 SECOND"
/*!50106 CREATE*/ /*!50117 DEFINER=`root`@`localhost`*/ /*!50106 EVENT `test` ON SCHEDULE EVERY 1 SECOND STARTS ''2019-01-23 16:03:16'' ON COMPLETION PRESERVE DISABLE ON SLAVE DO insert into fander.test values (1) */ ;;
你需要人手修改event让其可以跑。这也属于一个坑吧。
angular2使用PrimeNG-Scheduler实现FullCalendar-Scheduler
API使用Observables然后我在组件中订阅.这适用于事件,因为视图在事件更改时自动更新.
但是,当通过PrimeNG’选项’属性向FullCalendar提供’资源’时,事情不能按预期工作,因为设置’options’属性的代码在API调用有机会返回之前运行,因此空.
我确信这一点,因为如果我对资源进行硬编码,就会有所作为.
我可以想出几种方法来解决这个问题:
>使调用同步(希望避免这种情况)
>等待加载所有数据,然后(重新)渲染视图(使其与#1几乎相同)
>配置options.resources属性,以便在更改时,视图会更新,就像它对事件一样(这是最好的选择,但不确定它是否可能)
我将不胜感激任何帮助.谢谢.
<p-schedule [events]="events" [businessHours]="businessHours" [options]="optionConfig" > </p-schedule>
我的(现在)虚拟API
getEvents() { return this.http .get('assets/api/mockEvents.json') .map((response : Response) => <Appointment[]>response.json().data) .catch(this.handleError); } getResources() { return this.http .get('assets/api/mockResources.json') .map((response : Response) => <Resource[]>response.json().data) .catch(this.handleError); }
组件文件
ngOnInit() { this.schedulerService.getEvents() .subscribe(events=> this.events = events); this.schedulerService.getResources() .subscribe(resources => this.resources = resources); // ***** If the following code is uncommented,resources are displayed in Schedule view **** // this.resources = [ // new Resource(1,"Dr. Hibbert","blue",true,new BusinessHours("08:00","16:00")),// new Resource(2,"Dr. Simpson","green",new BusinessHours("10:00","18:00")) // ]; this.optionConfig = { "resources": this.resources } }
编辑:我想到的一件事是,只能通过它的setter方法设置this.resources属性.这样,我确切地知道何时设置了值,但问题仍然存在,如何在初始化之后将新值推送到调度组件.
解决方法
你可以使用angular2的异步管道来获取视图部分中的数据
<p-schedule [events]="events" [businessHours]="businessHours" [options]="optionConfig | async" > </p-schedule>
甚至你可以使用异步管道直接分配资源,而不是包装到optionConfig中,如果合适的话.
通过这样做,您既不需要进行同步调用,也不需要在加载数据后重新加入视图.
如果还有问题,请告诉我.
cocos2dx 定时器 schedule scheduleUpdate scheduleOnce
1. schedule a. 启用: this-schedule(schedule_selector(HelloWorld::Move),1.0f); b. 停用: this-unschedule(schedule_selector(HelloWorld::Move)); c.timeProc: void Move(float dt); void HelloWorld::Move(floatdt) { CCLOG(Tick! Tick!); } 2. sche
1. schedule
a. 启用:this->schedule(schedule_selector(HelloWorld::Move),1.0f);
b. 停用:this->unschedule(schedule_selector(HelloWorld::Move));
c.timeProc:void Move(float dt);
void HelloWorld::Move(floatdt)
{
CCLOG("Tick! Tick!");
}
2. scheduleUpdate
a. 启用:this->scheduleUpdate();
b. 停用:this->unscheduleUpdate();
c.timeProc:void Update(float dt);
voidHelloWorld::Update(float dt) // 函数名必须为Update,规定
{
CCLOG("Tick! Tick!");
}
3. scheduleOnce
a. 启用:this->scheduleOnce(schedule_selector(HelloWorld::Move),1.0f);
b. 停用:响应一次之后自动停止
4.停止所有定时器:this->unscheduleAllSelectors();
com.google.common.util.concurrent.AbstractScheduledService.Scheduler的实例源码
public void testDefaultExecutorIsShutdownWhenServiceIsstopped() throws Exception { final atomicreference<scheduledexecutorservice> executor = Atomics.newReference(); AbstractScheduledService service = new AbstractScheduledService() { @Override protected void runOneIteration() throws Exception {} @Override protected scheduledexecutorservice executor() { executor.set(super.executor()); return executor.get(); } @Override protected Scheduler scheduler() { return newFixedDelaySchedule(0,1,TimeUnit.MILLISECONDS); } }; service.startAsync(); assertFalse(service.executor().isShutdown()); service.awaitRunning(); service.stopAsync(); service.awaitTerminated(); assertTrue(executor.get().awaitTermination(100,TimeUnit.MILLISECONDS)); }
public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception { final atomicreference<scheduledexecutorservice> executor = Atomics.newReference(); AbstractScheduledService service = new AbstractScheduledService() { @Override protected void startUp() throws Exception { throw new Exception("Failed"); } @Override protected void runOneIteration() throws Exception {} @Override protected scheduledexecutorservice executor() { executor.set(super.executor()); return executor.get(); } @Override protected Scheduler scheduler() { return newFixedDelaySchedule(0,TimeUnit.MILLISECONDS); } }; try { service.startAsync().awaitRunning(); fail("Expected service to fail during startup"); } catch (IllegalStateException expected) {} assertTrue(executor.get().awaitTermination(100,TimeUnit.MILLISECONDS)); }
public void testTimeout() { // Create a service whose executor will never run its commands Service service = new AbstractScheduledService() { @Override protected Scheduler scheduler() { return Scheduler.newFixedDelaySchedule(0,TimeUnit.NANOSECONDS); } @Override protected scheduledexecutorservice executor() { return TestingExecutors.noOpScheduledExecutor(); } @Override protected void runOneIteration() throws Exception {} @Override protected String serviceName() { return "Foo"; } }; try { service.startAsync().awaitRunning(1,TimeUnit.MILLISECONDS); fail("Expected timeout"); } catch (TimeoutException e) { assertthat(e).hasMessage("Timed out waiting for Foo [STARTING] to reach the RUNNING state."); } }
public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached() throws Exception { TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { @Override protected Scheduler scheduler() { return newFixedDelaySchedule(Long.MAX_VALUE,Long.MAX_VALUE,SECONDS); } }; service.startAsync().awaitRunning(); try { service.firstBarrier.await(5,SECONDS); fail(); } catch (TimeoutException expected) { } assertEquals(0,service.numIterations.get()); service.stopAsync(); service.awaitTerminated(); }
public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached() throws Exception { TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { @Override protected Scheduler scheduler() { return new AbstractScheduledService.CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { return new Schedule(Long.MAX_VALUE,SECONDS); } }; } }; service.startAsync().awaitRunning(); try { service.firstBarrier.await(5,service.numIterations.get()); service.stopAsync(); service.awaitTerminated(); }
public void testCustomScheduler_deadlock() throws InterruptedException,brokenBarrierException { final CyclicBarrier inGetNextSchedule = new CyclicBarrier(2); // This will flakily deadlock,so run it multiple times to increase the flake likelihood for (int i = 0; i < 1000; i++) { Service service = new AbstractScheduledService() { @Override protected void runOneIteration() {} @Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { if (state() != State.STARTING) { inGetNextSchedule.await(); Thread.yield(); throw new RuntimeException("boom"); } return new Schedule(0,TimeUnit.NANOSECONDS); } }; } }; service.startAsync().awaitRunning(); inGetNextSchedule.await(); service.stopAsync(); } }
public void testBig() throws Exception { TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { @Override protected Scheduler scheduler() { return new AbstractScheduledService.CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { // Explicitly yield to increase the probability of a pathological scheduling. Thread.yield(); return new Schedule(0,TimeUnit.SECONDS); } }; } }; service.useBarriers = false; service.startAsync().awaitRunning(); Thread.sleep(50); service.useBarriers = true; service.firstBarrier.await(); int numIterations = service.numIterations.get(); service.stopAsync(); service.secondBarrier.await(); service.awaitTerminated(); assertEquals(numIterations,service.numIterations.get()); }
public void testDefaultExecutorIsShutdownWhenServiceIsstopped() throws Exception { final atomicreference<scheduledexecutorservice> executor = Atomics.newReference(); AbstractScheduledService service = new AbstractScheduledService() { @Override protected void runOneIteration() throws Exception {} @Override protected scheduledexecutorservice executor() { executor.set(super.executor()); return executor.get(); } @Override protected Scheduler scheduler() { return newFixedDelaySchedule(0,TimeUnit.MILLISECONDS)); }
public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception { final atomicreference<scheduledexecutorservice> executor = Atomics.newReference(); AbstractScheduledService service = new AbstractScheduledService() { @Override protected void startUp() throws Exception { throw new Exception("Failed"); } @Override protected void runOneIteration() throws Exception {} @Override protected scheduledexecutorservice executor() { executor.set(super.executor()); return executor.get(); } @Override protected Scheduler scheduler() { return newFixedDelaySchedule(0,TimeUnit.MILLISECONDS)); }
public void testTimeout() { // Create a service whose executor will never run its commands Service service = new AbstractScheduledService() { @Override protected Scheduler scheduler() { return Scheduler.newFixedDelaySchedule(0,TimeUnit.MILLISECONDS); fail("Expected timeout"); } catch (TimeoutException e) { assertthat(e).hasMessage("Timed out waiting for Foo [STARTING] to reach the RUNNING state."); } }
public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached() throws Exception { TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { @Override protected Scheduler scheduler() { return newFixedDelaySchedule(Long.MAX_VALUE,service.numIterations.get()); service.stopAsync(); service.awaitTerminated(); }
public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached() throws Exception { TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { @Override protected Scheduler scheduler() { return new AbstractScheduledService.CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { return new Schedule(Long.MAX_VALUE,service.numIterations.get()); service.stopAsync(); service.awaitTerminated(); }
public void testCustomScheduler_deadlock() throws InterruptedException,TimeUnit.NANOSECONDS); } }; } }; service.startAsync().awaitRunning(); inGetNextSchedule.await(); service.stopAsync(); } }
public void testBig() throws Exception { TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { @Override protected Scheduler scheduler() { return new AbstractScheduledService.CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { // Explicitly yield to increase the probability of a pathological scheduling. Thread.yield(); return new Schedule(0,service.numIterations.get()); }
public Scheduler newScheduler() { long initialDelay = initialDelay().isPresent() ? initialDelay().get().toMillis() : 0; if (delay().isPresent()) { return Scheduler.newFixedDelaySchedule( initialDelay,delay().get().toMillis(),TimeUnit.MILLISECONDS); } return Scheduler.newFixedrateSchedule( initialDelay,rate().get().toMillis(),TimeUnit.MILLISECONDS); }
public void testDefaultExecutorIsShutdownWhenServiceIsstopped() throws Exception { final atomicreference<scheduledexecutorservice> executor = Atomics.newReference(); AbstractScheduledService service = new AbstractScheduledService() { @Override protected void runOneIteration() throws Exception {} @Override protected scheduledexecutorservice executor() { executor.set(super.executor()); return executor.get(); } @Override protected Scheduler scheduler() { return Scheduler.newFixedDelaySchedule(0,TimeUnit.MILLISECONDS)); }
public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception { final atomicreference<scheduledexecutorservice> executor = Atomics.newReference(); AbstractScheduledService service = new AbstractScheduledService() { @Override protected void startUp() throws Exception { throw new Exception("Failed"); } @Override protected void runOneIteration() throws Exception {} @Override protected scheduledexecutorservice executor() { executor.set(super.executor()); return executor.get(); } @Override protected Scheduler scheduler() { return Scheduler.newFixedDelaySchedule(0,TimeUnit.MILLISECONDS)); }
public void testBig() throws Exception { TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { @Override protected Scheduler scheduler() { return new AbstractScheduledService.CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { // Explicitly yield to increase the probability of a pathological scheduling. Thread.yield(); return new Schedule(0,service.numIterations.get()); }
public void testDefaultExecutorIsShutdownWhenServiceIsstopped() throws Exception { final atomicreference<scheduledexecutorservice> executor = Atomics.newReference(); AbstractScheduledService service = new AbstractScheduledService() { @Override protected void runOneIteration() throws Exception {} @Override protected scheduledexecutorservice executor() { executor.set(super.executor()); return executor.get(); } @Override protected Scheduler scheduler() { return newFixedDelaySchedule(0,TimeUnit.MILLISECONDS); } }; service.startAsync(); assertFalse(service.executor().isShutdown()); service.awaitRunning(); service.stopAsync(); service.awaitTerminated(); assertTrue(executor.get().awaitTermination(100,TimeUnit.MILLISECONDS)); }
public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached() throws Exception { TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { @Override protected Scheduler scheduler() { return newFixedDelaySchedule(Long.MAX_VALUE,SECONDS); } }; service.startAsync().awaitRunning(); try { service.firstBarrier.await(5,service.numIterations.get()); service.stopAsync(); service.awaitTerminated(); }
public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached() throws Exception { TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { @Override protected Scheduler scheduler() { return new AbstractScheduledService.CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { return new Schedule(Long.MAX_VALUE,SECONDS); } }; } }; service.startAsync().awaitRunning(); try { service.firstBarrier.await(5,service.numIterations.get()); service.stopAsync(); service.awaitTerminated(); }
public void testBig() throws Exception { TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { @Override protected Scheduler scheduler() { return new AbstractScheduledService.CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { // Explicitly yield to increase the probability of a pathological scheduling. Thread.yield(); return new Schedule(0,TimeUnit.SECONDS); } }; } }; service.useBarriers = false; service.startAsync().awaitRunning(); Thread.sleep(50); service.useBarriers = true; service.firstBarrier.await(); int numIterations = service.numIterations.get(); service.stopAsync(); service.secondBarrier.await(); service.awaitTerminated(); assertEquals(numIterations,service.numIterations.get()); }
@Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { return new Schedule(delay,unit); }}; }
@Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { if (numIterations.get() > 2) { throw new IllegalStateException("Failed"); } return new Schedule(delay,unit); }}; }
@Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { return new Schedule(delay,unit); }}; }
@Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { if (numIterations.get() > 2) { throw new IllegalStateException("Failed"); } return new Schedule(delay,unit); }}; }
@Override protected void configure() { bind(TaskStatCalculator.class).in(Singleton.class); bind(CachedCounters.class).in(Singleton.class); bind(MachineResourceProvider.class).to(OfferAdapter.class); bind(SlotSizeCounter.class).in(Singleton.class); install(new PrivateModule() { @Override protected void configure() { bind(TaskStatUpdaterService.class).in(Singleton.class); bind(Scheduler.class).toInstance( Scheduler.newFixedrateSchedule( TASK_STAT_INTERVAL.get().getValue(),TASK_STAT_INTERVAL.get().getValue(),TASK_STAT_INTERVAL.get().getUnit().getTimeUnit())); expose(TaskStatUpdaterService.class); } }); SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()) .to(TaskStatUpdaterService.class); install(new PrivateModule() { @Override protected void configure() { bind(SlotSizeCounterService.class).in(Singleton.class); bind(Scheduler.class).toInstance( Scheduler.newFixedrateSchedule( SLOT_STAT_INTERVAL.get().getValue(),SLOT_STAT_INTERVAL.get().getValue(),SLOT_STAT_INTERVAL.get().getUnit().getTimeUnit())); expose(SlotSizeCounterService.class); } }); SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()) .to(SlotSizeCounterService.class); }
@Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { return new Schedule(delay,unit); }}; }
@Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { if (numIterations.get() > 2) { throw new IllegalStateException("Failed"); } return new Schedule(delay,unit); }}; }
public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception { final atomicreference<scheduledexecutorservice> executor = Atomics.newReference(); AbstractScheduledService service = new AbstractScheduledService() { @Override protected void startUp() throws Exception { throw new Exception("Failed"); } @Override protected void runOneIteration() throws Exception {} @Override protected scheduledexecutorservice executor() { executor.set(super.executor()); return executor.get(); } @Override protected Scheduler scheduler() { return newFixedDelaySchedule(0,TimeUnit.MILLISECONDS); } }; try { service.startAsync().awaitRunning(); fail("Expected service to fail during startup"); } catch (IllegalStateException expected) { } assertTrue(executor.get().awaitTermination(100,TimeUnit.MILLISECONDS)); }
public void testTimeout() { // Create a service whose executor will never run its commands Service service = new AbstractScheduledService() { @Override protected Scheduler scheduler() { return Scheduler.newFixedDelaySchedule(0,TimeUnit.NANOSECONDS); } @Override protected scheduledexecutorservice executor() { return TestingExecutors.noOpScheduledExecutor(); } @Override protected void runOneIteration() throws Exception {} @Override protected String serviceName() { return "Foo"; } }; try { service.startAsync().awaitRunning(1,TimeUnit.MILLISECONDS); fail("Expected timeout"); } catch (TimeoutException e) { assertthat(e).hasMessage("Timed out waiting for Foo [STARTING] to reach the RUNNING state."); } }
public void testCustomScheduler_deadlock() throws InterruptedException,so run it multiple times to increase the flake likelihood for (int i = 0; i < 1000; i++) { Service service = new AbstractScheduledService() { @Override protected void runOneIteration() {} @Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { if (state() != State.STARTING) { inGetNextSchedule.await(); Thread.yield(); throw new RuntimeException("boom"); } return new Schedule(0,TimeUnit.NANOSECONDS); } }; } }; service.startAsync().awaitRunning(); inGetNextSchedule.await(); service.stopAsync(); } }
@Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { return new Schedule(delay,unit); } }; }
@Override protected Scheduler scheduler() { return new CustomScheduler() { @Override protected Schedule getNextSchedule() throws Exception { if (numIterations.get() > 2) { throw new IllegalStateException("Failed"); } return new Schedule(delay,unit); } }; }
DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战
背景
MySQL库A 到 MySQL库B的增量数据同步需求
DolphinScheduler中配置DataX MySQL To MySQL工作流
工作流定义
工作流定义 > 创建工作流 > 拖入1个SHELL组件 > 拖入1个DATAX组件 SHELL组件(文章) 脚本
echo ''文章同步 MySQL To MySQL''
DATAX组件(t_article) 用到2个插件mysqlreader^[1]、mysqlwriter^[2] 选 自定义模板:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库A?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
],
"querySql": [
"select a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= ''${biz_update_dt}'';"
]
}
],
"password": "${biz_mysql_password}",
"username": "${biz_mysql_username}"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"`id`",
"`title`",
"`content`",
"`is_delete`",
"`delete_date`",
"`create_date`",
"`update_date`"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的数据库B?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
"table": [
"t_article"
]
}
],
"writeMode": "replace",
"password": "${biz_mysql_password}",
"username": "${biz_mysql_username}"
}
}
}
],
"setting": {
"errorLimit": {
"percentage": 0,
"record": 0
},
"speed": {
"channel": 1,
"record": 1000
}
}
}
}
reader和writer的字段配置需保持一致
自定义参数:
biz_update_dt: ${global_bizdate}
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql账号
biz_mysql_password: 你的mysql密码
# 本文实验环境A库和B库用的同一个实例,如果MySQL是多个实例,可以再新增加参数定义例如 biz_mysql_host_b,在模板中对应引用即可
配置的自定义参数将会自动替换json模板中的同名变量
reader mysqlreader插件中关键配置: a.update_date >= ''${biz_update_dt}''
就是实现增量同步的关键配置 writer mysqlwriter插件中关键配置: ``
"parameter": {
"writeMode": "replace",
......
}
writeMode为replace,相同主键id重复写入数据,就会更新数据。sql本质上执行的是 replace into
保存工作流
全局变量设置 global_bizdate: $[yyyy-MM-dd 00:00:00-1]
global_bizdate 引用的变量为 DolphinScheduler 内置变量,具体参考官网文档^[3] 结合调度时间设计好时间滚动的窗口时长,比如按1天增量,那么这里时间就是减1天
最终的工作流DAG图为:
爬坑记录
- 官网下载的DataX不包含ElasticSearchWriter写插件 默认不带该插件,需要自己编译ElasticSearchWriter插件。
git clone https://github.com/alibaba/DataX.git
为了加快编译速度,可以只编译<module>elasticsearchwriter</module>
项目根目录的pom.xml <!-- reader -->
全注释掉,<!-- writer -->
下只保留<module>elasticsearchwriter</module>
其他注释掉,另外<!-- common support module -->
也需要保留
如果自己不想编译或者编译失败请搜索 "流水理鱼"微信公众号,或者加我私人微信我给你已经编译好的插件包
by 流水理鱼|wwek
参考
1. DataX MysqlReader 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 2. DataX MysqlWriter 插件文档 https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md 3. Apache DolphinScheduler 内置参数 https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/built-in.html 本文首发于流水理鱼博客,如要转载请注明出处。 欢迎关注我的公众号:流水理鱼(liushuiliyu),全栈、云原生、Homelab交流。 如果您对相关文章感兴趣,也可以关注我的博客:www.iamle.com 上面有更多内容
今天关于MySQL8.0的参数event_scheduler默认是ON,请注意一些坑和mysql常用参数设置的介绍到此结束,谢谢您的阅读,有关angular2使用PrimeNG-Scheduler实现FullCalendar-Scheduler、cocos2dx 定时器 schedule scheduleUpdate scheduleOnce、com.google.common.util.concurrent.AbstractScheduledService.Scheduler的实例源码、DolphinScheduler 调度 DataX 实现 MySQL To MySQL 增量数据同步实战等更多相关知识的信息可以在本站进行查询。
本文标签: