Index: sql/mysqld.cc =================================================================== --- sql/mysqld.cc (revision 4490) +++ sql/mysqld.cc (revision 4496) @@ -477,6 +477,7 @@ ulong tc_heuristic_recover= 0; uint volatile thread_count; int32 thread_running; +int32 thread_rejected= 0; ulong thread_created; ulong back_log, connect_timeout, concurrency, server_id; ulong table_cache_size, table_def_size; @@ -498,6 +499,7 @@ query_id_t global_query_id; my_atomic_rwlock_t global_query_id_lock; my_atomic_rwlock_t thread_running_lock; +my_atomic_rwlock_t thread_rejected_lock; ulong aborted_threads, aborted_connects; ulong delayed_insert_timeout, delayed_insert_limit, delayed_queue_size; ulong delayed_insert_threads, delayed_insert_writes, delayed_rows_in_use; @@ -616,6 +618,10 @@ const char *in_additional_cond= ""; const char *in_having_cond= ""; +ulong thread_running_low_watermark= 0; +ulong thread_running_high_watermark= 0; +ulong thread_running_ctl_mode= 0; + my_decimal decimal_zero; /* @@ -690,6 +696,7 @@ #endif mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave; mysql_rwlock_t LOCK_system_variables_hash; +mysql_rwlock_t LOCK_thread_running_watermark; mysql_rwlock_t LOCK_filter_list; mysql_cond_t COND_thread_count; pthread_t signal_thread; @@ -1575,6 +1582,7 @@ logger.cleanup_end(); my_atomic_rwlock_destroy(&global_query_id_lock); my_atomic_rwlock_destroy(&thread_running_lock); + my_atomic_rwlock_destroy(&thread_rejected_lock); mysql_mutex_lock(&LOCK_thread_count); DBUG_PRINT("quit", ("got thread count lock")); ready_to_exit=1; @@ -1651,6 +1659,7 @@ mysql_rwlock_destroy(&LOCK_system_variables_hash); mysql_mutex_destroy(&LOCK_uuid_generator); mysql_mutex_destroy(&LOCK_prepared_stmt_count); + mysql_rwlock_destroy(&LOCK_thread_running_watermark); mysql_mutex_destroy(&LOCK_error_messages); mysql_cond_destroy(&COND_thread_count); mysql_cond_destroy(&COND_thread_cache); @@ -3494,6 +3503,9 @@ } open_files_limit= files; } + /* assign max_connections to thread_running_high_watermark if it equals 0. */ + if (thread_running_high_watermark == 0) + thread_running_high_watermark= max_connections; unireg_init(opt_specialflag); /* Set up extern variabels */ if (!(my_default_lc_messages= my_locale_by_name(lc_messages))) @@ -3718,6 +3730,8 @@ mysql_rwlock_init(key_rwlock_LOCK_sys_init_connect, &LOCK_sys_init_connect); mysql_rwlock_init(key_rwlock_LOCK_sys_init_slave, &LOCK_sys_init_slave); mysql_rwlock_init(key_rwlock_LOCK_grant, &LOCK_grant); + mysql_rwlock_init(key_rwlock_LOCK_thread_running_watermark, + &LOCK_thread_running_watermark); mysql_rwlock_init(key_rwlock_LOCK_filter_list, &LOCK_filter_list); mysql_cond_init(key_COND_thread_count, &COND_thread_count, NULL); mysql_cond_init(key_COND_thread_cache, &COND_thread_cache, NULL); @@ -6876,6 +6890,7 @@ {"Threads_connected", (char*) &connection_count, SHOW_INT}, {"Threads_created", (char*) &thread_created, SHOW_LONG_NOFLUSH}, {"Threads_running", (char*) &thread_running, SHOW_INT}, + {"Threads_rejected", (char*) &thread_rejected, SHOW_INT}, {"Uptime", (char*) &show_starttime, SHOW_FUNC}, #ifdef ENABLED_PROFILING {"Uptime_since_flush_status",(char*) &show_flushstatustime, SHOW_FUNC}, @@ -7081,6 +7096,7 @@ global_query_id= thread_id= 1L; my_atomic_rwlock_init(&global_query_id_lock); my_atomic_rwlock_init(&thread_running_lock); + my_atomic_rwlock_init(&thread_rejected_lock); strmov(server_version, MYSQL_SERVER_VERSION); threads.empty(); thread_cache.empty(); @@ -8160,6 +8176,7 @@ PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave, key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock, + key_rwlock_LOCK_thread_running_watermark, key_rwlock_LOCK_filter_list; static PSI_rwlock_info all_server_rwlocks[]= @@ -8173,6 +8190,7 @@ { &key_rwlock_LOCK_sys_init_slave, "LOCK_sys_init_slave", PSI_FLAG_GLOBAL}, { &key_rwlock_LOCK_system_variables_hash, "LOCK_system_variables_hash", PSI_FLAG_GLOBAL}, { &key_rwlock_query_cache_query_lock, "Query_cache_query::lock", 0}, + { &key_rwlock_LOCK_thread_running_watermark, "LOCK_thread_running_watermark", PSI_FLAG_GLOBAL}, { &key_rwlock_LOCK_filter_list, "LOCK_filter_list", PSI_FLAG_GLOBAL} }; Index: sql/mysqld.h =================================================================== --- sql/mysqld.h (revision 4490) +++ sql/mysqld.h (revision 4496) @@ -245,6 +245,10 @@ extern TYPELIB thread_handling_typelib; extern my_decimal decimal_zero; +extern ulong thread_running_low_watermark; +extern ulong thread_running_high_watermark; +extern ulong thread_running_ctl_mode; + /* control whether to use statement timeout, and which kind of statements will be affected, possible values: NONE, SELECTS, ALL. @@ -299,6 +303,7 @@ extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave, key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock, + key_rwlock_LOCK_thread_running_watermark, key_rwlock_LOCK_filter_list; #ifdef HAVE_MMAP @@ -399,8 +404,12 @@ extern mysql_cond_t COND_thread_count; extern mysql_cond_t COND_manager; extern int32 thread_running; +extern int32 thread_rejected; extern my_atomic_rwlock_t thread_running_lock; +extern my_atomic_rwlock_t thread_rejected_lock; +extern mysql_rwlock_t LOCK_thread_running_watermark; + extern char *opt_ssl_ca, *opt_ssl_capath, *opt_ssl_cert, *opt_ssl_cipher, *opt_ssl_key; @@ -536,6 +545,16 @@ } inline int32 +inc_thread_rejected() +{ + int32 num_thread_rejected; + my_atomic_rwlock_wrlock(&thread_rejected_lock); + num_thread_rejected= my_atomic_add32(&thread_rejected, 1); + my_atomic_rwlock_wrunlock(&thread_rejected_lock); + return (num_thread_rejected+1); +} + +inline int32 inc_thread_running() { int32 num_thread_running; Index: sql/share/errmsg-utf8.txt =================================================================== --- sql/share/errmsg-utf8.txt (revision 4490) +++ sql/share/errmsg-utf8.txt (revision 4496) @@ -6489,3 +6489,6 @@ ER_QUERY_TIMEOUT eng "Query execution was interrupted, max_statement_time exceeded" + +ER_SERVER_THREAD_RUNNING_TOO_HIGH + eng "MySQL Sever is too buzy." Index: sql/sys_vars.cc =================================================================== --- sql/sys_vars.cc (revision 4490) +++ sql/sys_vars.cc (revision 4496) @@ -1012,6 +1012,34 @@ } return false; } + +static bool check_tr_low_watermark(sys_var *self, THD *thd, set_var *var) +{ + if (check_has_super(self, thd, var)) + return true; + + longlong val = (longlong) var->save_result.ulonglong_value; + if (val > (longlong)thread_running_high_watermark) + return true; + + return false; +} + +static bool check_tr_high_watermark(sys_var *self, THD *thd, set_var *var) +{ + if (check_has_super(self, thd, var)) + return true; + + longlong val = (longlong) var->save_result.ulonglong_value; + if (val < (longlong)thread_running_low_watermark) + return true; + + if (val == 0) + var->save_result.ulonglong_value= max_connections; + + return false; +} + static PolyLock_rwlock PLock_sys_init_connect(&LOCK_sys_init_connect); static Sys_var_lexstring Sys_init_connect( "init_connect", "Command(s) that are executed for each " @@ -1020,6 +1048,34 @@ DEFAULT(""), &PLock_sys_init_connect, NOT_IN_BINLOG, ON_CHECK(check_init_string)); +static PolyLock_rwlock PLock_tr_low_watermark(&LOCK_thread_running_watermark); +static Sys_var_ulong Sys_thread_running_low_watermark( + "threads_running_low_watermark", + "When threads_running exceed this limit, " + "query that no in active transaction should wait.", + GLOBAL_VAR(thread_running_low_watermark), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, 50000), DEFAULT(0), BLOCK_SIZE(1), + &PLock_tr_low_watermark, NOT_IN_BINLOG, + ON_CHECK(check_tr_low_watermark)); + +static PolyLock_rwlock PLock_tr_high_watermark(&LOCK_thread_running_watermark); +static Sys_var_ulong Sys_thread_running_high_watermark( + "threads_running_high_watermark", + "When threads_running exceed this limit, " + "query that not in active transaction should quit.", + GLOBAL_VAR(thread_running_high_watermark), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, 50000), DEFAULT(0), BLOCK_SIZE(1), + &PLock_tr_high_watermark, NOT_IN_BINLOG, + ON_CHECK(check_tr_high_watermark)); + +static const char *thread_running_ctl_mode_names[]= {"SELECTS", "ALL", 0}; +static Sys_var_enum Sys_thread_running_ctl_mode( + "threads_running_ctl_mode", + "Control which statements will be affected by threads running control, " + "Values: SELECTS(default), ALL.", + GLOBAL_VAR(thread_running_ctl_mode), CMD_LINE(REQUIRED_ARG), + thread_running_ctl_mode_names, DEFAULT(0)); + static Sys_var_charptr Sys_init_file( "init_file", "Read SQL commands from this file at startup", READ_ONLY GLOBAL_VAR(opt_init_file), Index: sql/sql_parse.cc =================================================================== --- sql/sql_parse.cc (revision 4490) +++ sql/sql_parse.cc (revision 4496) @@ -949,6 +949,116 @@ } /** + Avoid too many statements run concurrently. + + If thread_running exceed thread_running_low_watermark, + wait until thread_running drops down under thread_running_low_watermark or + total wait time exceed 100ms. + + If thread_running exceed thread_running_high_watermark, quit the query. + + @param thd connection handle + @param tr value return from inc_thread_running(). + + @retval + FALSE query maybe delayed at most 100ms. + @retval + TRUE reject the query +*/ +static my_bool thread_running_control(THD *thd, ulong tr) +{ + int slept_cnt= 0; + ulong tr_low, tr_high; + DBUG_ENTER("thread_running_control"); + + /* + Super user/slave thread will not be affected at any time, + transactions that have already started will continue. + */ + if (thd->security_ctx->master_access & SUPER_ACL || + thd->in_active_multi_stmt_transaction() || + thd->slave_thread) + DBUG_RETURN(FALSE); + + /* + To promise that tr_low will never be greater than tr_high, + as values may be changed between these two statements. + eg. + (low, high) = (200, 500) + 1. read low = 200 + 2. other sessions: set low = 20; set high = 80 + 3. read high = 80 + Don't take a lock here to avoid lock contention. + */ + do + { + tr_low= thread_running_low_watermark; + tr_high= thread_running_high_watermark; + + } while (tr_low > tr_high); + +check_buzy: + + /* tr_high is promised to be non-zero.*/ + if ((tr_low == 0 && tr < tr_high) || (tr_low != 0 && tr < tr_low)) + DBUG_RETURN(FALSE); + + if (tr >= tr_high) + { + int can_reject= 1; + + /* thread_running_ctl_mode: 0 -> SELECTS, 1 -> ALL. */ + if (thread_running_ctl_mode == 0) + { + int query_is_select= 0; + if (thd->query_length() >= 8) + { + char *p= thd->query(); + if (my_toupper(system_charset_info, p[0]) == 'S' && + my_toupper(system_charset_info, p[1]) == 'E' && + my_toupper(system_charset_info, p[2]) == 'L' && + my_toupper(system_charset_info, p[3]) == 'E' && + my_toupper(system_charset_info, p[4]) == 'C' && + my_toupper(system_charset_info, p[5]) == 'T') + + query_is_select= 1; + } + + if (!query_is_select) + can_reject= 0; + } + + if (can_reject) + { + inc_thread_rejected(); + DBUG_RETURN(TRUE); + } + else + DBUG_RETURN(FALSE); + } + + if (tr_low != 0 && tr >= tr_low) + { + /* + If total slept time exceed 100ms and thread running does not + reach high watermark, let it in. + */ + if (slept_cnt >= 20) + DBUG_RETURN(FALSE); + + dec_thread_running() + + /* wait for 5ms. */ + my_sleep(5000UL); + + slept_cnt++; + tr= inc_thread_running() - 1; + + goto check_buzy; + } + + DBUG_RETURN(FALSE); +} + +/** Perform one connection-level (COM_XXXX) command. @param command type of command to perform @@ -1016,7 +1126,8 @@ thd->set_query_id(get_query_id()); if (!(server_command_flags[command] & CF_SKIP_QUERY_ID)) next_query_id(); - inc_thread_running(); + /* remember old value of thread_running for *thread_running_control*. */ + int32 tr= inc_thread_running() - 1; if (!(server_command_flags[command] & CF_SKIP_QUESTIONS)) statistic_increment(thd->status_var.questions, &LOCK_status); @@ -1129,6 +1240,13 @@ { if (alloc_query(thd, packet, packet_length)) break; // fatal error is set + + if (thread_running_control(thd, (ulong)tr)) + { + my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0)); + break; + } + MYSQL_QUERY_START(thd->query(), thd->thread_id, (char *) (thd->db ? thd->db : ""), &thd->security_ctx->priv_user[0],