[pgpool-general: 4826] Re: show pool_nodes returns different result when run on master and on slave

Muhammad Usama m.usama at gmail.com
Wed Jul 27 05:24:50 JST 2016


Hi

Syncing of backend node status across watchdog nodes was not implemented in
the watchdog. Can you please try the attached patch to implement the same
if it solves the issue you are facing. After this patch the standby pgpool
will load the backend node status from the master/coordinator watchdog node

Kind regards
Muhammad Usama

On Mon, Jul 25, 2016 at 9:54 PM, Krzysztof Mościcki <stivi at kity.pl> wrote:

> Hi.
> Anybody can help me with this?
> I reported bug: http://www.pgpool.net/mantisbt/view.php?id=218
> I tested on Virtualbox environment (i writed about this in bug repotr),
> but always the same.
> Maybe it's my mistake, but i don't know where. I dont't understand why
> it works not proper.
> I tested with trusted_servers and delegate_IP, but the same.
>
> Best regards,
> Kris
> _______________________________________________
> pgpool-general mailing list
> pgpool-general at pgpool.net
> http://www.pgpool.net/mailman/listinfo/pgpool-general
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://www.sraoss.jp/pipermail/pgpool-general/attachments/20160727/06b266e7/attachment-0001.html>
-------------- next part --------------
diff --git a/src/include/watchdog/wd_ipc_commands.h b/src/include/watchdog/wd_ipc_commands.h
index 2ef5908..1c4175c 100644
--- a/src/include/watchdog/wd_ipc_commands.h
+++ b/src/include/watchdog/wd_ipc_commands.h
@@ -27,6 +27,7 @@
 #define WD_IPC_COMMANDS_H
 
 #include "watchdog/wd_ipc_defines.h"
+#include "watchdog/wd_json_data.h"
 
 typedef enum WdCommandResult
 {
@@ -64,6 +65,7 @@ extern WdCommandResult wd_degenerate_backend_set(int *node_id_set, int count);
 extern WdCommandResult wd_promote_backend(int node_id);
 extern WDFailoverCMDResults wd_send_failover_sync_command(WDFailoverCMDTypes cmdType, char* syncReqType);
 
+extern WDPGBackendStatus* get_pg_backend_status_from_master_wd_node(void);
 
 extern char* wd_get_watchdog_nodes(int nodeID);
 
diff --git a/src/include/watchdog/wd_ipc_defines.h b/src/include/watchdog/wd_ipc_defines.h
index 8ef3929..cd8adf5 100644
--- a/src/include/watchdog/wd_ipc_defines.h
+++ b/src/include/watchdog/wd_ipc_defines.h
@@ -63,7 +63,7 @@ typedef enum WDFailoverCMDResults
 
 #define WD_FUNCTION_COMMAND					'f'
 #define WD_FAILOVER_CMD_SYNC_REQUEST		's'
-
+#define WD_GET_MASTER_DATA_REQUEST			'd'
 
 #define WD_FUNCTION_START_RECOVERY		"START_RECOVERY"
 #define WD_FUNCTION_END_RECOVERY		"END_RECOVERY"
@@ -71,6 +71,8 @@ typedef enum WDFailoverCMDResults
 #define WD_FUNCTION_DEGENERATE_REQUEST	"DEGENERATE_BACKEND_REQUEST"
 #define WD_FUNCTION_PROMOTE_REQUEST		"PROMOTE_BACKEND_REQUEST"
 
+#define WD_DATE_REQ_PG_BACKEND_DATA		"BackendStatus"
+
 #define WD_IPC_AUTH_KEY			"IPCAuthKey"	/* JSON data key for authentication.
 												 * watchdog IPC server use the value for this key
 												 * to authenticate the external IPC clients
diff --git a/src/include/watchdog/wd_json_data.h b/src/include/watchdog/wd_json_data.h
index b3b09b8..e8f662d 100644
--- a/src/include/watchdog/wd_json_data.h
+++ b/src/include/watchdog/wd_json_data.h
@@ -41,6 +41,18 @@ typedef struct WDNodeInfo
 	int	id;
 }WDNodeInfo;
 
