diff --git a/src/include/pcp/libpcp_ext.h b/src/include/pcp/libpcp_ext.h index caf05bbd..99330678 100644 --- a/src/include/pcp/libpcp_ext.h +++ b/src/include/pcp/libpcp_ext.h @@ -313,7 +313,7 @@ struct WdInfo; extern PCPConnInfo * pcp_connect(char *hostname, int port, char *username, char *password, FILE *Pfdebug); extern void pcp_disconnect(PCPConnInfo * pcpConn); -extern PCPResultInfo * pcp_terminate_pgpool(PCPConnInfo * pcpCon, char mode); +extern PCPResultInfo * pcp_terminate_pgpool(PCPConnInfo * pcpConn, char mode, bool cluster_mode); extern PCPResultInfo * pcp_node_count(PCPConnInfo * pcpCon); extern PCPResultInfo * pcp_node_info(PCPConnInfo * pcpCon, int nid); extern PCPResultInfo * pcp_health_check_stats(PCPConnInfo * pcpCon, int nid); diff --git a/src/include/watchdog/wd_internal_commands.h b/src/include/watchdog/wd_internal_commands.h index e2240183..8abb55b3 100644 --- a/src/include/watchdog/wd_internal_commands.h +++ b/src/include/watchdog/wd_internal_commands.h @@ -37,6 +37,8 @@ extern WDFailoverCMDResults wd_send_failback_request(int node_id, unsigned char extern WDFailoverCMDResults wd_degenerate_backend_set(int *node_id_set, int count, unsigned char flags); extern WDFailoverCMDResults wd_promote_backend(int node_id, unsigned char flags); +extern WdCommandResult wd_execute_cluster_command(const char *clusterCommand); + extern WDPGBackendStatus * get_pg_backend_status_from_master_wd_node(void); extern WD_STATES wd_internal_get_watchdog_local_node_state(void); @@ -57,5 +59,4 @@ extern void set_watchdog_node_escalated(void); extern void reset_watchdog_node_escalated(void); extern bool get_watchdog_node_escalation_state(void); - #endif /* WD_INTERNAL_COMMANDS_H */ diff --git a/src/include/watchdog/wd_ipc_defines.h b/src/include/watchdog/wd_ipc_defines.h index 0bda9c1b..58691075 100644 --- a/src/include/watchdog/wd_ipc_defines.h +++ b/src/include/watchdog/wd_ipc_defines.h @@ -62,13 +62,20 @@ typedef enum WDValueDataType #define WD_IPC_CMD_RESULT_OK '7' #define WD_IPC_CMD_TIMEOUT '8' +#define WD_EXECUTE_CLUSTER_COMMAND 'c' #define WD_IPC_FAILOVER_COMMAND 'f' -#define WD_IPC_ONLINE_RECOVERY_COMMAND 'r' -#define WD_FAILOVER_LOCKING_REQUEST 's' +#define WD_IPC_ONLINE_RECOVERY_COMMAND 'r' +#define WD_FAILOVER_LOCKING_REQUEST 's' #define WD_GET_MASTER_DATA_REQUEST 'd' #define WD_GET_RUNTIME_VARIABLE_VALUE 'v' #define WD_FAILOVER_INDICATION 'i' + +#define WD_COMMAND_RESTART_CLUSTER "RESTART_CLUSTER" +#define WD_COMMAND_REELECT_MASTER "REELECT_MASTER" +#define WD_COMMAND_SHUTDOWN_CLUSTER "SHUTDOWN_CLUSTER" + + #define WD_FUNCTION_START_RECOVERY "START_RECOVERY" #define WD_FUNCTION_END_RECOVERY "END_RECOVERY" #define WD_FUNCTION_FAILBACK_REQUEST "FAILBACK_REQUEST" @@ -112,7 +119,7 @@ typedef enum WDValueDataType #define WD_RUNTIME_VAR_ESCALATION_STATE "Escalated" /* Use to inform node new node status by lifecheck */ -#define WD_LIFECHECK_NODE_STATUS_DEAD 1 +#define WD_LIFECHECK_NODE_STATUS_DEAD 1 #define WD_LIFECHECK_NODE_STATUS_ALIVE 2 diff --git a/src/include/watchdog/wd_json_data.h b/src/include/watchdog/wd_json_data.h index 0080a217..9e480e9d 100644 --- a/src/include/watchdog/wd_json_data.h +++ b/src/include/watchdog/wd_json_data.h @@ -69,4 +69,10 @@ extern char *get_simple_request_json(char *key, char *value, unsigned int shared extern bool parse_data_request_json(char *json_data, int data_len, char **request_type); extern char *get_data_request_json(char *request_type, unsigned int sharedKey, char *authKey); +extern bool +parse_wd_exec_cluster_command_json(char *json_data, int data_len, + char **clusterCommand, unsigned char *flags); +extern char * +get_wd_exec_cluster_command_json(char *clusterCommand, unsigned char flags, unsigned int sharedKey, char *authKey); + #endif diff --git a/src/libs/pcp/Makefile.am b/src/libs/pcp/Makefile.am index feecdf49..9127c782 100644 --- a/src/libs/pcp/Makefile.am +++ b/src/libs/pcp/Makefile.am @@ -1,7 +1,7 @@ AM_CPPFLAGS = -D_GNU_SOURCE -DPOOL_PRIVATE -I @PGSQL_INCLUDE_DIR@ lib_LTLIBRARIES = libpcp.la -libpcp_la_LDFLAGS = -version-info 1:0:0 +libpcp_la_LDFLAGS = -version-info 2:0:0 dist_libpcp_la_SOURCES = pcp.c \ ../../utils/pool_path.c \ ../../tools/fe_port.c \ diff --git a/src/libs/pcp/pcp.c b/src/libs/pcp/pcp.c index 788d7224..fd10c278 100644 --- a/src/libs/pcp/pcp.c +++ b/src/libs/pcp/pcp.c @@ -608,7 +608,7 @@ pcp_disconnect(PCPConnInfo * pcpConn) * -------------------------------- */ PCPResultInfo * -pcp_terminate_pgpool(PCPConnInfo * pcpConn, char mode) +pcp_terminate_pgpool(PCPConnInfo * pcpConn, char mode, bool cluster_mode) { int wsize; @@ -617,7 +617,10 @@ pcp_terminate_pgpool(PCPConnInfo * pcpConn, char mode) pcp_internal_error(pcpConn, "invalid PCP connection"); return NULL; } - pcp_write(pcpConn->pcpConn, "T", 1); + if (cluster_mode == false) + pcp_write(pcpConn->pcpConn, "T", 1); + else + pcp_write(pcpConn->pcpConn, "t", 1); wsize = htonl(sizeof(int) + sizeof(char)); pcp_write(pcpConn->pcpConn, &wsize, sizeof(int)); pcp_write(pcpConn->pcpConn, &mode, sizeof(char)); diff --git a/src/pcp_con/pcp_worker.c b/src/pcp_con/pcp_worker.c index 35109e6a..2aa79e6a 100644 --- a/src/pcp_con/pcp_worker.c +++ b/src/pcp_con/pcp_worker.c @@ -80,7 +80,7 @@ static void process_attach_node(PCP_CONNECTION * frontend, char *buf); static void process_recovery_request(PCP_CONNECTION * frontend, char *buf); static void process_status_request(PCP_CONNECTION * frontend); static void process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos); -static void process_shutown_request(PCP_CONNECTION * frontend, char mode); +static void process_shutown_request(PCP_CONNECTION * frontend, char mode, char tos); static void process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len); static void pcp_worker_will_go_down(int code, Datum arg); @@ -297,8 +297,9 @@ pcp_process_command(char tos, char *buf, int buf_len) break; case 'T': + case 't': set_ps_display("PCP: processing shutdown request", false); - process_shutown_request(pcp_frontend, buf[0]); + process_shutown_request(pcp_frontend, buf[0], tos); break; case 'O': /* recovery request */ @@ -1260,7 +1261,7 @@ send_md5salt(PCP_CONNECTION * frontend, char *salt) } static void -process_shutown_request(PCP_CONNECTION * frontend, char mode) +process_shutown_request(PCP_CONNECTION * frontend, char mode, char tos) { char code[] = "CommandComplete"; pid_t ppid = getppid(); @@ -1295,12 +1296,23 @@ process_shutown_request(PCP_CONNECTION * frontend, char mode) errdetail("invalid shutdown mode \"%c\"", mode))); } + if (tos == 't' && pool_config->use_watchdog) + { + ereport(LOG, + (errmsg("PCP: sending command to watchdog to shutdown cluster"))); + if (wd_execute_cluster_command(WD_COMMAND_SHUTDOWN_CLUSTER) != COMMAND_OK) + ereport(ERROR, + (errmsg("PCP: error while processing shutdown cluster request"), + errdetail("failed to propogate shutdown command through watchdog"))); + } + pcp_write(frontend, "t", 1); len = htonl(sizeof(code) + sizeof(int)); pcp_write(frontend, &len, sizeof(int)); pcp_write(frontend, code, sizeof(code)); do_pcp_flush(frontend); + pool_signal_parent(sig); } diff --git a/src/tools/pcp/pcp_frontend_client.c b/src/tools/pcp/pcp_frontend_client.c index b89c5030..13d9f1f9 100644 --- a/src/tools/pcp/pcp_frontend_client.c +++ b/src/tools/pcp/pcp_frontend_client.c @@ -90,7 +90,7 @@ struct AppTypes AllAppTypes[] = {"pcp_proc_info", PCP_PROC_INFO, "h:p:P:U:awWvd", "display a pgpool-II child process' information"}, {"pcp_promote_node", PCP_PROMOTE_NODE, "n:h:p:U:gwWvd", "promote a node as new master from pgpool-II"}, {"pcp_recovery_node", PCP_RECOVERY_NODE, "n:h:p:U:wWvd", "recover a node"}, - {"pcp_stop_pgpool", PCP_STOP_PGPOOL, "m:h:p:U:wWvd", "terminate pgpool-II"}, + {"pcp_stop_pgpool", PCP_STOP_PGPOOL, "m:h:p:U:wWvda", "terminate pgpool-II"}, {"pcp_watchdog_info", PCP_WATCHDOG_INFO, "n:h:p:U:wWvd", "display a pgpool-II watchdog's information"}, {NULL, UNKNOWN, NULL, NULL}, }; @@ -404,7 +404,7 @@ main(int argc, char **argv) else if (current_app_type->app_type == PCP_STOP_PGPOOL) { - pcpResInfo = pcp_terminate_pgpool(pcpConn, shutdown_mode); + pcpResInfo = pcp_terminate_pgpool(pcpConn, shutdown_mode, all); } else if (current_app_type->app_type == PCP_WATCHDOG_INFO) diff --git a/src/watchdog/watchdog.c b/src/watchdog/watchdog.c index 5176f158..00b59546 100644 --- a/src/watchdog/watchdog.c +++ b/src/watchdog/watchdog.c @@ -145,6 +145,12 @@ typedef enum IPC_CMD_PREOCESS_RES #define CLUSTER_NODE_APPEARING_LOST 'Y' #define CLUSTER_NODE_APPEARING_FOUND 'Z' +/*cluster commands sent in payload of cluster service message*/ +#define CLUSTER_DO_SHUTDOWN '1' +#define CLUSTER_DO_RESTART '2' +#define CLUSTER_DO_RE_ELECTION '3' + + #define WD_MASTER_NODE getMasterWatchdogNode() typedef struct packet_types @@ -169,6 +175,7 @@ packet_types all_packet_types[] = { {WD_STAND_FOR_COORDINATOR_MESSAGE, "STAND FOR COORDINATOR"}, {WD_REMOTE_FAILOVER_REQUEST, "REPLICATE FAILOVER REQUEST"}, {WD_IPC_ONLINE_RECOVERY_COMMAND, "ONLINE RECOVERY REQUEST"}, + {WD_EXECUTE_CLUSTER_COMMAND, "EXECUTE CLUSTER COMMAND"}, {WD_IPC_FAILOVER_COMMAND, "FAILOVER FUNCTION COMMAND"}, {WD_INFORM_I_AM_GOING_DOWN, "INFORM I AM GOING DOWN"}, {WD_ASK_FOR_POOL_CONFIG, "ASK FOR POOL CONFIG"}, @@ -541,6 +548,7 @@ static IPC_CMD_PREOCESS_RES process_IPC_failover_indication(WDCommandData * ipcC static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDCommandData * ipcCommand); static IPC_CMD_PREOCESS_RES process_IPC_failover_command(WDCommandData * ipcCommand); static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandData * ipcCommand); +static IPC_CMD_PREOCESS_RES process_IPC_execute_cluster_command(WDCommandData * ipcCommand); static bool write_ipc_command_with_result_data(WDCommandData * ipcCommand, char type, char *data, int len); @@ -2001,12 +2009,20 @@ static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData * ipcCommand) case WD_FAILOVER_INDICATION: return process_IPC_failover_indication(ipcCommand); + break; case WD_GET_MASTER_DATA_REQUEST: return process_IPC_data_request_from_master(ipcCommand); + break; case WD_GET_RUNTIME_VARIABLE_VALUE: return process_IPC_get_runtime_variable_value_request(ipcCommand); + break; + + case WD_EXECUTE_CLUSTER_COMMAND: + return process_IPC_execute_cluster_command(ipcCommand); + break; + default: ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext, "unknown IPC command type"); break; @@ -2014,6 +2030,47 @@ static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData * ipcCommand) return IPC_CMD_ERROR; } +static IPC_CMD_PREOCESS_RES process_IPC_execute_cluster_command(WDCommandData * ipcCommand) +{ + /* get the json for node list */ + char *clusterCommand = NULL; + unsigned char flags; + + if (ipcCommand->sourcePacket.len <= 0 || ipcCommand->sourcePacket.data == NULL) + return IPC_CMD_ERROR; + + if (!parse_wd_exec_cluster_command_json(ipcCommand->sourcePacket.data, ipcCommand->sourcePacket.len, + &clusterCommand, &flags)) + return IPC_CMD_ERROR; + + + if (strcasecmp(WD_COMMAND_RESTART_CLUSTER, clusterCommand) == 0) + { + send_cluster_service_message(NULL, NULL, CLUSTER_DO_RESTART); + } + else if (strcasecmp(WD_COMMAND_REELECT_MASTER, clusterCommand) == 0) + { + send_cluster_service_message(NULL, NULL, CLUSTER_DO_RE_ELECTION); + } + else if (strcasecmp(WD_COMMAND_SHUTDOWN_CLUSTER, clusterCommand) == 0) + { + /* + * Just broadcast the cluster service message asking each node + * to shutdown itself + * Shutting down the local node is the responsibility of caller + * process + */ + send_cluster_service_message(NULL, NULL, CLUSTER_DO_SHUTDOWN); + } + else + { + ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext, + "unknown cluster command requested"); + return IPC_CMD_ERROR; + } + + return IPC_CMD_OK; +} static IPC_CMD_PREOCESS_RES process_IPC_get_runtime_variable_value_request(WDCommandData * ipcCommand) { @@ -3781,6 +3838,14 @@ cluster_service_message_processor(WatchdogNode * wdNode, WDPacketData * pkt) switch (pkt->data[0]) { + case CLUSTER_DO_SHUTDOWN: + { + ereport(LOG, + (errmsg("shutdown command from remote node \"%s\"", wdNode->nodeName))); + pool_signal_parent(SIGINT); + } + break; + case CLUSTER_IAM_TRUE_MASTER: { /* @@ -7420,6 +7485,7 @@ check_and_report_IPC_authentication(WDCommandData * ipcCommand) case WD_IPC_FAILOVER_COMMAND: case WD_IPC_ONLINE_RECOVERY_COMMAND: + case WD_EXECUTE_CLUSTER_COMMAND: case WD_GET_MASTER_DATA_REQUEST: /* only allowed internaly. */ internal_client_only = true; diff --git a/src/watchdog/wd_internal_commands.c b/src/watchdog/wd_internal_commands.c index e87d467d..80f046e3 100644 --- a/src/watchdog/wd_internal_commands.c +++ b/src/watchdog/wd_internal_commands.c @@ -264,6 +264,55 @@ wd_end_recovery(void) return COMMAND_FAILED; } +WdCommandResult +wd_execute_cluster_command(const char* clusterCommand) +{ + char type; + unsigned int *shared_key = get_ipc_shared_key(); + + char *func = get_wd_exec_cluster_command_json(clusterCommand, 0, + shared_key ? *shared_key : 0, pool_config->wd_authkey); + + WDIPCCmdResult *result = issue_command_to_watchdog(WD_EXECUTE_CLUSTER_COMMAND, + WD_DEFAULT_IPC_COMMAND_TIMEOUT, + func, strlen(func), true); + + pfree(func); + + if (result == NULL) + { + ereport(WARNING, + (errmsg("execute cluster command failed"), + errdetail("issue command to watchdog returned NULL"))); + return COMMAND_FAILED; + } + + type = result->type; + FreeCmdResult(result); + + if (type == WD_IPC_CMD_CLUSTER_IN_TRAN) + { + ereport(WARNING, + (errmsg("execute cluster command failed"), + errdetail("watchdog cluster is not in stable state"), + errhint("try again when the cluster is fully initialized"))); + return CLUSTER_IN_TRANSATIONING; + } + else if (type == WD_IPC_CMD_TIMEOUT) + { + ereport(WARNING, + (errmsg("execute cluster command failed"), + errdetail("ipc command timeout"))); + return COMMAND_TIMEOUT; + } + else if (type == WD_IPC_CMD_RESULT_OK) + { + return COMMAND_OK; + } + return COMMAND_FAILED; +} + + static char * get_wd_failover_state_json(bool start) { diff --git a/src/watchdog/wd_json_data.c b/src/watchdog/wd_json_data.c index 61db6d71..5e3edd24 100644 --- a/src/watchdog/wd_json_data.c +++ b/src/watchdog/wd_json_data.c @@ -805,3 +805,67 @@ get_wd_simple_message_json(char *message) jw_destroy(jNode); return json_str; } + +char * +get_wd_exec_cluster_command_json(char *clusterCommand, unsigned char flags, unsigned int sharedKey, char *authKey) +{ + char *json_str; + JsonNode *jNode = jw_create_with_object(true); + + jw_put_int(jNode, WD_IPC_SHARED_KEY, sharedKey); /* put the shared key */ + + if (authKey != NULL && strlen(authKey) > 0) + jw_put_string(jNode, WD_IPC_AUTH_KEY, authKey); /* put the auth key */ + + jw_put_string(jNode, "Command", clusterCommand); + jw_put_int(jNode, "Flags", (int) flags); + jw_finish_document(jNode); + json_str = pstrdup(jw_get_json_string(jNode)); + jw_destroy(jNode); + return json_str; +} + +bool +parse_wd_exec_cluster_command_json(char *json_data, int data_len, + char **clusterCommand, unsigned char *flags) +{ + json_value *root; + int tmpflags = 0; + char *ptr = NULL; + + root = json_parse(json_data, data_len); + + /* The root node must be object */ + if (root == NULL || root->type != json_object) + { + json_value_free(root); + ereport(LOG, + (errmsg("watchdog is unable to parse node function json"), + errdetail("invalid json data \"%.*s\"", data_len, json_data))); + return false; + } + ptr = json_get_string_value_for_key(root, "Command"); + if (ptr == NULL) + { + json_value_free(root); + ereport(LOG, + (errmsg("watchdog is unable to parse exec cluster command json"), + errdetail("function node not found in json data \"%s\"", json_data))); + return false; + } + *clusterCommand = pstrdup(ptr); + + if (json_get_int_value_for_key(root, "Flags", &tmpflags)) + { + /* node count not found, But we don't care much about this */ + *flags = 0; + /* it may be from the old version */ + } + else + { + *flags = (unsigned char)tmpflags; + } + + json_value_free(root); + return true; +}