MySQL statement timeout特性

这个特性是twitter开发的,点击查看详情,目前已经backport到AliMySQL中,正在测试中,首先简单介绍一下功能

特性描述:

1)statement timemout特性中,SELECT必须是top-level,不能是子查询或者union中的除第一个SELECT之外的SELECT
2)语句级别:为SELECT增加一个hint: max_statement_time指定查询最大执行时间,当执行时间超过max_statement_time后,查询自动中断(连接不中断)
3)session级别:允许超级用户grant其它用户一个指定的max_statement_time,该用户连接后默认使用该值(用户可在session中临时修改),每个SELECT执行时间都不会超过max_statement_time
4)没有grant max_statement_time或grant max_statement_time 0的用户,除非在query中指定max_statement_time hint,否则查询执行时间不受限制

运行效果

mysql> select @@max_statement_time;
+----------------------+
| @@max_statement_time |
+----------------------+
|                 5000 |
+----------------------+
1 row in set (0.00 sec)
 
mysql> select * from load_test_infile, t2;
ERROR 1722 (70101): Query execution was interrupted, max_statement_time exceeded
 
mysql> select MAX_STATEMENT_TIME=1000 * from load_test_infile, t2;
ERROR 1722 (70101): Query execution was interrupted, max_statement_time exceeded
 
mysql> grant select, insert, update, select, create on *.* to test@'%' with max_statement_time 10000;      
Query OK, 0 rows affected (0.00 sec)
 
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
 
mysql> select user, max_statement_time from mysql.user where user='test';
+------+--------------------+
| user | max_statement_time |
+------+--------------------+
| test |              10000 |
+------+--------------------+
1 row in set (0.00 sec)
 
mysql> show status like '%statement_time%';
+-----------------------------+-------+
| Variable_name               | Value |
+-----------------------------+-------+
| Max_statement_time_exceeded | 2     |
| Max_statement_time_failed   | 0     |
| Max_statement_time_set      | 2     |
+-----------------------------+-------+
3 rows in set (0.00 sec)

实现原理:

原理比较简单,但实现细节还是比较复杂,简单概括:
1)为需要使用statement timeout的连接创建一个timer,执行SELECT之前设置超时时间,timer超时后发送MY_TIMER_EVENT_SIGNO信
号,查询结束后执行timer_reset
2)增加一个后台线程,接收来自所有连接tiemr的MY_TIMER_EVENT_SIGNO信号,调用THD::awake(THD::KILL_TIMEOUT),结束查询
3)在mysql.user表中增加一列:max_statement_time保存grant max_statement_time的值,用户登录后初始化为该值

主要代码分析:

主要数据结构st_thd_timer和st_my_timer,前者是对后者的封装和保护,其中的THD是timer超时需要kill的session

struct st_thd_timer
{
  THD *thd;
  my_timer_t timer;
  pthread_mutex_t mutex;
  bool destroy;
};
 
typedef struct st_my_timer my_timer_t;
 
/** Non-copyable timer object. */
struct st_my_timer
{
  /** Timer ID used to identify the timer in timer requests. */
  os_timer_t id; 
 
  /** Timer expiration notification function. */
  void (*notify_function)(my_timer_t *); 
};

为连接创建一个thd_timer,里面封装了一个os_timer_t成员,这个是操作系统提供的timer,此函数被thd_timer_set调用,一句重要代码:
ttp->timer.notify_function= timer_callback

/**
  Allocate and initialize a thread timer object.
 
  @return NULL on failure.
*/
static thd_timer_t *thd_timer_create(void)
{
  thd_timer_t *ttp;
  DBUG_ENTER("thd_timer_create");
 
  ttp= (thd_timer_t *) my_malloc(sizeof(*ttp), MYF(MY_WME | MY_ZEROFILL));
 
  if (ttp == NULL)
    DBUG_RETURN(NULL);
 
  ttp->timer.notify_function= timer_callback;
  pthread_mutex_init(&ttp->mutex, MY_MUTEX_INIT_FAST);
 
  if (! my_timer_create(&ttp->timer))
    DBUG_RETURN(ttp);
 
  pthread_mutex_destroy(&ttp->mutex);
  my_free(ttp);
 
  DBUG_RETURN(NULL);
}

timer_callback实现:my_container_of是一个宏,通过一个结构体中某个成员的地址找到该结构体的起始地址,此处通过thd_timer_t中的timer地址找到thd_timer_t的地址

/**
  Timer expiration notification callback.
 
  @param  timer   Timer (mysys) object.
 
  @note Invoked in a separate thread of control.
*/
static void timer_callback(my_timer_t *timer)
{
  bool destroy;
  thd_timer_t *ttp;
 
  ttp= my_container_of(timer, thd_timer_t, timer);
 
  pthread_mutex_lock(&ttp->mutex);
  destroy= timer_notify(ttp);
  pthread_mutex_unlock(&ttp->mutex);
 
  if (destroy)
    thd_timer_destroy(ttp);
}