+/*
+ * The structure to hold the parsed PG backend node status data fetched
+ * from the master watchdog node
+ */
+typedef struct WDPGBackendStatus
+{
+	int primary_node_id;
+	int node_count;
+	BACKEND_STATUS backend_status[MAX_NUM_BACKENDS];
+	char nodeName[WD_MAX_HOST_NAMELEN];		/* name of the watchdog node that sent the data */
+}WDPGBackendStatus;
+
 extern WatchdogNode* get_watchdog_node_from_json(char* json_data, int data_len, char** authkey);
 extern char* get_watchdog_node_info_json(WatchdogNode* wdNode, char* authkey);
 extern POOL_CONFIG* get_pool_config_from_json(char* json_data, int data_len);
@@ -53,4 +65,11 @@ extern WDNodeInfo* get_WDNodeInfo_from_wd_node_json(json_value* source);
 extern char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned int sharedKey, char* authKey);
 extern bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count);
 extern char* get_wd_simple_error_message_json(char* message);
+
+extern WDPGBackendStatus* get_pg_backend_node_status_from_json(char* json_data, int data_len);
+extern char* get_backend_node_status_json(WatchdogNode* wdNode);
+
+extern bool parse_data_request_json(char* json_data, int data_len, char** request_type);
+extern char* get_data_request_json(char* request_type, unsigned int sharedKey, char* authKey);
+
 #endif
diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c
index 039d4bc..becf5bc 100644
--- a/src/main/pgpool_main.c
+++ b/src/main/pgpool_main.c
@@ -131,6 +131,7 @@ static int find_primary_node_repeatedly(void);
 static void terminate_all_childrens();
 static void system_will_go_down(int code, Datum arg);
 static char* process_name_from_pid(pid_t pid);
+static void initialize_backend_status_from_watchdog(void);
 
 static struct sockaddr_un un_addr;		/* unix domain socket path */
 static struct sockaddr_un pcp_un_addr;  /* unix domain socket path for PCP */
@@ -228,7 +229,6 @@ int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
 				(errmsg("failed to allocate memory in startup process")));
 
 	initialize_shared_mem_objects(clear_memcache_oidmaps);
-
 	if (pool_config->use_watchdog)
 	{
 		sigset_t mask;
@@ -275,6 +275,9 @@ int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
 		 * initialize the lifecheck process
 		 */
 		wd_lifecheck_pid = initialize_watchdog_lifecheck();
+
+		/* load the backend node status from watchdog cluster */
+		initialize_backend_status_from_watchdog();
 	}
 
 	fds[0] = create_unix_domain_socket(un_addr);
@@ -371,8 +374,14 @@ int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
 	ereport(LOG,
 			(errmsg("%s successfully started. version %s (%s)", PACKAGE, VERSION, PGPOOLVERSION)));
 
