[pgpool-hackers: 2489] New Feature with patch: Quorum and Consensus for backend failover

Muhammad Usama m.usama at gmail.com
Tue Aug 22 04:18:27 JST 2017


Hi

I was working on the new feature to make the backend node failover quorum
aware and on the half way through the implementation I also added the
majority consensus feature for the same.

So please find the first version of the patch for review that makes the
backend node failover consider the watchdog cluster quorum status and seek
the majority consensus before performing failover.

*Changes in the Failover mechanism with watchdog.*
For this new feature I have modified the Pgpool-II's existing failover
mechanism with watchdog.
Previously as you know when the Pgpool-II require to perform a node
operation (failover, failback, promote-node) with the watchdog. The
watchdog used to propagated the failover request to all the Pgpool-II nodes
in the watchdog cluster and as soon as the request was received by the
node, it used to initiate the local failover and that failover was
synchronised on all nodes using the distributed locks.

*Now Only the Master node performs the failover.*
The attached patch changes the mechanism of synchronised failover, and now
only the Pgpool-II of master watchdog node performs the failover, and all
other standby nodes sync the backend statuses after the master Pgpool-II is
finished with the failover.

*Overview of new failover mechanism.*
-- If the failover request is received to the standby watchdog node(from
local Pgpool-II), That request is forwarded to the master watchdog and the
Pgpool-II main process is returned with the FAILOVER_RES_WILL_BE_DONE
return code. And upon receiving the FAILOVER_RES_WILL_BE_DONE from the
watchdog for the failover request the requesting Pgpool-II moves forward
without doing anything further for the particular failover command.

-- Now when the failover request from standby node is received by the
master watchdog, after performing the validation, applying the consensus
rules the failover request is triggered on the local Pgpool-II .

-- When the failover request is received to the master watchdog node from
the local Pgpool-II (On the IPC channel) the watchdog process inform the
Pgpool-II requesting process to proceed with failover (provided all
failover rules are satisfied).

-- After the failover is finished on the master Pgpool-II, the failover
function calls the *wd_failover_end*() which sends the backend sync
required message to all standby watchdogs.

-- Upon receiving the sync required message from master watchdog node all
Pgpool-II sync the new statuses of each backend node from the master
watchdog.

*No More Failover locks*
Since with this new failover mechanism we do not require any
synchronisation and guards against the execution of failover_commands by
multiple Pgpool-II nodes, So the patch removes all the distributed locks
from failover function, This makes the failover simpler and faster.

*New kind of Failover operation NODE_QUARANTINE_REQUEST*
The patch adds the new kind of backend node operation NODE_QUARANTINE which
is effectively same as the NODE_DOWN, but with node_quarantine the
failover_command is not triggered.
The NODE_DOWN_REQUEST is automatically converted to the
NODE_QUARANTINE_REQUEST when the failover is requested on the backend node
but watchdog cluster does not holds the quorum.
This means in the absence of quorum the failed backend nodes are
quarantined and when the quorum becomes available again the Pgpool-II
performs the failback operation on all quarantine nodes.
And again when the failback is performed on the quarantine backend node the
failover function does not trigger the failback_command.

*Controlling the Failover behaviour.*
The patch adds three new configuration parameters to configure the failover
behaviour from user side.

*failover_when_quorum_exists*
When enabled the failover command will only be executed when the watchdog
cluster holds the quorum. And when the quorum is absent and
failover_when_quorum_exists is enabled the failed backend nodes will get
quarantine until the quorum becomes available again.
disabling it will enable the old behaviour of failover commands.


*failover_require_consensus*This new configuration parameter can be used to
make sure we get the majority vote before performing the failover on the
node. When *failover_require_consensus* is enabled then the failover is
only performed after receiving the failover request from the majority or
Pgpool-II nodes.
For example in three nodes cluster the failover will not be performed until
at least two nodes ask for performing the failover on the particular
backend node.

It is also worthwhile to mention here that *failover_require_consensus*
only works when failover_when_quorum_exists is enables.


*enable_multiple_failover_requests_from_node*
This parameter works in connection with *failover_require_consensus*
config. When enabled a single Pgpool-II node can vote for failover multiple
times.
For example in the three nodes cluster if one Pgpool-II node sends the
failover request of particular node twice that would be counted as two
votes in favour of failover and the failover will be performed even if we
do not get a vote from other two nodes.

And when *enable_multiple_failover_requests_from_node* is disabled, Only
the first vote from each Pgpool-II will be accepted and all other
subsequent votes will be marked duplicate and rejected.
So in that case we will require a majority votes from distinct nodes to
execute the failover.
Again this *enable_multiple_failover_requests_from_node* only becomes
effective when both *failover_when_quorum_exists* and
*failover_require_consensus* are enabled.


*Controlling the failover: The Coding perspective.*
Although the failover functions are made quorum and consensus aware but
there is still a way to bypass the quorum conditions, and requirement of
consensus.

For this the patch uses the existing request_details flags in
POOL_REQUEST_NODE to control the behaviour of failover.

Here are the newly added flags values.

*REQ_DETAIL_WATCHDOG*:
Setting this flag while issuing the failover command will not send the
failover request to the watchdog. But this flag may not be useful in any
other place than where it is already used.
Mostly this flag can be used to avoid the failover command from going to
watchdog that is already originated from watchdog. Otherwise we can end up
in infinite loop.

*REQ_DETAIL_CONFIRMED*:
Setting this flag will bypass the *failover_require_consensus*
configuration and immediately perform the failover if quorum is present.
This flag can be used to issue the failover request originated from PCP
command.

*REQ_DETAIL_UPDATE*:
This flag is used for the command where we are failing back the quarantine
nodes. Setting this flag will not trigger the failback_command.

*Some conditional flags used:*
I was not sure about the configuration of each type of failover operation.
As we have three main failover operations NODE_UP_REQUEST,
NODE_DOWN_REQUEST, and PROMOTE_NODE_REQUEST
So I was thinking do we need to give the configuration option to the users,
if they want to enable/disable quorum checking and consensus for individual
failover operation type.
For example: is it a practical configuration where a user would want to
ensure quorum while preforming NODE_DOWN operation while does not want it
for NODE_UP.
So in this patch I use three compile time defines to enable disable the
individual failover operation, while we can decide on the best solution.

NODE_UP_REQUIRE_CONSENSUS: defining it will enable quorum checking feature
for NODE_UP_REQUESTs

NODE_DOWN_REQUIRE_CONSENSUS: defining it will enable quorum checking
feature for NODE_DOWN_REQUESTs

NODE_PROMOTE_REQUIRE_CONSENSUS: defining it will enable quorum checking
feature for PROMOTE_NODE_REQUESTs

*Some Point for Discussion:*

*Do we really need to check ReqInfo->switching flag before enqueuing
failover request.*
While working on the patch I was wondering why do we disallow enqueuing the
failover command when the failover is already in progress? For example in
*pcp_process_command*() function if we see the *Req_info->switching* flag
set we bailout with the error instead of enqueuing the command. Is is
really necessary?

*Do we need more granule control over each failover operation:*
As described in section "Some conditional flags used" I want the opinion on
do we need configuration parameters in pgpool.conf to enable disable quorum
and consensus checking on individual failover types.

*Which failover should be mark as Confirmed:*
As defined in the above section of REQ_DETAIL_CONFIRMED, We can mark the
failover request to not need consensus, currently the requests from the PCP
commands are fired with this flag. But I was wondering there may be more
places where we many need to use the flag.
For example I currently use the same confirmed flag when failover is
triggered because of *replication_stop_on_mismatch*.

I think we should think this flag for each place of failover, like when the
failover is triggered
because of health_check failure.
because of replication mismatch
because of backend_error
e.t.c

*Node Quarantine behaviour.*
What do you think about the node quarantine used by this patch. Can you
think of some problem which can be caused by this?

*What should be the default values for each newly added config parameters.*



*TODOs*

-- Updating the documentation is still todo. Will do that once every aspect
of the feature will be finalised.
-- Some code warnings and cleanups are still not done.
-- I am still little short on testing
-- Regression test cases for the feature


Thoughts and suggestions are most welcome.

Thanks
Best regards
Muhammad Usama
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://www.sraoss.jp/pipermail/pgpool-hackers/attachments/20170822/8bef18f2/attachment-0001.html>
-------------- next part --------------
diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c
index f44ef412..29617c41 100644
--- a/src/config/pool_config_variables.c
+++ b/src/config/pool_config_variables.c
@@ -231,7 +231,33 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL,								/* check func */
 		NULL								/* show hook */
 	},
-
+	{
+		{"failover_when_quorum_exists", CFGCXT_INIT, FAILOVER_CONFIG,
+			"Do failover only when cluster has the quorum.",
+			CONFIG_VAR_TYPE_BOOL,false, 0
+		},
+		&g_pool_config.failover_when_quorum_exists,
+		false,
+		NULL, NULL,NULL
+	},
+	{
+		{"failover_require_consensus", CFGCXT_INIT, FAILOVER_CONFIG,
+			"Only do failover when majority aggrees.",
+			CONFIG_VAR_TYPE_BOOL,false, 0
+		},
+		&g_pool_config.failover_require_consensus,
+		false,
+		NULL, NULL,NULL
+	},
+	{
+		{"enable_multiple_failover_requests_from_node", CFGCXT_INIT, FAILOVER_CONFIG,
+			"A Pgpool-II node can send multiple failover requests to build consensus.",
+			CONFIG_VAR_TYPE_BOOL,false, 0
+		},
+		&g_pool_config.enable_multiple_failover_requests_from_node,
+		false,
+		NULL, NULL,NULL
+	},
 	{
 		{"log_connections", CFGCXT_RELOAD, LOGING_CONFIG,
 			"Logs each successful connection.",
diff --git a/src/include/pcp/libpcp_ext.h b/src/include/pcp/libpcp_ext.h
index 705ebf15..654dd7e0 100644
--- a/src/include/pcp/libpcp_ext.h
+++ b/src/include/pcp/libpcp_ext.h
@@ -74,6 +74,7 @@ typedef struct {
 	double unnormalized_weight; /* descripted parameter */
 	char backend_data_directory[MAX_PATH_LENGTH];
 	unsigned short flag;		/* various flags */
+	bool quarantine;			/* true if node is CON_DOWN because of quarantine */
 	uint64 standby_delay;		/* The replication delay against the primary */
 } BackendInfo;
 
diff --git a/src/include/pool.h b/src/include/pool.h
index b2f10ca6..d179f7d0 100644
--- a/src/include/pool.h
+++ b/src/include/pool.h
@@ -404,15 +404,18 @@ typedef enum {
 	NODE_DOWN_REQUEST,
 	NODE_RECOVERY_REQUEST,
 	CLOSE_IDLE_REQUEST,
-	PROMOTE_NODE_REQUEST
+	PROMOTE_NODE_REQUEST,
+	NODE_QUARANTINE_REQUEST
 } POOL_REQUEST_KIND;
 
 #define REQ_DETAIL_SWITCHOVER	0x00000001		/* failover due to switch over */
+#define REQ_DETAIL_WATCHDOG		0x00000002		/* failover req from watchdog */
+#define REQ_DETAIL_CONFIRMED	0x00000004		/* failover req that does not require majority vote */
+#define REQ_DETAIL_UPDATE		0x00000008		/* failover req is just and update node status request */
 
 typedef struct {
 	POOL_REQUEST_KIND	kind;		/* request kind */
 	unsigned char request_details;	/* option flags kind */
-	unsigned int wd_failover_id;	/* watchdog ID for this failover operation */
 	int node_id[MAX_NUM_BACKENDS];	/* request node id */
 	int count;						/* request node ids count */
 }POOL_REQUEST_NODE;
@@ -520,8 +523,10 @@ extern char remote_port[];	/* client port */
 /*
  * public functions
  */
+extern void register_watchdog_quorum_change_interupt(void);
 extern void register_watchdog_state_change_interupt(void);
-extern bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, bool switch_over, unsigned int wd_failover_id);
+extern void register_backend_state_sync_req_interupt(void);
+extern bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, unsigned char flags);
 extern char *get_config_file_name(void);
 extern char *get_hba_file_name(void);
 extern void do_child(int *fds);
@@ -551,11 +556,11 @@ extern POOL_STATUS ErrorResponse(POOL_CONNECTION *frontend,
 extern void NoticeResponse(POOL_CONNECTION *frontend,
 								  POOL_CONNECTION_POOL *backend);
 
-extern void notice_backend_error(int node_id, bool switch_over);
-extern bool degenerate_backend_set(int *node_id_set, int count, bool switch_over, unsigned int wd_failover_id);
-extern bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool test_only, bool switch_over, unsigned int wd_failover_id);
-extern bool promote_backend(int node_id, unsigned int wd_failover_id);
-extern bool send_failback_request(int node_id, bool throw_error, unsigned int wd_failover_id);
+extern void notice_backend_error(int node_id, unsigned char flags);
+extern bool degenerate_backend_set(int *node_id_set, int count, unsigned char flags);
+extern bool degenerate_backend_set_ex(int *node_id_set, int count, unsigned char flags, bool error, bool test_only);
+extern bool promote_backend(int node_id, unsigned char flags);
+extern bool send_failback_request(int node_id, bool throw_error, unsigned char flags);
 
 
 extern void pool_set_timeout(int timeoutval);
