这个特性是twitter开发的,点击查看详情,目前已经backport到AliMySQL中,正在测试中,首先简单介绍一下功能
特性描述:
1)statement timemout特性中,SELECT必须是top-level,不能是子查询或者union中的除第一个SELECT之外的SELECT
2)语句级别:为SELECT增加一个hint: max_statement_time指定查询最大执行时间,当执行时间超过max_statement_time后,查询自动中断(连接不中断)
3)session级别:允许超级用户grant其它用户一个指定的max_statement_time,该用户连接后默认使用该值(用户可在session中临时修改),每个SELECT执行时间都不会超过max_statement_time
4)没有grant max_statement_time或grant max_statement_time 0的用户,除非在query中指定max_statement_time hint,否则查询执行时间不受限制
运行效果
mysql> select @@max_statement_time; +----------------------+ | @@max_statement_time | +----------------------+ | 5000 | +----------------------+ 1 row in set (0.00 sec) mysql> select * from load_test_infile, t2; ERROR 1722 (70101): Query execution was interrupted, max_statement_time exceeded mysql> select MAX_STATEMENT_TIME=1000 * from load_test_infile, t2; ERROR 1722 (70101): Query execution was interrupted, max_statement_time exceeded mysql> grant select, insert, update, select, create on *.* to test@'%' with max_statement_time 10000; Query OK, 0 rows affected (0.00 sec) mysql> flush privileges; Query OK, 0 rows affected (0.00 sec) mysql> select user, max_statement_time from mysql.user where user='test'; +------+--------------------+ | user | max_statement_time | +------+--------------------+ | test | 10000 | +------+--------------------+ 1 row in set (0.00 sec) mysql> show status like '%statement_time%'; +-----------------------------+-------+ | Variable_name | Value | +-----------------------------+-------+ | Max_statement_time_exceeded | 2 | | Max_statement_time_failed | 0 | | Max_statement_time_set | 2 | +-----------------------------+-------+ 3 rows in set (0.00 sec) |
实现原理:
原理比较简单,但实现细节还是比较复杂,简单概括:
1)为需要使用statement timeout的连接创建一个timer,执行SELECT之前设置超时时间,timer超时后发送MY_TIMER_EVENT_SIGNO信
号,查询结束后执行timer_reset
2)增加一个后台线程,接收来自所有连接tiemr的MY_TIMER_EVENT_SIGNO信号,调用THD::awake(THD::KILL_TIMEOUT),结束查询
3)在mysql.user表中增加一列:max_statement_time保存grant max_statement_time的值,用户登录后初始化为该值
主要代码分析:
主要数据结构st_thd_timer和st_my_timer,前者是对后者的封装和保护,其中的THD是timer超时需要kill的session
struct st_thd_timer { THD *thd; my_timer_t timer; pthread_mutex_t mutex; bool destroy; }; typedef struct st_my_timer my_timer_t; /** Non-copyable timer object. */ struct st_my_timer { /** Timer ID used to identify the timer in timer requests. */ os_timer_t id; /** Timer expiration notification function. */ void (*notify_function)(my_timer_t *); }; |
为连接创建一个thd_timer,里面封装了一个os_timer_t成员,这个是操作系统提供的timer,此函数被thd_timer_set调用,一句重要代码:
ttp->timer.notify_function= timer_callback
/** Allocate and initialize a thread timer object. @return NULL on failure. */ static thd_timer_t *thd_timer_create(void) { thd_timer_t *ttp; DBUG_ENTER("thd_timer_create"); ttp= (thd_timer_t *) my_malloc(sizeof(*ttp), MYF(MY_WME | MY_ZEROFILL)); if (ttp == NULL) DBUG_RETURN(NULL); ttp->timer.notify_function= timer_callback; pthread_mutex_init(&ttp->mutex, MY_MUTEX_INIT_FAST); if (! my_timer_create(&ttp->timer)) DBUG_RETURN(ttp); pthread_mutex_destroy(&ttp->mutex); my_free(ttp); DBUG_RETURN(NULL); } |
timer_callback实现:my_container_of是一个宏,通过一个结构体中某个成员的地址找到该结构体的起始地址,此处通过thd_timer_t中的timer地址找到thd_timer_t的地址
/** Timer expiration notification callback. @param timer Timer (mysys) object. @note Invoked in a separate thread of control. */ static void timer_callback(my_timer_t *timer) { bool destroy; thd_timer_t *ttp; ttp= my_container_of(timer, thd_timer_t, timer); pthread_mutex_lock(&ttp->mutex); destroy= timer_notify(ttp); pthread_mutex_unlock(&ttp->mutex); if (destroy) thd_timer_destroy(ttp); } |
timer_notify函数负责调用THD::awake函数kill query
static void timer_notify(thd_timer_t *ttp) { THD *thd= ttp->thd; DBUG_ASSERT(!ttp->destroy || !thd); /* Statement might have finished while the timer notification was being delivered. If this is the case, the timer object was detached (orphaned) and has no associated session (thd). */ if (thd) { mysql_mutex_lock(&thd->LOCK_thd_data); thd->awake(THD::KILL_TIMEOUT); mysql_mutex_unlock(&thd->LOCK_thd_data); } /* Mark the object as unreachable. */ ttp->thd= NULL; return ttp->destroy; } |
调用操作系统API创建timer,此函数被thd_timer_create调用,设置timer超时时发送MY_TIMER_EVENT_SIGNO信号,同时通过sigev.sigev_value.sival_ptr= timer保存了指向my_timer_t(其中函数指针notify_function在thd_timer_create中被置为timer_callback)的指针,
/** Create a timer object. @param timer Location where the timer ID is returned. @return On success, 0. On error, -1 is returned, and errno is set to indicate the error. */ int my_timer_create(my_timer_t *timer) { struct sigevent sigev; memset(&sigev, 0, sizeof(sigev)); sigev.sigev_value.sival_ptr= timer; sigev.sigev_signo= MY_TIMER_EVENT_SIGNO; sigev.sigev_notify= SIGEV_SIGNAL | SIGEV_THREAD_ID; sigev.sigev_notify_thread_id= thread_id; return timer_create(CLOCK_MONOTONIC, &sigev, &timer->id); } |
后台kill线程实现,my_timer_init_ext函数会在server startup时调用,创建一个helper_thread,执行一个while循环(当收到MY_TIMER_EVENT_SIGNO信号时,执行回调函数timer_callback)
int my_timer_init_ext(void) { int rc; sigset_t set, old_set; if (sigfillset(&set)) return -1; /* Temporarily block all signals. New thread will inherit signal mask of the current thread. */ if (pthread_sigmask(SIG_BLOCK, &set, &old_set)) return -1; /* Create a helper thread. */ rc= start_helper_thread(); /* Restore the signal mask. */ pthread_sigmask(SIG_SETMASK, &old_set, NULL); return rc; } /** Create a helper thread to dispatch timer expiration notifications. @return On success, 0. On error, -1 is returned. */ static int start_helper_thread(void) { pthread_barrier_t barrier; if (pthread_barrier_init(&barrier, NULL, 2)) return -1; if (pthread_create(&thread, NULL, timer_notify_thread, &barrier)) return -1; pthread_barrier_wait(&barrier); pthread_barrier_destroy(&barrier); return 0; } /** Timer expiration notification thread. @param arg Barrier object. */ static void *timer_notify_thread(void *arg) { sigset_t set; siginfo_t info; pthread_barrier_t *barrier= arg; my_thread_init(); sigemptyset(&set); sigaddset(&set, MY_TIMER_EVENT_SIGNO); sigaddset(&set, MY_TIMER_KILL_SIGNO); /* Get the thread ID of the current thread. */ thread_id= (pid_t) syscall(SYS_gettid); /* Wake up parent thread, thread_id is available. */ pthread_barrier_wait(barrier); while (1) { if (sigwaitinfo(&set, &info) < 0) continue; if (info.si_signo == MY_TIMER_EVENT_SIGNO) timer_notify_function(info.si_value); else if (info.si_signo == MY_TIMER_KILL_SIGNO) break; } my_thread_end(); return NULL; } static void timer_notify_function(sigval_t sigev_value) { my_timer_t *timer= sigev_value.sival_ptr; timer->notify_function(timer); } |
timer_notify_thread中sigwaitinfo收到消息的同时,info.si_value会被置为sigev.sigev_value.sival_ptr(my_timer_create中),它指向一个my_timer_t,通过调用其中的notify_function(timer_callback,在thd_timer_create设置),达到kill query的效果
整个流程:
1)server startup: 调用my_timer_init_ext启动后台监听线程
2)SELECT执行过程中,若max_statement_time不为0,在mysql_execute_command开始之前设置超时时间并启动timer,在mysql_execute_command结束之前清除timer,若SELECT由于超时被kill会很会执行到这里
结束语
分析到此结束,若想进一步了解实现详情,请参考:
1)twitter statement timeout github
2)percona 5.6 statement timeout blueprint