-	/* Save primary node id */
-	Req_info->primary_node_id = find_primary_node();
+	/*
+	 * if the primary node id is not loaded by watchdog, search for it
+	 */
+	if (Req_info->primary_node_id < 0)
+	{
+		/* Save primary node id */
+		Req_info->primary_node_id = find_primary_node();
+	}
 
 	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
 	{
@@ -2929,7 +2938,6 @@ static void initialize_shared_mem_objects(bool clear_memcache_oidmaps)
 	ereport(DEBUG1,
 			(errmsg("Request info are: sizeof(POOL_REQUEST_INFO) %zu bytes requested for shared memory",
 					sizeof(POOL_REQUEST_INFO))));
-
 	/*
 	 * Initialize backend status area.
 	 * From now on, VALID_BACKEND macro can be used.
@@ -2946,6 +2954,7 @@ static void initialize_shared_mem_objects(bool clear_memcache_oidmaps)
 	Req_info->conn_counter = 0;
 	Req_info->switching = false;
 	Req_info->request_queue_head = Req_info->request_queue_tail = -1;
+	Req_info->primary_node_id = -2;
 	InRecovery = pool_shared_memory_create(sizeof(int));
 	*InRecovery = RECOVERY_INIT;
 
@@ -3338,3 +3347,73 @@ int pool_frontend_exists(void)
 		return pg_frontend_exists();
 	return -1;
 }
+
+static void initialize_backend_status_from_watchdog(void)
+{
+	if (pool_config->use_watchdog)
+	{
+		WDPGBackendStatus* backendStatus = get_pg_backend_status_from_master_wd_node();
+		if (backendStatus)
+		{
+			if (backendStatus->node_count <= 0)
+			{
+				/*
+				 * -ve node count is returned by watchdog when the node itself is a master
+				 * and in that case we need to use the loacl backend node status
+				 */
+				ereport(LOG,
+						(errmsg("I am the master watchdog node"),
+						 errdetail("using the local backend node status")));
+			}
+			else
+			{
+				int i;
+				bool reload_maste_node_id = false;
+				ereport(LOG,
+						(errmsg("master watchdog node \"%s\" returned status for %d backend nodes",backendStatus->nodeName,backendStatus->node_count)));
+				ereport(LOG,
+						(errmsg("primary node on master watchdog node \"%s\" is %d",backendStatus->nodeName,backendStatus->primary_node_id)));
+
+				Req_info->primary_node_id = backendStatus->primary_node_id;
+
+				for (i = 0; i < backendStatus->node_count; i++)
+				{
+					if (backendStatus->backend_status[i] == CON_DOWN)
+					{
+						if (BACKEND_INFO(i).backend_status != CON_DOWN)
+						{
+
+							BACKEND_INFO(i).backend_status = CON_DOWN;
+							my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
+							reload_maste_node_id = true;
+							ereport(LOG,
+									(errmsg("backend status from \"%s\" backend:%d is set to down status",backendStatus->nodeName, i)));
+						}
+					}
+					else if (backendStatus->backend_status[i] == CON_CONNECT_WAIT ||
+								backendStatus->backend_status[i] == CON_UP)
+					{
+						if (BACKEND_INFO(i).backend_status != CON_CONNECT_WAIT)
+						{
+							BACKEND_INFO(i).backend_status = CON_CONNECT_WAIT;
+							my_backend_status[i] = &(BACKEND_INFO(i).backend_status);
+							reload_maste_node_id = true;
+						}
+					}
+				}
+
+				if (reload_maste_node_id)
+				{
+					Req_info->master_node_id = get_next_master_node();
+				}
+			}
+			pfree(backendStatus);
+		}
+		else
+		{
+			ereport(WARNING,
+				(errmsg("failed to get the backend status from the master watchdog node"),
+					 errdetail("using the local backend node status")));
+		}
+	}
+}
diff --git a/src/watchdog/watchdog.c b/src/watchdog/watchdog.c
index fab8630..14f61d7 100644
--- a/src/watchdog/watchdog.c
+++ b/src/watchdog/watchdog.c
@@ -127,6 +127,8 @@ packet_types all_packet_types[] = {
 	{WD_INFORM_I_AM_GOING_DOWN, "INFORM I AM GOING DOWN"},
 	{WD_ASK_FOR_POOL_CONFIG, "ASK FOR POOL CONFIG"},
 	{WD_POOL_CONFIG_DATA, "CONFIG DATA"},
+	{WD_GET_MASTER_DATA_REQUEST, "DATA REQUEST"},
+
 	{WD_NO_MESSAGE,""}
 };
 