diff --git a/src/include/pool_config.h b/src/include/pool_config.h
index aeb63082..dbbe2043 100644
--- a/src/include/pool_config.h
+++ b/src/include/pool_config.h
@@ -300,6 +300,11 @@ typedef struct {
 	 * add for watchdog
 	 */
 	bool use_watchdog;						/* Enables watchdog */
+	bool failover_when_quorum_exists;		/* Failover only when cluster has the quorum */
+	bool failover_require_consensus;		/* Only do failover when majority aggrees */
+	bool enable_multiple_failover_requests_from_node; /* A Pgpool-II node can send multiple
+													   * failover requests to build consensus
+													   */
 	WdLifeCheckMethod wd_lifecheck_method;	/* method of lifecheck. 'heartbeat' or 'query' */
 	bool clear_memqcache_on_escalation;		/* Clear query cache on shmem when escalating ?*/
 	char *wd_escalation_command;			/* Executes this command at escalation on new active pgpool.*/
diff --git a/src/include/watchdog/wd_ipc_commands.h b/src/include/watchdog/wd_ipc_commands.h
index 723f9e96..dbcaa4a9 100644
--- a/src/include/watchdog/wd_ipc_commands.h
+++ b/src/include/watchdog/wd_ipc_commands.h
@@ -70,13 +70,14 @@ extern bool get_watchdog_node_escalation_state(void);
 
 extern WdCommandResult wd_start_recovery(void);
 extern WdCommandResult wd_end_recovery(void);
-extern WDFailoverCMDResults wd_send_failback_request(int node_id, unsigned int *wd_failover_id);
-extern WDFailoverCMDResults wd_degenerate_backend_set(int *node_id_set, int count, unsigned int *wd_failover_id);
-extern WDFailoverCMDResults wd_promote_backend(int node_id, unsigned int *wd_failover_id);
+extern WDFailoverCMDResults wd_send_failback_request(int node_id, unsigned char flags);
+extern WDFailoverCMDResults wd_degenerate_backend_set(int *node_id_set, int count, unsigned char flags);
+extern WDFailoverCMDResults wd_promote_backend(int node_id, unsigned char flags);
 
 extern WDPGBackendStatus* get_pg_backend_status_from_master_wd_node(void);
 extern WDGenericData *get_wd_runtime_variable_value(char *varName);
 extern WD_STATES get_watchdog_local_node_state(void);
+extern int get_watchdog_quorum_state(void);
 
 extern char* wd_get_watchdog_nodes(int nodeID);
 
@@ -84,11 +85,8 @@ extern WDIPCCmdResult* issue_command_to_watchdog(char type, int timeout_sec, cha
 
 
 /* functions for failover commands interlocking */
-extern WDFailoverCMDResults wd_end_failover_interlocking(unsigned int wd_failover_id);
-extern WDFailoverCMDResults wd_start_failover_interlocking(unsigned int wd_failover_id);
-extern WDFailoverCMDResults wd_failover_lock_release(enum WDFailoverLocks lock, unsigned int wd_failover_id);
-extern WDFailoverCMDResults wd_failover_lock_status(enum WDFailoverLocks lock, unsigned int wd_failover_id);
-extern void wd_wait_until_command_complete_or_timeout(enum WDFailoverLocks lock, unsigned int wd_failover_id);
+extern WDFailoverCMDResults wd_failover_end(void);
+extern WDFailoverCMDResults wd_failover_start(void);
 
 
 
diff --git a/src/include/watchdog/wd_ipc_defines.h b/src/include/watchdog/wd_ipc_defines.h
index 0dc648b5..846344f2 100644
--- a/src/include/watchdog/wd_ipc_defines.h
+++ b/src/include/watchdog/wd_ipc_defines.h
@@ -53,11 +53,13 @@ typedef enum WDFailoverCMDResults
 										  * standby node is advanced in the procedure
 										  */
 	FAILOVER_RES_PROCEED,
+	FAILOVER_RES_NO_QUORUM,
 	FAILOVER_RES_WILL_BE_DONE,
 	FAILOVER_RES_NOT_ALLOWED,
 	FAILOVER_RES_INVALID_FUNCTION,
 	FAILOVER_RES_ALREADY_ISSUED,
 	FAILOVER_RES_MASTER_REJECTED,
+	FAILOVER_RES_BUILDING_CONSENSUS,
 	FAILOVER_RES_TIMEOUT
 }WDFailoverCMDResults;
 
@@ -84,6 +86,7 @@ typedef enum WDValueDataType
 #define WD_FAILOVER_LOCKING_REQUEST			's'
 #define WD_GET_MASTER_DATA_REQUEST			'd'
 #define WD_GET_RUNTIME_VARIABLE_VALUE		'v'
+#define WD_FAILOVER_INDICATION				'i'
 
 #define WD_FUNCTION_START_RECOVERY		"START_RECOVERY"
 #define WD_FUNCTION_END_RECOVERY		"END_RECOVERY"
diff --git a/src/include/watchdog/wd_json_data.h b/src/include/watchdog/wd_json_data.h
index 1d6a04aa..648aad35 100644
--- a/src/include/watchdog/wd_json_data.h
+++ b/src/include/watchdog/wd_json_data.h
@@ -70,8 +70,8 @@ extern bool parse_beacon_message_json(char* json_data, int data_len, int* state,
 								bool* escalated);
 extern char* get_beacon_message_json(WatchdogNode* wdNode);
 
-extern char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned int sharedKey, char* authKey);
-extern bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count);
+extern char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned char flags, unsigned int sharedKey, char* authKey);
+extern bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count, unsigned char *flags);
 extern char* get_wd_simple_message_json(char* message);
 
 extern WDPGBackendStatus* get_pg_backend_node_status_from_json(char* json_data, int data_len);
diff --git a/src/main/health_check.c b/src/main/health_check.c
index 14468f24..b7556593 100644
--- a/src/main/health_check.c
+++ b/src/main/health_check.c
@@ -186,7 +186,7 @@ void do_health_check_child(int *node_id)
 
 					/* trigger failover */
 					partial = health_check_timer_expired?false:true;
-					degenerate_backend_set(node_id, 1, partial, 0);
+					degenerate_backend_set(node_id, 1, partial?REQ_DETAIL_SWITCHOVER:0);
 				}
 			}
 
diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c
index 8b45c9e2..5c1f581d 100644
--- a/src/main/pgpool_main.c
+++ b/src/main/pgpool_main.c
@@ -69,6 +69,8 @@ typedef enum
 {
 	SIG_FAILOVER_INTERRUPT,		/* signal main to start failover */
 	SIG_WATCHDOG_STATE_CHANGED,	/* notify main about local watchdog node state changed */
+	SIG_BACKEND_SYNC_REQUIRED,	/* notify main about local backend state sync required */
+	SIG_WATCHDOG_QUORUM_CHANGED,/* notify main about cluster quorum change of watchdog cluster */
 	MAX_INTERUPTS				/* Must be last! */
 } User1SignalReason;
 
@@ -142,6 +144,7 @@ static void terminate_all_childrens();
 static void system_will_go_down(int code, Datum arg);
 static char* process_name_from_pid(pid_t pid);
 static void sync_backend_from_watchdog(void);
+static void update_backend_quarantine_status(void);
 
 static struct sockaddr_un un_addr;		/* unix domain socket path */
 static struct sockaddr_un pcp_un_addr;  /* unix domain socket path for PCP */
@@ -459,12 +462,11 @@ int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
  * This function enqueues the failover/failback requests, and fires the failover() if the function
  * is not already executing
  */
-bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, bool switch_over , unsigned int wd_failover_id)
+bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, unsigned char flags)
 {
 	bool failover_in_progress;
 	pool_sigset_t oldmask;
 	int index;
-	unsigned char request_details = 0;
 
 	/*
 	 * if the queue is already full
@@ -485,12 +487,8 @@ bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, i
 	Req_info->request_queue_tail++;
 	index = Req_info->request_queue_tail % MAX_REQUEST_QUEUE_SIZE;
 	Req_info->request[index].kind = kind;
-	Req_info->request[index].wd_failover_id = wd_failover_id;
 
-	/* Set switch over flag if requested */
-	if (switch_over)
-		request_details |= REQ_DETAIL_SWITCHOVER;
-	Req_info->request[index].request_details = request_details;
+	Req_info->request[index].request_details = flags;
 
 	if(count > 0)
 		memcpy(Req_info->request[index].node_id, node_id_set, (sizeof(int) * count));
@@ -501,16 +499,29 @@ bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, i
 	POOL_SETMASK(&oldmask);
 	if(failover_in_progress == false)
 	{
-		signal_user1_to_parent_with_reason(SIG_FAILOVER_INTERRUPT);
+		if(processType == PT_MAIN)
+			failover();
+		else
+			signal_user1_to_parent_with_reason(SIG_FAILOVER_INTERRUPT);
 	}
 
 	return true;
 }
 
+void register_watchdog_quorum_change_interupt(void)
+{
+	signal_user1_to_parent_with_reason(SIG_WATCHDOG_QUORUM_CHANGED);
+}
+
 void register_watchdog_state_change_interupt(void)
 {
 	signal_user1_to_parent_with_reason(SIG_WATCHDOG_STATE_CHANGED);
 }
+void register_backend_state_sync_req_interupt(void)
+{
+	signal_user1_to_parent_with_reason(SIG_BACKEND_SYNC_REQUIRED);
+}
+
 static void signal_user1_to_parent_with_reason(User1SignalReason reason)
 {
 	user1SignalSlot->signalFlags[reason] = true;
@@ -956,7 +967,7 @@ static void terminate_all_childrens()
  * Reuest failover. If "switch_over" is false, request all existing sessions
  * restarting.
  */
-void notice_backend_error(int node_id, bool switch_over)
+void notice_backend_error(int node_id, unsigned char flags)
 {
 	int n = node_id;
 
@@ -967,7 +978,7 @@ void notice_backend_error(int node_id, bool switch_over)
 	}
 	else
 	{
-		degenerate_backend_set(&n, 1, switch_over, 0);
+		degenerate_backend_set(&n, 1, flags);
 	}
 }
 
@@ -990,8 +1001,7 @@ void notice_backend_error(int node_id, bool switch_over)
  *
  * wd_failover_id: The watchdog internal ID for this failover
  */
-bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool test_only,
-							   bool switch_over, unsigned int wd_failover_id)
+bool degenerate_backend_set_ex(int *node_id_set, int count, unsigned char flags, bool error, bool test_only)
 {
 	int i;
 	int node_id[MAX_NUM_BACKENDS];
@@ -1046,12 +1056,12 @@ bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool tes
 		if(test_only)
 			return true;
 
-		if (pool_config->use_watchdog && wd_failover_id == 0)
+		if (!(flags & REQ_DETAIL_WATCHDOG))
 		{
 			int x;
 			for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
 			{
-				res = wd_degenerate_backend_set(node_id_set, count, &wd_failover_id);
+				res = wd_degenerate_backend_set(node_id_set, count, flags);
 				if (res != FAILOVER_RES_TRANSITION)
 					break;
 				sleep(1);
@@ -1069,10 +1079,26 @@ bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool tes
 		}
 		if (res == FAILOVER_RES_PROCEED)
 		{
-			register_node_operation_request(NODE_DOWN_REQUEST, node_id, node_count, switch_over, wd_failover_id);
+			register_node_operation_request(NODE_DOWN_REQUEST, node_id, node_count, flags);
+		}
+		else if (res == FAILOVER_RES_NO_QUORUM)
+		{
+			ereport(LOG,
+					(errmsg("degenerate backend request for %d node(s) from pid [%d], is changed to quarantine node request by watchdog"
+							, node_count, getpid()),
+					 errdetail("watchdog does not holds the quorum")));
+
+			register_node_operation_request(NODE_QUARANTINE_REQUEST, node_id, node_count, flags);
+		}
+		else if (res == FAILOVER_RES_BUILDING_CONSENSUS)
+		{
+			ereport(LOG,
+					(errmsg("degenerate backend request for node_id: %d from pid [%d], will be handled by watchdog, which is building consensus for request"
+							,*node_id, getpid())));
 		}
 		else if (res == FAILOVER_RES_WILL_BE_DONE)
 		{
+			/* we will receive a sync request from master watchdog node */
 			ereport(LOG,
 					(errmsg("degenerate backend request for %d node(s) from pid [%d], will be handled by watchdog"
 							, node_count, getpid())));
@@ -1092,13 +1118,13 @@ bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool tes
  * wrapper over degenerate_backend_set_ex function to register
  * NODE down operation request
  */
-bool degenerate_backend_set(int *node_id_set, int count, bool switch_over, unsigned int wd_failover_id)
+bool degenerate_backend_set(int *node_id_set, int count, unsigned char flags)
 {
-	return degenerate_backend_set_ex(node_id_set, count, false, false, switch_over, wd_failover_id);
+	return degenerate_backend_set_ex(node_id_set, count, flags, false, false);
 }
 
 /* send promote node request using SIGUSR1 */
-bool promote_backend(int node_id, unsigned int wd_failover_id)
+bool promote_backend(int node_id, unsigned char flags)
 {
 	WDFailoverCMDResults res = FAILOVER_RES_PROCEED;
 	bool ret = false;
@@ -1125,12 +1151,12 @@ bool promote_backend(int node_id, unsigned int wd_failover_id)
 					node_id, getpid())));
 
 	/* If this was only a test. Inform the caller without doing anything */
-	if (pool_config->use_watchdog && wd_failover_id == 0)
+	if (!(flags & REQ_DETAIL_WATCHDOG))
 	{
 		int x;
 		for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
 		{
-			res = wd_promote_backend(node_id, &wd_failover_id);
+			res = wd_promote_backend(node_id, flags);
 			if (res != FAILOVER_RES_TRANSITION)
 				break;
 			sleep(1);
@@ -1149,7 +1175,7 @@ bool promote_backend(int node_id, unsigned int wd_failover_id)
 
 	if (res == FAILOVER_RES_PROCEED)
 	{
-		ret = register_node_operation_request(PROMOTE_NODE_REQUEST, &node_id, 1, false, wd_failover_id);
+		ret = register_node_operation_request(PROMOTE_NODE_REQUEST, &node_id, 1, flags);
 	}
 	else if (res == FAILOVER_RES_WILL_BE_DONE)
 	{
@@ -1157,6 +1183,18 @@ bool promote_backend(int node_id, unsigned int wd_failover_id)
 				(errmsg("promote backend request for node_id: %d from pid [%d], will be handled by watchdog"
 						, node_id, getpid())));
 	}
+	else if (res == FAILOVER_RES_NO_QUORUM)
+	{
+		ereport(LOG,
+				(errmsg("promote backend request for node_id: %d from pid [%d], is canceled because watchdog does not hold quorum"
+						, node_id, getpid())));
+	}
+	else if (res == FAILOVER_RES_BUILDING_CONSENSUS)
+	{
+		ereport(LOG,
+				(errmsg("promote backend request for node_id: %d from pid [%d], will be handled by watchdog, which is building consensus for request"
+						, node_id, getpid())));
+	}
 	else
 	{
 		ereport(LOG,
@@ -1167,7 +1205,7 @@ bool promote_backend(int node_id, unsigned int wd_failover_id)
 }
 
 /* send failback request using SIGUSR1 */
-bool send_failback_request(int node_id,bool throw_error, unsigned int wd_failover_id)
+bool send_failback_request(int node_id,bool throw_error, unsigned char flags)
 {
 	WDFailoverCMDResults res = FAILOVER_RES_PROCEED;
 	bool ret = false;
@@ -1187,16 +1225,16 @@ bool send_failback_request(int node_id,bool throw_error, unsigned int wd_failove
 	}
 
 	ereport(LOG,
-			(errmsg("received failback request for node_id: %d from pid [%d] wd_failover_id [%d]",
-					node_id, getpid(),wd_failover_id)));
+			(errmsg("received failback request for node_id: %d from pid [%d]",
+					node_id, getpid())));
 
 	/* If this was only a test. Inform the caller without doing anything */
-	if (pool_config->use_watchdog && wd_failover_id == 0)
+	if (!(flags & REQ_DETAIL_WATCHDOG))
 	{
 		int x;
 		for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
 		{
-			res = wd_send_failback_request(node_id, &wd_failover_id);
+			res = wd_send_failback_request(node_id, flags);
 			if (res != FAILOVER_RES_TRANSITION)
 				break;
 			sleep(1);
@@ -1215,7 +1253,7 @@ bool send_failback_request(int node_id,bool throw_error, unsigned int wd_failove
 
 	if (res == FAILOVER_RES_PROCEED)
 	{
-		ret = register_node_operation_request(NODE_UP_REQUEST, &node_id, 1, false, wd_failover_id);
+		ret = register_node_operation_request(NODE_UP_REQUEST, &node_id, 1, flags);
 	}
 	else if (res == FAILOVER_RES_WILL_BE_DONE)
 	{
@@ -1401,10 +1439,40 @@ static void sigusr1_interupt_processor(void)
 	ereport(DEBUG1,
 			(errmsg("Pgpool-II parent process received SIGUSR1")));
 
+	if (user1SignalSlot->signalFlags[SIG_WATCHDOG_QUORUM_CHANGED])
+	{
+		ereport(LOG,
+				(errmsg("Pgpool-II parent process received watchdog quorum change signal from watchdog")));
+		
+		user1SignalSlot->signalFlags[SIG_WATCHDOG_QUORUM_CHANGED] = false;
+		if (get_watchdog_quorum_state() >= 0)
+		{
+			ereport(LOG,
+					(errmsg("watchdog cluster now holds the quorum"),
+					 errdetail("updating the state of quarantine backend nodes")));
+			update_backend_quarantine_status();
+		}
+	}
+
+	if (user1SignalSlot->signalFlags[SIG_BACKEND_SYNC_REQUIRED])
+	{
+		ereport(LOG,
+				(errmsg("Pgpool-II parent process received sync backend signal from watchdog")));
+		
+		user1SignalSlot->signalFlags[SIG_BACKEND_SYNC_REQUIRED] = false;
+		if (get_watchdog_local_node_state() == WD_STANDBY)
+		{
+			ereport(LOG,
+					(errmsg("master watchdog has performed failover"),
+					 errdetail("syncing the backend states from the MASTER watchdog node")));
+			sync_backend_from_watchdog();
+		}
+	}
+
 	if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED])
 	{
 		ereport(DEBUG1,
-				(errmsg("Pgpool-II parent process received SIGUSR1 from watchdog")));
+				(errmsg("Pgpool-II parent process received watchdog state change signal from watchdog")));
 
 		user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false;
 		if (get_watchdog_local_node_state() == WD_STANDBY)
@@ -1468,6 +1536,7 @@ static void failover(void)
 	int sts;
 	bool need_to_restart_pcp = false;
 	bool all_backend_down = true;
+	bool sync_required = false;
 
 	ereport(DEBUG1,
 		(errmsg("failover handler called")));
@@ -1517,8 +1586,6 @@ static void failover(void)
 		int node_id_set[MAX_NUM_BACKENDS];
 		int node_count;
 		unsigned char request_details;
-		unsigned int wd_failover_id;
-		WDFailoverCMDResults wdInterlockingRes;
 
 		pool_semaphore_lock(REQUEST_INFO_SEM);
 
@@ -1537,7 +1604,6 @@ static void failover(void)
 		reqkind = Req_info->request[queue_index].kind;
 		request_details = Req_info->request[queue_index].request_details;
 		node_count = Req_info->request[queue_index].count;
-		wd_failover_id = Req_info->request[queue_index].wd_failover_id;
 		pool_semaphore_unlock(REQUEST_INFO_SEM);
 
 		ereport(DEBUG1,
@@ -1550,8 +1616,8 @@ static void failover(void)
 			continue;
 		}
 
-		/* start watchdog interlocking */
-		wdInterlockingRes = wd_start_failover_interlocking(wd_failover_id);
+		/* inform all remote watchdog nodes that we are starting the failover */
+		wd_failover_start();
 
 		/*
 		 * if not in replication mode/master slave mode, we treat this a restart request.
@@ -1577,9 +1643,6 @@ static void failover(void)
 					ereport(LOG,
 							(errmsg("invalid failback request, status: [%d] of node id : %d is invalid for failback",BACKEND_INFO(node_id).backend_status,node_id)));
 
-				if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
-					wd_end_failover_interlocking(wd_failover_id);
-
 				continue;
 			}
 
@@ -1592,24 +1655,18 @@ static void failover(void)
 			all_backend_down = check_all_backend_down();
 
 			BACKEND_INFO(node_id).backend_status = CON_CONNECT_WAIT;	/* unset down status */
-			(void)write_status_file();
-
-			/* Aquire failback start command lock */
-			if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
+			if (!(request_details & REQ_DETAIL_UPDATE))
 			{
+				/* The request is a proper failbak request
+				 * and not because of the update status of quarantined node
+				 */
+				(void)write_status_file();
+				
 				trigger_failover_command(node_id, pool_config->failback_command,
 											MASTER_NODE_ID, get_next_master_node(), PRIMARY_NODE_ID);
-				wd_failover_lock_release(FAILBACK_LOCK, wd_failover_id);
-			}
-			else
-			{
-				/*
-				 * Okay we are not allowed to execute the failover command
-				 * so we need to wait till the one who is executing the command
-				 * finish with it.
-				 */
-				wd_wait_until_command_complete_or_timeout(FAILBACK_LOCK,wd_failover_id);
 			}
+
+			sync_required = true;
 		}
 		else if (reqkind == PROMOTE_NODE_REQUEST)
 		{
@@ -1624,12 +1681,10 @@ static void failover(void)
 			{
 				ereport(LOG,
 						(errmsg("failover: no backends are promoted")));
-				if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
-					wd_end_failover_interlocking(wd_failover_id);
 				continue;
 			}
 		}
-		else	/* NODE_DOWN_REQUEST */
+		else	/* NODE_DOWN_REQUEST && NODE_QUARANTINE_REQUEST*/
 		{
 			int cnt = 0;
 
@@ -1640,12 +1695,17 @@ static void failover(void)
 					 VALID_BACKEND(node_id_set[i])))
 				{
 					ereport(LOG,
-							(errmsg("starting degeneration. shutdown host %s(%d)",
+							(errmsg("starting %s. shutdown host %s(%d)",
+							(reqkind == NODE_QUARANTINE_REQUEST)?"quarantine":"degeneration",
 							 BACKEND_INFO(node_id_set[i]).backend_hostname,
 							 BACKEND_INFO(node_id_set[i]).backend_port)));
 
 					BACKEND_INFO(node_id_set[i]).backend_status = CON_DOWN;	/* set down status */
-					(void)write_status_file();
+
+					if (reqkind == NODE_QUARANTINE_REQUEST)
+						BACKEND_INFO(node_id_set[i]).quarantine = true;
+					else
+						(void)write_status_file();
 
 					/* save down node */
 					nodes[node_id_set[i]] = 1;
@@ -1657,10 +1717,6 @@ static void failover(void)
 			{
 				ereport(LOG,
 						(errmsg("failover: no backends are degenerated")));
-
-				if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
-					wd_end_failover_interlocking(wd_failover_id);
-
 				continue;
 			}
 		}
@@ -1710,7 +1766,7 @@ static void failover(void)
 		 * NODE_DOWN_REQUEST and it's actually a switch over request, we don't
 		 * need to restart all children, except the node is primary.
 		 */
-		else if (STREAM && reqkind == NODE_DOWN_REQUEST &&
+		else if (STREAM && (reqkind == NODE_DOWN_REQUEST || reqkind == NODE_QUARANTINE_REQUEST) &&
 				 request_details & REQ_DETAIL_SWITCHOVER && node_id != PRIMARY_NODE_ID)
 		{
 			ereport(LOG,
@@ -1776,36 +1832,36 @@ static void failover(void)
 			need_to_restart_children = true;
 			partial_restart = false;
 		}
-		if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
+		/* Exec failover_command if needed
+		 * We do not execute failover when request is quarantine type
+		 */
+		if (reqkind == NODE_DOWN_REQUEST)
 		{
-			/* Exec failover_command if needed */
 			for (i = 0; i < pool_config->backend_desc->num_backends; i++)
 			{
 				if (nodes[i])
+				{
 					trigger_failover_command(i, pool_config->failover_command,
 												MASTER_NODE_ID, new_master, PRIMARY_NODE_ID);
+					sync_required = true;
+				}
 			}
-			wd_failover_lock_release(FAILOVER_LOCK, wd_failover_id);
-		}
-		else
-		{
-			wd_wait_until_command_complete_or_timeout(FAILOVER_LOCK, wd_failover_id);
 		}
 
-	/* no need to wait since it will be done in reap_handler */
-#ifdef NOT_USED
-		while (wait(NULL) > 0)
-			;
-
-		if (errno != ECHILD)
-			ereport(LOG,
-				(errmsg("failover_handler: wait() failed. reason:%s", strerror(errno))));
-
-#endif
-
 		if (reqkind == PROMOTE_NODE_REQUEST && VALID_BACKEND(node_id))
+		{
 			new_primary = node_id;
-
+		}
+		else if (reqkind == NODE_QUARANTINE_REQUEST)
+		{
+			/* if the quarantine node was the primary node
+			 * set the newprimary to -1 (invalid)
+			 */
+			if (Req_info->primary_node_id == node_id)
+				new_primary = -1;
+			else
+				new_primary =  find_primary_node_repeatedly();
+		}
 		/*
 		 * If the down node was a standby node in streaming replication
 		 * mode, we can avoid calling find_primary_node_repeatedly() and
@@ -1821,8 +1877,9 @@ static void failover(void)
 				new_primary =  find_primary_node_repeatedly();
 		}
 		else
+		{
 			new_primary =  find_primary_node_repeatedly();
-
+		}
 		/*
 		 * If follow_master_command is provided and in master/slave
 		 * streaming replication mode, we start degenerating all backends
@@ -1874,22 +1931,10 @@ static void failover(void)
 			}
 		}
 
-		/*
-		 * follow master command also uses the same locks used by trigring command
-		 */
-		if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
+		if ((follow_cnt > 0) && (*pool_config->follow_master_command != '\0'))
 		{
-			if ((follow_cnt > 0) && (*pool_config->follow_master_command != '\0'))
-			{
-				follow_pid = fork_follow_child(Req_info->master_node_id, new_primary,
-											Req_info->primary_node_id);
-			}
-			wd_failover_lock_release(FOLLOW_MASTER_LOCK, wd_failover_id);
-		}
-		else
-		{
-			wd_wait_until_command_complete_or_timeout(FOLLOW_MASTER_LOCK, wd_failover_id);
-
+			follow_pid = fork_follow_child(Req_info->master_node_id, new_primary,
+										Req_info->primary_node_id);
 		}
 
 		/* Save primary node id */
@@ -1900,6 +1945,7 @@ static void failover(void)
 		if (new_master >= 0)
 		{
 			Req_info->master_node_id = new_master;
+			sync_required = true;
 			ereport(LOG,
 					(errmsg("failover: set new master node: %d", Req_info->master_node_id)));
 		}
@@ -1977,8 +2023,8 @@ static void failover(void)
 		 */
 		kill(worker_pid, SIGUSR1);
 
-		if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
-			wd_end_failover_interlocking(wd_failover_id);
+		if ( sync_required)
+			wd_failover_end();
 
 		if (reqkind == NODE_UP_REQUEST)
 		{
@@ -1998,12 +2044,14 @@ static void failover(void)
 		else
 		{
 			/* Temporary black magic. Without this regression 055 does not finish */
-			fprintf(stderr, "failover done. shutdown host %s(%d)",
+			fprintf(stderr, "%s done. shutdown host %s(%d)",
+					(reqkind == NODE_DOWN_REQUEST)?"failover":"quarantine",
 					 BACKEND_INFO(node_id).backend_hostname,
 					BACKEND_INFO(node_id).backend_port);
 
 			ereport(LOG,
-					(errmsg("failover done. shutdown host %s(%d)",
+					(errmsg("%s done. shutdown host %s(%d)",
+					(reqkind == NODE_DOWN_REQUEST)?"failover":"quarantine",
 					 BACKEND_INFO(node_id).backend_hostname,
 					 BACKEND_INFO(node_id).backend_port)));
 		}
@@ -3396,6 +3444,29 @@ int pool_frontend_exists(void)
 	return -1;
 }
 
+static void update_backend_quarantine_status(void)
+{
+	/* Reset the quarantine flag from each backend and
+	 * set it to con_wait
+	 */
+	int i;
+	WD_STATES wd_state = get_watchdog_local_node_state();
+
+	for (i=0;i<NUM_BACKENDS;i++)
+	{
+		if (BACKEND_INFO(i).quarantine && BACKEND_INFO(i).backend_status == CON_DOWN)
+		{
+			BACKEND_INFO(i).quarantine = false;
+			/* send the failback request for the node
+			 * we also set the watchdog flag because we we eventually send the sync
+			 * message to all standby nodes
+			 */
+			if (wd_state == WD_COORDINATOR)
+				send_failback_request(i,false, REQ_DETAIL_UPDATE | REQ_DETAIL_WATCHDOG);
+		}
+	}
+}
+
 /*
  * The function fetch the current status of all configured backend
  * nodes from the MASTER/COORDINATOR watchdog Pgpool-II and synchronize the
diff --git a/src/pcp_con/pcp_worker.c b/src/pcp_con/pcp_worker.c
index 54219a72..658b461e 100644
--- a/src/pcp_con/pcp_worker.c
+++ b/src/pcp_con/pcp_worker.c
@@ -502,14 +502,14 @@ static int pool_detach_node(int node_id, bool gracefully)
 {
 	if (!gracefully)
 	{
-		degenerate_backend_set_ex(&node_id, 1, true, false, true, 0);
+		degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, false);
 		return 0;
 	}
 
 	/* Check if the NODE DOWN can be executed on
 	 * the given node id.
 	 */
-	degenerate_backend_set_ex(&node_id, 1, true, true, true, 0);
+	degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, true);
 
 	/*
 	 * Wait until all frontends exit
@@ -529,7 +529,7 @@ static int pool_detach_node(int node_id, bool gracefully)
 	/*
 	 * Now all frontends have gone. Let's do failover.
 	 */
-	degenerate_backend_set_ex(&node_id, 1, true, false, true, 0);
+	degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, false, true);
 
 	/*
 	 * Wait for failover completed.
@@ -556,7 +556,7 @@ static int pool_promote_node(int node_id, bool gracefully)
 {
 	if (!gracefully)
 	{
-		promote_backend(node_id, false);	/* send promote request */
+		promote_backend(node_id, REQ_DETAIL_CONFIRMED);	/* send promote request */
 		return 0;
 	}
 
@@ -576,7 +576,7 @@ static int pool_promote_node(int node_id, bool gracefully)
 	/*
 	 * Now all frontends have gone. Let's do failover.
 	 */
-	promote_backend(node_id, false);		/* send promote request */
+	promote_backend(node_id, REQ_DETAIL_CONFIRMED);		/* send promote request */
 
 	/*
 	 * Wait for failover completed.
@@ -910,7 +910,7 @@ process_attach_node(PCP_CONNECTION *frontend,char *buf)
 			(errmsg("PCP: processing attach node"),
 			 errdetail("attaching Node ID %d", node_id)));
 
-	send_failback_request(node_id,true, false);
+	send_failback_request(node_id,true, REQ_DETAIL_CONFIRMED);
 
 	pcp_write(frontend, "c", 1);
 	wsize = htonl(sizeof(code) + sizeof(int));
diff --git a/src/pcp_con/recovery.c b/src/pcp_con/recovery.c
index f2f57f56..ead5eeaf 100644
--- a/src/pcp_con/recovery.c
+++ b/src/pcp_con/recovery.c
@@ -147,7 +147,7 @@ void start_recovery(int recovery_node)
 		pcp_worker_wakeup_request = 0;
 
 		/* send failback request to pgpool parent */
-		send_failback_request(recovery_node,false, false);
+		send_failback_request(recovery_node,false, REQ_DETAIL_CONFIRMED);
 
 		/* wait for failback */
 		failback_wait_count = 0;
diff --git a/src/protocol/pool_connection_pool.c b/src/protocol/pool_connection_pool.c
index 2bbcae24..a84134cd 100644
--- a/src/protocol/pool_connection_pool.c
+++ b/src/protocol/pool_connection_pool.c
@@ -854,7 +854,7 @@ static POOL_CONNECTION_POOL *new_connection(POOL_CONNECTION_POOL *p)
 			 */
 			if (pool_config->fail_over_on_backend_error)
 			{
-				notice_backend_error(i, true);
+				notice_backend_error(i, REQ_DETAIL_SWITCHOVER);
 				ereport(FATAL,
 					(errmsg("failed to create a backend connection"),
 						 errdetail("executing failover on backend")));
diff --git a/src/protocol/pool_process_query.c b/src/protocol/pool_process_query.c
index 9631a5fb..473c1988 100644
--- a/src/protocol/pool_process_query.c
+++ b/src/protocol/pool_process_query.c
@@ -3595,7 +3595,7 @@ void read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *bac
 
 		if (pool_config->replication_stop_on_mismatch)
 		{
-			degenerate_backend_set(degenerate_node, degenerate_node_num, false, 0);
+			degenerate_backend_set(degenerate_node, degenerate_node_num, REQ_DETAIL_CONFIRMED);
             retcode = 1;
 		}
         ereport(FATAL,
@@ -4673,7 +4673,7 @@ pool_config->client_idle_limit)));
 						}
 						else
 						{
-							notice_backend_error(i, true);
+							notice_backend_error(i, REQ_DETAIL_SWITCHOVER);
 							sleep(5);
 						}
 						break;
diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c
index 19166832..a4bfe13e 100644
--- a/src/protocol/pool_proto_modules.c
+++ b/src/protocol/pool_proto_modules.c
@@ -504,7 +504,7 @@ POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
 
 			ereport(DEBUG1,
 					(errmsg("Query: sending SIGUSR1 signal to parent")));
-			register_node_operation_request(CLOSE_IDLE_REQUEST, NULL, 0, false, 0);
+			register_node_operation_request(CLOSE_IDLE_REQUEST, NULL, 0, 0);
 
 			/* we need to loop over here since we will get USR1 signal while sleeping */
 			while (stime > 0)
@@ -1741,7 +1741,7 @@ POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
 
 				free_string(msg);
 
-				degenerate_backend_set(victim_nodes, number_of_nodes, true, 0);
+				degenerate_backend_set(victim_nodes, number_of_nodes, REQ_DETAIL_CONFIRMED|REQ_DETAIL_SWITCHOVER);
 				child_exit(POOL_EXIT_AND_RESTART);
 			}
 			else
diff --git a/src/sample/pgpool.conf.sample b/src/sample/pgpool.conf.sample
index 5c5c360d..c66d5e98 100644
--- a/src/sample/pgpool.conf.sample
+++ b/src/sample/pgpool.conf.sample
@@ -551,6 +551,23 @@ wd_de_escalation_command = ''
 									# Executes this command when master pgpool resigns from being master.
 									# (change requires restart)
 
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+									# Only perform backend node failover
+									# when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+									# Perform failover when majority of Pgpool-II nodes
+									# aggrees on the backend node status change
+
+enable_multiple_failover_requests_from_node = false
+									# A Pgpool-II node can cast multiple votes
+									# for building the consensus on failover
+
+* failover requests to build consensus
+*/
+
 # - Lifecheck Setting -
 
 # -- common --
diff --git a/src/sample/pgpool.conf.sample-master-slave b/src/sample/pgpool.conf.sample-master-slave
index 24757d2d..360cba1f 100644
--- a/src/sample/pgpool.conf.sample-master-slave
+++ b/src/sample/pgpool.conf.sample-master-slave
@@ -549,6 +549,21 @@ wd_de_escalation_command = ''
 									# Executes this command when master pgpool resigns from being master.
 									# (change requires restart)
 
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+									# Only perform backend node failover
+									# when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+									# Perform failover when majority of Pgpool-II nodes
+									# aggrees on the backend node status change
+
+enable_multiple_failover_requests_from_node = false
+									# A Pgpool-II node can cast multiple votes
+									# for building the consensus on failover
+
+
 # - Lifecheck Setting -
 
 # -- common --
diff --git a/src/sample/pgpool.conf.sample-replication b/src/sample/pgpool.conf.sample-replication
index 3318753a..5b37626c 100644
--- a/src/sample/pgpool.conf.sample-replication
+++ b/src/sample/pgpool.conf.sample-replication
@@ -549,6 +549,23 @@ wd_de_escalation_command = ''
 									# Executes this command when master pgpool resigns from being master.
 									# (change requires restart)
 
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+									# Only perform backend node failover
+									# when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+									# Perform failover when majority of Pgpool-II nodes
+									# aggrees on the backend node status change
+
+
+enable_multiple_failover_requests_from_node = false
+									# A Pgpool-II node can cast multiple votes
+									# for building the consensus on failover
+
+
+
 # - Lifecheck Setting -
 
 # -- common --
diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream
index a4effb68..f5832c72 100644
--- a/src/sample/pgpool.conf.sample-stream
+++ b/src/sample/pgpool.conf.sample-stream
@@ -550,6 +550,21 @@ wd_de_escalation_command = ''
 									# Executes this command when master pgpool resigns from being master.
 									# (change requires restart)
 
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+									# Only perform backend node failover
+									# when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+									# Perform failover when majority of Pgpool-II nodes
+									# aggrees on the backend node status change
+
+enable_multiple_failover_requests_from_node = false
+									# A Pgpool-II node can cast multiple votes
+									# for building the consensus on failover
+
+
 # - Lifecheck Setting -
 
 # -- common --
diff --git a/src/utils/pool_ssl.c b/src/utils/pool_ssl.c
index 32f755f1..1f6e9574 100644
--- a/src/utils/pool_ssl.c
+++ b/src/utils/pool_ssl.c
@@ -350,7 +350,7 @@ void pool_ssl_close(POOL_CONNECTION *cp) { return; }
 int pool_ssl_read(POOL_CONNECTION *cp, void *buf, int size) {
 	ereport(WARNING,
 			(errmsg("pool_ssl: SSL i/o called but SSL support is not available")));
-	notice_backend_error(cp->db_node_id, true);
+	notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
 	child_exit(POOL_EXIT_AND_RESTART);
 	return -1; /* never reached */
 }
@@ -358,7 +358,7 @@ int pool_ssl_read(POOL_CONNECTION *cp, void *buf, int size) {
 int pool_ssl_write(POOL_CONNECTION *cp, const void *buf, int size) {
 	ereport(WARNING,
 			(errmsg("pool_ssl: SSL i/o called but SSL support is not available")));
-	notice_backend_error(cp->db_node_id, true);
+	notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
 	child_exit(POOL_EXIT_AND_RESTART);
 	return -1; /* never reached */
 }
diff --git a/src/utils/pool_stream.c b/src/utils/pool_stream.c
index f0f31d31..f586d9bf 100644
--- a/src/utils/pool_stream.c
+++ b/src/utils/pool_stream.c
@@ -218,7 +218,7 @@ int pool_read(POOL_CONNECTION *cp, void *buf, int len)
 				/* if fail_over_on_backend_error is true, then trigger failover */
 				if (pool_config->fail_over_on_backend_error)
 				{
-					notice_backend_error(cp->db_node_id, true);
+					notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
 
                     /* If we are in the main process, we will not exit */
 					child_exit(POOL_EXIT_AND_RESTART);
@@ -365,7 +365,7 @@ char *pool_read2(POOL_CONNECTION *cp, int len)
 				/* if fail_over_on_backend_error is true, then trigger failover */
 				if (pool_config->fail_over_on_backend_error)
 				{
-					notice_backend_error(cp->db_node_id, true);
+					notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
 					child_exit(POOL_EXIT_AND_RESTART);
                     /* we are in main process */
                     ereport(ERROR,
@@ -590,7 +590,7 @@ int pool_flush(POOL_CONNECTION *cp)
 			/* if fail_over_on_backend_error is true, then trigger failover */
 			if (pool_config->fail_over_on_backend_error)
 			{
-				notice_backend_error(cp->db_node_id, true);
+				notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
 				ereport(LOG,
 					(errmsg("unable to flush data to backend"),
 						 errdetail("do not failover because I am the main process")));
@@ -645,7 +645,7 @@ int pool_flush_noerror(POOL_CONNECTION *cp)
             /* if fail_over_on_backend_erro is true, then trigger failover */
             if (pool_config->fail_over_on_backend_error)
             {
-                notice_backend_error(cp->db_node_id, true);
+                notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
                 child_exit(POOL_EXIT_AND_RESTART);
 				ereport(LOG,
 					(errmsg("unable to flush data to backend"),
@@ -813,7 +813,7 @@ char *pool_read_string(POOL_CONNECTION *cp, int *len, int line)
 							 errdetail("pg_terminate_backend was called on the backend")));
 				}
 
-				notice_backend_error(cp->db_node_id, true);
+				notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
 				child_exit(POOL_EXIT_AND_RESTART);
                 ereport(ERROR,
                         (errmsg("unable to read data from frontend"),
diff --git a/src/watchdog/watchdog.c b/src/watchdog/watchdog.c
index 093aa636..cb70f616 100644
--- a/src/watchdog/watchdog.c
+++ b/src/watchdog/watchdog.c
@@ -58,6 +58,14 @@
 #include "watchdog/wd_ipc_commands.h"
 #include "parser/stringinfo.h"
 
+/* These defines enables the consensus building feature
+ * in watchdog for node failover operations
+ * We can also take these to the configure script
+ */
+#define NODE_UP_REQUIRE_CONSENSUS
+#define NODE_DOWN_REQUIRE_CONSENSUS
+#define NODE_PROMOTE_REQUIRE_CONSENSUS
+
 typedef enum IPC_CMD_PREOCESS_RES
 {
 	IPC_CMD_COMPLETE,
@@ -108,6 +116,9 @@ typedef enum IPC_CMD_PREOCESS_RES
 #define WD_CMD_REPLY_IN_DATA				'-'
 #define WD_CLUSTER_SERVICE_MESSAGE			'#'
 
+#define WD_FAILOVER_START					'F'
+#define WD_FAILOVER_END						'H'
+
 /*Cluster Service Message Types */
 #define CLUSTER_QUORUM_LOST					'L'
 #define CLUSTER_QUORUM_FOUND				'F'
@@ -150,6 +161,7 @@ packet_types all_packet_types[] = {
 	{WD_GET_RUNTIME_VARIABLE_VALUE, "GET WD RUNTIME VARIABLE VALUE"},
 	{WD_CMD_REPLY_IN_DATA, "COMMAND REPLY IN DATA"},
 	{WD_FAILOVER_LOCKING_REQUEST,"FAILOVER LOCKING REQUEST"},
+	{WD_FAILOVER_INDICATION,"FAILOVER INDICATION"},
 	{WD_CLUSTER_SERVICE_MESSAGE, "CLUSTER SERVICE MESSAGE"},
 	{WD_REGISTER_FOR_NOTIFICATION, "REGISTER FOR NOTIFICATION"},
 	{WD_NODE_STATUS_CHANGE_COMMAND, "NODE STATUS CHANGE"},
@@ -323,6 +335,7 @@ typedef struct wd_cluster
 	int				network_monitor_sock;
 	bool			clusterInitialized;
 	bool			ipc_auth_needed;
+	int				current_failover_id;
 	List			*unidentified_socks;
 	List			*notify_clients;
 	List			*ipc_command_socks;
@@ -340,7 +353,8 @@ typedef struct WDFailoverObject
 	int nodesCount;
 	unsigned int failoverID;
 	int *nodeList;
-	WatchdogNode* wdRequestingNode;
+	List* requestingNodes;
+	int request_count;
 	struct timeval	startTime;
 	int state;
 }WDFailoverObject;
@@ -354,6 +368,9 @@ static bool remove_failover_object_by_id(unsigned int failoverID);
 static void remove_failovers_from_node(WatchdogNode* wdNode);
 static void remove_failover_object(WDFailoverObject* failoverObj);
 static void service_expired_failovers(void);
+static int add_failover(POOL_REQUEST_KIND reqKind, int *node_id_list, int node_count, WatchdogNode *wdNode);
+static WDFailoverCMDResults compute_failover_consensus(POOL_REQUEST_KIND reqKind,int *node_id_list, int node_count,
+													   unsigned char flags, WatchdogNode *wdNode);
 
 static int send_command_packet_to_remote_nodes(WDCommandData* ipcCommand, bool source_included);
 static void wd_command_is_complete(WDCommandData* ipcCommand);
@@ -477,10 +494,9 @@ static IPC_CMD_PREOCESS_RES process_IPC_nodeStatusChange_command(WDCommandData*
 static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDCommandData* ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_get_runtime_variable_value_request(WDCommandData* ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_online_recovery(WDCommandData* ipcCommand);
-static IPC_CMD_PREOCESS_RES process_IPC_failover_locking_cmd(WDCommandData *ipcCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_failover_indication(WDCommandData *ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDCommandData *ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_failover_command(WDCommandData* ipcCommand);
-static IPC_CMD_PREOCESS_RES process_IPC_failover_command_on_coordinator(WDCommandData* ipcCommand);
 static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandData* ipcCommand);
 
 static bool write_ipc_command_with_result_data(WDCommandData* ipcCommand, char type, char* data, int len);
@@ -492,6 +508,9 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt,
 static void process_pgpool_remote_failover_command(WatchdogNode* wdNode, WDPacketData* pkt);
 static void process_remote_online_recovery_command(WatchdogNode* wdNode, WDPacketData* pkt);
 
+static WDFailoverCMDResults failover_end_indication(WDCommandData* ipcCommand);
+static WDFailoverCMDResults failover_start_indication(WDCommandData* ipcCommand);
+
 
 static IPC_CMD_PREOCESS_RES process_failover_locking_requests_on_cordinator(WDCommandData* ipcCommand);
 static WDFailoverCMDResults node_is_asking_for_failover_end(WatchdogNode* wdNode, WDPacketData* pkt, unsigned int failoverID);
@@ -1823,8 +1842,8 @@ static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData* ipcCommand)
 			return process_IPC_online_recovery(ipcCommand);
 			break;
 
-		case WD_FAILOVER_LOCKING_REQUEST:
-			return process_IPC_failover_locking_cmd(ipcCommand);
+		case WD_FAILOVER_INDICATION:
+			return process_IPC_failover_indication(ipcCommand);
 
 		case WD_GET_MASTER_DATA_REQUEST:
 			return process_IPC_data_request_from_master(ipcCommand);
@@ -1879,7 +1898,7 @@ static IPC_CMD_PREOCESS_RES process_IPC_get_runtime_variable_value_request(WDCom
 	else if (strcasecmp(WD_RUNTIME_VAR_QUORUM_STATE, requestVarName) == 0)
 	{
 		jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA_TYPE, VALUE_DATA_TYPE_INT);
-		jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA, g_cluster.quorum_status);
+		jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA, WD_MASTER_NODE?WD_MASTER_NODE->quorum_status:-2);
 	}
 	else if (strcasecmp(WD_RUNTIME_VAR_ESCALATION_STATE, requestVarName) == 0)
 	{
@@ -2043,8 +2062,9 @@ static WDFailoverObject* get_failover_object_by_id(unsigned int failoverID)
 static void remove_failover_object(WDFailoverObject* failoverObj)
 {
 	ereport(DEBUG1,
-			(errmsg("removing failover object from \"%s\" with ID:%d", failoverObj->wdRequestingNode->nodeName,failoverObj->failoverID)));
+			(errmsg("removing failover request from %d nodes with ID:%d", failoverObj->request_count,failoverObj->failoverID)));
 	g_cluster.wdCurrentFailovers = list_delete_ptr(g_cluster.wdCurrentFailovers,failoverObj);
+	list_free(failoverObj->requestingNodes);
 	pfree(failoverObj->nodeList);
 	pfree(failoverObj);
 }
@@ -2063,28 +2083,29 @@ static bool remove_failover_object_by_id(unsigned int failoverID)
 /* if the wdNode is NULL. The function removes all failover objects */
 static void remove_failovers_from_node(WatchdogNode* wdNode)
 {
-	ListCell *lc;
-	List *failovers_to_del = NULL;
-
-	foreach(lc, g_cluster.wdCurrentFailovers)
-	{
-		WDFailoverObject* failoverObj = lfirst(lc);
-		if (failoverObj)
-		{
-			if (wdNode == NULL || failoverObj->wdRequestingNode == wdNode)
-			{
-				failovers_to_del = lappend(failovers_to_del,failoverObj);
-			}
-		}
-	}
-
-	/* delete the failover objects */
-
-	foreach(lc, failovers_to_del)
-	{
-		WDFailoverObject* failoverObj = lfirst(lc);
-		remove_failover_object(failoverObj);
-	}
+	return;
+//	ListCell *lc;
+//	List *failovers_to_del = NULL;
+
+//	foreach(lc, g_cluster.wdCurrentFailovers)
+//	{
+//		WDFailoverObject* failoverObj = lfirst(lc);
+//		if (failoverObj)
+//		{
+//			if (wdNode == NULL || failoverObj->wdRequestingNode == wdNode)
+//			{
+//				failovers_to_del = lappend(failovers_to_del,failoverObj);
+//			}
+//		}
+//	}
+//
+//	/* delete the failover objects */
+//
+//	foreach(lc, failovers_to_del)
+//	{
+//		WDFailoverObject* failoverObj = lfirst(lc);
+//		remove_failover_object(failoverObj);
+//	}
 }
 
 /* Remove the over stayed failover objects */
@@ -2108,7 +2129,7 @@ static void service_expired_failovers(void)
 			{
 				failovers_to_del = lappend(failovers_to_del,failoverObj);
 				ereport(DEBUG1,
-					(errmsg("failover object from \"%s\" with ID:%d is timeout", failoverObj->wdRequestingNode->nodeName,failoverObj->failoverID),
+					(errmsg("failover request from %d nodes with ID:%d is timeout", failoverObj->request_count,failoverObj->failoverID),
 						 errdetail("adding the failover object for removal")));
 
 			}
@@ -2204,18 +2225,6 @@ static void process_remote_failover_command_on_coordinator(WatchdogNode* wdNode,
 	}
 }
 
-static IPC_CMD_PREOCESS_RES process_IPC_failover_command_on_coordinator(WDCommandData* ipcCommand)
-{
-	if (get_local_node_state() != WD_COORDINATOR)
-		return IPC_CMD_ERROR; /* should never hapen*/
-
-	ereport(LOG,
-			(errmsg("watchdog received the failover command from local pgpool-II on IPC interface")));
-
-	return process_failover_command_on_coordinator(ipcCommand);
-}
-
-
 static bool reply_to_failove_command(WDCommandData* ipcCommand, WDFailoverCMDResults cmdResult, unsigned int failoverID)
 {
 	bool ret = false;
@@ -2246,7 +2255,139 @@ static bool reply_to_failove_command(WDCommandData* ipcCommand, WDFailoverCMDRes
 }
 
 /*
- * The Function forwards the failover command to all standby nodes.
+ * This function process the failover command and decides
+ * about the execution of failover command.
+ */
+
+static WDFailoverCMDResults compute_failover_consensus(POOL_REQUEST_KIND reqKind,int *node_id_list, int node_count, unsigned char flags, WatchdogNode *wdNode)
+{
+#ifndef NODE_UP_REQUIRE_CONSENSUS
+	if (reqKind == NODE_UP_REQUEST)
+		return FAILOVER_RES_PROCEED;
+#endif
+#ifndef NODE_DOWN_REQUIRE_CONSENSUS
+	if (reqKind == NODE_DOWN_REQUEST)
+		return FAILOVER_RES_PROCEED;
+#endif
+#ifndef NODE_PROMOTE_REQUIRE_CONSENSUS
+	if (reqKind == PROMOTE_NODE_REQUEST)
+		return FAILOVER_RES_PROCEED;
+#endif
+
+	if (pool_config->failover_when_quorum_exists == false)
+	{
+		/* No need for any calculation, We do not need a quorum for failover */
+		ereport(LOG,(
+				errmsg("we do not need quorum to hold to proceed with failover"),
+					 errdetail("proceeding with the failover"),
+					 errhint("failover_when_quorum_exists is set to false")));
+
+		return FAILOVER_RES_PROCEED;
+	}
+	if (flags & REQ_DETAIL_CONFIRMED)
+	{
+		/* Check the request flags, If it asks to bypass quorum  */
+		ereport(LOG,(
+			errmsg("The failover request does not need quorum to hold"),
+					 errdetail("proceeding with the failover"),
+					 errhint("REQ_DETAIL_CONFIRMED")));
+		return FAILOVER_RES_PROCEED;
+	}
+	update_quorum_status();
+	if (g_cluster.quorum_status < 0)
+	{
+		/* quorum is must and it is not present at the moment */
+		ereport(LOG,(
+				errmsg("failover requires the quorum to hold, which is not present at the moment"),
+					 errdetail("Rejecting the failover request")));
+		return FAILOVER_RES_NO_QUORUM;
+	}
+
+	/* So we reached here means quorum is present
+	 * Now come to dificult part, enusre the consensus
+	 *
+	 * Record the failover.
+	 */
+	if (pool_config->failover_require_consensus == true)
+	{
+		int cnt = add_failover(reqKind, node_id_list, node_count, wdNode);
+		if (cnt <= get_mimimum_nodes_required_for_quorum())
+		{
+			ereport(LOG,(
+					errmsg("failover requires the majority vote, waiting for consensus"),
+						 errdetail("failover request noted")));
+			return FAILOVER_RES_BUILDING_CONSENSUS;
+		}
+	}
+	ereport(LOG,(
+			errmsg("we do not require majority votes to proceed with failover"),
+				errdetail("proceeding with the failover"),
+				 errhint("failover_require_consensus is set to false")));
+
+	return FAILOVER_RES_PROCEED;
+}
+
+static int add_failover(POOL_REQUEST_KIND reqKind, int *node_id_list, int node_count, WatchdogNode *wdNode)
+{
+	MemoryContext oldCxt;
+	/* Find the failover */
+	WDFailoverObject *failoverObj = get_failover_object(reqKind, node_count, node_id_list);
+	if (failoverObj)
+	{
+		ListCell *lc;
+		/* search the node if it is a duplicate request */
+		foreach(lc, failoverObj->requestingNodes)
+		{
+			WatchdogNode* reqWdNode = lfirst(lc);
+			if (wdNode == reqWdNode)
+			{
+				/* The failover request is duplicate */
+				if (pool_config->enable_multiple_failover_requests_from_node)
+				{
+					failoverObj->request_count++;
+					ereport(LOG,(
+							errmsg("Duplicate failover request from \"%s\" node",wdNode->nodeName),
+								 errdetail("Pgpool-II can send multiple failover requests for same node"),
+								 errhint("enable_multiple_failover_requests_from_node is enabled")));
+				}
+				else
+				{
+					ereport(LOG,(
+							errmsg("Duplicate failover request from \"%s\" node",wdNode->nodeName),
+								 errdetail("request ignored")));
+				}
+				return failoverObj->request_count;
+			}
+		}
+	}
+	else
+	{
+		oldCxt = MemoryContextSwitchTo(TopMemoryContext);
+		failoverObj = palloc0(sizeof(WDFailoverObject));
+		failoverObj->reqKind = reqKind;
+		failoverObj->requestingNodes = NULL;
+		failoverObj->nodesCount = node_count;
+		failoverObj->request_count = 0;
+		if (node_count > 0)
+		{
+			failoverObj->nodeList = palloc(sizeof(int) * node_count);
+			memcpy(failoverObj->nodeList, node_id_list, sizeof(int) * node_count);
+		}
+		failoverObj->failoverID = get_next_commandID();
+		gettimeofday(&failoverObj->startTime, NULL);
+		g_cluster.wdCurrentFailovers = lappend(g_cluster.wdCurrentFailovers,failoverObj);
+		MemoryContextSwitchTo(oldCxt);
+	}
+	
+	failoverObj->request_count++;
+	oldCxt = MemoryContextSwitchTo(TopMemoryContext);
+	failoverObj->requestingNodes = lappend(failoverObj->requestingNodes,wdNode);
+	MemoryContextSwitchTo(oldCxt);
+	return failoverObj->request_count;
+}
+
+/*
+ * The function processes all failover commands on master node
  */
 static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandData* ipcCommand)
 {
@@ -2254,22 +2395,15 @@ static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandDat
 	int node_count = 0;
 	int *node_id_list = NULL;
 	bool ret = false;
-	WDFailoverObject* failoverObj;
+	unsigned char flags;
 	POOL_REQUEST_KIND reqKind;
-
+	WDFailoverCMDResults res;
+	
 	if (get_local_node_state() != WD_COORDINATOR)
 		return IPC_CMD_ERROR; /* should never happen*/
 
-	/*
-	 * The coordinator node
-	 * Forward this command to all standby nodes.
-	 * Ask the caller to proceed with failover
-	 * but first check if this failover is already requested
-	 * by some other node.
-	 */
-
 	ret = parse_wd_node_function_json(ipcCommand->sourcePacket.data, ipcCommand->sourcePacket.len,
-									  &func_name, &node_id_list, &node_count);
+									  &func_name, &node_id_list, &node_count, &flags);
 	if (ret == false)
 	{
 		ereport(LOG,(
@@ -2297,129 +2431,64 @@ static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandDat
 					ipcCommand->commandSource == COMMAND_SOURCE_IPC?
 					"local pgpool-II on IPC interface":ipcCommand->sourceWdNode->nodeName)));
 
-	if (get_cluster_node_count() == 0)
+	res = compute_failover_consensus(reqKind, node_id_list, node_count, flags, ipcCommand->sourceWdNode);
+	if (res == FAILOVER_RES_PROCEED)
 	{
-		/*
-		 * Since I am the only node in the cluster so nothing
-		 * we need to do here
+		/* if the command is originated from remote node.
+		 * request the local node to start failover
 		 */
-		ereport(LOG,(
-			errmsg("I am the only pgpool-II node in the watchdog cluster"),
-				errdetail("no need to propagate the failover command [%s]",func_name)));
-		reply_to_failove_command(ipcCommand, FAILOVER_RES_PROCEED, 0);
-		return IPC_CMD_COMPLETE;
-	}
+		if (ipcCommand->commandSource == COMMAND_SOURCE_REMOTE)
+		{
+			/* Set the flag that failover is request is originated from watchdog */
+			flags |= REQ_DETAIL_WATCHDOG;
 
-	if (ipcCommand->commandSource == COMMAND_SOURCE_REMOTE  && Req_info->switching)
-	{
-		/*
-		 * check if the failover is allowed before doing anything
-		 */
-		ereport(LOG,
-			(errmsg("failover command [%s] request from pgpool-II node \"%s\" is rejected because of switching",
-					func_name,
-					ipcCommand->sourceWdNode->nodeName)));
-		reply_to_failove_command(ipcCommand, FAILOVER_RES_NOT_ALLOWED, 0);
+			if (reqKind == NODE_DOWN_REQUEST)
+				ret = degenerate_backend_set(node_id_list, node_count, flags);
+			else if (reqKind == NODE_UP_REQUEST)
+				ret = send_failback_request(node_id_list[0],false, REQ_DETAIL_WATCHDOG);
+			else if (reqKind == PROMOTE_NODE_REQUEST)
+				ret = promote_backend(node_id_list[0], flags);
+			
+			if (ret == true)
+				reply_to_failove_command(ipcCommand, FAILOVER_RES_WILL_BE_DONE, 0);
+			else
+				reply_to_failove_command(ipcCommand, FAILOVER_RES_ERROR, 0);
+		}
+		else
+		{
+			/* Just reply the caller to get on with the failover */
+			reply_to_failove_command(ipcCommand, FAILOVER_RES_PROCEED, 0);
+		}
 		return IPC_CMD_COMPLETE;
 	}
-
-	/*
-	 * check if the same failover is already issued to the main
-	 * process
-	 */
-	failoverObj = get_failover_object(reqKind, node_count, node_id_list);
-	if (failoverObj)
+	else if (res == FAILOVER_RES_NO_QUORUM)
 	{
 		ereport(LOG,
-			(errmsg("failover command [%s] from %s is ignored",
+				(errmsg("failover command [%s] request from pgpool-II node \"%s\" is rejected because we do not have the quorum",
 						func_name,
-						ipcCommand->commandSource == COMMAND_SOURCE_IPC?
-						"local pgpool-II on IPC interface":ipcCommand->sourceWdNode->nodeName),
-			 errdetail("similar failover with ID:%d is already in progress",failoverObj->failoverID)));
-
-		/* Same failover is already in progress */
-		reply_to_failove_command(ipcCommand, FAILOVER_RES_ALREADY_ISSUED, 0);
+						ipcCommand->sourceWdNode->nodeName)));
+		reply_to_failove_command(ipcCommand, FAILOVER_RES_NO_QUORUM, 0);
 		return IPC_CMD_COMPLETE;
 	}
-	else
+	else if (res == FAILOVER_RES_BUILDING_CONSENSUS)
 	{
-		/* No similar failover is in progress */
-		MemoryContext oldCxt;
-		ereport(DEBUG1,
-				(errmsg("proceeding with the failover command [%s] request from %s",
-						func_name,
-						ipcCommand->commandSource == COMMAND_SOURCE_IPC?
-						"local pgpool-II":ipcCommand->sourceWdNode->nodeName),
-				 errdetail("no similar failover is in progress")));
-		/*
-		 * okay now ask all nodes to start failover
-		 */
-		wd_packet_shallow_copy(&ipcCommand->sourcePacket, &ipcCommand->commandPacket);
-		ipcCommand->commandPacket.type = WD_REMOTE_FAILOVER_REQUEST;
-		ipcCommand->sendToNode = NULL; /* command needs to be sent to all nodes */
-		set_next_commandID_in_message(&ipcCommand->commandPacket);
-
-		if (ipcCommand->commandSource != COMMAND_SOURCE_IPC)
-		{
-			if (process_wd_command_function(ipcCommand->sourceWdNode, &ipcCommand->sourcePacket,
-										func_name, node_count, node_id_list, ipcCommand->commandPacket.command_id) == false)
-			{
-				return IPC_CMD_COMPLETE;
-			}
-		}
-
-		/* send to all alive nodes */
 		ereport(LOG,
-			(errmsg("forwarding the failover request [%s] to all alive nodes",func_name),
-				 errdetail("watchdog cluster currently has %d connected remote nodes",get_cluster_node_count())));
-		send_command_packet_to_remote_nodes(ipcCommand, false);
-
-		/* create a failover object. to make sure we know the node failovers
-		 * is already in progress
-		 */
-		oldCxt = MemoryContextSwitchTo(TopMemoryContext);
-		failoverObj = palloc0(sizeof(WDFailoverObject));
-		failoverObj->reqKind = reqKind;
-		failoverObj->nodesCount = node_count;
-		if (node_count > 0)
-		{
-			failoverObj->nodeList = palloc(sizeof(int) * node_count);
-			memcpy(failoverObj->nodeList, node_id_list, sizeof(int) * node_count);
-		}
-		failoverObj->failoverID = ipcCommand->commandPacket.command_id; /* use command id as failover id */
-		gettimeofday(&failoverObj->startTime, NULL);
-		failoverObj->wdRequestingNode = ipcCommand->sourceWdNode;
-		g_cluster.wdCurrentFailovers = lappend(g_cluster.wdCurrentFailovers,failoverObj);
-
-		MemoryContextSwitchTo(oldCxt);
-
-		/* For a moment just think it is successfully sent to all nodes.*/
-		if (ipcCommand->commandSource == COMMAND_SOURCE_IPC)
-		{
-			reply_to_failove_command(ipcCommand, FAILOVER_RES_PROCEED, failoverObj->failoverID);
-			return IPC_CMD_COMPLETE;
-		}
-		else
-		{
-			if (get_cluster_node_count() <= 1)
-			{
-				/* Since its just 2 nodes cluster, and the only other
-				 * node is the one that actually issued the failover
-				 * so the command actually completes here
-				 */
-				return IPC_CMD_COMPLETE;
-			}
-		}
+				(errmsg("failover command [%s] request from pgpool-II node \"%s\" is queued, waiting for more nodes confirmation",
+						func_name,
+						ipcCommand->sourceWdNode->nodeName)));
+		reply_to_failove_command(ipcCommand, FAILOVER_RES_BUILDING_CONSENSUS, 0);
+		return IPC_CMD_COMPLETE;
 	}
-
-	return IPC_CMD_PROCESSING;
+	return IPC_CMD_COMPLETE;
 }
 
 static IPC_CMD_PREOCESS_RES process_IPC_failover_command(WDCommandData* ipcCommand)
 {
 	if (get_local_node_state() == WD_COORDINATOR)
 	{
-		return process_IPC_failover_command_on_coordinator(ipcCommand);
+		ereport(LOG,
+				(errmsg("watchdog received the failover command from local pgpool-II on IPC interface")));
+		return process_failover_command_on_coordinator(ipcCommand);
 	}
 	else if (get_local_node_state() == WD_STANDBY)
 	{
@@ -2548,54 +2617,66 @@ static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDCommandData *
 	return IPC_CMD_TRY_AGAIN;
 }
 
-static IPC_CMD_PREOCESS_RES process_IPC_failover_locking_cmd(WDCommandData *ipcCommand)
+static IPC_CMD_PREOCESS_RES process_IPC_failover_indication(WDCommandData *ipcCommand)
 {
+	WDFailoverCMDResults res = FAILOVER_RES_NOT_ALLOWED;
 	/*
 	 * if cluster or myself is not in stable state
 	 * just return cluster in transaction
 	 */
 	ereport(LOG,
-			(errmsg("received the failover command lock request from local pgpool-II on IPC interface")));
-	if (get_local_node_state() == WD_STANDBY)
-	{
-		/* I am a standby node, Just forward the request to coordinator */
-		wd_packet_shallow_copy(&ipcCommand->sourcePacket, &ipcCommand->commandPacket);
-		set_next_commandID_in_message(&ipcCommand->commandPacket);
+			(errmsg("received the failover indication from Pgpool-II on IPC interface")));
 
-		ipcCommand->sendToNode = WD_MASTER_NODE;
-		if (send_command_packet_to_remote_nodes(ipcCommand, true) <= 0)
+	if (get_local_node_state() == WD_COORDINATOR)
+	{
+		json_value* root;
+		int failoverState = -1;
+		if (ipcCommand->sourcePacket.data == NULL || ipcCommand->sourcePacket.len <= 0)
 		{
 			ereport(LOG,
-				(errmsg("unable to process the failover command lock request received on IPC interface"),
-					 errdetail("failed to forward the request to the master watchdog node \"%s\"",WD_MASTER_NODE->nodeName)));
-			return IPC_CMD_ERROR;
+					(errmsg("watchdog unable to process failover indication"),
+					 errdetail("invalid command packet")));
+			res = FAILOVER_RES_INVALID_FUNCTION;
+		}
+		root = json_parse(ipcCommand->sourcePacket.data,ipcCommand->sourcePacket.len);
+		if (root && root->type == json_object)
+		{
+			json_get_int_value_for_key(root, "FailoverFuncState", &failoverState);
 		}
 		else
 		{
-			/*
-			 * wait for the result
-			 */
 			ereport(LOG,
-					(errmsg("failover command lock request from local pgpool-II node received on IPC interface is forwarded to master watchdog node \"%s\"",
-							WD_MASTER_NODE->nodeName),
-					 errdetail("waiting for the reply...")));
-			return IPC_CMD_PROCESSING;
+					(errmsg("unable to process failover indication"),
+					 errdetail("invalid json data in command packet")));
+			res = FAILOVER_RES_INVALID_FUNCTION;
+		}
+		if (root)
+			json_value_free(root);
+
+		if (failoverState < 0 )
+		{
+			ereport(LOG,
+				(errmsg("unable to process failover indication"),
+					 errdetail("invalid json data in command packet")));
+			res = FAILOVER_RES_INVALID_FUNCTION;
+		}
+		else if (failoverState == 0) /* start */
+		{
+			res = failover_start_indication(ipcCommand);
+		}
+		else
+		{
+			res = failover_end_indication(ipcCommand);
 		}
 	}
-	else if (get_local_node_state() == WD_COORDINATOR)
+	else
 	{
-		/*
-		 * If I am coordinator, Just process the request locally
-		 */
-		return process_failover_locking_requests_on_cordinator(ipcCommand);
+		ereport(LOG,
+				(errmsg("received the failover indication from Pgpool-II on IPC interface, but only master can do failover")));
 	}
+	reply_to_failove_command(ipcCommand, res, 0);
 
-	/* we are not in any stable state at the moment */
-	ereport(LOG,
-		(errmsg("unable to process the failover command lock request received on IPC interface"),
-			 errdetail("this watchdog node has not joined the cluster yet"),
-				errhint("try again in few seconds")));
-	return IPC_CMD_TRY_AGAIN;
+	return IPC_CMD_COMPLETE;
 }
 
 static void process_remote_failover_locking_request(WatchdogNode* wdNode, WDPacketData* pkt)
@@ -2726,6 +2807,86 @@ static IPC_CMD_PREOCESS_RES process_failover_locking_requests_on_cordinator(WDCo
 	return IPC_CMD_COMPLETE;
 }
 
+/* Failover start basically does nothing fency, It just sets the failover_in_progress
+ * flag and inform all nodes that the failover is in progress.
+ *
+ * only the local node that is a master can start the failover.
+ */
+static WDFailoverCMDResults
+failover_start_indication(WDCommandData* ipcCommand)
+{
+	ereport(LOG,
+		(errmsg("watchdog is informed of failover start")));
+
+	/* only coordinator(master) node is allowed to process failover */
+	if (get_local_node_state() == WD_COORDINATOR)
+	{
+//		if (g_cluster.current_failover_id > 0)
+//		{
+//			/* we do allow multiple calls to failover_start but
+//			 * still it's a warning
+//			 */
+//			if (g_cluster.current_failover_id != failoverID)
+//			{
+//				ereport(LOG,
+//					(errmsg("watchdog is informed of new failover start, while failover with ID: %d was already in progress",
+//								g_cluster.current_failover_id)));
+//			}
+//			else
+//			{
+//				ereport(LOG,
+//					(errmsg("watchdog is informed of failover start while it is already in progress")));
+//			}
+//		}
+		/* Okay not save the current failoverID */
+//		g_cluster.current_failover_id = failoverID;
+		/* inform to all nodes about failover start */
+		send_message_of_type(NULL, WD_FAILOVER_START, NULL);
+		return FAILOVER_RES_PROCEED;
+	}
+	else if (get_local_node_state() == WD_STANDBY)
+	{
+		ereport(LOG,
+			(errmsg("failed to process failover start request, I am not the master node"),
+				 errdetail("I am standby node and request can only be processed by master watchdog node")));
+		return FAILOVER_RES_ERROR;
+	}
+	else
+	{
+		ereport(LOG,
+				(errmsg("failed to process failover start request, I am not in stable state")));
+	}
+	return FAILOVER_RES_TRANSITION;
+}
+
+static WDFailoverCMDResults
+failover_end_indication(WDCommandData* ipcCommand)
+{
+	ereport(LOG,
+			(errmsg("watchdog is informed of failover end")));
+
+	/* only coordinator(master) node is allowed to process failover */
+	if (get_local_node_state() == WD_COORDINATOR)
+	{
+		send_message_of_type(NULL, WD_FAILOVER_END, NULL);
+		return FAILOVER_RES_PROCEED;
+	}
+	else if (get_local_node_state() == WD_STANDBY)
+	{
+		ereport(LOG,
+				(errmsg("failed to process failover start request, I am not the master node"),
+				 errdetail("I am standby node and request can only be processed by master watchdog node")));
+		return FAILOVER_RES_ERROR;
+	}
+	else
+	{
+		ereport(LOG,
+				(errmsg("failed to process failover start request, I am not in stable state")));
+	}
+	return FAILOVER_RES_TRANSITION;
+}
+
+
 /*
  * node_is_asking_for_failover_start()
  * the function process the lock holding requests. If the lock holding node
@@ -3940,6 +4101,7 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
 		case WD_INFO_MESSAGE:
 		{
 			char *authkey = NULL;
+			int oldQuorumStatus;
 			WD_STATES oldNodeState;
 			WatchdogNode* tempNode = parse_node_info_message(pkt, &authkey);
 			if (tempNode == NULL)
@@ -3949,6 +4111,7 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
 				send_cluster_service_message(wdNode,pkt,CLUSTER_NODE_INVALID_VERSION);
 				break;
 			}
+			oldQuorumStatus = wdNode->quorum_status;
 			oldNodeState = wdNode->state;
 			wdNode->state = tempNode->state;
 			wdNode->startup_time.tv_sec = tempNode->startup_time.tv_sec;
@@ -3998,6 +4161,11 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
 						handle_split_brain(wdNode, pkt);
 					}
 				}
+				else if (WD_MASTER_NODE == wdNode && oldQuorumStatus != wdNode->quorum_status)
+				{
+					/* inform Pgpool man about quorum status changes */
+					register_watchdog_quorum_change_interupt();
+				}
 			}
 
 			/* if the info message is from master node. Make sure we are in sync
@@ -4676,6 +4844,9 @@ static WDPacketData* get_message_of_type(char type, WDPacketData* replyFor)
 		case WD_IAM_COORDINATOR_MESSAGE:
 			pkt = get_beacon_message(WD_IAM_COORDINATOR_MESSAGE,replyFor);
 			break;
+
+		case WD_FAILOVER_START:
+		case WD_FAILOVER_END:
 		case WD_REQ_INFO_MESSAGE:
 		case WD_STAND_FOR_COORDINATOR_MESSAGE:
 		case WD_DECLARE_COORDINATOR_MESSAGE:
@@ -5598,6 +5769,7 @@ static int watchdog_state_machine_coordinator(WD_EVENTS event, WatchdogNode* wdN
 				}
 				/* inform to the cluster about the new quorum status */
 				send_message_of_type(NULL, WD_INFO_MESSAGE,NULL);
+				register_watchdog_quorum_change_interupt();
 			}
 		}
 			break;
@@ -6189,6 +6361,9 @@ static int watchdog_state_machine_standby(WD_EVENTS event, WatchdogNode* wdNode,
 		case WD_EVENT_PACKET_RCV:
 			switch (pkt->type)
 		{
+			case WD_FAILOVER_END:
+				register_backend_state_sync_req_interupt();
+				break;
 			case WD_STAND_FOR_COORDINATOR_MESSAGE:
 			{
 				if (WD_MASTER_NODE == NULL)
@@ -6394,6 +6569,7 @@ static void process_pgpool_remote_failover_command(WatchdogNode* wdNode, WDPacke
 	char* func_name;
 	int node_count = 0;
 	int *node_id_list = NULL;
+	unsigned char flags;
 
 	if (pkt->data == NULL || pkt->len == 0)
 	{
@@ -6412,7 +6588,7 @@ static void process_pgpool_remote_failover_command(WatchdogNode* wdNode, WDPacke
 		reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
 		return;
 	}
-	if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count))
+	if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count, &flags))
 	{
 		ereport(LOG,
 			(errmsg("watchdog received the failover command from \"%s\"",wdNode->nodeName)));
@@ -6437,6 +6613,7 @@ static void process_remote_online_recovery_command(WatchdogNode* wdNode, WDPacke
 	char* func_name;
 	int node_count = 0;
 	int *node_id_list = NULL;
+	unsigned char flags;
 
 	if (pkt->data == NULL || pkt->len == 0)
 	{
@@ -6450,7 +6627,7 @@ static void process_remote_online_recovery_command(WatchdogNode* wdNode, WDPacke
 	ereport(LOG,
 		(errmsg("watchdog received online recovery request from \"%s\"",wdNode->nodeName)));
 
-	if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count))
+	if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count, &flags))
 	{
 		if (strcasecmp(WD_FUNCTION_START_RECOVERY, func_name) == 0)
 		{
@@ -6530,7 +6707,7 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt,
 		}
 		else
 		{
-			ret = send_failback_request(node_id_list[0],false, failover_id);
+			ret = send_failback_request(node_id_list[0],false, REQ_DETAIL_WATCHDOG);
 		}
 	}
 	
@@ -6544,7 +6721,7 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt,
 		}
 		else
 		{
-			ret = degenerate_backend_set(node_id_list, node_count, false, failover_id);
+//			ret = degenerate_backend_set(node_id_list, node_count, false, fla);
 		}
 	}
 
diff --git a/src/watchdog/wd_commands.c b/src/watchdog/wd_commands.c
index 42c77705..ac26f49e 100644
--- a/src/watchdog/wd_commands.c
+++ b/src/watchdog/wd_commands.c
@@ -54,12 +54,9 @@
 #define WD_INTERLOCK_TIMEOUT_SEC	10
 #define WD_INTERLOCK_WAIT_COUNT ((int) ((WD_INTERLOCK_TIMEOUT_SEC * 1000)/WD_INTERLOCK_WAIT_MSEC))
 
-static void sleep_in_waiting(void);
 static void FreeCmdResult(WDIPCCmdResult* res);
 
-static WDFailoverCMDResults wd_issue_failover_lock_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
-static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
-static WDFailoverCMDResults wd_send_failover_sync_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
+static char* get_wd_failover_state_json(bool start);
 
 static int open_wd_command_sock(bool throw_error);
 static WDFailoverCMDResults wd_get_failover_result_from_data(WDIPCCmdResult *result, unsigned int *wd_failover_id);
@@ -135,6 +132,30 @@ WD_STATES get_watchdog_local_node_state(void)
 	return ret;
 }
 
+int get_watchdog_quorum_state(void)
+{
+	WD_STATES ret = WD_DEAD;
+	WDGenericData *state = get_wd_runtime_variable_value(WD_RUNTIME_VAR_QUORUM_STATE);
+	if (state == NULL)
+	{
+		ereport(LOG,
+				(errmsg("failed to get quorum state of watchdog cluster"),
+				 errdetail("get runtime variable value from watchdog returned no data")));
+		return WD_DEAD;
+	}
+	if (state->valueType != VALUE_DATA_TYPE_INT)
+	{
+		ereport(LOG,
+				(errmsg("failed to get quorum state of watchdog cluster"),
+				 errdetail("get runtime variable value from watchdog returned invalid value type")));
+		pfree(state);
+		return WD_DEAD;
+	}
+	ret = (WD_STATES)state->data.intVal;
+	pfree(state);
+	return ret;
+}
+
 char* get_watchdog_ipc_address(void)
 {
 	return watchdog_ipc_address;
@@ -539,7 +560,7 @@ wd_start_recovery(void)
 	char type;
 	unsigned int *shared_key = get_ipc_shared_key();
 
-	char* func = get_wd_node_function_json(WD_FUNCTION_START_RECOVERY, NULL,0,
+	char* func = get_wd_node_function_json(WD_FUNCTION_START_RECOVERY, NULL,0, 0,
 										   shared_key?*shared_key:0,pool_config->wd_authkey);
 
 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_ONLINE_RECOVERY_COMMAND,
@@ -585,7 +606,7 @@ wd_end_recovery(void)
 	char type;
 	unsigned int *shared_key = get_ipc_shared_key();
 
-	char* func = get_wd_node_function_json(WD_FUNCTION_END_RECOVERY, NULL, 0,
+	char* func = get_wd_node_function_json(WD_FUNCTION_END_RECOVERY, NULL, 0, 0,
 										   shared_key?*shared_key:0,pool_config->wd_authkey);
 
 	
@@ -629,14 +650,15 @@ wd_end_recovery(void)
 
 
 WDFailoverCMDResults
-wd_send_failback_request(int node_id, unsigned int *wd_failover_id)
+wd_send_failback_request(int node_id, unsigned char flags)
 {
 	int n = node_id;
 	char* func;
+	unsigned int wd_failover_id;
 	unsigned int *shared_key = get_ipc_shared_key();
 	WDFailoverCMDResults res;
 
-	func = get_wd_node_function_json(WD_FUNCTION_FAILBACK_REQUEST,&n, 1,
+	func = get_wd_node_function_json(WD_FUNCTION_FAILBACK_REQUEST,&n, 1, flags,
 									 shared_key?*shared_key:0,pool_config->wd_authkey);
 
 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND,
@@ -644,24 +666,22 @@ wd_send_failback_request(int node_id, unsigned int *wd_failover_id)
 													   func, strlen(func), true);
 	pfree(func);
 
-	res = wd_get_failover_result_from_data(result, wd_failover_id);
+	res = wd_get_failover_result_from_data(result, &wd_failover_id);
 	FreeCmdResult(result);
 	return res;
 }
 
-static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
+static char* get_wd_failover_state_json(bool start)
 {
 	char* json_str;
 	JsonNode* jNode = jw_create_with_object(true);
 	unsigned int *shared_key = get_ipc_shared_key();
-
+	
 	jw_put_int(jNode, WD_IPC_SHARED_KEY, shared_key?*shared_key:0); /* put the shared key*/
 	if (pool_config->wd_authkey != NULL && strlen(pool_config->wd_authkey) > 0)
 		jw_put_string(jNode, WD_IPC_AUTH_KEY, pool_config->wd_authkey); /*  put the auth key*/
 
-	jw_put_string(jNode, "SyncRequestType", reqType);
-	jw_put_int(jNode, "FailoverLockID", lockID);
-	jw_put_int(jNode, "WDFailoverID", wd_failover_id);
+	jw_put_int(jNode, "FailoverFuncState", start?0:1);
 	jw_finish_document(jNode);
 	json_str = pstrdup(jw_get_json_string(jNode));
 	jw_destroy(jNode);
@@ -669,14 +689,14 @@ static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks l
 }
 
 static WDFailoverCMDResults
-wd_send_failover_sync_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
+wd_send_failover_func_status_command(bool start)
 {
 	WDFailoverCMDResults res;
 	unsigned int failover_id;
 
-	char* json_data = get_wd_failover_cmd_type_json(syncReqType, lockID, wd_failover_id);
+	char* json_data = get_wd_failover_state_json(start);
 
-	WDIPCCmdResult *result = issue_command_to_watchdog(WD_FAILOVER_LOCKING_REQUEST
+	WDIPCCmdResult *result = issue_command_to_watchdog(WD_FAILOVER_INDICATION
 													   ,pool_config->recovery_timeout,
 													   json_data, strlen(json_data), true);
 
@@ -743,41 +763,55 @@ static WDFailoverCMDResults wd_get_failover_result_from_data(WDIPCCmdResult *res
 	return FAILOVER_RES_ERROR;
 }
 
+/*
+ * send the degenerate backend request to watchdog.
+ * now watchdog can respond to the request in following ways.
+ *
+ * 1 - It can tell the caller to procees with failover. This
+ * happens when the current node is the master watchdog node.
+ *
+ * 2 - It can tell the caller to failover not allowed
+ * this happens when either cluster does not have the quorum
+ *
+ * 3 -  bool switch_over,
+ */
 WDFailoverCMDResults
-wd_degenerate_backend_set(int *node_id_set, int count, unsigned int *wd_failover_id)
+wd_degenerate_backend_set(int *node_id_set, int count, unsigned char flags)
 {
 	WDFailoverCMDResults res;
 	char* func;
 	unsigned int *shared_key = get_ipc_shared_key();
+	unsigned int wd_failover_id;
 	
-	func = get_wd_node_function_json(WD_FUNCTION_DEGENERATE_REQUEST,node_id_set, count,
+	func = get_wd_node_function_json(WD_FUNCTION_DEGENERATE_REQUEST,node_id_set, count, flags,
 									 shared_key?*shared_key:0,pool_config->wd_authkey);
 
 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND ,
 													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
 													   func, strlen(func), true);
 	pfree(func);
-	res = wd_get_failover_result_from_data(result, wd_failover_id);
+	res = wd_get_failover_result_from_data(result, &wd_failover_id);
 	FreeCmdResult(result);
 	return res;
 }
 
 WDFailoverCMDResults
-wd_promote_backend(int node_id, unsigned int *wd_failover_id)
+wd_promote_backend(int node_id, unsigned char flags)
 {
 	WDFailoverCMDResults res;
 	int n = node_id;
 	char* func;
 	WDIPCCmdResult *result;
+	unsigned int wd_failover_id;
 	unsigned int *shared_key = get_ipc_shared_key();
 	
-	func = get_wd_node_function_json(WD_FUNCTION_PROMOTE_REQUEST,&n, 1,
+	func = get_wd_node_function_json(WD_FUNCTION_PROMOTE_REQUEST,&n, 1, flags,
 									 shared_key?*shared_key:0,pool_config->wd_authkey);
 	result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND,
 									   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
 									   func, strlen(func), true);
 	pfree(func);
-	res = wd_get_failover_result_from_data(result, wd_failover_id);
+	res = wd_get_failover_result_from_data(result, &wd_failover_id);
 	FreeCmdResult(result);
 	return res;
 }
@@ -878,86 +912,20 @@ open_wd_command_sock(bool throw_error)
 	return sock;
 }
 
-WDFailoverCMDResults wd_start_failover_interlocking(unsigned int wd_failover_id)
+WDFailoverCMDResults wd_failover_start(void)
 {
 	if (pool_config->use_watchdog)
-		return wd_issue_failover_lock_command(WD_REQ_FAILOVER_START, 0, wd_failover_id);
-	return FAILOVER_RES_I_AM_LOCK_HOLDER;
+		return wd_send_failover_func_status_command(0);
+	return FAILOVER_RES_PROCEED;
 }
 
-WDFailoverCMDResults wd_end_failover_interlocking(unsigned int wd_failover_id)
+WDFailoverCMDResults wd_failover_end(void)
 {
 	if (pool_config->use_watchdog)
-		return wd_issue_failover_lock_command(WD_REQ_FAILOVER_END, 0, wd_failover_id);
-	return FAILOVER_RES_SUCCESS;
+		return wd_send_failover_func_status_command(1);
+	return FAILOVER_RES_PROCEED;
 }
 
-WDFailoverCMDResults wd_failover_lock_release(enum WDFailoverLocks lock, unsigned int wd_failover_id)
-{
-	if (pool_config->use_watchdog)
-		return wd_issue_failover_lock_command(WD_REQ_FAILOVER_RELEASE_LOCK, lock, wd_failover_id);
-	return FAILOVER_RES_SUCCESS;
-}
-
-WDFailoverCMDResults wd_failover_lock_status(enum WDFailoverLocks lock, unsigned int wd_failover_id)
-{
-	if (pool_config->use_watchdog)
-		return wd_issue_failover_lock_command(WD_REQ_FAILOVER_LOCK_STATUS, lock, wd_failover_id);
-	return FAILOVER_RES_UNLOCKED;
-}
-
-void wd_wait_until_command_complete_or_timeout(enum WDFailoverLocks lock, unsigned int wd_failover_id)
-{
-	WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
-	int	count = WD_INTERLOCK_WAIT_COUNT;
-
-	while (pool_config->use_watchdog)
-	{
-		res = wd_failover_lock_status(lock, wd_failover_id);
-		if (res == FAILOVER_RES_UNLOCKED ||
-			res == FAILOVER_RES_NO_LOCKHOLDER)
-		{
-			/* we have the permision */
-			return;
-		}
-		sleep_in_waiting();
-		if (--count < 0)
-		{
-			ereport(WARNING,
-					(errmsg("timeout wating for unlock")));
-			break;
-		}
-	}
-}
-
-/*
- * This is just a wrapper over wd_send_failover_sync_command()
- * but try to wait for WD_INTERLOCK_TIMEOUT_SEC amount of time
- * if watchdog is in transition state
- */
-
-static WDFailoverCMDResults wd_issue_failover_lock_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
-{
-	WDFailoverCMDResults res;
-	int x;
-	for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION/2; x++)
-	{
-		res = wd_send_failover_sync_command(syncReqType, lockID, wd_failover_id);
-		if (res != FAILOVER_RES_TRANSITION)
-			break;
-		sleep(2);
-	}
-	return res;
-}
-
-static void
-sleep_in_waiting(void)
-{
-	struct timeval t = {0, WD_INTERLOCK_WAIT_MSEC * 1000};
-	select(0, NULL, NULL, NULL, &t);
-}
-
-
 
 static void FreeCmdResult(WDIPCCmdResult* res)
 {
diff --git a/src/watchdog/wd_json_data.c b/src/watchdog/wd_json_data.c
index cc95e28f..36889829 100644
--- a/src/watchdog/wd_json_data.c
+++ b/src/watchdog/wd_json_data.c
@@ -690,7 +690,7 @@ WDNodeInfo* get_WDNodeInfo_from_wd_node_json(json_value* source)
 	
 }
 
-char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned int sharedKey, char* authKey)
+char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned char flags, unsigned int sharedKey, char* authKey)
 {
 	char* json_str;
 	int  i;
@@ -702,6 +702,7 @@ char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, un
 		jw_put_string(jNode, WD_IPC_AUTH_KEY, authKey); /*  put the auth key*/
 
 	jw_put_string(jNode, "Function", func_name);
+	jw_put_int(jNode, "Flags", (int)flags);
 	jw_put_int(jNode, "NodeCount", count);
 	if (count > 0)
 	{
@@ -717,13 +718,13 @@ char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, un
 	return json_str;
 }
 
-bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count)
+bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count, unsigned char *flags)
 {
 	json_value *root, *value;
 	char* ptr;
 	int node_count = 0;
 	int i;
-
+	
 	*node_id_set = NULL;
 	*func_name = NULL;
 	*count = 0;
@@ -750,6 +751,12 @@ bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name
 	}
 	*func_name = pstrdup(ptr);
 	/* If it is a node function ?*/
+	if (json_get_int_value_for_key(root, "Flags", flags))
+	{
+		/*node count not found, But we don't care much about this*/
+		*flags = 0;
+		/* it may be from the old version */
+	}
 	if (json_get_int_value_for_key(root, "NodeCount", &node_count))
 	{
 		/*node count not found, But we don't care much about this*/


More information about the pgpool-hackers mailing list