timer_notify函数负责调用THD::awake函数kill query

static void timer_notify(thd_timer_t *ttp)
{
  THD *thd= ttp->thd;
  DBUG_ASSERT(!ttp->destroy || !thd);
 
  /*
    Statement might have finished while the timer notification
    was being delivered. If this is the case, the timer object
    was detached (orphaned) and has no associated session (thd).
  */
  if (thd)
  {
    mysql_mutex_lock(&thd->LOCK_thd_data);
    thd->awake(THD::KILL_TIMEOUT);
    mysql_mutex_unlock(&thd->LOCK_thd_data);
  }
 
  /* Mark the object as unreachable. */
  ttp->thd= NULL;
  return ttp->destroy;
}

调用操作系统API创建timer,此函数被thd_timer_create调用,设置timer超时时发送MY_TIMER_EVENT_SIGNO信号,同时通过sigev.sigev_value.sival_ptr= timer保存了指向my_timer_t(其中函数指针notify_function在thd_timer_create中被置为timer_callback)的指针,

/**
  Create a timer object.
 
  @param  timer   Location where the timer ID is returned.
 
  @return On success, 0.
          On error, -1 is returned, and errno is set to indicate the error.
*/
int my_timer_create(my_timer_t *timer)
{
  struct sigevent sigev;
 
  memset(&sigev, 0, sizeof(sigev));
 
  sigev.sigev_value.sival_ptr= timer;
  sigev.sigev_signo= MY_TIMER_EVENT_SIGNO;
  sigev.sigev_notify= SIGEV_SIGNAL | SIGEV_THREAD_ID;
  sigev.sigev_notify_thread_id= thread_id;
 
  return timer_create(CLOCK_MONOTONIC, &sigev, &timer->id);
}

后台kill线程实现,my_timer_init_ext函数会在server startup时调用,创建一个helper_thread,执行一个while循环(当收到MY_TIMER_EVENT_SIGNO信号时,执行回调函数timer_callback)

int my_timer_init_ext(void)
{
  int rc;
  sigset_t set, old_set;
 
  if (sigfillset(&set))
    return -1;
 
  /*
    Temporarily block all signals. New thread will inherit signal
    mask of the current thread.
  */
  if (pthread_sigmask(SIG_BLOCK, &set, &old_set))
    return -1;
 
  /* Create a helper thread. */
  rc= start_helper_thread();
 
  /* Restore the signal mask. */
  pthread_sigmask(SIG_SETMASK, &old_set, NULL);
 
  return rc;
}
 
/**
  Create a helper thread to dispatch timer expiration notifications.
 
  @return On success, 0. On error, -1 is returned.
*/
static int start_helper_thread(void)
{
  pthread_barrier_t barrier;
 
  if (pthread_barrier_init(&barrier, NULL, 2))
    return -1;
 
  if (pthread_create(&thread, NULL, timer_notify_thread, &barrier))
    return -1;
 
  pthread_barrier_wait(&barrier);
  pthread_barrier_destroy(&barrier);
 
  return 0;
}
 
/**
  Timer expiration notification thread.
 
  @param  arg   Barrier object.
*/
static void *timer_notify_thread(void *arg)
{
  sigset_t set;
  siginfo_t info;
  pthread_barrier_t *barrier= arg;
 
  my_thread_init();
 
  sigemptyset(&set);
  sigaddset(&set, MY_TIMER_EVENT_SIGNO);
  sigaddset(&set, MY_TIMER_KILL_SIGNO);
 
  /* Get the thread ID of the current thread. */
  thread_id= (pid_t) syscall(SYS_gettid);
 
  /* Wake up parent thread, thread_id is available. */
  pthread_barrier_wait(barrier);
 
  while (1)
  {
    if (sigwaitinfo(&set, &info) < 0)
      continue;
 
    if (info.si_signo == MY_TIMER_EVENT_SIGNO)
      timer_notify_function(info.si_value);
    else if (info.si_signo == MY_TIMER_KILL_SIGNO)
      break;
  }
  my_thread_end();
  return NULL;
}
 
static void timer_notify_function(sigval_t sigev_value)
{
  my_timer_t *timer= sigev_value.sival_ptr;
  timer->notify_function(timer);
}

timer_notify_thread中sigwaitinfo收到消息的同时,info.si_value会被置为sigev.sigev_value.sival_ptr(my_timer_create中),它指向一个my_timer_t,通过调用其中的notify_function(timer_callback,在thd_timer_create设置),达到kill query的效果

整个流程:

1)server startup: 调用my_timer_init_ext启动后台监听线程
2)SELECT执行过程中,若max_statement_time不为0,在mysql_execute_command开始之前设置超时时间并启动timer,在mysql_execute_command结束之前清除timer,若SELECT由于超时被kill会很会执行到这里

结束语

分析到此结束,若想进一步了解实现详情,请参考:
1)twitter statement timeout github
2)percona 5.6 statement timeout blueprint

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注