@@ -391,6 +393,7 @@ static IPC_CMD_PREOCESS_RES process_IPC_nodeStatusChange_command(WDIPCCommandDat
 static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDIPCCommandData* IPCCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_replicate_variable(WDIPCCommandData* IPCCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_failover_cmd_synchronise(WDIPCCommandData *IPCCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDIPCCommandData *IPCCommand);
 static IPC_CMD_PREOCESS_RES execute_replicate_command(WDIPCCommandData* ipcCommand);
 static bool write_ipc_command_with_result_data(WDIPCCommandData* IPCCommand, char type, char* data, int len);
 
@@ -424,6 +427,7 @@ static bool check_and_report_IPC_authentication(WDIPCCommandData* ipcCommand);
 static void print_received_packet_info(WDPacketData* pkt,WatchdogNode* wdNode);
 static void update_interface_status(void);
 static bool any_interface_available(void);
+static WDPacketData* process_data_request(WatchdogNode* wdNode, WDPacketData* pkt);
 
 /* global variables */
 wd_cluster g_cluster;
@@ -688,7 +692,7 @@ wd_create_recv_socket(int port)
 		}
 		close(sock);
 		ereport(ERROR,
-				(errmsg("failed to create watchdog receive socket"),
+			(errmsg("failed to create watchdog receive socket"),
 				 errdetail("bind on \"%s:%s\" failed with reason: \"%s\"", host, serv, strerror(errno))));
 	}
 	
@@ -1567,6 +1571,7 @@ static IPC_CMD_PREOCESS_RES process_IPC_command(WDIPCCommandData* ipcCommand)
 
 	switch(ipcCommand->type)
 	{
+
 		case WD_NODE_STATUS_CHANGE_COMMAND:
 			return process_IPC_nodeStatusChange_command(ipcCommand);
 			break;
@@ -1589,6 +1594,9 @@ static IPC_CMD_PREOCESS_RES process_IPC_command(WDIPCCommandData* ipcCommand)
 		case WD_FAILOVER_CMD_SYNC_REQUEST:
 			return process_IPC_failover_cmd_synchronise(ipcCommand);
 
+		case WD_GET_MASTER_DATA_REQUEST:
+			return process_IPC_data_request_from_master(ipcCommand);
+
 		default:
 		{
 			char* error_json;
@@ -1775,6 +1783,69 @@ static IPC_CMD_PREOCESS_RES process_IPC_replicate_variable(WDIPCCommandData* IPC
 	return IPC_CMD_ERROR;
 }
 
+static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDIPCCommandData *IPCCommand)
+{
+	char res_type = WD_IPC_CMD_RESULT_BAD;
+	/*
+	 * if cluster or myself is not in stable state
+	 * just return cluster in transaction
+	 */
+	ereport(LOG,
+			(errmsg("processing master node data request from IPC socket")));
+
+	IPCCommand->type = WD_GET_MASTER_DATA_REQUEST;
+	if (get_local_node_state() == WD_STANDBY)
+	{
+		/* I am a standby node, Just forward the request to coordinator */
+
+		WDPacketData wdPacket;
+		init_wd_packet(&wdPacket);
+		set_message_type(&wdPacket, WD_GET_MASTER_DATA_REQUEST);
+		set_next_commandID_in_message(&wdPacket);
+		set_message_data(&wdPacket, IPCCommand->data_buf , IPCCommand->data_len);
+		/* save the command ID */
+		IPCCommand->internal_command_id = wdPacket.command_id;
+		if (send_message(g_cluster.masterNode, &wdPacket) <= 0)
+		{
+			ereport(LOG,
+				(errmsg("failed to process master node data request from IPC socket"),
+					 errdetail("failed to forward the request to master watchdog node \"%s\"",g_cluster.masterNode->nodeName)));
+			/* we have failed to send to any node, return lock failed  */
+			res_type = WD_IPC_CMD_RESULT_BAD;
+		}
+		else
+		{
+			/*
+			 * we need to wait for the result
+			 */
+			ereport(LOG,
+				(errmsg("data request from IPC socket is forwarded to master watchdog node \"%s\"",g_cluster.masterNode->nodeName),
+					 errdetail("waiting for the reply from master node...")));
+
+			return IPC_CMD_PROCESSING;
+		}
+	}
+	else if (get_local_node_state() == WD_COORDINATOR)
+	{
+		/* This node is itself a master node, So send the empty result with OK tag */
+		res_type = WD_IPC_CMD_RESULT_OK;
+	}
+	else /* we are not in any stable state at the moment */
+	{
+		res_type = WD_IPC_CMD_CLUSTER_IN_TRAN;
+	}
+
+	if (write_ipc_command_with_result_data(IPCCommand, res_type, NULL, 0))
+	{
+		/*
+		 * This is the complete lifecycle of command.
+		 * we are done with it
+		 */
+		return IPC_CMD_COMPLETE;
+	}
+	return IPC_CMD_ERROR;
+
+}
 
 static IPC_CMD_PREOCESS_RES process_IPC_failover_cmd_synchronise(WDIPCCommandData *IPCCommand)
 {
@@ -2792,24 +2863,79 @@ static void cleanUpIPCCommand(WDIPCCommandData* ipcCommand)
 	MemoryContextDelete(ipcCommand->memoryContext);
 }
 
+static WDPacketData* process_data_request(WatchdogNode* wdNode, WDPacketData* pkt)
+{
+	char* request_type;
+	char* data = NULL;
+	WDPacketData* replyPkt = NULL;
+
+	if (pkt->data == NULL || pkt->len <= 0)
+	{
+		ereport(WARNING,
+			(errmsg("invalid data request packet from watchdog node \"%s\"",wdNode->nodeName),
+				 errdetail("no data found in the packet")));
+
+		replyPkt = get_minimum_message(WD_ERROR_MESSAGE,pkt);
+		return replyPkt;
+	}
+
+	if (!parse_data_request_json(pkt->data,pkt->len, &request_type))
+	{
+		ereport(WARNING,
+			(errmsg("invalid data request packet from watchdog node \"%s\"",wdNode->nodeName),
+				 errdetail("no data found in the packet")));
+
+		replyPkt = get_minimum_message(WD_ERROR_MESSAGE,pkt);
+		return replyPkt;
+	}
+
+	if (strcasecmp(request_type, WD_DATE_REQ_PG_BACKEND_DATA) == 0)
+	{
+		data = get_backend_node_status_json(g_cluster.localNode);
+	}
+
+	if (data)
+	{
+		replyPkt = get_empty_packet();
+		set_message_type(replyPkt, WD_DATA_MESSAGE);
+		set_message_commandID(replyPkt, pkt->command_id);
+		set_message_data(replyPkt, data , strlen(data));
+	}
+	else
+	{
+		replyPkt = get_minimum_message(WD_ERROR_MESSAGE,pkt);
+	}
+
+	return replyPkt;
+}
+
 static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
 {
 	WDPacketData* replyPkt = NULL;
 	switch (pkt->type)
 	{
+		case WD_GET_MASTER_DATA_REQUEST:
+		{
+			replyPkt = process_data_request(wdNode, pkt);
+		}
+			break;
+
 		case WD_ASK_FOR_POOL_CONFIG:
 		{
 			char* config_data = get_pool_config_json();
 			
-			if (config_data == NULL)
-				reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
-			else
+			if (config_data)
 			{
 				replyPkt = get_empty_packet();
 				set_message_type(replyPkt, WD_POOL_CONFIG_DATA);
 				set_message_commandID(replyPkt, pkt->command_id);
 				set_message_data(replyPkt, config_data , strlen(config_data));
 			}
+			else
+			{
+				replyPkt = get_minimum_message(WD_ERROR_MESSAGE,pkt);
+
+			}
 		}
 			break;
 			
@@ -2900,9 +3026,13 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
 			 * otherwise reject
 			 */
 			if (g_cluster.localNode == g_cluster.masterNode)
-				reply_with_minimal_message(wdNode, WD_ACCEPT_MESSAGE, pkt);
+			{
+				replyPkt = get_minimum_message(WD_ACCEPT_MESSAGE,pkt);
+			}
 			else
-				reply_with_minimal_message(wdNode, WD_REJECT_MESSAGE, pkt);
+			{
+				replyPkt = get_minimum_message(WD_REJECT_MESSAGE,pkt);
+			}
 		}
 			break;
 			
@@ -2916,10 +3046,12 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
 			{
 				ereport(NOTICE,
 						(errmsg("cluster is in split brain")));
-				reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
+				replyPkt = get_minimum_message(WD_ERROR_MESSAGE,pkt);
 			}
 			else
+			{
 				replyPkt = get_mynode_info_message(pkt);
+			}
 		}
 			break;
 
@@ -2931,8 +3063,7 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
 		if (send_message_to_node(wdNode,replyPkt) == false)
 			ereport(LOG,
 				(errmsg("sending packet to node \"%s\" failed", wdNode->nodeName)));
-
-		pfree(replyPkt);
+		free_packet(replyPkt);
 	}
 	return 1;
 }
