[pgpool-hackers: 2505] Re: New Feature with patch: Quorum and Consensus for backend failover

Muhammad Usama m.usama at gmail.com
Fri Aug 25 03:32:49 JST 2017


Hi Ishii-San

Please fine the updated patch, It fixes the regression issue you were
facing and also another bug which I encountered during my testing.

-- Adding Yugo to the thread,
Hi Yugo,

Since you are an expert of watchdog feature, So I thought you might have
something to say especially regarding the discussion points mentioned in
the initial mail.


Thanks
Best Regards
Muhammad Usama


On Thu, Aug 24, 2017 at 11:25 AM, Muhammad Usama <m.usama at gmail.com> wrote:

>
>
> On Thu, Aug 24, 2017 at 4:34 AM, Tatsuo Ishii <ishii at sraoss.co.jp> wrote:
>
>> After applying the patch, many of regression tests fail. It seems
>> pgpool.conf.sample has bogus comment which causes the pgpool.conf
>> parser to complain parse error.
>>
>> 2017-08-24 08:22:36: pid 6017: FATAL:  syntex error in configuration file
>> "/home/t-ishii/work/pgpool-II/current/pgpool2/src/test/regre
>> ssion/tests/004.watchdog/standby/etc/pgpool.conf"
>> 2017-08-24 08:22:36: pid 6017: DETAIL:  parse error at line 568 '*' token
>> = 8
>>
>
> Really sorry, Somehow I overlooked the sample config file changes I made
> at the last minute.
> Will send you the updated version.
>
> Thanks
> Best Regards
> Muhammad Usama
>
>>
>> Best regards,
>> --
>> Tatsuo Ishii
>> SRA OSS, Inc. Japan
>> English: http://www.sraoss.co.jp/index_en.php
>> Japanese:http://www.sraoss.co.jp
>>
>> > Usama,
>> >
>> > Thanks for the patch. I am going to review it.
>> >
>> > In the mean time when I apply your patch, I got some trailing
>> > whitespace errors. Can you please fix them?
>> >
>> > /home/t-ishii/quorum_aware_failover.diff:470: trailing whitespace.
>> >
>> > /home/t-ishii/quorum_aware_failover.diff:485: trailing whitespace.
>> >
>> > /home/t-ishii/quorum_aware_failover.diff:564: trailing whitespace.
>> >
>> > /home/t-ishii/quorum_aware_failover.diff:1428: trailing whitespace.
>> >
>> > /home/t-ishii/quorum_aware_failover.diff:1450: trailing whitespace.
>> >
>> > warning: squelched 3 whitespace errors
>> > warning: 8 lines add whitespace errors.
>> >
>> > Best regards,
>> > --
>> > Tatsuo Ishii
>> > SRA OSS, Inc. Japan
>> > English: http://www.sraoss.co.jp/index_en.php
>> > Japanese:http://www.sraoss.co.jp
>> >
>> >> Hi
>> >>
>> >> I was working on the new feature to make the backend node failover
>> quorum
>> >> aware and on the half way through the implementation I also added the
>> >> majority consensus feature for the same.
>> >>
>> >> So please find the first version of the patch for review that makes the
>> >> backend node failover consider the watchdog cluster quorum status and
>> seek
>> >> the majority consensus before performing failover.
>> >>
>> >> *Changes in the Failover mechanism with watchdog.*
>> >> For this new feature I have modified the Pgpool-II's existing failover
>> >> mechanism with watchdog.
>> >> Previously as you know when the Pgpool-II require to perform a node
>> >> operation (failover, failback, promote-node) with the watchdog. The
>> >> watchdog used to propagated the failover request to all the Pgpool-II
>> nodes
>> >> in the watchdog cluster and as soon as the request was received by the
>> >> node, it used to initiate the local failover and that failover was
>> >> synchronised on all nodes using the distributed locks.
>> >>
>> >> *Now Only the Master node performs the failover.*
>> >> The attached patch changes the mechanism of synchronised failover, and
>> now
>> >> only the Pgpool-II of master watchdog node performs the failover, and
>> all
>> >> other standby nodes sync the backend statuses after the master
>> Pgpool-II is
>> >> finished with the failover.
>> >>
>> >> *Overview of new failover mechanism.*
>> >> -- If the failover request is received to the standby watchdog
>> node(from
>> >> local Pgpool-II), That request is forwarded to the master watchdog and
>> the
>> >> Pgpool-II main process is returned with the FAILOVER_RES_WILL_BE_DONE
>> >> return code. And upon receiving the FAILOVER_RES_WILL_BE_DONE from the
>> >> watchdog for the failover request the requesting Pgpool-II moves
>> forward
>> >> without doing anything further for the particular failover command.
>> >>
>> >> -- Now when the failover request from standby node is received by the
>> >> master watchdog, after performing the validation, applying the
>> consensus
>> >> rules the failover request is triggered on the local Pgpool-II .
>> >>
>> >> -- When the failover request is received to the master watchdog node
>> from
>> >> the local Pgpool-II (On the IPC channel) the watchdog process inform
>> the
>> >> Pgpool-II requesting process to proceed with failover (provided all
>> >> failover rules are satisfied).
>> >>
>> >> -- After the failover is finished on the master Pgpool-II, the failover
>> >> function calls the *wd_failover_end*() which sends the backend sync
>> >> required message to all standby watchdogs.
>> >>
>> >> -- Upon receiving the sync required message from master watchdog node
>> all
>> >> Pgpool-II sync the new statuses of each backend node from the master
>> >> watchdog.
>> >>
>> >> *No More Failover locks*
>> >> Since with this new failover mechanism we do not require any
>> >> synchronisation and guards against the execution of failover_commands
>> by
>> >> multiple Pgpool-II nodes, So the patch removes all the distributed
>> locks
>> >> from failover function, This makes the failover simpler and faster.
>> >>
>> >> *New kind of Failover operation NODE_QUARANTINE_REQUEST*
>> >> The patch adds the new kind of backend node operation NODE_QUARANTINE
>> which
>> >> is effectively same as the NODE_DOWN, but with node_quarantine the
>> >> failover_command is not triggered.
>> >> The NODE_DOWN_REQUEST is automatically converted to the
>> >> NODE_QUARANTINE_REQUEST when the failover is requested on the backend
>> node
>> >> but watchdog cluster does not holds the quorum.
>> >> This means in the absence of quorum the failed backend nodes are
>> >> quarantined and when the quorum becomes available again the Pgpool-II
>> >> performs the failback operation on all quarantine nodes.
>> >> And again when the failback is performed on the quarantine backend
>> node the
>> >> failover function does not trigger the failback_command.
>> >>
>> >> *Controlling the Failover behaviour.*
>> >> The patch adds three new configuration parameters to configure the
>> failover
>> >> behaviour from user side.
>> >>
>> >> *failover_when_quorum_exists*
>> >> When enabled the failover command will only be executed when the
>> watchdog
>> >> cluster holds the quorum. And when the quorum is absent and
>> >> failover_when_quorum_exists is enabled the failed backend nodes will
>> get
>> >> quarantine until the quorum becomes available again.
>> >> disabling it will enable the old behaviour of failover commands.
>> >>
>> >>
>> >> *failover_require_consensus*This new configuration parameter can be
>> used to
>> >> make sure we get the majority vote before performing the failover on
>> the
>> >> node. When *failover_require_consensus* is enabled then the failover is
>> >> only performed after receiving the failover request from the majority
>> or
>> >> Pgpool-II nodes.
>> >> For example in three nodes cluster the failover will not be performed
>> until
>> >> at least two nodes ask for performing the failover on the particular
>> >> backend node.
>> >>
>> >> It is also worthwhile to mention here that *failover_require_consensus*
>> >> only works when failover_when_quorum_exists is enables.
>> >>
>> >>
>> >> *enable_multiple_failover_requests_from_node*
>> >> This parameter works in connection with *failover_require_consensus*
>> >> config. When enabled a single Pgpool-II node can vote for failover
>> multiple
>> >> times.
>> >> For example in the three nodes cluster if one Pgpool-II node sends the
>> >> failover request of particular node twice that would be counted as two
>> >> votes in favour of failover and the failover will be performed even if
>> we
>> >> do not get a vote from other two nodes.
>> >>
>> >> And when *enable_multiple_failover_requests_from_node* is disabled,
>> Only
>> >> the first vote from each Pgpool-II will be accepted and all other
>> >> subsequent votes will be marked duplicate and rejected.
>> >> So in that case we will require a majority votes from distinct nodes to
>> >> execute the failover.
>> >> Again this *enable_multiple_failover_requests_from_node* only becomes
>> >> effective when both *failover_when_quorum_exists* and
>> >> *failover_require_consensus* are enabled.
>> >>
>> >>
>> >> *Controlling the failover: The Coding perspective.*
>> >> Although the failover functions are made quorum and consensus aware but
>> >> there is still a way to bypass the quorum conditions, and requirement
>> of
>> >> consensus.
>> >>
>> >> For this the patch uses the existing request_details flags in
>> >> POOL_REQUEST_NODE to control the behaviour of failover.
>> >>
>> >> Here are the newly added flags values.
>> >>
>> >> *REQ_DETAIL_WATCHDOG*:
>> >> Setting this flag while issuing the failover command will not send the
>> >> failover request to the watchdog. But this flag may not be useful in
>> any
>> >> other place than where it is already used.
>> >> Mostly this flag can be used to avoid the failover command from going
>> to
>> >> watchdog that is already originated from watchdog. Otherwise we can
>> end up
>> >> in infinite loop.
>> >>
>> >> *REQ_DETAIL_CONFIRMED*:
>> >> Setting this flag will bypass the *failover_require_consensus*
>> >> configuration and immediately perform the failover if quorum is
>> present.
>> >> This flag can be used to issue the failover request originated from PCP
>> >> command.
>> >>
>> >> *REQ_DETAIL_UPDATE*:
>> >> This flag is used for the command where we are failing back the
>> quarantine
>> >> nodes. Setting this flag will not trigger the failback_command.
>> >>
>> >> *Some conditional flags used:*
>> >> I was not sure about the configuration of each type of failover
>> operation.
>> >> As we have three main failover operations NODE_UP_REQUEST,
>> >> NODE_DOWN_REQUEST, and PROMOTE_NODE_REQUEST
>> >> So I was thinking do we need to give the configuration option to the
>> users,
>> >> if they want to enable/disable quorum checking and consensus for
>> individual
>> >> failover operation type.
>> >> For example: is it a practical configuration where a user would want to
>> >> ensure quorum while preforming NODE_DOWN operation while does not want
>> it
>> >> for NODE_UP.
>> >> So in this patch I use three compile time defines to enable disable the
>> >> individual failover operation, while we can decide on the best
>> solution.
>> >>
>> >> NODE_UP_REQUIRE_CONSENSUS: defining it will enable quorum checking
>> feature
>> >> for NODE_UP_REQUESTs
>> >>
>> >> NODE_DOWN_REQUIRE_CONSENSUS: defining it will enable quorum checking
>> >> feature for NODE_DOWN_REQUESTs
>> >>
>> >> NODE_PROMOTE_REQUIRE_CONSENSUS: defining it will enable quorum
>> checking
>> >> feature for PROMOTE_NODE_REQUESTs
>> >>
>> >> *Some Point for Discussion:*
>> >>
>> >> *Do we really need to check ReqInfo->switching flag before enqueuing
>> >> failover request.*
>> >> While working on the patch I was wondering why do we disallow
>> enqueuing the
>> >> failover command when the failover is already in progress? For example
>> in
>> >> *pcp_process_command*() function if we see the *Req_info->switching*
>> flag
>> >> set we bailout with the error instead of enqueuing the command. Is is
>> >> really necessary?
>> >>
>> >> *Do we need more granule control over each failover operation:*
>> >> As described in section "Some conditional flags used" I want the
>> opinion on
>> >> do we need configuration parameters in pgpool.conf to enable disable
>> quorum
>> >> and consensus checking on individual failover types.
>> >>
>> >> *Which failover should be mark as Confirmed:*
>> >> As defined in the above section of REQ_DETAIL_CONFIRMED, We can mark
>> the
>> >> failover request to not need consensus, currently the requests from
>> the PCP
>> >> commands are fired with this flag. But I was wondering there may be
>> more
>> >> places where we many need to use the flag.
>> >> For example I currently use the same confirmed flag when failover is
>> >> triggered because of *replication_stop_on_mismatch*.
>> >>
>> >> I think we should think this flag for each place of failover, like
>> when the
>> >> failover is triggered
>> >> because of health_check failure.
>> >> because of replication mismatch
>> >> because of backend_error
>> >> e.t.c
>> >>
>> >> *Node Quarantine behaviour.*
>> >> What do you think about the node quarantine used by this patch. Can you
>> >> think of some problem which can be caused by this?
>> >>
>> >> *What should be the default values for each newly added config
>> parameters.*
>> >>
>> >>
>> >>
>> >> *TODOs*
>> >>
>> >> -- Updating the documentation is still todo. Will do that once every
>> aspect
>> >> of the feature will be finalised.
>> >> -- Some code warnings and cleanups are still not done.
>> >> -- I am still little short on testing
>> >> -- Regression test cases for the feature
>> >>
>> >>
>> >> Thoughts and suggestions are most welcome.
>> >>
>> >> Thanks
>> >> Best regards
>> >> Muhammad Usama
>> > _______________________________________________
>> > pgpool-hackers mailing list
>> > pgpool-hackers at pgpool.net
>> > http://www.pgpool.net/mailman/listinfo/pgpool-hackers
>>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://www.sraoss.jp/pipermail/pgpool-hackers/attachments/20170824/6032cdd2/attachment-0001.html>
-------------- next part --------------
diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c
index f44ef412..29617c41 100644
--- a/src/config/pool_config_variables.c
+++ b/src/config/pool_config_variables.c
@@ -231,7 +231,33 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL,								/* check func */
 		NULL								/* show hook */
 	},
