Index: sql/sql_parse.h =================================================================== --- sql/sql_parse.h (revision 4151) +++ sql/sql_parse.h (working copy) @@ -51,6 +51,7 @@ Object_creation_ctx *creation_ctx); uint kill_one_thread(THD *thd, ulong id, bool only_kill_query); +uint kill_xa_prepared_thread(THD *thd, ulong id, int *found); void free_items(Item *item); void cleanup_items(Item *item); Index: sql/transaction.cc =================================================================== --- sql/transaction.cc (revision 4151) +++ sql/transaction.cc (working copy) @@ -659,16 +659,61 @@ enum xa_states xa_state= thd->transaction.xid_state.xa_state; DBUG_ENTER("trans_xa_commit"); - if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) + THD *xa_thd= NULL; + THD *thd_old= NULL; + + NET xa_thd_net; + LEX *thd_lex= thd->lex; + + int in_xa_prepared_thread= 0; + ulong i, found_idx= 0; + + /* 'if' branch: (xa_state == XA_PREPARED && thd_lex->xa_opt == XA_NONE) */ + if (thd_lex->xa_opt == XA_NONE) { - XID_STATE *xs= xid_cache_search(thd->lex->xid); + mysql_mutex_lock(&LOCK_xa_prepared_threads); + + for (i = 0; i < xa_prepared_threads.elements; i++) + { + get_dynamic(&xa_prepared_threads, (uchar*) &xa_thd, i); + + /* commit xa prepared transaction for the disconnected client */ + if (xa_thd->transaction.xid_state.xid.eq(thd_lex->xid)) + { + in_xa_prepared_thread= 1; + + /* backup connection information first. */ + memcpy(&xa_thd_net, &xa_thd->net, sizeof(NET)); + + /* use the current thd's connection information. */ + memcpy(&xa_thd->net, &thd->net, sizeof(NET)); + + /* at the end of trans_xa_commit, old value will be restored. */ + thd_old= thd; + thd= xa_thd; + + xa_state= XA_PREPARED; + found_idx= i; + break; + } + } + + if (in_xa_prepared_thread) + delete_dynamic_element(&xa_prepared_threads, found_idx); + + mysql_mutex_unlock(&LOCK_xa_prepared_threads); + } + + if (!thd->transaction.xid_state.xid.eq(thd_lex->xid)) + { + XID_STATE *xs= xid_cache_search(thd_lex->xid); res= !xs || xs->in_thd; if (res) my_error(ER_XAER_NOTA, MYF(0)); else { res= xa_trans_rolled_back(xs); - ha_commit_or_rollback_by_xid(thd->lex->xid, !res); + ha_commit_or_rollback_by_xid(thd_lex->xid, !res); xid_cache_delete(xs); } DBUG_RETURN(res); @@ -679,13 +724,13 @@ xa_trans_force_rollback(thd); res= thd->is_error(); } - else if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE) + else if (xa_state == XA_IDLE && thd_lex->xa_opt == XA_ONE_PHASE) { int r= ha_commit_trans(thd, TRUE); if ((res= test(r))) my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0)); } - else if (xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE) + else if (xa_state == XA_PREPARED && thd_lex->xa_opt == XA_NONE) { MDL_request mdl_request; @@ -726,6 +771,19 @@ xid_cache_delete(&thd->transaction.xid_state); thd->transaction.xid_state.xa_state= XA_NOTR; + if (in_xa_prepared_thread) + { + /* restore old value */ + thd= thd_old; + memcpy(&xa_thd->net, &xa_thd_net, sizeof(NET)); + + /* don't forget to unlink the already disconnected client*/ + unlink_thd(xa_thd); + + /* need to unlock LOCK_thread_count */ + mysql_mutex_unlock(&LOCK_thread_count); + } + DBUG_RETURN(res); } @@ -745,15 +803,57 @@ enum xa_states xa_state= thd->transaction.xid_state.xa_state; DBUG_ENTER("trans_xa_rollback"); - if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) + THD *xa_thd= NULL; + THD *thd_old= NULL; + + NET xa_thd_net; + LEX *thd_lex= thd->lex; + + int in_xa_prepared_thread= 0; + ulong i, found_idx= 0; + + mysql_mutex_lock(&LOCK_xa_prepared_threads); + + for (i = 0; i < xa_prepared_threads.elements; i++) { - XID_STATE *xs= xid_cache_search(thd->lex->xid); + get_dynamic(&xa_prepared_threads, (uchar*) &xa_thd, i); + + /* commit xa prepared transaction for the disconnected client */ + if (xa_thd->transaction.xid_state.xid.eq(thd_lex->xid)) + { + in_xa_prepared_thread= 1; + + /* backup connection information first. */ + memcpy(&xa_thd_net, &xa_thd->net, sizeof(NET)); + + /* use the current thd's connection information. */ + memcpy(&xa_thd->net, &thd->net, sizeof(NET)); + + /* at the end of trans_xa_commit, old value will be restored. */ + thd_old= thd; + thd= xa_thd; + + xa_state= XA_PREPARED; + found_idx= i; + + break; + } + } + + if (in_xa_prepared_thread) + delete_dynamic_element(&xa_prepared_threads, found_idx); + + mysql_mutex_unlock(&LOCK_xa_prepared_threads); + + if (!thd->transaction.xid_state.xid.eq(thd_lex->xid)) + { + XID_STATE *xs= xid_cache_search(thd_lex->xid); if (!xs || xs->in_thd) my_error(ER_XAER_NOTA, MYF(0)); else { xa_trans_rolled_back(xs); - ha_commit_or_rollback_by_xid(thd->lex->xid, 0); + ha_commit_or_rollback_by_xid(thd_lex->xid, 0); xid_cache_delete(xs); } DBUG_RETURN(thd->stmt_da->is_error()); @@ -773,5 +873,18 @@ xid_cache_delete(&thd->transaction.xid_state); thd->transaction.xid_state.xa_state= XA_NOTR; + if (in_xa_prepared_thread) + { + /* restore old value */ + thd= thd_old; + memcpy(&xa_thd->net, &xa_thd_net, sizeof(NET)); + + /* don't forget to unlink the already disconnected client*/ + unlink_thd(xa_thd); + + /* need to unlock LOCK_thread_count */ + mysql_mutex_unlock(&LOCK_thread_count); + } + DBUG_RETURN(res); } Index: sql/mysqld.cc =================================================================== --- sql/mysqld.cc (revision 4151) +++ sql/mysqld.cc (working copy) @@ -630,6 +630,7 @@ I_List threads; Rpl_filter* rpl_filter; Rpl_filter* binlog_filter; +DYNAMIC_ARRAY xa_prepared_threads; struct system_variables global_system_variables; struct system_variables max_system_variables; @@ -658,6 +659,7 @@ pthread_key(MEM_ROOT**,THR_MALLOC); pthread_key(THD*, THR_THD); mysql_mutex_t LOCK_thread_count; +mysql_mutex_t LOCK_xa_prepared_threads; mysql_mutex_t LOCK_status, LOCK_error_log, LOCK_uuid_generator, LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create, @@ -1161,6 +1163,11 @@ } mysql_mutex_unlock(&LOCK_thread_count); // For unlink from list + /* free xa_prepared_threads */ + mysql_mutex_lock(&LOCK_xa_prepared_threads); + delete_dynamic(&xa_prepared_threads); + mysql_mutex_unlock(&LOCK_xa_prepared_threads); + Events::deinit(); end_slave(); @@ -1606,6 +1613,7 @@ { mysql_rwlock_destroy(&LOCK_grant); mysql_mutex_destroy(&LOCK_thread_count); + mysql_mutex_destroy(&LOCK_xa_prepared_threads); mysql_mutex_destroy(&LOCK_status); mysql_mutex_destroy(&LOCK_delayed_insert); mysql_mutex_destroy(&LOCK_delayed_status); @@ -2133,17 +2141,36 @@ { DBUG_ENTER("unlink_thd"); DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd)); + + /* + Reserve the *thd* when the client disconnects while it has + XA_PREPARED transcation. Other client can help commit/rollback + the XA_PREPARED transaction and do the cleanup for this *thd*. + */ + int thd_xa_prepared= thd->transaction.xid_state.xa_state == XA_PREPARED; - thd_cleanup(thd); - dec_connection_count(); - mysql_mutex_lock(&LOCK_thread_count); + if (!thd_xa_prepared) + { + thd_cleanup(thd); + dec_connection_count(); + mysql_mutex_lock(&LOCK_thread_count); + } + else + { + /* add this 'thd' to the global *xa_prepared_threads* */ + mysql_mutex_lock(&LOCK_xa_prepared_threads); + thd->proc_info= "Waiting for xa commit/rollback"; + insert_dynamic(&xa_prepared_threads, (uchar*)&thd); + mysql_mutex_unlock(&LOCK_xa_prepared_threads); + } /* Used by binlog_reset_master. It would be cleaner to use DEBUG_SYNC here, but that's not possible because the THD's debug sync feature has been shut down at this point. */ DBUG_EXECUTE_IF("sleep_after_lock_thread_count_before_delete_thd", sleep(5);); - delete_thd(thd); + if (!thd_xa_prepared) + delete_thd(thd); DBUG_VOID_RETURN; } @@ -3654,6 +3681,8 @@ static int init_thread_environment() { mysql_mutex_init(key_LOCK_thread_count, &LOCK_thread_count, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_xa_prepared_threads, &LOCK_xa_prepared_threads, + MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_status, &LOCK_status, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_delayed_insert, &LOCK_delayed_insert, MY_MUTEX_INIT_FAST); @@ -7050,6 +7079,7 @@ threads.empty(); thread_cache.empty(); key_caches.empty(); + my_init_dynamic_array(&xa_prepared_threads, sizeof(THD*), 512, 512); if (!(dflt_key_cache= get_or_create_key_cache(default_key_cache_base.str, default_key_cache_base.length))) { @@ -8055,6 +8085,7 @@ key_relay_log_info_log_space_lock, key_relay_log_info_run_lock, key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data, key_LOCK_error_messages, key_LOG_INFO_lock, key_LOCK_thread_count, + key_LOCK_xa_prepared_threads, key_PARTITION_LOCK_auto_inc; PSI_mutex_key key_RELAYLOG_LOCK_index; PSI_mutex_key key_LOCK_wakeup_ready, key_LOCK_group_commit_queue, key_LOCK_commit_ordered; @@ -8119,6 +8150,7 @@ { &key_LOCK_error_messages, "LOCK_error_messages", PSI_FLAG_GLOBAL}, { &key_LOG_INFO_lock, "LOG_INFO::lock", 0}, { &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL}, + { &key_LOCK_xa_prepared_threads, "LOCK_xa_prepared_threads", PSI_FLAG_GLOBAL}, { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0} }; Index: sql/mysqld.h =================================================================== --- sql/mysqld.h (revision 4151) +++ sql/mysqld.h (working copy) @@ -241,6 +241,7 @@ extern LEX_STRING opt_init_connect, opt_init_slave; extern int bootstrap_error; extern I_List threads; +extern DYNAMIC_ARRAY xa_prepared_threads; extern char err_shared_dir[]; extern TYPELIB thread_handling_typelib; extern my_decimal decimal_zero; @@ -286,7 +287,7 @@ key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock, key_relay_log_info_log_space_lock, key_relay_log_info_run_lock, key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data, - key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; + key_LOCK_error_messages, key_LOCK_thread_count, key_LOCK_xa_prepared_threads, key_PARTITION_LOCK_auto_inc; extern PSI_mutex_key key_RELAYLOG_LOCK_index; extern PSI_mutex_key key_LOCK_wakeup_ready, key_LOCK_group_commit_queue, key_LOCK_commit_ordered; @@ -380,7 +381,7 @@ LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count, LOCK_stats, LOCK_global_user_client_stats, LOCK_global_table_stats, LOCK_global_index_stats; -extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count; +extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count, LOCK_xa_prepared_threads; #ifdef HAVE_OPENSSL extern mysql_mutex_t LOCK_des_key_file; #endif Index: sql/sql_parse.cc =================================================================== --- sql/sql_parse.cc (revision 4151) +++ sql/sql_parse.cc (working copy) @@ -6639,7 +6639,92 @@ lex->prev_join_using= using_fields; } +/** + Try to find a XA_PREPARED thread and rollback. + + @param thd Thread class + @param id Thread id + @param found [out] whether thread id in *xa_prepared_threads*. + @note + This function is only called by *kill_one_thread*, and the paramter + 'only_kill_query' is ignored, when the thread to be killed is in + *xa_prepared_threads*, rollback the XA_PREPARED transaction and call + *unlink_thd* to make it disappear from 'SHOW PROCESSLIST;' +*/ +uint kill_xa_prepared_thread(THD *thd, ulong id, int *found) +{ + THD *tmp; + uint error= 0; + DBUG_ENTER("kill_xa_prepared_thread"); + DBUG_PRINT("enter", ("id=%lu", id)); + + NET tmp_net; + *found= 0; + ulong i= 0, found_idx= 0; + + mysql_mutex_lock(&LOCK_xa_prepared_threads); + + for (i= 0; i < xa_prepared_threads.elements; i++) + { + get_dynamic(&xa_prepared_threads, (uchar*)&tmp, i); + if (tmp->thread_id == id) + { + *found= 1; + found_idx= i; + + mysql_mutex_lock(&tmp->LOCK_thd_data); + + memcpy(&tmp_net, &tmp->net, sizeof(NET)); + memcpy(&tmp->net, &thd->net, sizeof(NET)); + + break; + } + } + + if (*found) + { + if ((thd->security_ctx->master_access & SUPER_ACL) || + thd->security_ctx->user_matches(tmp->security_ctx)) + { + /* rollback the XA_PREPARED transaction */ + tmp->transaction.xid_state.rm_error= 0; + if (ha_rollback_trans(tmp, true)) + error= ER_XAER_RMERR; + + /* remove XA_PREPARED thread from *xa_prepared_threads* */ + delete_dynamic_element(&xa_prepared_threads, found_idx); + + tmp->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); + tmp->transaction.all.modified_non_trans_table= FALSE; + tmp->server_status&= ~SERVER_STATUS_IN_TRANS; + xid_cache_delete(&tmp->transaction.xid_state); + tmp->transaction.xid_state.xa_state= XA_NOTR; + + /* restore old value */ + memcpy(&tmp->net, &tmp_net, sizeof(NET)); + + mysql_mutex_unlock(&tmp->LOCK_thd_data); + + /* don't forget to unlink the already disconnected client */ + unlink_thd(tmp); + + /* needed to unlock LOCK_thread_count */ + mysql_mutex_unlock(&LOCK_thread_count); + + error= 0; + } + else + { + mysql_mutex_unlock(&tmp->LOCK_thd_data); + error= ER_KILL_DENIED_ERROR; + } + } + mysql_mutex_unlock(&LOCK_xa_prepared_threads); + + return error; +} + /** kill on thread. @@ -6657,6 +6742,15 @@ uint error=ER_NO_SUCH_THREAD; DBUG_ENTER("kill_one_thread"); DBUG_PRINT("enter", ("id=%lu only_kill=%d", id, only_kill_query)); + + int found_xa_prepared_thread= 0; + error= kill_xa_prepared_thread(thd, id, &found_xa_prepared_thread); + if (found_xa_prepared_thread) + return error; + + /* reset error */ + error=ER_NO_SUCH_THREAD; + mysql_mutex_lock(&LOCK_thread_count); // For unlink from list I_List_iterator it(threads); while ((tmp=it++))