@@ -3583,14 +3714,19 @@ static int watchdog_state_machine(WD_EVENTS event, WatchdogNode* wdNode, WDPacke
 		gettimeofday(&wdNode->last_rcv_time, NULL);
 
 		if (pkt->type == WD_INFO_MESSAGE)
+		{
 			standard_packet_processor(wdNode, pkt);
+		}
+
 		if (pkt->type == WD_INFORM_I_AM_GOING_DOWN)		/* TODO do it better way */
 		{
 			wdNode->state = WD_SHUTDOWN;
 			return watchdog_state_machine(WD_EVENT_REMOTE_NODE_LOST, wdNode, NULL);
 		}
 		if (watchdog_internal_command_packet_processor(wdNode,pkt) == true)
+		{
 			return 0;
+		}
 	}
 	else if (event == WD_EVENT_NEW_OUTBOUND_CONNECTION)
 	{
@@ -5422,6 +5558,7 @@ static bool check_and_report_IPC_authentication(WDIPCCommandData* ipcCommand)
 
 		case WD_FUNCTION_COMMAND:
 		case WD_FAILOVER_CMD_SYNC_REQUEST:
+		case WD_GET_MASTER_DATA_REQUEST:
 			/* only allowed internaly.*/
 			internal_client_only = true;
 			break;
diff --git a/src/watchdog/wd_commands.c b/src/watchdog/wd_commands.c
index f017aff..c1ec5a6 100644
--- a/src/watchdog/wd_commands.c
+++ b/src/watchdog/wd_commands.c
@@ -59,6 +59,7 @@
 #define PROMOTE_REQUEST_NODE_MASK		0x04
 
 static void sleep_in_waiting(void);
+static void FreeCmdResult(WDIPCCmdResult* res);
 static WDFailoverCMDResults wd_issue_failover_lock_command(WDFailoverCMDTypes cmdType, char* syncReqType);
 
 
@@ -263,6 +264,70 @@ issue_command_to_watchdog(char type, int timeout_sec, char* data, int data_len,
 	return result;
 }
 
+/*
+ * function gets the PG backend status of all attached nodes from
+ * the master watchdog node.
+ */
+WDPGBackendStatus* get_pg_backend_status_from_master_wd_node(void)
+{
+	unsigned int *shared_key = get_ipc_shared_key();
+	char *data = get_data_request_json(WD_DATE_REQ_PG_BACKEND_DATA,
+									   shared_key?*shared_key:0,pool_config->wd_authkey);
+
+	WDIPCCmdResult *result = issue_command_to_watchdog(WD_GET_MASTER_DATA_REQUEST,
+													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
+													   data, strlen(data), true);
+	pfree(data);
+
+	if (result == NULL)
+	{
+		ereport(WARNING,
+			(errmsg("get backend node status from master watchdog failed"),
+				 errdetail("issue command to watchdog returned NULL")));
+		return NULL;
+	}
+	if (result->type == WD_IPC_CMD_CLUSTER_IN_TRAN)
+	{
+		ereport(WARNING,
+			(errmsg("get backend node status from master watchdog failed"),
+				 errdetail("watchdog cluster is not in stable state"),
+					errhint("try again when the cluster is fully initialized")));
+		FreeCmdResult(result);
+		return NULL;
+	}
+	else if (result->type == WD_IPC_CMD_TIMEOUT)
+	{
+		ereport(WARNING,
+				(errmsg("get backend node status from master watchdog failed"),
+				 errdetail("ipc command timeout")));
+		FreeCmdResult(result);
+		return NULL;
+	}
+	else if (result->type == WD_IPC_CMD_RESULT_OK)
+	{
+		WDPGBackendStatus* backendStatus =  get_pg_backend_node_status_from_json(result->data, result->length);
+		/*
+		 * Watchdog returns the zero length data when the node itself is a master
+		 * watchdog node
+		 */
+		if (result->length <= 0)
+		{
+			backendStatus = palloc0(sizeof(WDPGBackendStatus));
+			backendStatus->node_count = -1;
+		}
+		else
+		{
+			backendStatus =  get_pg_backend_node_status_from_json(result->data, result->length);
+		}
+		FreeCmdResult(result);
+		return backendStatus;
+	}
+
+	ereport(WARNING,
+		(errmsg("get backend node status from master watchdog failed")));
+	FreeCmdResult(result);
+	return NULL;
+}
 
 WdCommandResult
 wd_start_recovery(void)
@@ -287,7 +352,7 @@ wd_start_recovery(void)
 	}
 
 	type = result->type;