-
+	{
+		{"failover_when_quorum_exists", CFGCXT_INIT, FAILOVER_CONFIG,
+			"Do failover only when cluster has the quorum.",
+			CONFIG_VAR_TYPE_BOOL,false, 0
+		},
+		&g_pool_config.failover_when_quorum_exists,
+		false,
+		NULL, NULL,NULL
+	},
+	{
+		{"failover_require_consensus", CFGCXT_INIT, FAILOVER_CONFIG,
+			"Only do failover when majority aggrees.",
+			CONFIG_VAR_TYPE_BOOL,false, 0
+		},
+		&g_pool_config.failover_require_consensus,
+		false,
+		NULL, NULL,NULL
+	},
+	{
+		{"enable_multiple_failover_requests_from_node", CFGCXT_INIT, FAILOVER_CONFIG,
+			"A Pgpool-II node can send multiple failover requests to build consensus.",
+			CONFIG_VAR_TYPE_BOOL,false, 0
+		},
+		&g_pool_config.enable_multiple_failover_requests_from_node,
+		false,
+		NULL, NULL,NULL
+	},
 	{
 		{"log_connections", CFGCXT_RELOAD, LOGING_CONFIG,
 			"Logs each successful connection.",
diff --git a/src/include/pcp/libpcp_ext.h b/src/include/pcp/libpcp_ext.h
index 705ebf15..654dd7e0 100644
--- a/src/include/pcp/libpcp_ext.h
+++ b/src/include/pcp/libpcp_ext.h
@@ -74,6 +74,7 @@ typedef struct {
 	double unnormalized_weight; /* descripted parameter */
 	char backend_data_directory[MAX_PATH_LENGTH];
 	unsigned short flag;		/* various flags */
+	bool quarantine;			/* true if node is CON_DOWN because of quarantine */
 	uint64 standby_delay;		/* The replication delay against the primary */
 } BackendInfo;
 
diff --git a/src/include/pool.h b/src/include/pool.h
index b2f10ca6..d179f7d0 100644
--- a/src/include/pool.h
+++ b/src/include/pool.h
@@ -404,15 +404,18 @@ typedef enum {
 	NODE_DOWN_REQUEST,
 	NODE_RECOVERY_REQUEST,
 	CLOSE_IDLE_REQUEST,
-	PROMOTE_NODE_REQUEST
+	PROMOTE_NODE_REQUEST,
+	NODE_QUARANTINE_REQUEST
 } POOL_REQUEST_KIND;
 
 #define REQ_DETAIL_SWITCHOVER	0x00000001		/* failover due to switch over */
+#define REQ_DETAIL_WATCHDOG		0x00000002		/* failover req from watchdog */
+#define REQ_DETAIL_CONFIRMED	0x00000004		/* failover req that does not require majority vote */
+#define REQ_DETAIL_UPDATE		0x00000008		/* failover req is just and update node status request */
 
 typedef struct {
 	POOL_REQUEST_KIND	kind;		/* request kind */
 	unsigned char request_details;	/* option flags kind */
-	unsigned int wd_failover_id;	/* watchdog ID for this failover operation */
 	int node_id[MAX_NUM_BACKENDS];	/* request node id */
 	int count;						/* request node ids count */
 }POOL_REQUEST_NODE;
@@ -520,8 +523,10 @@ extern char remote_port[];	/* client port */
 /*
  * public functions
  */
+extern void register_watchdog_quorum_change_interupt(void);
 extern void register_watchdog_state_change_interupt(void);
-extern bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, bool switch_over, unsigned int wd_failover_id);
+extern void register_backend_state_sync_req_interupt(void);
+extern bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, unsigned char flags);
 extern char *get_config_file_name(void);
 extern char *get_hba_file_name(void);
 extern void do_child(int *fds);
@@ -551,11 +556,11 @@ extern POOL_STATUS ErrorResponse(POOL_CONNECTION *frontend,
 extern void NoticeResponse(POOL_CONNECTION *frontend,
 								  POOL_CONNECTION_POOL *backend);
 
-extern void notice_backend_error(int node_id, bool switch_over);
-extern bool degenerate_backend_set(int *node_id_set, int count, bool switch_over, unsigned int wd_failover_id);
-extern bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool test_only, bool switch_over, unsigned int wd_failover_id);
-extern bool promote_backend(int node_id, unsigned int wd_failover_id);
-extern bool send_failback_request(int node_id, bool throw_error, unsigned int wd_failover_id);
+extern void notice_backend_error(int node_id, unsigned char flags);
+extern bool degenerate_backend_set(int *node_id_set, int count, unsigned char flags);
+extern bool degenerate_backend_set_ex(int *node_id_set, int count, unsigned char flags, bool error, bool test_only);
+extern bool promote_backend(int node_id, unsigned char flags);
+extern bool send_failback_request(int node_id, bool throw_error, unsigned char flags);
 
 
 extern void pool_set_timeout(int timeoutval);
diff --git a/src/include/pool_config.h b/src/include/pool_config.h
index aeb63082..dbbe2043 100644
--- a/src/include/pool_config.h
+++ b/src/include/pool_config.h
@@ -300,6 +300,11 @@ typedef struct {
 	 * add for watchdog
 	 */
 	bool use_watchdog;						/* Enables watchdog */
+	bool failover_when_quorum_exists;		/* Failover only when cluster has the quorum */
+	bool failover_require_consensus;		/* Only do failover when majority aggrees */
+	bool enable_multiple_failover_requests_from_node; /* A Pgpool-II node can send multiple
+													   * failover requests to build consensus
+													   */
 	WdLifeCheckMethod wd_lifecheck_method;	/* method of lifecheck. 'heartbeat' or 'query' */
 	bool clear_memqcache_on_escalation;		/* Clear query cache on shmem when escalating ?*/
 	char *wd_escalation_command;			/* Executes this command at escalation on new active pgpool.*/
diff --git a/src/include/watchdog/wd_ipc_commands.h b/src/include/watchdog/wd_ipc_commands.h
index 723f9e96..dbcaa4a9 100644
--- a/src/include/watchdog/wd_ipc_commands.h
+++ b/src/include/watchdog/wd_ipc_commands.h
@@ -70,13 +70,14 @@ extern bool get_watchdog_node_escalation_state(void);
 
 extern WdCommandResult wd_start_recovery(void);
 extern WdCommandResult wd_end_recovery(void);
-extern WDFailoverCMDResults wd_send_failback_request(int node_id, unsigned int *wd_failover_id);
-extern WDFailoverCMDResults wd_degenerate_backend_set(int *node_id_set, int count, unsigned int *wd_failover_id);
-extern WDFailoverCMDResults wd_promote_backend(int node_id, unsigned int *wd_failover_id);
+extern WDFailoverCMDResults wd_send_failback_request(int node_id, unsigned char flags);
+extern WDFailoverCMDResults wd_degenerate_backend_set(int *node_id_set, int count, unsigned char flags);
+extern WDFailoverCMDResults wd_promote_backend(int node_id, unsigned char flags);
 
 extern WDPGBackendStatus* get_pg_backend_status_from_master_wd_node(void);
 extern WDGenericData *get_wd_runtime_variable_value(char *varName);
 extern WD_STATES get_watchdog_local_node_state(void);
+extern int get_watchdog_quorum_state(void);
 
 extern char* wd_get_watchdog_nodes(int nodeID);
 
@@ -84,11 +85,8 @@ extern WDIPCCmdResult* issue_command_to_watchdog(char type, int timeout_sec, cha
 
 
 /* functions for failover commands interlocking */
-extern WDFailoverCMDResults wd_end_failover_interlocking(unsigned int wd_failover_id);
-extern WDFailoverCMDResults wd_start_failover_interlocking(unsigned int wd_failover_id);
-extern WDFailoverCMDResults wd_failover_lock_release(enum WDFailoverLocks lock, unsigned int wd_failover_id);
-extern WDFailoverCMDResults wd_failover_lock_status(enum WDFailoverLocks lock, unsigned int wd_failover_id);
-extern void wd_wait_until_command_complete_or_timeout(enum WDFailoverLocks lock, unsigned int wd_failover_id);
+extern WDFailoverCMDResults wd_failover_end(void);
+extern WDFailoverCMDResults wd_failover_start(void);
 
 
 
diff --git a/src/include/watchdog/wd_ipc_defines.h b/src/include/watchdog/wd_ipc_defines.h
index 0dc648b5..846344f2 100644
--- a/src/include/watchdog/wd_ipc_defines.h
+++ b/src/include/watchdog/wd_ipc_defines.h
@@ -53,11 +53,13 @@ typedef enum WDFailoverCMDResults
 										  * standby node is advanced in the procedure
 										  */
 	FAILOVER_RES_PROCEED,
+	FAILOVER_RES_NO_QUORUM,
 	FAILOVER_RES_WILL_BE_DONE,
 	FAILOVER_RES_NOT_ALLOWED,
 	FAILOVER_RES_INVALID_FUNCTION,
 	FAILOVER_RES_ALREADY_ISSUED,
 	FAILOVER_RES_MASTER_REJECTED,
+	FAILOVER_RES_BUILDING_CONSENSUS,
 	FAILOVER_RES_TIMEOUT
 }WDFailoverCMDResults;
 
@@ -84,6 +86,7 @@ typedef enum WDValueDataType
 #define WD_FAILOVER_LOCKING_REQUEST			's'
 #define WD_GET_MASTER_DATA_REQUEST			'd'
 #define WD_GET_RUNTIME_VARIABLE_VALUE		'v'
+#define WD_FAILOVER_INDICATION				'i'
 
 #define WD_FUNCTION_START_RECOVERY		"START_RECOVERY"
 #define WD_FUNCTION_END_RECOVERY		"END_RECOVERY"
diff --git a/src/include/watchdog/wd_json_data.h b/src/include/watchdog/wd_json_data.h
index 1d6a04aa..648aad35 100644
--- a/src/include/watchdog/wd_json_data.h
+++ b/src/include/watchdog/wd_json_data.h
@@ -70,8 +70,8 @@ extern bool parse_beacon_message_json(char* json_data, int data_len, int* state,
 								bool* escalated);
 extern char* get_beacon_message_json(WatchdogNode* wdNode);
 
-extern char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned int sharedKey, char* authKey);
-extern bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count);
+extern char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned char flags, unsigned int sharedKey, char* authKey);
+extern bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count, unsigned char *flags);
 extern char* get_wd_simple_message_json(char* message);
 
 extern WDPGBackendStatus* get_pg_backend_node_status_from_json(char* json_data, int data_len);
diff --git a/src/main/health_check.c b/src/main/health_check.c
index 14468f24..b7556593 100644
--- a/src/main/health_check.c
+++ b/src/main/health_check.c
@@ -186,7 +186,7 @@ void do_health_check_child(int *node_id)
 
 					/* trigger failover */
 					partial = health_check_timer_expired?false:true;
-					degenerate_backend_set(node_id, 1, partial, 0);
+					degenerate_backend_set(node_id, 1, partial?REQ_DETAIL_SWITCHOVER:0);
 				}
 			}
 
diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c
index 8b45c9e2..d57f4a8f 100644
--- a/src/main/pgpool_main.c
+++ b/src/main/pgpool_main.c
@@ -69,6 +69,8 @@ typedef enum
 {
 	SIG_FAILOVER_INTERRUPT,		/* signal main to start failover */
 	SIG_WATCHDOG_STATE_CHANGED,	/* notify main about local watchdog node state changed */
+	SIG_BACKEND_SYNC_REQUIRED,	/* notify main about local backend state sync required */
+	SIG_WATCHDOG_QUORUM_CHANGED,/* notify main about cluster quorum change of watchdog cluster */
 	MAX_INTERUPTS				/* Must be last! */
 } User1SignalReason;
 
@@ -142,6 +144,7 @@ static void terminate_all_childrens();
 static void system_will_go_down(int code, Datum arg);
 static char* process_name_from_pid(pid_t pid);
 static void sync_backend_from_watchdog(void);
+static void update_backend_quarantine_status(void);
 
 static struct sockaddr_un un_addr;		/* unix domain socket path */
 static struct sockaddr_un pcp_un_addr;  /* unix domain socket path for PCP */
@@ -459,12 +462,11 @@ int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps)
  * This function enqueues the failover/failback requests, and fires the failover() if the function
  * is not already executing
  */
-bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, bool switch_over , unsigned int wd_failover_id)
+bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, int count, unsigned char flags)
 {
 	bool failover_in_progress;
 	pool_sigset_t oldmask;
 	int index;
-	unsigned char request_details = 0;
 
 	/*
 	 * if the queue is already full
@@ -485,12 +487,8 @@ bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, i
 	Req_info->request_queue_tail++;
 	index = Req_info->request_queue_tail % MAX_REQUEST_QUEUE_SIZE;
 	Req_info->request[index].kind = kind;
-	Req_info->request[index].wd_failover_id = wd_failover_id;
 
-	/* Set switch over flag if requested */
-	if (switch_over)
-		request_details |= REQ_DETAIL_SWITCHOVER;
-	Req_info->request[index].request_details = request_details;
+	Req_info->request[index].request_details = flags;
 
 	if(count > 0)
 		memcpy(Req_info->request[index].node_id, node_id_set, (sizeof(int) * count));
@@ -501,16 +499,29 @@ bool register_node_operation_request(POOL_REQUEST_KIND kind, int* node_id_set, i
 	POOL_SETMASK(&oldmask);
 	if(failover_in_progress == false)
 	{
-		signal_user1_to_parent_with_reason(SIG_FAILOVER_INTERRUPT);
+		if(processType == PT_MAIN)
+			failover();
+		else
+			signal_user1_to_parent_with_reason(SIG_FAILOVER_INTERRUPT);
 	}
 
 	return true;
 }
 
+void register_watchdog_quorum_change_interupt(void)
+{
+	signal_user1_to_parent_with_reason(SIG_WATCHDOG_QUORUM_CHANGED);
+}
+
 void register_watchdog_state_change_interupt(void)
 {
 	signal_user1_to_parent_with_reason(SIG_WATCHDOG_STATE_CHANGED);
 }
+void register_backend_state_sync_req_interupt(void)
+{
+	signal_user1_to_parent_with_reason(SIG_BACKEND_SYNC_REQUIRED);
+}
+
 static void signal_user1_to_parent_with_reason(User1SignalReason reason)
 {
 	user1SignalSlot->signalFlags[reason] = true;
@@ -956,7 +967,7 @@ static void terminate_all_childrens()
  * Reuest failover. If "switch_over" is false, request all existing sessions
  * restarting.
  */
-void notice_backend_error(int node_id, bool switch_over)
+void notice_backend_error(int node_id, unsigned char flags)
 {
 	int n = node_id;
 
@@ -967,7 +978,7 @@ void notice_backend_error(int node_id, bool switch_over)
 	}
 	else
 	{
-		degenerate_backend_set(&n, 1, switch_over, 0);
+		degenerate_backend_set(&n, 1, flags);
 	}
 }
 
@@ -990,8 +1001,7 @@ void notice_backend_error(int node_id, bool switch_over)
  *
  * wd_failover_id: The watchdog internal ID for this failover
  */
-bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool test_only,
-							   bool switch_over, unsigned int wd_failover_id)
+bool degenerate_backend_set_ex(int *node_id_set, int count, unsigned char flags, bool error, bool test_only)
 {
 	int i;
 	int node_id[MAX_NUM_BACKENDS];
@@ -1046,12 +1056,12 @@ bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool tes
 		if(test_only)
 			return true;
 
-		if (pool_config->use_watchdog && wd_failover_id == 0)
+		if (!(flags & REQ_DETAIL_WATCHDOG))
 		{
 			int x;
 			for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
 			{
-				res = wd_degenerate_backend_set(node_id_set, count, &wd_failover_id);
+				res = wd_degenerate_backend_set(node_id_set, count, flags);
 				if (res != FAILOVER_RES_TRANSITION)
 					break;
 				sleep(1);
@@ -1069,10 +1079,26 @@ bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool tes
 		}
 		if (res == FAILOVER_RES_PROCEED)
 		{
-			register_node_operation_request(NODE_DOWN_REQUEST, node_id, node_count, switch_over, wd_failover_id);
+			register_node_operation_request(NODE_DOWN_REQUEST, node_id, node_count, flags);
+		}
+		else if (res == FAILOVER_RES_NO_QUORUM)
+		{
+			ereport(LOG,
+					(errmsg("degenerate backend request for %d node(s) from pid [%d], is changed to quarantine node request by watchdog"
+							, node_count, getpid()),
+					 errdetail("watchdog does not holds the quorum")));
+
+			register_node_operation_request(NODE_QUARANTINE_REQUEST, node_id, node_count, flags);
+		}
+		else if (res == FAILOVER_RES_BUILDING_CONSENSUS)
+		{
+			ereport(LOG,
+					(errmsg("degenerate backend request for node_id: %d from pid [%d], will be handled by watchdog, which is building consensus for request"
+							,*node_id, getpid())));
 		}
 		else if (res == FAILOVER_RES_WILL_BE_DONE)
 		{
+			/* we will receive a sync request from master watchdog node */
 			ereport(LOG,
 					(errmsg("degenerate backend request for %d node(s) from pid [%d], will be handled by watchdog"
 							, node_count, getpid())));
@@ -1092,13 +1118,13 @@ bool degenerate_backend_set_ex(int *node_id_set, int count, bool error, bool tes
  * wrapper over degenerate_backend_set_ex function to register
  * NODE down operation request
  */
-bool degenerate_backend_set(int *node_id_set, int count, bool switch_over, unsigned int wd_failover_id)
+bool degenerate_backend_set(int *node_id_set, int count, unsigned char flags)
 {
-	return degenerate_backend_set_ex(node_id_set, count, false, false, switch_over, wd_failover_id);
+	return degenerate_backend_set_ex(node_id_set, count, flags, false, false);
 }
 
 /* send promote node request using SIGUSR1 */
-bool promote_backend(int node_id, unsigned int wd_failover_id)
+bool promote_backend(int node_id, unsigned char flags)
 {
 	WDFailoverCMDResults res = FAILOVER_RES_PROCEED;
 	bool ret = false;
@@ -1125,12 +1151,12 @@ bool promote_backend(int node_id, unsigned int wd_failover_id)
 					node_id, getpid())));
 
 	/* If this was only a test. Inform the caller without doing anything */
-	if (pool_config->use_watchdog && wd_failover_id == 0)
+	if (!(flags & REQ_DETAIL_WATCHDOG))
 	{
 		int x;
 		for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
 		{
-			res = wd_promote_backend(node_id, &wd_failover_id);
+			res = wd_promote_backend(node_id, flags);
 			if (res != FAILOVER_RES_TRANSITION)
 				break;
 			sleep(1);
@@ -1149,7 +1175,7 @@ bool promote_backend(int node_id, unsigned int wd_failover_id)
 
 	if (res == FAILOVER_RES_PROCEED)
 	{
-		ret = register_node_operation_request(PROMOTE_NODE_REQUEST, &node_id, 1, false, wd_failover_id);
+		ret = register_node_operation_request(PROMOTE_NODE_REQUEST, &node_id, 1, flags);
 	}
 	else if (res == FAILOVER_RES_WILL_BE_DONE)
 	{
@@ -1157,6 +1183,18 @@ bool promote_backend(int node_id, unsigned int wd_failover_id)
 				(errmsg("promote backend request for node_id: %d from pid [%d], will be handled by watchdog"
 						, node_id, getpid())));
 	}
+	else if (res == FAILOVER_RES_NO_QUORUM)
+	{
+		ereport(LOG,
+				(errmsg("promote backend request for node_id: %d from pid [%d], is canceled because watchdog does not hold quorum"
+						, node_id, getpid())));
+	}
+	else if (res == FAILOVER_RES_BUILDING_CONSENSUS)
+	{
+		ereport(LOG,
+				(errmsg("promote backend request for node_id: %d from pid [%d], will be handled by watchdog, which is building consensus for request"
+						, node_id, getpid())));
+	}
 	else
 	{
 		ereport(LOG,
@@ -1167,7 +1205,7 @@ bool promote_backend(int node_id, unsigned int wd_failover_id)
 }
 
 /* send failback request using SIGUSR1 */
-bool send_failback_request(int node_id,bool throw_error, unsigned int wd_failover_id)
+bool send_failback_request(int node_id,bool throw_error, unsigned char flags)
 {
 	WDFailoverCMDResults res = FAILOVER_RES_PROCEED;
 	bool ret = false;
@@ -1187,16 +1225,16 @@ bool send_failback_request(int node_id,bool throw_error, unsigned int wd_failove
 	}
 
 	ereport(LOG,
-			(errmsg("received failback request for node_id: %d from pid [%d] wd_failover_id [%d]",
-					node_id, getpid(),wd_failover_id)));
+			(errmsg("received failback request for node_id: %d from pid [%d]",
+					node_id, getpid())));
 
 	/* If this was only a test. Inform the caller without doing anything */
-	if (pool_config->use_watchdog && wd_failover_id == 0)
+	if (!(flags & REQ_DETAIL_WATCHDOG))
 	{
 		int x;
 		for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++)
 		{
-			res = wd_send_failback_request(node_id, &wd_failover_id);
+			res = wd_send_failback_request(node_id, flags);
 			if (res != FAILOVER_RES_TRANSITION)
 				break;
 			sleep(1);
@@ -1215,7 +1253,7 @@ bool send_failback_request(int node_id,bool throw_error, unsigned int wd_failove
 
 	if (res == FAILOVER_RES_PROCEED)
 	{
-		ret = register_node_operation_request(NODE_UP_REQUEST, &node_id, 1, false, wd_failover_id);
+		ret = register_node_operation_request(NODE_UP_REQUEST, &node_id, 1, flags);
 	}
 	else if (res == FAILOVER_RES_WILL_BE_DONE)
 	{
@@ -1401,10 +1439,40 @@ static void sigusr1_interupt_processor(void)
 	ereport(DEBUG1,
 			(errmsg("Pgpool-II parent process received SIGUSR1")));
 
+	if (user1SignalSlot->signalFlags[SIG_WATCHDOG_QUORUM_CHANGED])
+	{
+		ereport(LOG,
+				(errmsg("Pgpool-II parent process received watchdog quorum change signal from watchdog")));
+
+		user1SignalSlot->signalFlags[SIG_WATCHDOG_QUORUM_CHANGED] = false;
+		if (get_watchdog_quorum_state() >= 0)
+		{
+			ereport(LOG,
+					(errmsg("watchdog cluster now holds the quorum"),
+					 errdetail("updating the state of quarantine backend nodes")));
+			update_backend_quarantine_status();
+		}
+	}
+
+	if (user1SignalSlot->signalFlags[SIG_BACKEND_SYNC_REQUIRED])
+	{
+		ereport(LOG,
+				(errmsg("Pgpool-II parent process received sync backend signal from watchdog")));
+
+		user1SignalSlot->signalFlags[SIG_BACKEND_SYNC_REQUIRED] = false;
+		if (get_watchdog_local_node_state() == WD_STANDBY)
+		{
+			ereport(LOG,
+					(errmsg("master watchdog has performed failover"),
+					 errdetail("syncing the backend states from the MASTER watchdog node")));
+			sync_backend_from_watchdog();
+		}
+	}
+
 	if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED])
 	{
 		ereport(DEBUG1,
-				(errmsg("Pgpool-II parent process received SIGUSR1 from watchdog")));
+				(errmsg("Pgpool-II parent process received watchdog state change signal from watchdog")));
 
 		user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false;
 		if (get_watchdog_local_node_state() == WD_STANDBY)
@@ -1468,6 +1536,7 @@ static void failover(void)
 	int sts;
 	bool need_to_restart_pcp = false;
 	bool all_backend_down = true;
+	bool sync_required = false;
 
 	ereport(DEBUG1,
 		(errmsg("failover handler called")));
@@ -1517,8 +1586,6 @@ static void failover(void)
 		int node_id_set[MAX_NUM_BACKENDS];
 		int node_count;
 		unsigned char request_details;
-		unsigned int wd_failover_id;
-		WDFailoverCMDResults wdInterlockingRes;
 
 		pool_semaphore_lock(REQUEST_INFO_SEM);
 
@@ -1537,7 +1604,6 @@ static void failover(void)
 		reqkind = Req_info->request[queue_index].kind;
 		request_details = Req_info->request[queue_index].request_details;
 		node_count = Req_info->request[queue_index].count;
-		wd_failover_id = Req_info->request[queue_index].wd_failover_id;
 		pool_semaphore_unlock(REQUEST_INFO_SEM);
 
 		ereport(DEBUG1,
@@ -1550,8 +1616,8 @@ static void failover(void)
 			continue;
 		}
 
-		/* start watchdog interlocking */
-		wdInterlockingRes = wd_start_failover_interlocking(wd_failover_id);
+		/* inform all remote watchdog nodes that we are starting the failover */
+		wd_failover_start();
 
 		/*
 		 * if not in replication mode/master slave mode, we treat this a restart request.
@@ -1577,9 +1643,6 @@ static void failover(void)
 					ereport(LOG,
 							(errmsg("invalid failback request, status: [%d] of node id : %d is invalid for failback",BACKEND_INFO(node_id).backend_status,node_id)));
 
-				if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
-					wd_end_failover_interlocking(wd_failover_id);
-
 				continue;
 			}
 
@@ -1592,24 +1655,18 @@ static void failover(void)
 			all_backend_down = check_all_backend_down();
 
 			BACKEND_INFO(node_id).backend_status = CON_CONNECT_WAIT;	/* unset down status */
-			(void)write_status_file();
-
-			/* Aquire failback start command lock */
-			if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
+			if (!(request_details & REQ_DETAIL_UPDATE))
 			{
+				/* The request is a proper failbak request
+				 * and not because of the update status of quarantined node
+				 */
+				(void)write_status_file();
+
 				trigger_failover_command(node_id, pool_config->failback_command,
 											MASTER_NODE_ID, get_next_master_node(), PRIMARY_NODE_ID);
-				wd_failover_lock_release(FAILBACK_LOCK, wd_failover_id);
-			}
-			else
-			{
-				/*
-				 * Okay we are not allowed to execute the failover command
-				 * so we need to wait till the one who is executing the command
-				 * finish with it.
-				 */
-				wd_wait_until_command_complete_or_timeout(FAILBACK_LOCK,wd_failover_id);
 			}
+
+			sync_required = true;
 		}
 		else if (reqkind == PROMOTE_NODE_REQUEST)
 		{
@@ -1624,12 +1681,10 @@ static void failover(void)
 			{
 				ereport(LOG,
 						(errmsg("failover: no backends are promoted")));
-				if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
-					wd_end_failover_interlocking(wd_failover_id);
 				continue;
 			}
 		}
-		else	/* NODE_DOWN_REQUEST */
+		else	/* NODE_DOWN_REQUEST && NODE_QUARANTINE_REQUEST*/
 		{
 			int cnt = 0;
 
@@ -1640,12 +1695,17 @@ static void failover(void)
 					 VALID_BACKEND(node_id_set[i])))
 				{
 					ereport(LOG,
-							(errmsg("starting degeneration. shutdown host %s(%d)",
+							(errmsg("starting %s. shutdown host %s(%d)",
+							(reqkind == NODE_QUARANTINE_REQUEST)?"quarantine":"degeneration",
 							 BACKEND_INFO(node_id_set[i]).backend_hostname,
 							 BACKEND_INFO(node_id_set[i]).backend_port)));
 
 					BACKEND_INFO(node_id_set[i]).backend_status = CON_DOWN;	/* set down status */
-					(void)write_status_file();
+
+					if (reqkind == NODE_QUARANTINE_REQUEST)
+						BACKEND_INFO(node_id_set[i]).quarantine = true;
+					else
+						(void)write_status_file();
 
 					/* save down node */
 					nodes[node_id_set[i]] = 1;
@@ -1657,10 +1717,6 @@ static void failover(void)
 			{
 				ereport(LOG,
 						(errmsg("failover: no backends are degenerated")));
-
-				if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
-					wd_end_failover_interlocking(wd_failover_id);
-
 				continue;
 			}
 		}
@@ -1710,7 +1766,7 @@ static void failover(void)
 		 * NODE_DOWN_REQUEST and it's actually a switch over request, we don't
 		 * need to restart all children, except the node is primary.
 		 */
-		else if (STREAM && reqkind == NODE_DOWN_REQUEST &&
+		else if (STREAM && (reqkind == NODE_DOWN_REQUEST || reqkind == NODE_QUARANTINE_REQUEST) &&
 				 request_details & REQ_DETAIL_SWITCHOVER && node_id != PRIMARY_NODE_ID)
 		{
 			ereport(LOG,
@@ -1776,36 +1832,36 @@ static void failover(void)
 			need_to_restart_children = true;
 			partial_restart = false;
 		}
-		if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
+		/* Exec failover_command if needed
+		 * We do not execute failover when request is quarantine type
+		 */
+		if (reqkind == NODE_DOWN_REQUEST)
 		{
-			/* Exec failover_command if needed */
 			for (i = 0; i < pool_config->backend_desc->num_backends; i++)
 			{
 				if (nodes[i])
+				{
 					trigger_failover_command(i, pool_config->failover_command,
 												MASTER_NODE_ID, new_master, PRIMARY_NODE_ID);
+					sync_required = true;
+				}
 			}
-			wd_failover_lock_release(FAILOVER_LOCK, wd_failover_id);
-		}
-		else
-		{
-			wd_wait_until_command_complete_or_timeout(FAILOVER_LOCK, wd_failover_id);
 		}
 
-	/* no need to wait since it will be done in reap_handler */
-#ifdef NOT_USED
-		while (wait(NULL) > 0)
-			;
-
-		if (errno != ECHILD)
-			ereport(LOG,
-				(errmsg("failover_handler: wait() failed. reason:%s", strerror(errno))));
-
-#endif
-
 		if (reqkind == PROMOTE_NODE_REQUEST && VALID_BACKEND(node_id))
+		{
 			new_primary = node_id;
-
+		}
+		else if (reqkind == NODE_QUARANTINE_REQUEST)
+		{
+			/* if the quarantine node was the primary node
+			 * set the newprimary to -1 (invalid)
+			 */
+			if (Req_info->primary_node_id == node_id)
+				new_primary = -1;
+			else
+				new_primary =  find_primary_node_repeatedly();
+		}
 		/*
 		 * If the down node was a standby node in streaming replication
 		 * mode, we can avoid calling find_primary_node_repeatedly() and
@@ -1821,8 +1877,9 @@ static void failover(void)
 				new_primary =  find_primary_node_repeatedly();
 		}
 		else
+		{
 			new_primary =  find_primary_node_repeatedly();
-
+		}
 		/*
 		 * If follow_master_command is provided and in master/slave
 		 * streaming replication mode, we start degenerating all backends
@@ -1874,22 +1931,10 @@ static void failover(void)
 			}
 		}
 
-		/*
-		 * follow master command also uses the same locks used by trigring command
-		 */
-		if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
+		if ((follow_cnt > 0) && (*pool_config->follow_master_command != '\0'))
 		{
-			if ((follow_cnt > 0) && (*pool_config->follow_master_command != '\0'))
-			{
-				follow_pid = fork_follow_child(Req_info->master_node_id, new_primary,
-											Req_info->primary_node_id);
-			}
-			wd_failover_lock_release(FOLLOW_MASTER_LOCK, wd_failover_id);
-		}
-		else
-		{
-			wd_wait_until_command_complete_or_timeout(FOLLOW_MASTER_LOCK, wd_failover_id);
-
+			follow_pid = fork_follow_child(Req_info->master_node_id, new_primary,
+										Req_info->primary_node_id);
 		}
 
 		/* Save primary node id */
@@ -1900,6 +1945,7 @@ static void failover(void)
 		if (new_master >= 0)
 		{
 			Req_info->master_node_id = new_master;
+			sync_required = true;
 			ereport(LOG,
 					(errmsg("failover: set new master node: %d", Req_info->master_node_id)));
 		}
@@ -1977,8 +2023,8 @@ static void failover(void)
 		 */
 		kill(worker_pid, SIGUSR1);
 
-		if (wdInterlockingRes == FAILOVER_RES_I_AM_LOCK_HOLDER)
-			wd_end_failover_interlocking(wd_failover_id);
+		if ( sync_required)
+			wd_failover_end();
 
 		if (reqkind == NODE_UP_REQUEST)
 		{
@@ -1998,12 +2044,14 @@ static void failover(void)
 		else
 		{
 			/* Temporary black magic. Without this regression 055 does not finish */
-			fprintf(stderr, "failover done. shutdown host %s(%d)",
+			fprintf(stderr, "%s done. shutdown host %s(%d)",
+					(reqkind == NODE_DOWN_REQUEST)?"failover":"quarantine",
 					 BACKEND_INFO(node_id).backend_hostname,
 					BACKEND_INFO(node_id).backend_port);
 
 			ereport(LOG,
-					(errmsg("failover done. shutdown host %s(%d)",
+					(errmsg("%s done. shutdown host %s(%d)",
+					(reqkind == NODE_DOWN_REQUEST)?"failover":"quarantine",
 					 BACKEND_INFO(node_id).backend_hostname,
 					 BACKEND_INFO(node_id).backend_port)));
 		}
@@ -3396,6 +3444,29 @@ int pool_frontend_exists(void)
 	return -1;
 }
 
+static void update_backend_quarantine_status(void)
+{
+	/* Reset the quarantine flag from each backend and
+	 * set it to con_wait
+	 */
+	int i;
+	WD_STATES wd_state = get_watchdog_local_node_state();
+
+	for (i=0;i<NUM_BACKENDS;i++)
+	{
+		if (BACKEND_INFO(i).quarantine && BACKEND_INFO(i).backend_status == CON_DOWN)
+		{
+			BACKEND_INFO(i).quarantine = false;
+			/* send the failback request for the node
+			 * we also set the watchdog flag because we we eventually send the sync
+			 * message to all standby nodes
+			 */
+			if (wd_state == WD_COORDINATOR)
+				send_failback_request(i,false, REQ_DETAIL_UPDATE | REQ_DETAIL_WATCHDOG);
+		}
+	}
+}
+
 /*
  * The function fetch the current status of all configured backend
  * nodes from the MASTER/COORDINATOR watchdog Pgpool-II and synchronize the
diff --git a/src/pcp_con/pcp_worker.c b/src/pcp_con/pcp_worker.c
index 54219a72..658b461e 100644
--- a/src/pcp_con/pcp_worker.c
+++ b/src/pcp_con/pcp_worker.c
@@ -502,14 +502,14 @@ static int pool_detach_node(int node_id, bool gracefully)
 {
 	if (!gracefully)
 	{
-		degenerate_backend_set_ex(&node_id, 1, true, false, true, 0);
+		degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, false);
 		return 0;
 	}
 
 	/* Check if the NODE DOWN can be executed on
 	 * the given node id.
 	 */
-	degenerate_backend_set_ex(&node_id, 1, true, true, true, 0);
+	degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, true, true);
 
 	/*
 	 * Wait until all frontends exit
@@ -529,7 +529,7 @@ static int pool_detach_node(int node_id, bool gracefully)
 	/*
 	 * Now all frontends have gone. Let's do failover.
 	 */
-	degenerate_backend_set_ex(&node_id, 1, true, false, true, 0);
+	degenerate_backend_set_ex(&node_id, 1, REQ_DETAIL_SWITCHOVER | REQ_DETAIL_CONFIRMED, false, true);
 
 	/*
 	 * Wait for failover completed.
@@ -556,7 +556,7 @@ static int pool_promote_node(int node_id, bool gracefully)
 {
 	if (!gracefully)
 	{
-		promote_backend(node_id, false);	/* send promote request */
+		promote_backend(node_id, REQ_DETAIL_CONFIRMED);	/* send promote request */
 		return 0;
 	}
 
@@ -576,7 +576,7 @@ static int pool_promote_node(int node_id, bool gracefully)
 	/*
 	 * Now all frontends have gone. Let's do failover.
 	 */
-	promote_backend(node_id, false);		/* send promote request */
+	promote_backend(node_id, REQ_DETAIL_CONFIRMED);		/* send promote request */
 
 	/*
 	 * Wait for failover completed.
@@ -910,7 +910,7 @@ process_attach_node(PCP_CONNECTION *frontend,char *buf)
 			(errmsg("PCP: processing attach node"),
 			 errdetail("attaching Node ID %d", node_id)));
 
-	send_failback_request(node_id,true, false);
+	send_failback_request(node_id,true, REQ_DETAIL_CONFIRMED);
 
 	pcp_write(frontend, "c", 1);
 	wsize = htonl(sizeof(code) + sizeof(int));
diff --git a/src/pcp_con/recovery.c b/src/pcp_con/recovery.c
index f2f57f56..ead5eeaf 100644
--- a/src/pcp_con/recovery.c
+++ b/src/pcp_con/recovery.c
@@ -147,7 +147,7 @@ void start_recovery(int recovery_node)
 		pcp_worker_wakeup_request = 0;
 
 		/* send failback request to pgpool parent */
-		send_failback_request(recovery_node,false, false);
+		send_failback_request(recovery_node,false, REQ_DETAIL_CONFIRMED);
 
 		/* wait for failback */
 		failback_wait_count = 0;
diff --git a/src/protocol/pool_connection_pool.c b/src/protocol/pool_connection_pool.c
index 2bbcae24..a84134cd 100644
--- a/src/protocol/pool_connection_pool.c
+++ b/src/protocol/pool_connection_pool.c
@@ -854,7 +854,7 @@ static POOL_CONNECTION_POOL *new_connection(POOL_CONNECTION_POOL *p)
 			 */
 			if (pool_config->fail_over_on_backend_error)
 			{
-				notice_backend_error(i, true);
+				notice_backend_error(i, REQ_DETAIL_SWITCHOVER);
 				ereport(FATAL,
 					(errmsg("failed to create a backend connection"),
 						 errdetail("executing failover on backend")));
diff --git a/src/protocol/pool_process_query.c b/src/protocol/pool_process_query.c
index 9631a5fb..473c1988 100644
--- a/src/protocol/pool_process_query.c
+++ b/src/protocol/pool_process_query.c
@@ -3595,7 +3595,7 @@ void read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *bac
 
 		if (pool_config->replication_stop_on_mismatch)
 		{
-			degenerate_backend_set(degenerate_node, degenerate_node_num, false, 0);
+			degenerate_backend_set(degenerate_node, degenerate_node_num, REQ_DETAIL_CONFIRMED);
             retcode = 1;
 		}
         ereport(FATAL,
@@ -4673,7 +4673,7 @@ pool_config->client_idle_limit)));
 						}
 						else
 						{
-							notice_backend_error(i, true);
+							notice_backend_error(i, REQ_DETAIL_SWITCHOVER);
 							sleep(5);
 						}
 						break;
diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c
index 19166832..a4bfe13e 100644
--- a/src/protocol/pool_proto_modules.c
+++ b/src/protocol/pool_proto_modules.c
@@ -504,7 +504,7 @@ POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
 
 			ereport(DEBUG1,
 					(errmsg("Query: sending SIGUSR1 signal to parent")));
-			register_node_operation_request(CLOSE_IDLE_REQUEST, NULL, 0, false, 0);
+			register_node_operation_request(CLOSE_IDLE_REQUEST, NULL, 0, 0);
 
 			/* we need to loop over here since we will get USR1 signal while sleeping */
 			while (stime > 0)
@@ -1741,7 +1741,7 @@ POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
 
 				free_string(msg);
 
-				degenerate_backend_set(victim_nodes, number_of_nodes, true, 0);
+				degenerate_backend_set(victim_nodes, number_of_nodes, REQ_DETAIL_CONFIRMED|REQ_DETAIL_SWITCHOVER);
 				child_exit(POOL_EXIT_AND_RESTART);
 			}
 			else
diff --git a/src/sample/pgpool.conf.sample b/src/sample/pgpool.conf.sample
index 5c5c360d..c48d5878 100644
--- a/src/sample/pgpool.conf.sample
+++ b/src/sample/pgpool.conf.sample
@@ -551,6 +551,20 @@ wd_de_escalation_command = ''
 									# Executes this command when master pgpool resigns from being master.
 									# (change requires restart)
 
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+									# Only perform backend node failover
+									# when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+									# Perform failover when majority of Pgpool-II nodes
+									# aggrees on the backend node status change
+
+enable_multiple_failover_requests_from_node = false
+									# A Pgpool-II node can cast multiple votes
+									# for building the consensus on failover
+
 # - Lifecheck Setting -
 
 # -- common --
diff --git a/src/sample/pgpool.conf.sample-logical b/src/sample/pgpool.conf.sample-logical
index 8e8fa391..8be64bca 100644
--- a/src/sample/pgpool.conf.sample-logical
+++ b/src/sample/pgpool.conf.sample-logical
@@ -549,6 +549,19 @@ wd_escalation_command = ''
 wd_de_escalation_command = ''
 									# Executes this command when master pgpool resigns from being master.
 									# (change requires restart)
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+									# Only perform backend node failover
+									# when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+									# Perform failover when majority of Pgpool-II nodes
+									# aggrees on the backend node status change
+
+enable_multiple_failover_requests_from_node = false
+									# A Pgpool-II node can cast multiple votes
+									# for building the consensus on failover
 
 # - Lifecheck Setting -
 
diff --git a/src/sample/pgpool.conf.sample-master-slave b/src/sample/pgpool.conf.sample-master-slave
index 24757d2d..360cba1f 100644
--- a/src/sample/pgpool.conf.sample-master-slave
+++ b/src/sample/pgpool.conf.sample-master-slave
@@ -549,6 +549,21 @@ wd_de_escalation_command = ''
 									# Executes this command when master pgpool resigns from being master.
 									# (change requires restart)
 
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+									# Only perform backend node failover
+									# when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+									# Perform failover when majority of Pgpool-II nodes
+									# aggrees on the backend node status change
+
+enable_multiple_failover_requests_from_node = false
+									# A Pgpool-II node can cast multiple votes
+									# for building the consensus on failover
+
+
 # - Lifecheck Setting -
 
 # -- common --
diff --git a/src/sample/pgpool.conf.sample-replication b/src/sample/pgpool.conf.sample-replication
index 3318753a..5b37626c 100644
--- a/src/sample/pgpool.conf.sample-replication
+++ b/src/sample/pgpool.conf.sample-replication
@@ -549,6 +549,23 @@ wd_de_escalation_command = ''
 									# Executes this command when master pgpool resigns from being master.
 									# (change requires restart)
 
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+									# Only perform backend node failover
+									# when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+									# Perform failover when majority of Pgpool-II nodes
+									# aggrees on the backend node status change
+
+
+enable_multiple_failover_requests_from_node = false
+									# A Pgpool-II node can cast multiple votes
+									# for building the consensus on failover
+
+
+
 # - Lifecheck Setting -
 
 # -- common --
diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream
index a4effb68..f5832c72 100644
--- a/src/sample/pgpool.conf.sample-stream
+++ b/src/sample/pgpool.conf.sample-stream
@@ -550,6 +550,21 @@ wd_de_escalation_command = ''
 									# Executes this command when master pgpool resigns from being master.
 									# (change requires restart)
 
+# - Watchdog consensus settings for failover -
+
+failover_when_quorum_exists = true
+									# Only perform backend node failover
+									# when the watchdog cluster holds the quorum
+
+failover_require_consensus = true
+									# Perform failover when majority of Pgpool-II nodes
+									# aggrees on the backend node status change
+
+enable_multiple_failover_requests_from_node = false
+									# A Pgpool-II node can cast multiple votes
+									# for building the consensus on failover
+
+
 # - Lifecheck Setting -
 
 # -- common --
diff --git a/src/utils/pool_ssl.c b/src/utils/pool_ssl.c
index 32f755f1..1f6e9574 100644
--- a/src/utils/pool_ssl.c
+++ b/src/utils/pool_ssl.c
@@ -350,7 +350,7 @@ void pool_ssl_close(POOL_CONNECTION *cp) { return; }
 int pool_ssl_read(POOL_CONNECTION *cp, void *buf, int size) {
 	ereport(WARNING,
 			(errmsg("pool_ssl: SSL i/o called but SSL support is not available")));
-	notice_backend_error(cp->db_node_id, true);
+	notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
 	child_exit(POOL_EXIT_AND_RESTART);
 	return -1; /* never reached */
 }
@@ -358,7 +358,7 @@ int pool_ssl_read(POOL_CONNECTION *cp, void *buf, int size) {
 int pool_ssl_write(POOL_CONNECTION *cp, const void *buf, int size) {
 	ereport(WARNING,
 			(errmsg("pool_ssl: SSL i/o called but SSL support is not available")));
-	notice_backend_error(cp->db_node_id, true);
+	notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
 	child_exit(POOL_EXIT_AND_RESTART);
 	return -1; /* never reached */
 }
diff --git a/src/utils/pool_stream.c b/src/utils/pool_stream.c
index f0f31d31..f586d9bf 100644
--- a/src/utils/pool_stream.c
+++ b/src/utils/pool_stream.c
@@ -218,7 +218,7 @@ int pool_read(POOL_CONNECTION *cp, void *buf, int len)
 				/* if fail_over_on_backend_error is true, then trigger failover */
 				if (pool_config->fail_over_on_backend_error)
 				{
-					notice_backend_error(cp->db_node_id, true);
+					notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
 
                     /* If we are in the main process, we will not exit */
 					child_exit(POOL_EXIT_AND_RESTART);
@@ -365,7 +365,7 @@ char *pool_read2(POOL_CONNECTION *cp, int len)
 				/* if fail_over_on_backend_error is true, then trigger failover */
 				if (pool_config->fail_over_on_backend_error)
 				{
-					notice_backend_error(cp->db_node_id, true);
+					notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
 					child_exit(POOL_EXIT_AND_RESTART);
                     /* we are in main process */
                     ereport(ERROR,
@@ -590,7 +590,7 @@ int pool_flush(POOL_CONNECTION *cp)
 			/* if fail_over_on_backend_error is true, then trigger failover */
 			if (pool_config->fail_over_on_backend_error)
 			{
-				notice_backend_error(cp->db_node_id, true);
+				notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
 				ereport(LOG,
 					(errmsg("unable to flush data to backend"),
 						 errdetail("do not failover because I am the main process")));
@@ -645,7 +645,7 @@ int pool_flush_noerror(POOL_CONNECTION *cp)
             /* if fail_over_on_backend_erro is true, then trigger failover */
             if (pool_config->fail_over_on_backend_error)
             {
-                notice_backend_error(cp->db_node_id, true);
+                notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
                 child_exit(POOL_EXIT_AND_RESTART);
 				ereport(LOG,
 					(errmsg("unable to flush data to backend"),
@@ -813,7 +813,7 @@ char *pool_read_string(POOL_CONNECTION *cp, int *len, int line)
 							 errdetail("pg_terminate_backend was called on the backend")));
 				}
 
-				notice_backend_error(cp->db_node_id, true);
+				notice_backend_error(cp->db_node_id, REQ_DETAIL_SWITCHOVER);
 				child_exit(POOL_EXIT_AND_RESTART);
                 ereport(ERROR,
                         (errmsg("unable to read data from frontend"),
diff --git a/src/watchdog/watchdog.c b/src/watchdog/watchdog.c
index 093aa636..0e3ddc39 100644
--- a/src/watchdog/watchdog.c
+++ b/src/watchdog/watchdog.c
@@ -58,6 +58,14 @@
 #include "watchdog/wd_ipc_commands.h"
 #include "parser/stringinfo.h"
 
+/* These defines enables the consensus building feature
+ * in watchdog for node failover operations
+ * We can also take these to the configure script
+ */
+#define NODE_UP_REQUIRE_CONSENSUS
+#define NODE_DOWN_REQUIRE_CONSENSUS
+#define NODE_PROMOTE_REQUIRE_CONSENSUS
+
 typedef enum IPC_CMD_PREOCESS_RES
 {
 	IPC_CMD_COMPLETE,
@@ -108,6 +116,9 @@ typedef enum IPC_CMD_PREOCESS_RES
 #define WD_CMD_REPLY_IN_DATA				'-'
 #define WD_CLUSTER_SERVICE_MESSAGE			'#'
 
+#define WD_FAILOVER_START					'F'
+#define WD_FAILOVER_END						'H'
+
 /*Cluster Service Message Types */
 #define CLUSTER_QUORUM_LOST					'L'
 #define CLUSTER_QUORUM_FOUND				'F'
@@ -150,6 +161,7 @@ packet_types all_packet_types[] = {
 	{WD_GET_RUNTIME_VARIABLE_VALUE, "GET WD RUNTIME VARIABLE VALUE"},
 	{WD_CMD_REPLY_IN_DATA, "COMMAND REPLY IN DATA"},
 	{WD_FAILOVER_LOCKING_REQUEST,"FAILOVER LOCKING REQUEST"},
+	{WD_FAILOVER_INDICATION,"FAILOVER INDICATION"},
 	{WD_CLUSTER_SERVICE_MESSAGE, "CLUSTER SERVICE MESSAGE"},
 	{WD_REGISTER_FOR_NOTIFICATION, "REGISTER FOR NOTIFICATION"},
 	{WD_NODE_STATUS_CHANGE_COMMAND, "NODE STATUS CHANGE"},
@@ -323,6 +335,7 @@ typedef struct wd_cluster
 	int				network_monitor_sock;
 	bool			clusterInitialized;
 	bool			ipc_auth_needed;
+	int				current_failover_id;
 	List			*unidentified_socks;
 	List			*notify_clients;
 	List			*ipc_command_socks;
@@ -340,7 +353,8 @@ typedef struct WDFailoverObject
 	int nodesCount;
 	unsigned int failoverID;
 	int *nodeList;
-	WatchdogNode* wdRequestingNode;
+	List* requestingNodes;
+	int request_count;
 	struct timeval	startTime;
 	int state;
 }WDFailoverObject;
@@ -354,6 +368,9 @@ static bool remove_failover_object_by_id(unsigned int failoverID);
 static void remove_failovers_from_node(WatchdogNode* wdNode);
 static void remove_failover_object(WDFailoverObject* failoverObj);
 static void service_expired_failovers(void);
+static int add_failover(POOL_REQUEST_KIND reqKind, int *node_id_list, int node_count, WatchdogNode *wdNode);
+static WDFailoverCMDResults compute_failover_consensus(POOL_REQUEST_KIND reqKind,int *node_id_list, int node_count,
+													   unsigned char flags, WatchdogNode *wdNode);
 
 static int send_command_packet_to_remote_nodes(WDCommandData* ipcCommand, bool source_included);
 static void wd_command_is_complete(WDCommandData* ipcCommand);
@@ -477,10 +494,9 @@ static IPC_CMD_PREOCESS_RES process_IPC_nodeStatusChange_command(WDCommandData*
 static IPC_CMD_PREOCESS_RES process_IPC_nodeList_command(WDCommandData* ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_get_runtime_variable_value_request(WDCommandData* ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_online_recovery(WDCommandData* ipcCommand);
-static IPC_CMD_PREOCESS_RES process_IPC_failover_locking_cmd(WDCommandData *ipcCommand);
+static IPC_CMD_PREOCESS_RES process_IPC_failover_indication(WDCommandData *ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDCommandData *ipcCommand);
 static IPC_CMD_PREOCESS_RES process_IPC_failover_command(WDCommandData* ipcCommand);
-static IPC_CMD_PREOCESS_RES process_IPC_failover_command_on_coordinator(WDCommandData* ipcCommand);
 static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandData* ipcCommand);
 
 static bool write_ipc_command_with_result_data(WDCommandData* ipcCommand, char type, char* data, int len);
@@ -492,6 +508,9 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt,
 static void process_pgpool_remote_failover_command(WatchdogNode* wdNode, WDPacketData* pkt);
 static void process_remote_online_recovery_command(WatchdogNode* wdNode, WDPacketData* pkt);
 
+static WDFailoverCMDResults failover_end_indication(WDCommandData* ipcCommand);
+static WDFailoverCMDResults failover_start_indication(WDCommandData* ipcCommand);
+
 
 static IPC_CMD_PREOCESS_RES process_failover_locking_requests_on_cordinator(WDCommandData* ipcCommand);
 static WDFailoverCMDResults node_is_asking_for_failover_end(WatchdogNode* wdNode, WDPacketData* pkt, unsigned int failoverID);
@@ -1823,8 +1842,8 @@ static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData* ipcCommand)
 			return process_IPC_online_recovery(ipcCommand);
 			break;
 
-		case WD_FAILOVER_LOCKING_REQUEST:
-			return process_IPC_failover_locking_cmd(ipcCommand);
+		case WD_FAILOVER_INDICATION:
+			return process_IPC_failover_indication(ipcCommand);
 
 		case WD_GET_MASTER_DATA_REQUEST:
 			return process_IPC_data_request_from_master(ipcCommand);
@@ -1879,7 +1898,7 @@ static IPC_CMD_PREOCESS_RES process_IPC_get_runtime_variable_value_request(WDCom
 	else if (strcasecmp(WD_RUNTIME_VAR_QUORUM_STATE, requestVarName) == 0)
 	{
 		jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA_TYPE, VALUE_DATA_TYPE_INT);
-		jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA, g_cluster.quorum_status);
+		jw_put_int(jNode, WD_JSON_KEY_VALUE_DATA, WD_MASTER_NODE?WD_MASTER_NODE->quorum_status:-2);
 	}
 	else if (strcasecmp(WD_RUNTIME_VAR_ESCALATION_STATE, requestVarName) == 0)
 	{
@@ -2043,8 +2062,9 @@ static WDFailoverObject* get_failover_object_by_id(unsigned int failoverID)
 static void remove_failover_object(WDFailoverObject* failoverObj)
 {
 	ereport(DEBUG1,
-			(errmsg("removing failover object from \"%s\" with ID:%d", failoverObj->wdRequestingNode->nodeName,failoverObj->failoverID)));
+			(errmsg("removing failover request from %d nodes with ID:%d", failoverObj->request_count,failoverObj->failoverID)));
 	g_cluster.wdCurrentFailovers = list_delete_ptr(g_cluster.wdCurrentFailovers,failoverObj);
+	list_free(failoverObj->requestingNodes);
 	pfree(failoverObj->nodeList);
 	pfree(failoverObj);
 }
@@ -2063,28 +2083,29 @@ static bool remove_failover_object_by_id(unsigned int failoverID)
 /* if the wdNode is NULL. The function removes all failover objects */
 static void remove_failovers_from_node(WatchdogNode* wdNode)
 {
-	ListCell *lc;
-	List *failovers_to_del = NULL;
-
-	foreach(lc, g_cluster.wdCurrentFailovers)
-	{
-		WDFailoverObject* failoverObj = lfirst(lc);
-		if (failoverObj)
-		{
-			if (wdNode == NULL || failoverObj->wdRequestingNode == wdNode)
-			{
-				failovers_to_del = lappend(failovers_to_del,failoverObj);
-			}
-		}
-	}
-
-	/* delete the failover objects */
-
-	foreach(lc, failovers_to_del)
-	{
-		WDFailoverObject* failoverObj = lfirst(lc);
-		remove_failover_object(failoverObj);
-	}
+	return;
+//	ListCell *lc;
+//	List *failovers_to_del = NULL;
+
+//	foreach(lc, g_cluster.wdCurrentFailovers)
+//	{
+//		WDFailoverObject* failoverObj = lfirst(lc);
+//		if (failoverObj)
+//		{
+//			if (wdNode == NULL || failoverObj->wdRequestingNode == wdNode)
+//			{
+//				failovers_to_del = lappend(failovers_to_del,failoverObj);
+//			}
+//		}
+//	}
+//
+//	/* delete the failover objects */
+//
+//	foreach(lc, failovers_to_del)
+//	{
+//		WDFailoverObject* failoverObj = lfirst(lc);
+//		remove_failover_object(failoverObj);
+//	}
 }
 
 /* Remove the over stayed failover objects */
@@ -2108,7 +2129,7 @@ static void service_expired_failovers(void)
 			{
 				failovers_to_del = lappend(failovers_to_del,failoverObj);
 				ereport(DEBUG1,
-					(errmsg("failover object from \"%s\" with ID:%d is timeout", failoverObj->wdRequestingNode->nodeName,failoverObj->failoverID),
+					(errmsg("failover request from %d nodes with ID:%d is timeout", failoverObj->request_count,failoverObj->failoverID),
 						 errdetail("adding the failover object for removal")));
 
 			}
@@ -2204,18 +2225,6 @@ static void process_remote_failover_command_on_coordinator(WatchdogNode* wdNode,
 	}
 }
 
-static IPC_CMD_PREOCESS_RES process_IPC_failover_command_on_coordinator(WDCommandData* ipcCommand)
-{
-	if (get_local_node_state() != WD_COORDINATOR)
-		return IPC_CMD_ERROR; /* should never hapen*/
-
-	ereport(LOG,
-			(errmsg("watchdog received the failover command from local pgpool-II on IPC interface")));
-
-	return process_failover_command_on_coordinator(ipcCommand);
-}
-
-
 static bool reply_to_failove_command(WDCommandData* ipcCommand, WDFailoverCMDResults cmdResult, unsigned int failoverID)
 {
 	bool ret = false;
@@ -2246,7 +2255,139 @@ static bool reply_to_failove_command(WDCommandData* ipcCommand, WDFailoverCMDRes
 }
 
 /*
- * The Function forwards the failover command to all standby nodes.
+ * This function process the failover command and decides
+ * about the execution of failover command.
+ */
+
+static WDFailoverCMDResults compute_failover_consensus(POOL_REQUEST_KIND reqKind,int *node_id_list, int node_count, unsigned char flags, WatchdogNode *wdNode)
+{
+#ifndef NODE_UP_REQUIRE_CONSENSUS
+	if (reqKind == NODE_UP_REQUEST)
+		return FAILOVER_RES_PROCEED;
+#endif
+#ifndef NODE_DOWN_REQUIRE_CONSENSUS
+	if (reqKind == NODE_DOWN_REQUEST)
+		return FAILOVER_RES_PROCEED;
+#endif
+#ifndef NODE_PROMOTE_REQUIRE_CONSENSUS
+	if (reqKind == PROMOTE_NODE_REQUEST)
+		return FAILOVER_RES_PROCEED;
+#endif
+
+	if (pool_config->failover_when_quorum_exists == false)
+	{
+		/* No need for any calculation, We do not need a quorum for failover */
+		ereport(LOG,(
+				errmsg("we do not need quorum to hold to proceed with failover"),
+					 errdetail("proceeding with the failover"),
+					 errhint("failover_when_quorum_exists is set to false")));
+
+		return FAILOVER_RES_PROCEED;
+	}
+	if (flags & REQ_DETAIL_CONFIRMED)
+	{
+		/* Check the request flags, If it asks to bypass quorum  */
+		ereport(LOG,(
+			errmsg("The failover request does not need quorum to hold"),
+					 errdetail("proceeding with the failover"),
+					 errhint("REQ_DETAIL_CONFIRMED")));
+		return FAILOVER_RES_PROCEED;
+	}
+	update_quorum_status();
+	if (g_cluster.quorum_status < 0)
+	{
+		/* quorum is must and it is not present at the moment */
+		ereport(LOG,(
+				errmsg("failover requires the quorum to hold, which is not present at the moment"),
+					 errdetail("Rejecting the failover request")));
+		return FAILOVER_RES_NO_QUORUM;
+	}
+
+	/* So we reached here means quorum is present
+	 * Now come to dificult part, enusre the consensus
+	 *
+	 * Record the failover.
+	 */
+	if (pool_config->failover_require_consensus == true)
+	{
+		int cnt = add_failover(reqKind, node_id_list, node_count, wdNode);
+		if (cnt <= get_mimimum_nodes_required_for_quorum())
+		{
+			ereport(LOG,(
+					errmsg("failover requires the majority vote, waiting for consensus"),
+						 errdetail("failover request noted")));
+			return FAILOVER_RES_BUILDING_CONSENSUS;
+		}
+	}
+	ereport(LOG,(
+			errmsg("we do not require majority votes to proceed with failover"),
+				errdetail("proceeding with the failover"),
+				 errhint("failover_require_consensus is set to false")));
+
+	return FAILOVER_RES_PROCEED;
+}
+
+static int add_failover(POOL_REQUEST_KIND reqKind, int *node_id_list, int node_count, WatchdogNode *wdNode)
+{
+	MemoryContext oldCxt;
+	/* Find the failover */
+	WDFailoverObject *failoverObj = get_failover_object(reqKind, node_count, node_id_list);
+	if (failoverObj)
+	{
+		ListCell *lc;
+		/* search the node if it is a duplicate request */
+		foreach(lc, failoverObj->requestingNodes)
+		{
+			WatchdogNode* reqWdNode = lfirst(lc);
+			if (wdNode == reqWdNode)
+			{
+				/* The failover request is duplicate */
+				if (pool_config->enable_multiple_failover_requests_from_node)
+				{
+					failoverObj->request_count++;
+					ereport(LOG,(
+							errmsg("Duplicate failover request from \"%s\" node",wdNode->nodeName),
+								 errdetail("Pgpool-II can send multiple failover requests for same node"),
+								 errhint("enable_multiple_failover_requests_from_node is enabled")));
+				}
+				else
+				{
+					ereport(LOG,(
+							errmsg("Duplicate failover request from \"%s\" node",wdNode->nodeName),
+								 errdetail("request ignored")));
+				}
+				return failoverObj->request_count;
+			}
+		}
+	}
+	else
+	{
+		oldCxt = MemoryContextSwitchTo(TopMemoryContext);
+		failoverObj = palloc0(sizeof(WDFailoverObject));
+		failoverObj->reqKind = reqKind;
+		failoverObj->requestingNodes = NULL;
+		failoverObj->nodesCount = node_count;
+		failoverObj->request_count = 0;
+		if (node_count > 0)
+		{
+			failoverObj->nodeList = palloc(sizeof(int) * node_count);
+			memcpy(failoverObj->nodeList, node_id_list, sizeof(int) * node_count);
+		}
+		failoverObj->failoverID = get_next_commandID();
+		gettimeofday(&failoverObj->startTime, NULL);
+		g_cluster.wdCurrentFailovers = lappend(g_cluster.wdCurrentFailovers,failoverObj);
+		MemoryContextSwitchTo(oldCxt);
+	}
+
+	failoverObj->request_count++;
+	oldCxt = MemoryContextSwitchTo(TopMemoryContext);
+	failoverObj->requestingNodes = lappend(failoverObj->requestingNodes,wdNode);
+	MemoryContextSwitchTo(oldCxt);
+	return failoverObj->request_count;
+}
+
+/*
+ * The function processes all failover commands on master node
  */
 static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandData* ipcCommand)
 {
@@ -2254,22 +2395,15 @@ static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandDat
 	int node_count = 0;
 	int *node_id_list = NULL;
 	bool ret = false;
-	WDFailoverObject* failoverObj;
+	unsigned char flags;
 	POOL_REQUEST_KIND reqKind;
+	WDFailoverCMDResults res;
 
 	if (get_local_node_state() != WD_COORDINATOR)
 		return IPC_CMD_ERROR; /* should never happen*/
 
-	/*
-	 * The coordinator node
-	 * Forward this command to all standby nodes.
-	 * Ask the caller to proceed with failover
-	 * but first check if this failover is already requested
-	 * by some other node.
-	 */
-
 	ret = parse_wd_node_function_json(ipcCommand->sourcePacket.data, ipcCommand->sourcePacket.len,
-									  &func_name, &node_id_list, &node_count);
+									  &func_name, &node_id_list, &node_count, &flags);
 	if (ret == false)
 	{
 		ereport(LOG,(
@@ -2297,129 +2431,64 @@ static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandDat
 					ipcCommand->commandSource == COMMAND_SOURCE_IPC?
 					"local pgpool-II on IPC interface":ipcCommand->sourceWdNode->nodeName)));
 
-	if (get_cluster_node_count() == 0)
+	res = compute_failover_consensus(reqKind, node_id_list, node_count, flags, ipcCommand->sourceWdNode);
+	if (res == FAILOVER_RES_PROCEED)
 	{
-		/*
-		 * Since I am the only node in the cluster so nothing
-		 * we need to do here
+		/* if the command is originated from remote node.
+		 * request the local node to start failover
 		 */
-		ereport(LOG,(
-			errmsg("I am the only pgpool-II node in the watchdog cluster"),
-				errdetail("no need to propagate the failover command [%s]",func_name)));
-		reply_to_failove_command(ipcCommand, FAILOVER_RES_PROCEED, 0);
-		return IPC_CMD_COMPLETE;
-	}
+		if (ipcCommand->commandSource == COMMAND_SOURCE_REMOTE)
+		{
+			/* Set the flag that failover is request is originated from watchdog */
+			flags |= REQ_DETAIL_WATCHDOG;
 
-	if (ipcCommand->commandSource == COMMAND_SOURCE_REMOTE  && Req_info->switching)
-	{
-		/*
-		 * check if the failover is allowed before doing anything
-		 */
-		ereport(LOG,
-			(errmsg("failover command [%s] request from pgpool-II node \"%s\" is rejected because of switching",
-					func_name,
-					ipcCommand->sourceWdNode->nodeName)));
-		reply_to_failove_command(ipcCommand, FAILOVER_RES_NOT_ALLOWED, 0);
+			if (reqKind == NODE_DOWN_REQUEST)
+				ret = degenerate_backend_set(node_id_list, node_count, flags);
+			else if (reqKind == NODE_UP_REQUEST)
+				ret = send_failback_request(node_id_list[0],false, REQ_DETAIL_WATCHDOG);
+			else if (reqKind == PROMOTE_NODE_REQUEST)
+				ret = promote_backend(node_id_list[0], flags);
+
+			if (ret == true)
+				reply_to_failove_command(ipcCommand, FAILOVER_RES_WILL_BE_DONE, 0);
+			else
+				reply_to_failove_command(ipcCommand, FAILOVER_RES_ERROR, 0);
+		}
+		else
+		{
+			/* Just reply the caller to get on with the failover */
+			reply_to_failove_command(ipcCommand, FAILOVER_RES_PROCEED, 0);
+		}
 		return IPC_CMD_COMPLETE;
 	}
-
-	/*
-	 * check if the same failover is already issued to the main
-	 * process
-	 */
-	failoverObj = get_failover_object(reqKind, node_count, node_id_list);
-	if (failoverObj)
+	else if (res == FAILOVER_RES_NO_QUORUM)
 	{
 		ereport(LOG,
-			(errmsg("failover command [%s] from %s is ignored",
+				(errmsg("failover command [%s] request from pgpool-II node \"%s\" is rejected because we do not have the quorum",
 						func_name,
-						ipcCommand->commandSource == COMMAND_SOURCE_IPC?
-						"local pgpool-II on IPC interface":ipcCommand->sourceWdNode->nodeName),
-			 errdetail("similar failover with ID:%d is already in progress",failoverObj->failoverID)));
-
-		/* Same failover is already in progress */
-		reply_to_failove_command(ipcCommand, FAILOVER_RES_ALREADY_ISSUED, 0);
+						ipcCommand->sourceWdNode->nodeName)));
+		reply_to_failove_command(ipcCommand, FAILOVER_RES_NO_QUORUM, 0);
 		return IPC_CMD_COMPLETE;
 	}
-	else
+	else if (res == FAILOVER_RES_BUILDING_CONSENSUS)
 	{
-		/* No similar failover is in progress */
-		MemoryContext oldCxt;
-		ereport(DEBUG1,
-				(errmsg("proceeding with the failover command [%s] request from %s",
-						func_name,
-						ipcCommand->commandSource == COMMAND_SOURCE_IPC?
-						"local pgpool-II":ipcCommand->sourceWdNode->nodeName),
-				 errdetail("no similar failover is in progress")));
-		/*
-		 * okay now ask all nodes to start failover
-		 */
-		wd_packet_shallow_copy(&ipcCommand->sourcePacket, &ipcCommand->commandPacket);
-		ipcCommand->commandPacket.type = WD_REMOTE_FAILOVER_REQUEST;
-		ipcCommand->sendToNode = NULL; /* command needs to be sent to all nodes */
-		set_next_commandID_in_message(&ipcCommand->commandPacket);
-
-		if (ipcCommand->commandSource != COMMAND_SOURCE_IPC)
-		{
-			if (process_wd_command_function(ipcCommand->sourceWdNode, &ipcCommand->sourcePacket,
-										func_name, node_count, node_id_list, ipcCommand->commandPacket.command_id) == false)
-			{
-				return IPC_CMD_COMPLETE;
-			}
-		}
-
-		/* send to all alive nodes */
 		ereport(LOG,
-			(errmsg("forwarding the failover request [%s] to all alive nodes",func_name),
-				 errdetail("watchdog cluster currently has %d connected remote nodes",get_cluster_node_count())));
-		send_command_packet_to_remote_nodes(ipcCommand, false);
-
-		/* create a failover object. to make sure we know the node failovers
-		 * is already in progress
-		 */
-		oldCxt = MemoryContextSwitchTo(TopMemoryContext);
-		failoverObj = palloc0(sizeof(WDFailoverObject));
-		failoverObj->reqKind = reqKind;
-		failoverObj->nodesCount = node_count;
-		if (node_count > 0)
-		{
-			failoverObj->nodeList = palloc(sizeof(int) * node_count);
-			memcpy(failoverObj->nodeList, node_id_list, sizeof(int) * node_count);
-		}
-		failoverObj->failoverID = ipcCommand->commandPacket.command_id; /* use command id as failover id */
-		gettimeofday(&failoverObj->startTime, NULL);
-		failoverObj->wdRequestingNode = ipcCommand->sourceWdNode;
-		g_cluster.wdCurrentFailovers = lappend(g_cluster.wdCurrentFailovers,failoverObj);
-
-		MemoryContextSwitchTo(oldCxt);
-
-		/* For a moment just think it is successfully sent to all nodes.*/
-		if (ipcCommand->commandSource == COMMAND_SOURCE_IPC)
-		{
-			reply_to_failove_command(ipcCommand, FAILOVER_RES_PROCEED, failoverObj->failoverID);
-			return IPC_CMD_COMPLETE;
-		}
-		else
-		{
-			if (get_cluster_node_count() <= 1)
-			{
-				/* Since its just 2 nodes cluster, and the only other
-				 * node is the one that actually issued the failover
-				 * so the command actually completes here
-				 */
-				return IPC_CMD_COMPLETE;
-			}
-		}
+				(errmsg("failover command [%s] request from pgpool-II node \"%s\" is queued, waiting for more nodes confirmation",
+						func_name,
+						ipcCommand->sourceWdNode->nodeName)));
+		reply_to_failove_command(ipcCommand, FAILOVER_RES_BUILDING_CONSENSUS, 0);
+		return IPC_CMD_COMPLETE;
 	}
-
-	return IPC_CMD_PROCESSING;
+	return IPC_CMD_COMPLETE;
 }
 
 static IPC_CMD_PREOCESS_RES process_IPC_failover_command(WDCommandData* ipcCommand)
 {
 	if (get_local_node_state() == WD_COORDINATOR)
 	{
-		return process_IPC_failover_command_on_coordinator(ipcCommand);
+		ereport(LOG,
+				(errmsg("watchdog received the failover command from local pgpool-II on IPC interface")));
+		return process_failover_command_on_coordinator(ipcCommand);
 	}
 	else if (get_local_node_state() == WD_STANDBY)
 	{
@@ -2548,54 +2617,66 @@ static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDCommandData *
 	return IPC_CMD_TRY_AGAIN;
 }
 
-static IPC_CMD_PREOCESS_RES process_IPC_failover_locking_cmd(WDCommandData *ipcCommand)
+static IPC_CMD_PREOCESS_RES process_IPC_failover_indication(WDCommandData *ipcCommand)
 {
+	WDFailoverCMDResults res = FAILOVER_RES_NOT_ALLOWED;
 	/*
 	 * if cluster or myself is not in stable state
 	 * just return cluster in transaction
 	 */
 	ereport(LOG,
-			(errmsg("received the failover command lock request from local pgpool-II on IPC interface")));
-	if (get_local_node_state() == WD_STANDBY)
-	{
-		/* I am a standby node, Just forward the request to coordinator */
-		wd_packet_shallow_copy(&ipcCommand->sourcePacket, &ipcCommand->commandPacket);
-		set_next_commandID_in_message(&ipcCommand->commandPacket);
+			(errmsg("received the failover indication from Pgpool-II on IPC interface")));
 
-		ipcCommand->sendToNode = WD_MASTER_NODE;
-		if (send_command_packet_to_remote_nodes(ipcCommand, true) <= 0)
+	if (get_local_node_state() == WD_COORDINATOR)
+	{
+		json_value* root;
+		int failoverState = -1;
+		if (ipcCommand->sourcePacket.data == NULL || ipcCommand->sourcePacket.len <= 0)
 		{
 			ereport(LOG,
-				(errmsg("unable to process the failover command lock request received on IPC interface"),
-					 errdetail("failed to forward the request to the master watchdog node \"%s\"",WD_MASTER_NODE->nodeName)));
-			return IPC_CMD_ERROR;
+					(errmsg("watchdog unable to process failover indication"),
+					 errdetail("invalid command packet")));
+			res = FAILOVER_RES_INVALID_FUNCTION;
+		}
+		root = json_parse(ipcCommand->sourcePacket.data,ipcCommand->sourcePacket.len);
+		if (root && root->type == json_object)
+		{
+			json_get_int_value_for_key(root, "FailoverFuncState", &failoverState);
 		}
 		else
 		{
-			/*
-			 * wait for the result
-			 */
 			ereport(LOG,
-					(errmsg("failover command lock request from local pgpool-II node received on IPC interface is forwarded to master watchdog node \"%s\"",
-							WD_MASTER_NODE->nodeName),
-					 errdetail("waiting for the reply...")));
-			return IPC_CMD_PROCESSING;
+					(errmsg("unable to process failover indication"),
+					 errdetail("invalid json data in command packet")));
+			res = FAILOVER_RES_INVALID_FUNCTION;
+		}
+		if (root)
+			json_value_free(root);
+
+		if (failoverState < 0 )
+		{
+			ereport(LOG,
+				(errmsg("unable to process failover indication"),
+					 errdetail("invalid json data in command packet")));
+			res = FAILOVER_RES_INVALID_FUNCTION;
+		}
+		else if (failoverState == 0) /* start */
+		{
+			res = failover_start_indication(ipcCommand);
+		}
+		else
+		{
+			res = failover_end_indication(ipcCommand);
 		}
 	}
-	else if (get_local_node_state() == WD_COORDINATOR)
+	else
 	{
-		/*
-		 * If I am coordinator, Just process the request locally
-		 */
-		return process_failover_locking_requests_on_cordinator(ipcCommand);
+		ereport(LOG,
+				(errmsg("received the failover indication from Pgpool-II on IPC interface, but only master can do failover")));
 	}
+	reply_to_failove_command(ipcCommand, res, 0);
 
-	/* we are not in any stable state at the moment */
-	ereport(LOG,
-		(errmsg("unable to process the failover command lock request received on IPC interface"),
-			 errdetail("this watchdog node has not joined the cluster yet"),
-				errhint("try again in few seconds")));
-	return IPC_CMD_TRY_AGAIN;
+	return IPC_CMD_COMPLETE;
 }
 
 static void process_remote_failover_locking_request(WatchdogNode* wdNode, WDPacketData* pkt)
@@ -2726,6 +2807,86 @@ static IPC_CMD_PREOCESS_RES process_failover_locking_requests_on_cordinator(WDCo
 	return IPC_CMD_COMPLETE;
 }
 
+/* Failover start basically does nothing fency, It just sets the failover_in_progress
+ * flag and inform all nodes that the failover is in progress.
+ *
+ * only the local node that is a master can start the failover.
+ */
+static WDFailoverCMDResults
+failover_start_indication(WDCommandData* ipcCommand)
+{
+	ereport(LOG,
+		(errmsg("watchdog is informed of failover start")));
+
+	/* only coordinator(master) node is allowed to process failover */
+	if (get_local_node_state() == WD_COORDINATOR)
+	{
+//		if (g_cluster.current_failover_id > 0)
+//		{
+//			/* we do allow multiple calls to failover_start but
+//			 * still it's a warning
+//			 */
+//			if (g_cluster.current_failover_id != failoverID)
+//			{
+//				ereport(LOG,
+//					(errmsg("watchdog is informed of new failover start, while failover with ID: %d was already in progress",
+//								g_cluster.current_failover_id)));
+//			}
+//			else
+//			{
+//				ereport(LOG,
+//					(errmsg("watchdog is informed of failover start while it is already in progress")));
+//			}
+//		}
+		/* Okay not save the current failoverID */
+//		g_cluster.current_failover_id = failoverID;
+		/* inform to all nodes about failover start */
+		send_message_of_type(NULL, WD_FAILOVER_START, NULL);
+		return FAILOVER_RES_PROCEED;
+	}
+	else if (get_local_node_state() == WD_STANDBY)
+	{
+		ereport(LOG,
+			(errmsg("failed to process failover start request, I am not the master node"),
+				 errdetail("I am standby node and request can only be processed by master watchdog node")));
+		return FAILOVER_RES_ERROR;
+	}
+	else
+	{
+		ereport(LOG,
+				(errmsg("failed to process failover start request, I am not in stable state")));
+	}
+	return FAILOVER_RES_TRANSITION;
+}
+
+static WDFailoverCMDResults
+failover_end_indication(WDCommandData* ipcCommand)
+{
+	ereport(LOG,
+			(errmsg("watchdog is informed of failover end")));
+
+	/* only coordinator(master) node is allowed to process failover */
+	if (get_local_node_state() == WD_COORDINATOR)
+	{
+		send_message_of_type(NULL, WD_FAILOVER_END, NULL);
+		return FAILOVER_RES_PROCEED;
+	}
+	else if (get_local_node_state() == WD_STANDBY)
+	{
+		ereport(LOG,
+				(errmsg("failed to process failover start request, I am not the master node"),
+				 errdetail("I am standby node and request can only be processed by master watchdog node")));
+		return FAILOVER_RES_ERROR;
+	}
+	else
+	{
+		ereport(LOG,
+				(errmsg("failed to process failover start request, I am not in stable state")));
+	}
+	return FAILOVER_RES_TRANSITION;
+}
+
+
 /*
  * node_is_asking_for_failover_start()
  * the function process the lock holding requests. If the lock holding node
@@ -3940,6 +4101,7 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
 		case WD_INFO_MESSAGE:
 		{
 			char *authkey = NULL;
+			int oldQuorumStatus;
 			WD_STATES oldNodeState;
 			WatchdogNode* tempNode = parse_node_info_message(pkt, &authkey);
 			if (tempNode == NULL)
@@ -3949,6 +4111,7 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
 				send_cluster_service_message(wdNode,pkt,CLUSTER_NODE_INVALID_VERSION);
 				break;
 			}
+			oldQuorumStatus = wdNode->quorum_status;
 			oldNodeState = wdNode->state;
 			wdNode->state = tempNode->state;
 			wdNode->startup_time.tv_sec = tempNode->startup_time.tv_sec;
@@ -3998,6 +4161,11 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
 						handle_split_brain(wdNode, pkt);
 					}
 				}
+				else if (WD_MASTER_NODE == wdNode && oldQuorumStatus != wdNode->quorum_status)
+				{
+					/* inform Pgpool man about quorum status changes */
+					register_watchdog_quorum_change_interupt();
+				}
 			}
 
 			/* if the info message is from master node. Make sure we are in sync
@@ -4676,6 +4844,9 @@ static WDPacketData* get_message_of_type(char type, WDPacketData* replyFor)
 		case WD_IAM_COORDINATOR_MESSAGE:
 			pkt = get_beacon_message(WD_IAM_COORDINATOR_MESSAGE,replyFor);
 			break;
+
+		case WD_FAILOVER_START:
+		case WD_FAILOVER_END:
 		case WD_REQ_INFO_MESSAGE:
 		case WD_STAND_FOR_COORDINATOR_MESSAGE:
 		case WD_DECLARE_COORDINATOR_MESSAGE:
@@ -5598,6 +5769,7 @@ static int watchdog_state_machine_coordinator(WD_EVENTS event, WatchdogNode* wdN
 				}
 				/* inform to the cluster about the new quorum status */
 				send_message_of_type(NULL, WD_INFO_MESSAGE,NULL);
+				register_watchdog_quorum_change_interupt();
 			}
 		}
 			break;
@@ -6189,6 +6361,9 @@ static int watchdog_state_machine_standby(WD_EVENTS event, WatchdogNode* wdNode,
 		case WD_EVENT_PACKET_RCV:
 			switch (pkt->type)
 		{
+			case WD_FAILOVER_END:
+				register_backend_state_sync_req_interupt();
+				break;
 			case WD_STAND_FOR_COORDINATOR_MESSAGE:
 			{
 				if (WD_MASTER_NODE == NULL)
@@ -6394,6 +6569,7 @@ static void process_pgpool_remote_failover_command(WatchdogNode* wdNode, WDPacke
 	char* func_name;
 	int node_count = 0;
 	int *node_id_list = NULL;
+	unsigned char flags;
 
 	if (pkt->data == NULL || pkt->len == 0)
 	{
@@ -6412,7 +6588,7 @@ static void process_pgpool_remote_failover_command(WatchdogNode* wdNode, WDPacke
 		reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
 		return;
 	}
-	if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count))
+	if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count, &flags))
 	{
 		ereport(LOG,
 			(errmsg("watchdog received the failover command from \"%s\"",wdNode->nodeName)));
@@ -6437,6 +6613,7 @@ static void process_remote_online_recovery_command(WatchdogNode* wdNode, WDPacke
 	char* func_name;
 	int node_count = 0;
 	int *node_id_list = NULL;
+	unsigned char flags;
 
 	if (pkt->data == NULL || pkt->len == 0)
 	{
@@ -6450,7 +6627,7 @@ static void process_remote_online_recovery_command(WatchdogNode* wdNode, WDPacke
 	ereport(LOG,
 		(errmsg("watchdog received online recovery request from \"%s\"",wdNode->nodeName)));
 
-	if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count))
+	if (parse_wd_node_function_json(pkt->data, pkt->len, &func_name, &node_id_list, &node_count, &flags))
 	{
 		if (strcasecmp(WD_FUNCTION_START_RECOVERY, func_name) == 0)
 		{
@@ -6530,7 +6707,7 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt,
 		}
 		else
 		{
-			ret = send_failback_request(node_id_list[0],false, failover_id);
+			ret = send_failback_request(node_id_list[0],false, REQ_DETAIL_WATCHDOG);
 		}
 	}
 	
@@ -6544,7 +6721,7 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt,
 		}
 		else
 		{
-			ret = degenerate_backend_set(node_id_list, node_count, false, failover_id);
+//			ret = degenerate_backend_set(node_id_list, node_count, false, fla);
 		}
 	}
 
diff --git a/src/watchdog/wd_commands.c b/src/watchdog/wd_commands.c
index 42c77705..a089f53e 100644
--- a/src/watchdog/wd_commands.c
+++ b/src/watchdog/wd_commands.c
@@ -54,16 +54,12 @@
 #define WD_INTERLOCK_TIMEOUT_SEC	10
 #define WD_INTERLOCK_WAIT_COUNT ((int) ((WD_INTERLOCK_TIMEOUT_SEC * 1000)/WD_INTERLOCK_WAIT_MSEC))
 
-static void sleep_in_waiting(void);
 static void FreeCmdResult(WDIPCCmdResult* res);
-
-static WDFailoverCMDResults wd_issue_failover_lock_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
-static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
-static WDFailoverCMDResults wd_send_failover_sync_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id);
+static char* get_wd_failover_state_json(bool start);
 
 static int open_wd_command_sock(bool throw_error);
 static WDFailoverCMDResults wd_get_failover_result_from_data(WDIPCCmdResult *result, unsigned int *wd_failover_id);
-
+static WDFailoverCMDResults wd_issue_failover_command(char* func_name, int *node_id_set, int count, unsigned char flags);
 /* shared memory variables */
 char *watchdog_ipc_address = NULL;
 bool *watchdog_require_cleanup = NULL;	/* shared memory variable set to true
@@ -135,6 +131,30 @@ WD_STATES get_watchdog_local_node_state(void)
 	return ret;
 }
 
+int get_watchdog_quorum_state(void)
+{
+	WD_STATES ret = WD_DEAD;
+	WDGenericData *state = get_wd_runtime_variable_value(WD_RUNTIME_VAR_QUORUM_STATE);
+	if (state == NULL)
+	{
+		ereport(LOG,
+				(errmsg("failed to get quorum state of watchdog cluster"),
+				 errdetail("get runtime variable value from watchdog returned no data")));
+		return WD_DEAD;
+	}
+	if (state->valueType != VALUE_DATA_TYPE_INT)
+	{
+		ereport(LOG,
+				(errmsg("failed to get quorum state of watchdog cluster"),
+				 errdetail("get runtime variable value from watchdog returned invalid value type")));
+		pfree(state);
+		return WD_DEAD;
+	}
+	ret = (WD_STATES)state->data.intVal;
+	pfree(state);
+	return ret;
+}
+
 char* get_watchdog_ipc_address(void)
 {
 	return watchdog_ipc_address;
@@ -539,7 +559,7 @@ wd_start_recovery(void)
 	char type;
 	unsigned int *shared_key = get_ipc_shared_key();
 
-	char* func = get_wd_node_function_json(WD_FUNCTION_START_RECOVERY, NULL,0,
+	char* func = get_wd_node_function_json(WD_FUNCTION_START_RECOVERY, NULL,0, 0,
 										   shared_key?*shared_key:0,pool_config->wd_authkey);
 
 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_ONLINE_RECOVERY_COMMAND,
@@ -585,7 +605,7 @@ wd_end_recovery(void)
 	char type;
 	unsigned int *shared_key = get_ipc_shared_key();
 
-	char* func = get_wd_node_function_json(WD_FUNCTION_END_RECOVERY, NULL, 0,
+	char* func = get_wd_node_function_json(WD_FUNCTION_END_RECOVERY, NULL, 0, 0,
 										   shared_key?*shared_key:0,pool_config->wd_authkey);
 
 	
@@ -627,29 +647,7 @@ wd_end_recovery(void)
 	return COMMAND_FAILED;
 }
 
-
-WDFailoverCMDResults
-wd_send_failback_request(int node_id, unsigned int *wd_failover_id)
-{
-	int n = node_id;
-	char* func;
-	unsigned int *shared_key = get_ipc_shared_key();
-	WDFailoverCMDResults res;
-
-	func = get_wd_node_function_json(WD_FUNCTION_FAILBACK_REQUEST,&n, 1,
-									 shared_key?*shared_key:0,pool_config->wd_authkey);
-
-	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND,
-													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
-													   func, strlen(func), true);
-	pfree(func);
-
-	res = wd_get_failover_result_from_data(result, wd_failover_id);
-	FreeCmdResult(result);
-	return res;
-}
-
-static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
+static char* get_wd_failover_state_json(bool start)
 {
 	char* json_str;
 	JsonNode* jNode = jw_create_with_object(true);
@@ -659,9 +657,7 @@ static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks l
 	if (pool_config->wd_authkey != NULL && strlen(pool_config->wd_authkey) > 0)
 		jw_put_string(jNode, WD_IPC_AUTH_KEY, pool_config->wd_authkey); /*  put the auth key*/
 
-	jw_put_string(jNode, "SyncRequestType", reqType);
-	jw_put_int(jNode, "FailoverLockID", lockID);
-	jw_put_int(jNode, "WDFailoverID", wd_failover_id);
+	jw_put_int(jNode, "FailoverFuncState", start?0:1);
 	jw_finish_document(jNode);
 	json_str = pstrdup(jw_get_json_string(jNode));
 	jw_destroy(jNode);
@@ -669,14 +665,14 @@ static char* get_wd_failover_cmd_type_json(char* reqType, enum WDFailoverLocks l
 }
 
 static WDFailoverCMDResults
-wd_send_failover_sync_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
+wd_send_failover_func_status_command(bool start)
 {
 	WDFailoverCMDResults res;
 	unsigned int failover_id;
 
-	char* json_data = get_wd_failover_cmd_type_json(syncReqType, lockID, wd_failover_id);
+	char* json_data = get_wd_failover_state_json(start);
 
-	WDIPCCmdResult *result = issue_command_to_watchdog(WD_FAILOVER_LOCKING_REQUEST
+	WDIPCCmdResult *result = issue_command_to_watchdog(WD_FAILOVER_INDICATION
 													   ,pool_config->recovery_timeout,
 													   json_data, strlen(json_data), true);
 
@@ -743,43 +739,59 @@ static WDFailoverCMDResults wd_get_failover_result_from_data(WDIPCCmdResult *res
 	return FAILOVER_RES_ERROR;
 }
 
-WDFailoverCMDResults
-wd_degenerate_backend_set(int *node_id_set, int count, unsigned int *wd_failover_id)
+static WDFailoverCMDResults
+wd_issue_failover_command(char* func_name, int *node_id_set, int count, unsigned char flags)
 {
 	WDFailoverCMDResults res;
 	char* func;
 	unsigned int *shared_key = get_ipc_shared_key();
+	unsigned int wd_failover_id;
 	
-	func = get_wd_node_function_json(WD_FUNCTION_DEGENERATE_REQUEST,node_id_set, count,
+	func = get_wd_node_function_json(func_name,node_id_set, count, flags,
 									 shared_key?*shared_key:0,pool_config->wd_authkey);
-
+
 	WDIPCCmdResult *result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND ,
 													   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
 													   func, strlen(func), true);
 	pfree(func);
-	res = wd_get_failover_result_from_data(result, wd_failover_id);
+	res = wd_get_failover_result_from_data(result, &wd_failover_id);
 	FreeCmdResult(result);
 	return res;
 }
 
+/*
+ * send the degenerate backend request to watchdog.
+ * now watchdog can respond to the request in following ways.
+ *
+ * 1 - It can tell the caller to procees with failover. This
+ * happens when the current node is the master watchdog node.
+ *
+ * 2 - It can tell the caller to failover not allowed
+ * this happens when either cluster does not have the quorum
+ *
+ */
 WDFailoverCMDResults
-wd_promote_backend(int node_id, unsigned int *wd_failover_id)
+wd_degenerate_backend_set(int *node_id_set, int count, unsigned char flags)
 {
-	WDFailoverCMDResults res;
-	int n = node_id;
-	char* func;
-	WDIPCCmdResult *result;
-	unsigned int *shared_key = get_ipc_shared_key();
-	
-	func = get_wd_node_function_json(WD_FUNCTION_PROMOTE_REQUEST,&n, 1,
-									 shared_key?*shared_key:0,pool_config->wd_authkey);
-	result = issue_command_to_watchdog(WD_IPC_FAILOVER_COMMAND,
-									   WD_DEFAULT_IPC_COMMAND_TIMEOUT,
-									   func, strlen(func), true);
-	pfree(func);
-	res = wd_get_failover_result_from_data(result, wd_failover_id);
-	FreeCmdResult(result);
-	return res;
+	if (pool_config->use_watchdog)
+		return wd_issue_failover_command(WD_FUNCTION_DEGENERATE_REQUEST, node_id_set, count, flags);
+	return FAILOVER_RES_PROCEED;
+}
+
+WDFailoverCMDResults
+wd_promote_backend(int node_id, unsigned char flags)
+{
+	if (pool_config->use_watchdog)
+		return wd_issue_failover_command(WD_FUNCTION_PROMOTE_REQUEST, &node_id, 1, flags);
+	return FAILOVER_RES_PROCEED;
+}
+
+WDFailoverCMDResults
+wd_send_failback_request(int node_id, unsigned char flags)
+{
+	if (pool_config->use_watchdog)
+		return wd_issue_failover_command(WD_FUNCTION_FAILBACK_REQUEST, &node_id, 1, flags);
+	return FAILOVER_RES_PROCEED;
 }
 
 /*
@@ -878,86 +890,20 @@ open_wd_command_sock(bool throw_error)
 	return sock;
 }
 
-WDFailoverCMDResults wd_start_failover_interlocking(unsigned int wd_failover_id)
-{
-	if (pool_config->use_watchdog)
-		return wd_issue_failover_lock_command(WD_REQ_FAILOVER_START, 0, wd_failover_id);
-	return FAILOVER_RES_I_AM_LOCK_HOLDER;
-}
-
-WDFailoverCMDResults wd_end_failover_interlocking(unsigned int wd_failover_id)
-{
-	if (pool_config->use_watchdog)
-		return wd_issue_failover_lock_command(WD_REQ_FAILOVER_END, 0, wd_failover_id);
-	return FAILOVER_RES_SUCCESS;
-}
-
-WDFailoverCMDResults wd_failover_lock_release(enum WDFailoverLocks lock, unsigned int wd_failover_id)
+WDFailoverCMDResults wd_failover_start(void)
 {
 	if (pool_config->use_watchdog)
-		return wd_issue_failover_lock_command(WD_REQ_FAILOVER_RELEASE_LOCK, lock, wd_failover_id);
-	return FAILOVER_RES_SUCCESS;
+		return wd_send_failover_func_status_command(0);
+	return FAILOVER_RES_PROCEED;
 }
 
-WDFailoverCMDResults wd_failover_lock_status(enum WDFailoverLocks lock, unsigned int wd_failover_id)
+WDFailoverCMDResults wd_failover_end(void)
 {
 	if (pool_config->use_watchdog)
-		return wd_issue_failover_lock_command(WD_REQ_FAILOVER_LOCK_STATUS, lock, wd_failover_id);
-	return FAILOVER_RES_UNLOCKED;
+		return wd_send_failover_func_status_command(1);
+	return FAILOVER_RES_PROCEED;
 }
 
-void wd_wait_until_command_complete_or_timeout(enum WDFailoverLocks lock, unsigned int wd_failover_id)
-{
-	WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
-	int	count = WD_INTERLOCK_WAIT_COUNT;
-
-	while (pool_config->use_watchdog)
-	{
-		res = wd_failover_lock_status(lock, wd_failover_id);
-		if (res == FAILOVER_RES_UNLOCKED ||
-			res == FAILOVER_RES_NO_LOCKHOLDER)
-		{
-			/* we have the permision */
-			return;
-		}
-		sleep_in_waiting();
-		if (--count < 0)
-		{
-			ereport(WARNING,
-					(errmsg("timeout wating for unlock")));
-			break;
-		}
-	}
-}
-
-/*
- * This is just a wrapper over wd_send_failover_sync_command()
- * but try to wait for WD_INTERLOCK_TIMEOUT_SEC amount of time
- * if watchdog is in transition state
- */
-
-static WDFailoverCMDResults wd_issue_failover_lock_command(char* syncReqType, enum WDFailoverLocks lockID, unsigned int wd_failover_id)
-{
-	WDFailoverCMDResults res;
-	int x;
-	for (x=0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION/2; x++)
-	{
-		res = wd_send_failover_sync_command(syncReqType, lockID, wd_failover_id);
-		if (res != FAILOVER_RES_TRANSITION)
-			break;
-		sleep(2);
-	}
-	return res;
-}
-
-static void
-sleep_in_waiting(void)
-{
-	struct timeval t = {0, WD_INTERLOCK_WAIT_MSEC * 1000};
-	select(0, NULL, NULL, NULL, &t);
-}
-
-
 
 static void FreeCmdResult(WDIPCCmdResult* res)
 {
diff --git a/src/watchdog/wd_json_data.c b/src/watchdog/wd_json_data.c
index cc95e28f..97dc1129 100644
--- a/src/watchdog/wd_json_data.c
+++ b/src/watchdog/wd_json_data.c
@@ -690,7 +690,7 @@ WDNodeInfo* get_WDNodeInfo_from_wd_node_json(json_value* source)
 	
 }
 
-char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned int sharedKey, char* authKey)
+char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, unsigned char flags, unsigned int sharedKey, char* authKey)
 {
 	char* json_str;
 	int  i;
@@ -702,6 +702,7 @@ char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, un
 		jw_put_string(jNode, WD_IPC_AUTH_KEY, authKey); /*  put the auth key*/
 
 	jw_put_string(jNode, "Function", func_name);
+	jw_put_int(jNode, "Flags", (int)flags);
 	jw_put_int(jNode, "NodeCount", count);
 	if (count > 0)
 	{
@@ -717,7 +718,7 @@ char* get_wd_node_function_json(char* func_name, int *node_id_set, int count, un
 	return json_str;
 }
 
-bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count)
+bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name, int **node_id_set, int *count, unsigned char *flags)
 {
 	json_value *root, *value;
 	char* ptr;
@@ -750,6 +751,12 @@ bool parse_wd_node_function_json(char* json_data, int data_len, char** func_name
 	}
 	*func_name = pstrdup(ptr);
 	/* If it is a node function ?*/
+	if (json_get_int_value_for_key(root, "Flags", flags))
+	{
+		/*node count not found, But we don't care much about this*/
+		*flags = 0;
+		/* it may be from the old version */
+	}
 	if (json_get_int_value_for_key(root, "NodeCount", &node_count))
 	{
 		/*node count not found, But we don't care much about this*/


More information about the pgpool-hackers mailing list