Index: sql/sql_class.h =================================================================== --- sql/sql_class.h (revision 5163) +++ sql/sql_class.h (working copy) @@ -177,7 +177,165 @@ friend char **thd_query(MYSQL_THD thd); }; +/* + number of slots per conc queue. Use multi-queue(hard code: 8) to decrease + lock contention, total of 100000 slots, which is double of max value of + srv_max_n_threads(50000, set in innobase_start_or_create_for_mysql). +*/ +#define N_SLOTS_PER_QUEUE 12500 +#define N_THREAD_CONC_QUEUE 8 +typedef struct st_thread_wait_slot +{ + /* used for cond-wait/signal */ + mysql_mutex_t mutex; + mysql_cond_t cond; + bool signaled; /* protected by mutex */ + + /* protected by waiting-queue's mutex */ + bool wait_ended; + + struct st_thread_wait_slot *prev, *next; + +} Thread_wait_slot; + +/* implementation of FIFO based on double-link list */ +class Thread_slot_queue +{ +public: + + Thread_slot_queue() {} + ~Thread_slot_queue() {} + + inline void init() + { + first= last= NULL; + elements= 0; + } + + inline void remove(Thread_wait_slot *slot) + { + if (slot->prev != NULL) + slot->prev->next= slot->next; + else + first= slot->next; + if (slot->next != NULL) + slot->next->prev= slot->prev; + else + last= slot->prev; + elements--; + } + + inline void push_back(Thread_wait_slot *slot) + { + slot->next= NULL; + slot->prev= last; + if (last != NULL) + last->next= slot; + if (first == NULL) + first= slot; + last= slot; + elements++; + } + + inline Thread_wait_slot* pop() + { + Thread_wait_slot* slot= first; + if (first) + remove(first); + return slot; + } + + inline Thread_wait_slot* head() {return first;} + +private: + Thread_wait_slot *first, *last; + int elements; +}; + +/* a thread conc queue, contains waiting-slots queue and free-slots queue */ +class Thread_conc_queue +{ +public: + + Thread_conc_queue() {} + ~Thread_conc_queue() {} + + inline void init() + { + free_queue.init(); + wait_queue.init(); + + thread_active= thread_wait= 0; + + mysql_mutex_init(0, &mutex, NULL); + + int n= N_SLOTS_PER_QUEUE * sizeof(Thread_wait_slot); + slots= (Thread_wait_slot*)my_malloc(n, MYF(MY_FAE)); + + for (n= 0; n < N_SLOTS_PER_QUEUE; n++) + { + mysql_mutex_init(0, &slots[n].mutex, NULL); + mysql_cond_init(0, &slots[n].cond, NULL); + + slots[n].wait_ended= false; + slots[n].signaled= false; + slots[n].prev= slots[n].next= NULL; + + free_queue.push_back(&slots[n]); + } + } + + inline void deinit() + { + int n; + for (n= 0; n < N_SLOTS_PER_QUEUE; n++) + { + mysql_mutex_destroy(&slots[n].mutex); + mysql_cond_destroy(&slots[n].cond); + } + my_free(slots); + + mysql_mutex_destroy(&mutex); + } + + inline void lock() { mysql_mutex_lock(&mutex); } + inline void unlock() { mysql_mutex_unlock(&mutex); } + + /* calling of all the these functions should under protection of lock(). */ + + inline Thread_wait_slot* pop_free() + { + return free_queue.pop(); + } + + inline void push_back_free(Thread_wait_slot *slot) + { + free_queue.push_back(slot); + } + + inline void remove_wait(Thread_wait_slot *slot) + { + wait_queue.remove(slot); + } + + inline void push_back_wait(Thread_wait_slot *slot) + { + wait_queue.push_back(slot); + } + + Thread_wait_slot *wait_q_head() { return wait_queue.head(); } + +public: + /* access of these variables should under protection of lock().*/ + int32 thread_active, thread_wait; + +private: + Thread_slot_queue wait_queue, free_queue; + mysql_mutex_t mutex; + Thread_wait_slot* slots; +}; + #define TC_LOG_PAGE_SIZE 8192 #define TC_LOG_MIN_SIZE (3*TC_LOG_PAGE_SIZE) Index: sql/mysqld.cc =================================================================== --- sql/mysqld.cc (revision 5163) +++ sql/mysqld.cc (working copy) @@ -505,6 +505,9 @@ uint lower_case_table_names; ulong tc_heuristic_recover= 0; int32 num_thread_running; +int32 thread_rejected; +int32 thread_active; +int32 thread_wait; ulong thread_created; ulong back_log, connect_timeout, concurrency, server_id; ulong table_cache_size, table_def_size; @@ -657,6 +660,15 @@ const char *in_additional_cond= ""; const char *in_having_cond= ""; +/* server layer array of FIFO */ +Thread_conc_queue thread_conc_queues[N_THREAD_CONC_QUEUE]; +ulong thread_running_low_watermark= 0; +ulong thread_running_high_watermark= 0; +ulong thread_running_ctl_mode= 0; +ulong thread_running_wait_timeout= 100; /* 100 ms*/ +ulong thread_running_wait_timeout_ns= 100000000; /* 100 ms in nanoseconds */ +ulong queue_tr_low_watermark= 0; + my_decimal decimal_zero; /** Number of connection errors when selecting on the listening port */ ulong connection_errors_select= 0; @@ -1288,6 +1300,21 @@ #ifndef EMBEDDED_LIBRARY + +static void thread_conc_init() +{ + int i= 0; + for (; i < N_THREAD_CONC_QUEUE; i++) + thread_conc_queues[i].init(); +} + +static void thread_conc_destory() +{ + int i= 0; + for (; i < N_THREAD_CONC_QUEUE; i++) + thread_conc_queues[i].deinit(); +} + /**************************************************************************** ** Code to end mysqld ****************************************************************************/ @@ -1523,6 +1550,8 @@ } mysql_mutex_unlock(&LOCK_thread_count); + thread_conc_destory(); + close_active_mi(); DBUG_PRINT("quit",("close_connections thread")); DBUG_VOID_RETURN; @@ -3962,6 +3991,19 @@ if (back_log == 0 && (back_log= 50 + max_connections / 5) > 900) back_log= 900; + /* assign max_connections to thread_running_high_watermark if it equals 0. */ + if (thread_running_high_watermark == 0) + thread_running_high_watermark= max_connections; + if (thread_running_high_watermark < thread_running_low_watermark) + thread_running_high_watermark= thread_running_low_watermark; + + thread_running_wait_timeout_ns= thread_running_wait_timeout * 1000000; + queue_tr_low_watermark= thread_running_low_watermark + N_THREAD_CONC_QUEUE - 1; + queue_tr_low_watermark /= N_THREAD_CONC_QUEUE; + + /* init the server layer array of waiting slots */ + thread_conc_init(); + unireg_init(opt_specialflag); /* Set up extern variabels */ if (!(my_default_lc_messages= my_locale_by_name(lc_messages))) @@ -8014,6 +8056,9 @@ {"Threads_connected", (char*) &connection_count, SHOW_INT}, {"Threads_created", (char*) &thread_created, SHOW_LONG_NOFLUSH}, {"Threads_running", (char*) &num_thread_running, SHOW_INT}, + {"Threads_rejected", (char*) &thread_rejected, SHOW_INT}, + {"Threads_active", (char*) &thread_active, SHOW_INT}, + {"Threads_wait", (char*) &thread_wait, SHOW_INT}, {"Uptime", (char*) &show_starttime, SHOW_FUNC}, #ifdef ENABLED_PROFILING {"Uptime_since_flush_status",(char*) &show_flushstatustime, SHOW_FUNC}, Index: sql/mysqld.h =================================================================== --- sql/mysqld.h (revision 5163) +++ sql/mysqld.h (working copy) @@ -34,6 +34,9 @@ typedef struct st_mysql_const_lex_string LEX_CSTRING; typedef struct st_mysql_show_var SHOW_VAR; +class Thread_conc_queue; +typedef struct st_thread_wait_slot Thread_wait_slot; + /* This forward declaration is used from C files where the real definition is included before. Since C does not allow repeated @@ -279,6 +282,15 @@ extern char err_shared_dir[]; extern TYPELIB thread_handling_typelib; extern my_decimal decimal_zero; + +extern Thread_conc_queue thread_conc_queues[]; +extern ulong thread_running_low_watermark; +extern ulong thread_running_high_watermark; +extern ulong thread_running_ctl_mode; +extern ulong thread_running_wait_timeout; +extern ulong thread_running_wait_timeout_ns; +extern ulong queue_tr_low_watermark; + extern ulong connection_errors_select; extern ulong connection_errors_accept; extern ulong connection_errors_tcpwrap; @@ -587,6 +599,9 @@ extern mysql_rwlock_t LOCK_filter_list; extern mysql_cond_t COND_manager; extern int32 thread_running; +extern int32 thread_rejected; +extern int32 thread_active; +extern int32 thread_wait; extern my_atomic_rwlock_t thread_running_lock; extern my_atomic_rwlock_t slave_open_temp_tables_lock; extern my_atomic_rwlock_t opt_binlog_max_flush_queue_time_lock; @@ -738,6 +753,16 @@ return (num_threads-1); } +inline int32 +get_thread_running() +{ + int32 num_threads; + my_atomic_rwlock_wrlock(&thread_running_lock); + num_threads= my_atomic_load32(&num_thread_running); + my_atomic_rwlock_wrunlock(&thread_running_lock); + return num_threads; +} + #if defined(MYSQL_DYNAMIC_PLUGIN) && defined(_WIN32) extern "C" THD *_current_thd_noinline(); #define _current_thd() _current_thd_noinline() Index: sql/share/errmsg-utf8.txt =================================================================== --- sql/share/errmsg-utf8.txt (revision 5163) +++ sql/share/errmsg-utf8.txt (working copy) @@ -7103,3 +7103,6 @@ # Alibaba error numbers starts from 3000 start-error-number 3000 + +ER_SERVER_THREAD_RUNNING_TOO_HIGH + eng "MySQL Sever is too busy." Index: sql/sys_vars.cc =================================================================== --- sql/sys_vars.cc (revision 5163) +++ sql/sys_vars.cc (working copy) @@ -1372,6 +1372,38 @@ } return false; } + +static bool check_tr_low_watermark(sys_var *self, THD *thd, set_var *var) +{ + longlong val = (longlong) var->save_result.ulonglong_value; + if (val > (longlong)thread_running_high_watermark) + return true; + + queue_tr_low_watermark= (val + N_THREAD_CONC_QUEUE - 1) / N_THREAD_CONC_QUEUE; + + return false; +} + +static bool check_tr_high_watermark(sys_var *self, THD *thd, set_var *var) +{ + longlong val = (longlong) var->save_result.ulonglong_value; + if (val == 0) + { + var->save_result.ulonglong_value= max_connections; + return false; + } + + if (val < (longlong)thread_running_low_watermark) + return true; + + return false; +} + +static bool check_tr_wait_timeout(sys_var *self, THD *thd, set_var *var) +{ + thread_running_wait_timeout_ns= var->save_result.ulonglong_value * 1000000; + return false; +} static PolyLock_rwlock PLock_sys_init_connect(&LOCK_sys_init_connect); static Sys_var_lexstring Sys_init_connect( "init_connect", "Command(s) that are executed for each " @@ -1380,6 +1412,37 @@ DEFAULT(""), &PLock_sys_init_connect, NOT_IN_BINLOG, ON_CHECK(check_init_string)); +static Sys_var_ulong Sys_thread_running_low_watermark( + "threads_running_low_watermark", + "When threads_running exceed this limit, " + "query that isn't in active transaction should wait.", + GLOBAL_VAR(thread_running_low_watermark), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, 50000), DEFAULT(0), BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_tr_low_watermark)); + +static Sys_var_ulong Sys_thread_running_high_watermark( + "threads_running_high_watermark", + "When threads_running exceed this limit, " + "query that isn't in active transaction should quit.", + GLOBAL_VAR(thread_running_high_watermark), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, 50000), DEFAULT(0), BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_tr_high_watermark)); + +static const char *thread_running_ctl_mode_names[]= {"SELECTS", "ALL", 0}; +static Sys_var_enum Sys_thread_running_ctl_mode( + "threads_running_ctl_mode", + "Control which statements will be affected by threads running control, " + "Values: SELECTS(default), ALL.", + GLOBAL_VAR(thread_running_ctl_mode), CMD_LINE(REQUIRED_ARG), + thread_running_ctl_mode_names, DEFAULT(0)); + +static Sys_var_ulong Sys_thread_running_wait_time( + "threads_running_wait_timeout", + "Max time(ms) waiting in FIFO when thread running control takes effect.", + GLOBAL_VAR(thread_running_wait_timeout), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(1, 60000), DEFAULT(100), BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_tr_wait_timeout)); + static Sys_var_charptr Sys_init_file( "init_file", "Read SQL commands from this file at startup", READ_ONLY GLOBAL_VAR(opt_init_file), Index: sql/sql_parse.cc =================================================================== --- sql/sql_parse.cc (revision 5163) +++ sql/sql_parse.cc (working copy) @@ -1113,7 +1113,231 @@ DBUG_RETURN(FALSE); } +static bool has_prefix(const char *str, uint32 len1, + const char *upper_prefix, uint32 len2) +{ + uint32 i= 0; + + if (!str || len1 < len2) + return false; + + while (i < len2) + { + if (my_toupper(system_charset_info, str[i]) != upper_prefix[i]) + return false; + i++; + } + + return true; +} + /** + Avoid too many statements run concurrently, high watermark part. + If *thread_running* exceeds thread_running_high_watermark, reject the stmt. + + @param thd connection handle + + @retval + FALSE query continues. + @retval + TRUE query rejected. +*/ +static my_bool thread_running_control_high(THD *thd, int32 tr) +{ + int32 tr_high; + DBUG_ENTER("thread_running_control_high"); + + tr_high= (int32)thread_running_high_watermark; + + /* thread_running_ctl_mode: 0 -> SELECTS, 1 -> ALL. */ + if ((!tr_high || tr <= tr_high) || + thd->transaction.is_active() || + thd->get_command() != COM_QUERY || + thd->security_ctx->master_access & SUPER_ACL || + thd->slave_thread) + DBUG_RETURN(FALSE); + + const char *query= thd->query(); + uint32 len= thd->query_length(); + + if ((!has_prefix(query, len, "SELECT", 6) && thread_running_ctl_mode == 0) || + has_prefix(query, len, "COMMIT", 6) || + has_prefix(query, len, "ROLLBACK", 8)) + DBUG_RETURN(FALSE); + + /* confirm again*/ + if (tr > tr_high && get_thread_running() > tr_high) + { + __sync_add_and_fetch(&thread_rejected, 1); + DBUG_RETURN(TRUE); + } + + DBUG_RETURN(FALSE); +} + +/** + Avoid too many statements run concurrently, low watermark part. + + If *thread_active* exceeds thread_running_low_watermark, wait on a condition + variable that will be signaled by other thread when it has finished a + statement, the query statmenet will be killed if the wait time exceeds + thread_running_ctl_sleep(ms). + + @param thd connection handle + + @retval + TRUE query is rejected. + @retval + FALSE query continues. +*/ +static my_bool thread_running_control_low_enter(THD *thd) +{ + int res= 0; + int32 tr_low; + my_bool ret= FALSE; + my_bool slept= FALSE; + struct timespec timeout; + Thread_conc_queue *queue; + DBUG_ENTER("thread_running_control_low_enter"); + + /* update global status */ + __sync_add_and_fetch(&thread_active, 1); + + tr_low= (int32)queue_tr_low_watermark; + queue= thread_conc_queues + thd->query_id % N_THREAD_CONC_QUEUE; + + queue->lock(); + +retry: + + if ((!tr_low || queue->thread_active < tr_low) || + (thd->lex->sql_command != SQLCOM_SELECT && thread_running_ctl_mode == 0) || + (!slept && (thd->transaction.is_active() || + thd->security_ctx->master_access & SUPER_ACL || thd->slave_thread))) + { + queue->thread_active++; + queue->unlock(); + DBUG_RETURN(ret); + } + + if (!slept) + { + queue->unlock(); + + /* sleep for 500 us */ + my_sleep(500); + slept= TRUE; + queue->lock(); + + goto retry; + } + + /* get a free wait-slot */ + Thread_wait_slot *slot= queue->pop_free(); + + /* can't find a free wait slot, must let the query enter */ + if (!slot) + { + queue->thread_active++; + queue->unlock(); + DBUG_RETURN(ret); + } + + slot->signaled= false; + slot->wait_ended= false; + + /* put slot into waiting queue. */ + queue->push_back_wait(slot); + queue->thread_wait++; + + queue->unlock(); + + /* update global status */ + thd_proc_info(thd, "waiting in server fifo"); + __sync_sub_and_fetch(&thread_active, 1); + __sync_add_and_fetch(&thread_wait, 1); + + /* cond-wait for at most thread_running_wait_timeout(ms). */ + set_timespec_nsec(timeout, thread_running_wait_timeout_ns); + + mysql_mutex_lock(&slot->mutex); + while (!slot->signaled) + { + res= mysql_cond_timedwait(&slot->cond, &slot->mutex, &timeout); + /* no need to signal if cond-wait timedout */ + slot->signaled= true; + } + mysql_mutex_unlock(&slot->mutex); + + queue->lock(); + queue->thread_wait--; + queue->thread_active++; + + /* remove slot from waiting queue. */ + queue->remove_wait(slot); + /* put slot into the free queue for reuse. */ + queue->push_back_free(slot); + + queue->unlock(); + + /* update global status */ + __sync_sub_and_fetch(&thread_wait, 1); + __sync_add_and_fetch(&thread_active, 1); + thd_proc_info(thd, 0); + + if (res == ETIMEDOUT || res == ETIME) + { + ret= TRUE; // indicate that query is rejected. + __sync_add_and_fetch(&thread_rejected, 1); + } + + DBUG_RETURN(ret); +} + +/* wake up the possible waiting thread */ +static void thread_running_control_low_exit(THD *thd) +{ + int32 tr_low; + Thread_conc_queue *queue; + + /* update global status */ + __sync_sub_and_fetch(&thread_active, 1); + + tr_low= (int32)queue_tr_low_watermark; + queue= thread_conc_queues + thd->query_id % N_THREAD_CONC_QUEUE; + + queue->lock(); + queue->thread_active--; + + if (tr_low && queue->thread_active >= tr_low) + { + queue->unlock(); + return; + } + + /* find a waiting query thread */ + Thread_wait_slot *slot= queue->wait_q_head(); + while (slot && slot->wait_ended) + slot= slot->next; + if (slot) + slot->wait_ended= true; + + queue->unlock(); + + if (slot) + { + mysql_mutex_lock(&slot->mutex); + /* signaled is set when cond-wait timeout */ + if (!slot->signaled) + { + slot->signaled= true; + mysql_cond_signal(&slot->cond); + } + mysql_mutex_unlock(&slot->mutex); + } +} + +/** Perform one connection-level (COM_XXXX) command. @param command type of command to perform @@ -1177,7 +1401,7 @@ command= COM_SHUTDOWN; } thd->set_query_id(next_query_id()); - inc_thread_running(); + int32 tr= inc_thread_running(); if (!(server_command_flags[command] & CF_SKIP_QUESTIONS)) statistic_increment(thd->status_var.questions, &LOCK_status); @@ -1209,6 +1433,15 @@ goto done; } + if (command == COM_QUERY && alloc_query(thd, packet, packet_length)) + goto endof_case; // fatal error is set + + if (thread_running_control_high(thd, tr)) + { + my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0)); + goto endof_case; + } + switch (command) { case COM_INIT_DB: { @@ -1311,8 +1544,6 @@ } case COM_QUERY: { - if (alloc_query(thd, packet, packet_length)) - break; // fatal error is set MYSQL_QUERY_START(thd->query(), thd->thread_id, (char *) (thd->db ? thd->db : ""), &thd->security_ctx->priv_user[0], @@ -1751,6 +1982,7 @@ my_message(ER_UNKNOWN_COM_ERROR, ER(ER_UNKNOWN_COM_ERROR), MYF(0)); break; } +endof_case: done: DBUG_ASSERT(thd->derived_tables == NULL && @@ -2502,12 +2734,37 @@ Opt_trace_array trace_command_steps(&thd->opt_trace, "steps"); DBUG_ASSERT(thd->transaction.stmt.cannot_safely_rollback() == FALSE); + bool count_active= false; if (need_traffic_control(thd, lex->sql_command)) { thd->killed = THD::KILL_QUERY; goto error; } + + switch (lex->sql_command) { + + case SQLCOM_SELECT: + case SQLCOM_UPDATE: + case SQLCOM_UPDATE_MULTI: + case SQLCOM_DELETE: + case SQLCOM_DELETE_MULTI: + case SQLCOM_INSERT: + case SQLCOM_INSERT_SELECT: + case SQLCOM_REPLACE: + case SQLCOM_REPLACE_SELECT: + count_active= true; + break; + default: + break; + } + + if (count_active && thread_running_control_low_enter(thd)) + { + my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, myf(0)); + goto error; + } + status_var_increment(thd->status_var.com_stat[lex->sql_command]); switch (gtid_pre_statement_checks(thd)) @@ -4990,6 +5247,9 @@ finish: + if (count_active) + thread_running_control_low_exit(thd); + DBUG_ASSERT(!thd->in_active_multi_stmt_transaction() || thd->in_multi_stmt_transaction_mode());