-	pfree(result);
+	FreeCmdResult(result);
 	if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
 	{
 		ereport(WARNING,
@@ -334,7 +399,8 @@ wd_end_recovery(void)
 	}
 	
 	type = result->type;
-	pfree(result);
+	FreeCmdResult(result);
+
 	if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
 	{
 		ereport(WARNING,
@@ -387,7 +453,7 @@ wd_send_failback_request(int node_id)
 	}
 	
 	type = result->type;
-	pfree(result);
+	FreeCmdResult(result);
 	if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
 	{
 		ereport(WARNING,
@@ -457,6 +523,7 @@ wd_send_failover_sync_command(WDFailoverCMDTypes cmdType, char* syncReqType)
 		ereport(WARNING,
 			(errmsg("watchdog failed to send failover command"),
 				 errdetail("ipc command timeout")));
+		FreeCmdResult(result);
 		return FAILOVER_RES_ERROR;
 	}
 	if (result->length <= 0)
@@ -464,6 +531,7 @@ wd_send_failover_sync_command(WDFailoverCMDTypes cmdType, char* syncReqType)
 		ereport(WARNING,
 			(errmsg("watchdog failed to send failover command"),
 				 errdetail("issue command to watchdog returned no data")));
+		FreeCmdResult(result);
 		return FAILOVER_RES_ERROR;
 	}
 
@@ -473,21 +541,24 @@ wd_send_failover_sync_command(WDFailoverCMDTypes cmdType, char* syncReqType)
 	{
 		ereport(NOTICE,
 				(errmsg("unable to parse json data from replicate command")));
+		FreeCmdResult(result);
 		return FAILOVER_RES_ERROR;
 	}
 	
 	if (json_get_int_value_for_key(root, "FailoverCMDType", &failoverResCmdType))
 	{
 		json_value_free(root);
+		FreeCmdResult(result);
 		return FAILOVER_RES_ERROR;
 	}
 	if (root && json_get_int_value_for_key(root, "InterlockingResult", &interlockingResult))
 	{
 		json_value_free(root);
+		FreeCmdResult(result);
 		return FAILOVER_RES_ERROR;
 	}
 	json_value_free(root);
-	pfree(result);
+	FreeCmdResult(result);
 	
 	if (failoverResCmdType != cmdType)
 		return FAILOVER_RES_ERROR;
@@ -526,7 +597,7 @@ wd_degenerate_backend_set(int *node_id_set, int count)
 	}
 	
 	type = result->type;
-	pfree(result);
+	FreeCmdResult(result);
 	if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
 	{
 		ereport(WARNING,
@@ -579,7 +650,7 @@ wd_promote_backend(int node_id)
 	}
 	
 	type = result->type;
-	pfree(result);
+	FreeCmdResult(result);
 	if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
 	{
 		ereport(WARNING,
@@ -644,7 +715,7 @@ char* wd_get_watchdog_nodes(int nodeID)
 			(errmsg("get watchdog nodes command failed"),
 				 errdetail("watchdog cluster is not in stable state"),
 					errhint("try again when the cluster is fully initialized")));
-		pfree(result);
+		FreeCmdResult(result);
 		return NULL;
 	}
 	else if (result->type == WD_IPC_CMD_TIMEOUT)
@@ -652,16 +723,17 @@ char* wd_get_watchdog_nodes(int nodeID)
 		ereport(WARNING,
 			(errmsg("get watchdog nodes command failed"),
 				 errdetail("ipc command timeout")));
-		pfree(result);
+		FreeCmdResult(result);
 		return NULL;
 	}
 	else if (result->type == WD_IPC_CMD_RESULT_OK)
 	{
 		char* data = result->data;
+		/* do not free the result->data, Save the data copy */
 		pfree(result);
 		return data;
 	}
-	pfree(result);
+	FreeCmdResult(result);
 	return NULL;
 }
 
@@ -761,7 +833,7 @@ static WDFailoverCMDResults wd_issue_failover_lock_command(WDFailoverCMDTypes cm
 	int x;
 	for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
 	{
-		res = wd_send_failover_sync_command(NODE_FAILBACK_CMD, syncReqType);
+		res = wd_send_failover_sync_command(cmdType, syncReqType);
 		if (res != FAILOVER_RES_TRANSITION)
 			break;
 		sleep(1);
@@ -847,3 +919,9 @@ wd_chk_node_mask_for_promote_req(int *node_id_set, int count)
 	return wd_chk_node_mask (PROMOTE_REQUEST_NODE_MASK, node_id_set, count);
 }
 
+static void FreeCmdResult(WDIPCCmdResult* res)
+{
+	if (res->data)
+		pfree(res->data);
+	pfree (res);
+}
diff --git a/src/watchdog/wd_json_data.c b/src/watchdog/wd_json_data.c
index a0710f8..36b563d 100644
--- a/src/watchdog/wd_json_data.c
+++ b/src/watchdog/wd_json_data.c
@@ -30,6 +30,9 @@
 #include "watchdog/watchdog.h"
 #include "watchdog/wd_json_data.h"
 #include "watchdog/wd_ipc_defines.h"
+#include "pool.h"
+
+#define WD_JSON_KEY_DATA_REQ_TYPE	"DataRequestType"
 
 
 POOL_CONFIG* get_pool_config_from_json(char* json_data, int data_len)
@@ -224,6 +227,133 @@ char* get_pool_config_json(void)
 	return json_str;
 }
 
+char* get_data_request_json(char* request_type, unsigned int sharedKey, char* authKey)
+{
+	char* json_str;
+
+	JsonNode* jNode = jw_create_with_object(true);
+
+	jw_put_int(jNode, WD_IPC_SHARED_KEY, sharedKey); /* put the shared key*/
+
+	if (authKey != NULL && strlen(authKey) > 0)
+		jw_put_string(jNode, WD_IPC_AUTH_KEY, authKey); /*  put the auth key*/
+
+	jw_put_string(jNode, WD_JSON_KEY_DATA_REQ_TYPE, request_type);
+	jw_finish_document(jNode);
+	json_str = pstrdup(jw_get_json_string(jNode));
+	jw_destroy(jNode);
+	return json_str;
+}
+
+bool parse_data_request_json(char* json_data, int data_len, char** request_type)
+{
+	json_value *root, *value;
+	char* ptr;
+
+	*request_type = NULL;
+
+	root = json_parse(json_data,data_len);
+
+	/* The root node must be object */
+	if (root == NULL || root->type != json_object)
+	{
+		json_value_free(root);
+		ereport(LOG,
+			(errmsg("watchdog is unable to parse data request json"),
+				 errdetail("invalid json data \"%s\"",json_data)));
+		return false;
+	}
+	ptr = json_get_string_value_for_key(root, WD_JSON_KEY_DATA_REQ_TYPE);
+	if (ptr == NULL)
+	{
+		json_value_free(root);
+		ereport(LOG,
+			(errmsg("watchdog is unable to parse data request json"),
+				 errdetail("request name node not found in json data \"%s\"",json_data)));
+		return false;
+	}
+	*request_type = pstrdup(ptr);
+	json_value_free(root);
+	return true;
+}
+
+
+/* The function reads the backend node status from shared memory
+ * and creates a json packet from it
+ */
+char* get_backend_node_status_json(WatchdogNode* wdNode)
+{
+	int i;
+	char* json_str;
+	JsonNode* jNode = jw_create_with_object(true);
+
+	jw_start_array(jNode, "BackendNodeStatusList");
+
+	for (i=0;i< pool_config->backend_desc->num_backends;i++)
+	{
+		BACKEND_STATUS backend_status = pool_config->backend_desc->backend_info[i].backend_status;
+		jw_put_int_value(jNode, backend_status);
+	}
+	/* put the primary node id */
+	jw_end_element(jNode);
+	jw_put_int(jNode, "PrimaryNodeId", Req_info->primary_node_id);
+	jw_put_string(jNode, "NodeName",wdNode->nodeName);
+
+	jw_finish_document(jNode);
+	json_str = pstrdup(jw_get_json_string(jNode));
+	jw_destroy(jNode);
+	return json_str;
+}
+
+WDPGBackendStatus* get_pg_backend_node_status_from_json(char* json_data, int data_len)
+{
+	json_value *root = NULL;
+	json_value *value = NULL;
+	char *ptr;
+	int i;
+	WDPGBackendStatus* backendStatus = NULL;
+
+	root = json_parse(json_data,data_len);
+	/* The root node must be object */
+	if (root == NULL || root->type != json_object)
+		return NULL;
+
+	/* backend status array */
+	value = json_get_value_for_key(root,"BackendNodeStatusList");
+	if (value == NULL || value->type != json_array)
+		return NULL;
+
+	if (value->u.array.length <=0 || value->u.array.length > MAX_NUM_BACKENDS )
+		return NULL;
+
+	backendStatus = palloc(sizeof(WDPGBackendStatus));
+	backendStatus->node_count = value->u.array.length;
+
+	for (i = 0; i < backendStatus->node_count; i++)
+	{
+		backendStatus->backend_status[i] = value->u.array.values[i]->u.integer;
+	}
+
+	if (json_get_int_value_for_key(root, "PrimaryNodeId", &backendStatus->primary_node_id))
+	{
+		ereport(ERROR,
+			(errmsg("invalid json data"),
+				 errdetail("unable to find Watchdog Node ID")));
+	}
+
+	ptr = json_get_string_value_for_key(root, "NodeName");
+	if (ptr)
+	{
+		strncpy(backendStatus->nodeName, ptr, sizeof(backendStatus->nodeName) -1);
+	}
+	else
+	{
+		backendStatus->nodeName[0] = 0;
+	}
+
+	return backendStatus;
+}
+
 
 char* get_watchdog_node_info_json(WatchdogNode* wdNode, char* authkey)
 {


More information about the pgpool-general mailing list