diff --git a/doc/src/sgml/ref/pcp_stop_pgpool.sgml b/doc/src/sgml/ref/pcp_stop_pgpool.sgml index d8fb8338..cdec14d8 100644 --- a/doc/src/sgml/ref/pcp_stop_pgpool.sgml +++ b/doc/src/sgml/ref/pcp_stop_pgpool.sgml @@ -59,6 +59,23 @@ Pgpool-II documentation + + + + + + Shutdown mode for terminating the Pgpool-II process. + + + The supported command scopes are as follows: + + c, cluster : terminates all Pgpool-II nodes part of the cluster + l, local : terminates local Pgpool-II node only + + + + + diff --git a/src/Makefile.am b/src/Makefile.am index 9b5460af..dd0a767c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -8,6 +8,7 @@ pgpool_SOURCES = main/main.c \ main/pool_globals.c \ main/pgpool_main.c \ main/health_check.c \ + main/pgpool_internal_commands.c \ config/pool_config.l \ config/pool_config_variables.c \ pcp_con/pcp_child.c \ @@ -20,6 +21,7 @@ pgpool_SOURCES = main/main.c \ auth/auth-scram.c \ protocol/pool_proto2.c \ protocol/child.c \ + protocol/pool_pg_utils.c \ protocol/pool_process_query.c \ protocol/pool_connection_pool.c \ protocol/pool_proto_modules.c \ diff --git a/src/Makefile.in b/src/Makefile.in index 971c20b7..be5d1938 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -104,13 +104,15 @@ PROGRAMS = $(bin_PROGRAMS) am__dirstamp = $(am__leading_dot)dirstamp am_pgpool_OBJECTS = main/main.$(OBJEXT) main/pool_globals.$(OBJEXT) \ main/pgpool_main.$(OBJEXT) main/health_check.$(OBJEXT) \ + main/pgpool_internal_commands.$(OBJEXT) \ config/pool_config.$(OBJEXT) \ config/pool_config_variables.$(OBJEXT) \ - pcp_con/pcp_child.$(OBJEXT) pcp_con/pcp_worker.$(OBJEXT) \ - pcp_con/recovery.$(OBJEXT) auth/md5.$(OBJEXT) \ - auth/pool_auth.$(OBJEXT) auth/pool_passwd.$(OBJEXT) \ - auth/pool_hba.$(OBJEXT) auth/auth-scram.$(OBJEXT) \ - protocol/pool_proto2.$(OBJEXT) protocol/child.$(OBJEXT) \ + utils/ps_status.$(OBJEXT) pcp_con/pcp_child.$(OBJEXT) \ + pcp_con/pcp_worker.$(OBJEXT) pcp_con/recovery.$(OBJEXT) \ + auth/md5.$(OBJEXT) auth/pool_auth.$(OBJEXT) \ + auth/pool_passwd.$(OBJEXT) auth/pool_hba.$(OBJEXT) \ + auth/auth-scram.$(OBJEXT) protocol/pool_proto2.$(OBJEXT) \ + protocol/child.$(OBJEXT) protocol/pool_pg_utils.$(OBJEXT) \ protocol/pool_process_query.$(OBJEXT) \ protocol/pool_connection_pool.$(OBJEXT) \ protocol/pool_proto_modules.$(OBJEXT) \ @@ -123,10 +125,9 @@ am_pgpool_OBJECTS = main/main.$(OBJEXT) main/pool_globals.$(OBJEXT) \ rewrite/pool_timestamp.$(OBJEXT) rewrite/pool_lobj.$(OBJEXT) \ utils/pool_select_walker.$(OBJEXT) utils/strlcpy.$(OBJEXT) \ utils/psprintf.$(OBJEXT) utils/pool_params.$(OBJEXT) \ - utils/ps_status.$(OBJEXT) utils/pool_shmem.$(OBJEXT) \ - utils/pool_sema.$(OBJEXT) utils/pool_signal.$(OBJEXT) \ - utils/pool_path.$(OBJEXT) utils/pool_ip.$(OBJEXT) \ - utils/pool_relcache.$(OBJEXT) \ + utils/pool_shmem.$(OBJEXT) utils/pool_sema.$(OBJEXT) \ + utils/pool_signal.$(OBJEXT) utils/pool_path.$(OBJEXT) \ + utils/pool_ip.$(OBJEXT) utils/pool_relcache.$(OBJEXT) \ utils/pool_process_reporting.$(OBJEXT) \ utils/pool_ssl.$(OBJEXT) utils/pool_stream.$(OBJEXT) \ utils/socket_stream.$(OBJEXT) utils/getopt_long.$(OBJEXT) \ @@ -435,8 +436,10 @@ pgpool_SOURCES = main/main.c \ main/pool_globals.c \ main/pgpool_main.c \ main/health_check.c \ + main/pgpool_internal_commands.c \ config/pool_config.l \ config/pool_config_variables.c \ + utils/ps_status.c \ pcp_con/pcp_child.c \ pcp_con/pcp_worker.c \ pcp_con/recovery.c \ @@ -447,6 +450,7 @@ pgpool_SOURCES = main/main.c \ auth/auth-scram.c \ protocol/pool_proto2.c \ protocol/child.c \ + protocol/pool_pg_utils.c \ protocol/pool_process_query.c \ protocol/pool_connection_pool.c \ protocol/pool_proto_modules.c \ @@ -462,7 +466,6 @@ pgpool_SOURCES = main/main.c \ utils/strlcpy.c \ utils/psprintf.c \ utils/pool_params.c \ - utils/ps_status.c \ utils/pool_shmem.c \ utils/pool_sema.c \ utils/pool_signal.c \ @@ -680,11 +683,16 @@ main/main.$(OBJEXT): main/$(am__dirstamp) main/pool_globals.$(OBJEXT): main/$(am__dirstamp) main/pgpool_main.$(OBJEXT): main/$(am__dirstamp) main/health_check.$(OBJEXT): main/$(am__dirstamp) +main/pgpool_internal_commands.$(OBJEXT): main/$(am__dirstamp) config/$(am__dirstamp): @$(MKDIR_P) config @: > config/$(am__dirstamp) config/pool_config.$(OBJEXT): config/$(am__dirstamp) config/pool_config_variables.$(OBJEXT): config/$(am__dirstamp) +utils/$(am__dirstamp): + @$(MKDIR_P) utils + @: > utils/$(am__dirstamp) +utils/ps_status.$(OBJEXT): utils/$(am__dirstamp) pcp_con/$(am__dirstamp): @$(MKDIR_P) pcp_con @: > pcp_con/$(am__dirstamp) @@ -704,6 +712,7 @@ protocol/$(am__dirstamp): @: > protocol/$(am__dirstamp) protocol/pool_proto2.$(OBJEXT): protocol/$(am__dirstamp) protocol/child.$(OBJEXT): protocol/$(am__dirstamp) +protocol/pool_pg_utils.$(OBJEXT): protocol/$(am__dirstamp) protocol/pool_process_query.$(OBJEXT): protocol/$(am__dirstamp) protocol/pool_connection_pool.$(OBJEXT): protocol/$(am__dirstamp) protocol/pool_proto_modules.$(OBJEXT): protocol/$(am__dirstamp) @@ -728,14 +737,10 @@ rewrite/$(am__dirstamp): @: > rewrite/$(am__dirstamp) rewrite/pool_timestamp.$(OBJEXT): rewrite/$(am__dirstamp) rewrite/pool_lobj.$(OBJEXT): rewrite/$(am__dirstamp) -utils/$(am__dirstamp): - @$(MKDIR_P) utils - @: > utils/$(am__dirstamp) utils/pool_select_walker.$(OBJEXT): utils/$(am__dirstamp) utils/strlcpy.$(OBJEXT): utils/$(am__dirstamp) utils/psprintf.$(OBJEXT): utils/$(am__dirstamp) utils/pool_params.$(OBJEXT): utils/$(am__dirstamp) -utils/ps_status.$(OBJEXT): utils/$(am__dirstamp) utils/pool_shmem.$(OBJEXT): utils/$(am__dirstamp) utils/pool_sema.$(OBJEXT): utils/$(am__dirstamp) utils/pool_signal.$(OBJEXT): utils/$(am__dirstamp) diff --git a/src/auth/auth-scram.c b/src/auth/auth-scram.c index 3b0b3cd6..b26859bf 100644 --- a/src/auth/auth-scram.c +++ b/src/auth/auth-scram.c @@ -5,7 +5,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2018 PgPool Global Development Group + * Copyright (c) 2003-2020 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby @@ -109,6 +109,7 @@ #include "utils/sha2.h" #include "auth/pool_passwd.h" #include "auth/scram.h" +#include "auth/pool_auth.h" #include "utils/base64.h" #include "utils/elog.h" #include "utils/palloc.h" diff --git a/src/auth/pool_auth.c b/src/auth/pool_auth.c index d9e5dc90..f2fddea9 100644 --- a/src/auth/pool_auth.c +++ b/src/auth/pool_auth.c @@ -22,8 +22,11 @@ #include "pool.h" #include "context/pool_session_context.h" +#include "protocol/pool_process_query.h" +#include "protocol/pool_proto_modules.h" #include "utils/pool_stream.h" #include "pool_config.h" +#include "auth/pool_auth.h" #include "auth/pool_hba.h" #include "auth/pool_passwd.h" #include "auth/scram.h" @@ -2006,198 +2009,6 @@ send_auth_ok(POOL_CONNECTION * frontend, int protoMajor) return 0; } -/* - * read message length (V3 only) - */ -int -pool_read_message_length(POOL_CONNECTION_POOL * cp) -{ - int length, - length0; - int i; - - /* read message from master node */ - pool_read(CONNECTION(cp, MASTER_NODE_ID), &length0, sizeof(length0)); - length0 = ntohl(length0); - - ereport(DEBUG5, - (errmsg("reading message length"), - errdetail("slot: %d length: %d", MASTER_NODE_ID, length0))); - - for (i = 0; i < NUM_BACKENDS; i++) - { - if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i)) - { - continue; - } - - pool_read(CONNECTION(cp, i), &length, sizeof(length)); - - length = ntohl(length); - ereport(DEBUG5, - (errmsg("reading message length"), - errdetail("slot: %d length: %d", i, length))); - - if (length != length0) - ereport(ERROR, - (errmsg("unable to read message length"), - errdetail("message length (%d) in slot %d does not match with slot 0(%d)", length, i, length0))); - - } - - if (length0 < 0) - ereport(ERROR, - (errmsg("unable to read message length"), - errdetail("invalid message length (%d)", length))); - - return length0; -} - -/* - * read message length2 (V3 only) - * unlike pool_read_message_length, this returns an array of message length. - * The array is in the static storage, thus it will be destroyed by subsequent calls. - */ -int * -pool_read_message_length2(POOL_CONNECTION_POOL * cp) -{ - int length, - length0; - int i; - static int length_array[MAX_CONNECTION_SLOTS]; - - /* read message from master node */ - pool_read(CONNECTION(cp, MASTER_NODE_ID), &length0, sizeof(length0)); - - length0 = ntohl(length0); - length_array[MASTER_NODE_ID] = length0; - ereport(DEBUG5, - (errmsg("reading message length"), - errdetail("master slot: %d length: %d", MASTER_NODE_ID, length0))); - - for (i = 0; i < NUM_BACKENDS; i++) - { - if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i)) - { - pool_read(CONNECTION(cp, i), &length, sizeof(length)); - - length = ntohl(length); - ereport(DEBUG5, - (errmsg("reading message length"), - errdetail("master slot: %d length: %d", i, length))); - - if (length != length0) - { - ereport(LOG, - (errmsg("reading message length"), - errdetail("message length (%d) in slot %d does not match with slot 0(%d)", length, i, length0))); - } - - if (length < 0) - { - ereport(ERROR, - (errmsg("unable to read message length"), - errdetail("invalid message length (%d)", length))); - } - - length_array[i] = length; - } - - } - return &length_array[0]; -} - -signed char -pool_read_kind(POOL_CONNECTION_POOL * cp) -{ - char kind0, - kind; - int i; - - kind = -1; - kind0 = 0; - - for (i = 0; i < NUM_BACKENDS; i++) - { - if (!VALID_BACKEND(i)) - { - continue; - } - - pool_read(CONNECTION(cp, i), &kind, sizeof(kind)); - - if (IS_MASTER_NODE_ID(i)) - { - kind0 = kind; - } - else - { - if (kind != kind0) - { - char *message; - - if (kind0 == 'E') - { - if (pool_extract_error_message(false, MASTER(cp), MAJOR(cp), true, &message) == 1) - { - ereport(LOG, - (errmsg("pool_read_kind: error message from master backend:%s", message))); - pfree(message); - } - } - else if (kind == 'E') - { - if (pool_extract_error_message(false, CONNECTION(cp, i), MAJOR(cp), true, &message) == 1) - { - ereport(LOG, - (errmsg("pool_read_kind: error message from %d th backend:%s", i, message))); - pfree(message); - } - } - ereport(ERROR, - (errmsg("unable to read message kind"), - errdetail("kind does not match between master(%x) slot[%d] (%x)", kind0, i, kind))); - } - } - } - - return kind; -} - -int -pool_read_int(POOL_CONNECTION_POOL * cp) -{ - int data0, - data; - int i; - - data = -1; - data0 = 0; - - for (i = 0; i < NUM_BACKENDS; i++) - { - if (!VALID_BACKEND(i)) - { - continue; - } - pool_read(CONNECTION(cp, i), &data, sizeof(data)); - if (IS_MASTER_NODE_ID(i)) - { - data0 = data; - } - else - { - if (data != data0) - { - ereport(ERROR, - (errmsg("unable to read int value"), - errdetail("data does not match between between master(%x) slot[%d] (%x)", data0, i, data))); - - } - } - } - return data; -} void pool_random(void *buf, size_t len) diff --git a/src/auth/pool_hba.c b/src/auth/pool_hba.c index 794ec012..61630645 100644 --- a/src/auth/pool_hba.c +++ b/src/auth/pool_hba.c @@ -36,9 +36,12 @@ #include "pool.h" #include "auth/pool_hba.h" +#include "auth/pool_auth.h" +#include "protocol/pool_connection_pool.h" #include "utils/pool_path.h" #include "utils/pool_ip.h" #include "utils/pool_stream.h" +#include "utils/pool_signal.h" #include "pool_config.h" #include "pool_type.h" #include "utils/palloc.h" @@ -46,6 +49,7 @@ #include "utils/elog.h" #include "parser/pg_list.h" #include "auth/pool_passwd.h" +#include "protocol/pool_process_query.h" #define MULTI_VALUE_SEP "\001" /* delimiter for multi-valued column strings */ @@ -106,7 +110,6 @@ static MemoryContext tokenize_file(const char *filename, FILE *file, List **tok_lines, int elevel); static void sendAuthRequest(POOL_CONNECTION * frontend, AuthRequest areq); static void auth_failed(POOL_CONNECTION * frontend); -static void close_all_backend_connections(void); static bool hba_getauthmethod(POOL_CONNECTION * frontend); static bool check_hba(POOL_CONNECTION * frontend); static bool check_user(char *user, List *tokens); @@ -1098,34 +1101,6 @@ auth_failed(POOL_CONNECTION * frontend) } -/* - * Close all of the cached backend connections. - * - * This is exactly the same as send_frontend_exits() in child.c. - */ -static void -close_all_backend_connections(void) -{ - int i; - POOL_CONNECTION_POOL *p = pool_connection_pool; - - pool_sigset_t oldmask; - - POOL_SETMASK2(&BlockSig, &oldmask); - - for (i = 0; i < pool_config->max_pool; i++, p++) - { - if (!MASTER_CONNECTION(p)) - continue; - if (MASTER_CONNECTION(p)->sp->user == NULL) - continue; - pool_send_frontend_exits(p); - } - - POOL_SETMASK(&oldmask); -} - - /* * Determine what authentication method should be used when accessing database * "database" from frontend "raddr", user "user". Return the method and diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c index 809a0522..a4b446b2 100644 --- a/src/config/pool_config_variables.c +++ b/src/config/pool_config_variables.c @@ -27,6 +27,7 @@ #include #include #include +#include #include "pool.h" #include "pool_config.h" diff --git a/src/context/pool_process_context.c b/src/context/pool_process_context.c index baa4d709..46139f5a 100644 --- a/src/context/pool_process_context.c +++ b/src/context/pool_process_context.c @@ -23,6 +23,7 @@ #include #include +#include #include "pool.h" #include "utils/elog.h" #include "context/pool_process_context.h" diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c index b66a5e17..b5546d1a 100644 --- a/src/context/pool_query_context.c +++ b/src/context/pool_query_context.c @@ -4,7 +4,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2018 PgPool Global Development Group + * Copyright (c) 2003-2020 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby @@ -21,9 +21,12 @@ #include "pool.h" #include "pool_config.h" #include "protocol/pool_proto_modules.h" +#include "protocol/pool_process_query.h" +#include "protocol/pool_pg_utils.h" #include "utils/palloc.h" #include "utils/memutils.h" #include "utils/elog.h" +#include "utils/statistics.h" #include "utils/pool_select_walker.h" #include "utils/pool_stream.h" #include "context/pool_session_context.h" @@ -33,6 +36,7 @@ #include #include #include +#include /* * Where to send query diff --git a/src/context/pool_session_context.c b/src/context/pool_session_context.c index a73d83fd..b3df8c55 100644 --- a/src/context/pool_session_context.c +++ b/src/context/pool_session_context.c @@ -27,8 +27,11 @@ #include "utils/memutils.h" #include "utils/elog.h" #include "pool_config.h" -#include "context/pool_session_context.h" #include "protocol/pool_proto_modules.h" +#include "protocol/pool_process_query.h" +#include "protocol/pool_connection_pool.h" +#include "protocol/pool_pg_utils.h" +#include "context/pool_session_context.h" static POOL_SESSION_CONTEXT session_context_d; static POOL_SESSION_CONTEXT * session_context = NULL; diff --git a/src/include/auth/md5.h b/src/include/auth/md5.h index 6ea08ad8..80871521 100644 --- a/src/include/auth/md5.h +++ b/src/include/auth/md5.h @@ -3,7 +3,7 @@ * md5.h * Interface to md5.c * - * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * $Header$ @@ -26,4 +26,6 @@ extern int pool_md5_hash(const void *buff, size_t len, char *hexsum); extern int pool_md5_encrypt(const char *passwd, const char *salt, size_t salt_len, char *buf); extern void bytesToHex(char *b, int len, char *s); +extern bool pg_md5_encrypt(const char *passwd, const char *salt, size_t salt_len, char *buf); + #endif diff --git a/src/include/context/pool_process_context.h b/src/include/context/pool_process_context.h index d57bb051..1a6d3c6d 100644 --- a/src/include/context/pool_process_context.h +++ b/src/include/context/pool_process_context.h @@ -25,7 +25,11 @@ #ifndef POOL_PROCESS_CONTEXT_H #define POOL_PROCESS_CONTEXT_H -#include "pool.h" + +//#include "pool.h" +#include "pcp/libpcp_ext.h" +#include "utils/pool_signal.h" + /* * Child process context: diff --git a/src/include/pcp/libpcp_ext.h b/src/include/pcp/libpcp_ext.h index caf05bbd..179f15ab 100644 --- a/src/include/pcp/libpcp_ext.h +++ b/src/include/pcp/libpcp_ext.h @@ -313,7 +313,7 @@ struct WdInfo; extern PCPConnInfo * pcp_connect(char *hostname, int port, char *username, char *password, FILE *Pfdebug); extern void pcp_disconnect(PCPConnInfo * pcpConn); -extern PCPResultInfo * pcp_terminate_pgpool(PCPConnInfo * pcpCon, char mode); +extern PCPResultInfo * pcp_terminate_pgpool(PCPConnInfo * pcpConn, char mode, char command_scope); extern PCPResultInfo * pcp_node_count(PCPConnInfo * pcpCon); extern PCPResultInfo * pcp_node_info(PCPConnInfo * pcpCon, int nid); extern PCPResultInfo * pcp_health_check_stats(PCPConnInfo * pcpCon, int nid); diff --git a/src/include/pcp/pcp.h b/src/include/pcp/pcp.h index 68d2586f..6df3eb12 100644 --- a/src/include/pcp/pcp.h +++ b/src/include/pcp/pcp.h @@ -4,7 +4,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2016 PgPool Global Development Group + * Copyright (c) 2003-2020 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby diff --git a/src/include/pcp/pcp_stream.h b/src/include/pcp/pcp_stream.h index b1f9efb5..a0d41a77 100644 --- a/src/include/pcp/pcp_stream.h +++ b/src/include/pcp/pcp_stream.h @@ -4,7 +4,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2008 PgPool Global Development Group + * Copyright (c) 2003-2020 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby diff --git a/src/include/pool.h b/src/include/pool.h index 43e75185..f2e4e7d6 100644 --- a/src/include/pool.h +++ b/src/include/pool.h @@ -27,14 +27,9 @@ #include "config.h" #include "pool_type.h" #include "pcp/libpcp_ext.h" -#include "utils/pool_signal.h" #include "auth/pool_passwd.h" +#include "utils/pool_params.h" #include "parser/nodes.h" -#include -#include -#include -#include -#include #ifdef USE_SSL #include @@ -165,12 +160,6 @@ typedef struct CancelPacket #define MAX_PASSWORD_SIZE 1024 -typedef struct -{ - int num; /* number of entries */ - char **names; /* parameter names */ - char **values; /* values */ -} ParamStatus; /* * HbaLines is declared in pool_hba.h @@ -179,51 +168,6 @@ typedef struct typedef struct HbaLine HbaLine; -#ifdef USE_SSL -/* - * Hardcoded DH parameters, used in ephemeral DH keying. - * - * If you want to create your own hardcoded DH parameters - * for fun and profit, review "Assigned Number for SKIP - * Protocols" (http://www.skip-vpn.org/spec/numbers.html) - * for suggestions. - */ -#define FILE_DH2048 \ -"-----BEGIN DH PARAMETERS-----\n\ -MIIBCAKCAQEA9kJXtwh/CBdyorrWqULzBej5UxE5T7bxbrlLOCDaAadWoxTpj0BV\n\ -89AHxstDqZSt90xkhkn4DIO9ZekX1KHTUPj1WV/cdlJPPT2N286Z4VeSWc39uK50\n\ -T8X8dryDxUcwYc58yWb/Ffm7/ZFexwGq01uejaClcjrUGvC/RgBYK+X0iP1YTknb\n\ -zSC0neSRBzZrM2w4DUUdD3yIsxx8Wy2O9vPJI8BD8KVbGI2Ou1WMuF040zT9fBdX\n\ -Q6MdGGzeMyEstSr/POGxKUAYEY18hKcKctaGxAMZyAcpesqVDNmWn6vQClCbAkbT\n\ -CD1mpF1Bn5x8vYlLIhkmuquiXsNV6TILOwIBAg==\n\ ------END DH PARAMETERS-----\n" -#endif - -/* - * Macro that allows to cast constness away from an expression, but doesn't - * allow changing the underlying type. Enforcement of the latter - * currently only works for gcc like compilers. - * - * Please note IT IS NOT SAFE to cast constness away if the result will ever - * be modified (it would be undefined behaviour). Doing so anyway can cause - * compiler misoptimizations or runtime crashes (modifying readonly memory). - * It is only safe to use when the the result will not be modified, but API - * design or language restrictions prevent you from declaring that - * (e.g. because a function returns both const and non-const variables). - * - * Note that this only works in function scope, not for global variables (it'd - * be nice, but not trivial, to improve that). - */ -#if defined(HAVE__BUILTIN_TYPES_COMPATIBLE_P) -#define unconstify(underlying_type, expr) \ - (StaticAssertExpr(__builtin_types_compatible_p(__typeof(expr), const underlying_type), \ - "wrong cast"), \ - (underlying_type) (expr)) -#else -#define unconstify(underlying_type, expr) \ - ((underlying_type) (expr)) -#endif - /* * stream connection structure */ @@ -333,51 +277,6 @@ typedef struct POOL_CONNECTION_POOL_SLOT *slots[MAX_NUM_BACKENDS]; } POOL_CONNECTION_POOL; -/* - * for pool_clear_cache() in pool_query_cache.c - * - * used to specify the time which cached data created before it to be deleted. - */ -typedef enum -{ - second, seconds, - minute, minutes, - hour, hours, - day, days, - week, weeks, - month, months, - year, years, - decade, decades, - century, centuries, - millennium, millenniums -} UNIT; - -typedef struct -{ - int quantity; - UNIT unit; -} Interval; - -/* - * Health check statistics per node -*/ -typedef struct { - uint64 total_count; /* total count of health check */ - uint64 success_count; /* total count of successful health check */ - uint64 fail_count; /* total count of failed health check */ - uint64 skip_count; /* total count of skipped health check */ - uint64 retry_count; /* total count of health check retries */ - uint32 max_retry_count; /* max retry count in a health check session */ - uint64 total_health_check_duration; /* sum of health check duration */ - int32 max_health_check_duration; /* maximum duration spent for a health check session in milli seconds */ - int32 min_health_check_duration; /* minimum duration spent for a health check session in milli seconds */ - time_t last_health_check; /* last health check timestamp */ - time_t last_successful_health_check; /* last succesfull health check timestamp */ - time_t last_skip_health_check; /* last skipped health check timestamp */ - time_t last_failed_health_check; /* last failed health check timestamp */ -} POOL_HEALTH_CHECK_STATISTICS; - -extern volatile POOL_HEALTH_CHECK_STATISTICS *health_check_stats; /* health check stats area in shared memory */ /* Defined in pool_session_context.h */ extern int pool_get_major_version(void); @@ -394,6 +293,7 @@ extern int pool_get_major_version(void); */ extern bool pool_is_node_to_be_sent_in_current_query(int node_id); extern int pool_virtual_master_db_node_id(void); + extern BACKEND_STATUS * my_backend_status[]; extern int my_master_node_id; @@ -455,17 +355,6 @@ typedef enum POOL_NODE_STATUS_INVALID /* invalid node (split branin, stand alone) */ } POOL_NODE_STATUS; -#ifdef NO_USED -#define REPLICATION (pool_config->replication_mode) -#define MASTER_SLAVE (pool_config->master_slave_mode) -#define STREAM (MASTER_SLAVE && pool_config->master_slave_sub_mode == STREAM_MODE) -#define LOGICAL (MASTER_SLAVE && pool_config->master_slave_sub_mode == LOGICAL_MODE) -#define SLONY (MASTER_SLAVE && pool_config->master_slave_sub_mode == SLONY_MODE) -#define DUAL_MODE (REPLICATION || MASTER_SLAVE) -#define RAW_MODE (!REPLICATION && !MASTER_SLAVE) -#define SL_MODE (STREAM || LOGICAL) /* streaming or logical replication mode */ -#endif - /* Clustering mode macros */ #define REPLICATION (pool_config->backend_clustering_mode == CM_NATIVE_REPLICATION) #define MASTER_SLAVE (pool_config->backend_clustering_mode == CM_STREAMING_REPLICATION || \ @@ -485,12 +374,6 @@ typedef enum #define Max(x, y) ((x) > (y) ? (x) : (y)) #define Min(x, y) ((x) < (y) ? (x) : (y)) -#define LOCK_COMMENT "/*INSERT LOCK*/" -#define LOCK_COMMENT_SZ (sizeof(LOCK_COMMENT)-1) -#define NO_LOCK_COMMENT "/*NO INSERT LOCK*/" -#define NO_LOCK_COMMENT_SZ (sizeof(NO_LOCK_COMMENT)-1) -#define NO_LOAD_BALANCE "/*NO LOAD BALANCE*/" -#define NO_LOAD_BALANCE_COMMENT_SZ (sizeof(NO_LOAD_BALANCE)-1) #define MAX_NUM_SEMAPHORES 6 #define CONN_COUNTER_SEM 0 @@ -601,11 +484,6 @@ typedef enum RECOVERY_PROMOTE } POOL_RECOVERY_MODE; -/* - * global variables - */ -extern pid_t mypid; /* parent pid */ - /* * Process types. DO NOT change the order of each enum meber! If you do * that, you must change application_name array in src/main/pgpool_main.c @@ -628,7 +506,6 @@ typedef enum PT_LAST_PTYPE /* last ptype marker. any ptype must be above this. */ } ProcessType; -extern ProcessType processType; typedef enum { @@ -641,37 +518,25 @@ typedef enum EXITING } ProcessState; -#define MAX_PG_VERSION_STRING 512 - /* - * PostgreSQL version descriptor + * global variables + * pool_global.c */ -typedef struct -{ - short major; /* major version number in up to 3 digits decimal. - * Examples: 120, 110, 100, 96. - */ - short minor; /* minor version number in up to 2 digits decimal. - * Examples: 0, 1, 2, 10, 23. - */ - char version_string[MAX_PG_VERSION_STRING+1]; /* original version string */ -} PGVersion; - +extern pid_t mypid; /* parent pid */ +extern ProcessType processType; extern ProcessState processState; -extern POOL_CONNECTION_POOL * pool_connection_pool; /* connection pool */ + + extern volatile sig_atomic_t backend_timer_expired; /* flag for connection * closed timer is expired */ extern volatile sig_atomic_t health_check_timer_expired; /* non 0 if health check * timer expired */ -extern long int weight_master; /* normalized weight of master (0-RAND_MAX - * range) */ extern int my_proc_id; /* process table id (!= UNIX's PID) */ extern ProcessInfo * process_info; /* shmem process information table */ extern ConnectionInfo * con_info; /* shmem connection info table */ extern POOL_REQUEST_INFO * Req_info; extern volatile sig_atomic_t *InRecovery; -extern char remote_ps_data[]; /* used for set_ps_display */ extern volatile sig_atomic_t got_sighup; extern volatile sig_atomic_t exit_request; extern volatile sig_atomic_t ignore_sigusr1; @@ -687,223 +552,50 @@ extern char remote_port[]; /* client port */ /* * public functions */ -extern void register_watchdog_quorum_change_interupt(void); -extern void register_watchdog_state_change_interupt(void); -extern void register_backend_state_sync_req_interupt(void); -extern void register_inform_quarantine_nodes_req(void); -extern bool register_node_operation_request(POOL_REQUEST_KIND kind, int *node_id_set, int count, unsigned char flags); +/*main.c*/ extern char *get_config_file_name(void); extern char *get_hba_file_name(void); -extern void do_child(int *fds); -extern void pcp_main(int unix_fd, int inet_fd); -extern int select_load_balancing_node(void); -extern int pool_init_cp(void); -extern POOL_STATUS pool_process_query(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, - int reset_request); - -extern void connection_do_auth(POOL_CONNECTION_POOL_SLOT * cp, char *password); -extern int pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend); -extern int pool_do_reauth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp); -extern void authenticate_frontend(POOL_CONNECTION * frontend); - -extern bool is_backend_cache_empty(POOL_CONNECTION_POOL * backend); - -/* SSL functionality */ -extern void pool_ssl_negotiate_serverclient(POOL_CONNECTION * cp); -extern void pool_ssl_negotiate_clientserver(POOL_CONNECTION * cp); -extern void pool_ssl_close(POOL_CONNECTION * cp); -extern int pool_ssl_read(POOL_CONNECTION * cp, void *buf, int size); -extern int pool_ssl_write(POOL_CONNECTION * cp, const void *buf, int size); -extern bool pool_ssl_pending(POOL_CONNECTION * cp); -extern int SSL_ServerSide_init(void); - -extern POOL_STATUS ErrorResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend); - -extern void NoticeResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend); - -extern void notice_backend_error(int node_id, unsigned char flags); -extern bool degenerate_backend_set(int *node_id_set, int count, unsigned char flags); -extern bool degenerate_backend_set_ex(int *node_id_set, int count, unsigned char flags, bool error, bool test_only); -extern bool promote_backend(int node_id, unsigned char flags); -extern bool send_failback_request(int node_id, bool throw_error, unsigned char flags); - -extern void pool_send_frontend_exits(POOL_CONNECTION_POOL * backend); - -extern int pool_read_message_length(POOL_CONNECTION_POOL * cp); -extern int *pool_read_message_length2(POOL_CONNECTION_POOL * cp); -extern signed char pool_read_kind(POOL_CONNECTION_POOL * cp); -extern int pool_read_int(POOL_CONNECTION_POOL * cp); - -extern POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend); -extern POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, int len, char *contents); -extern POOL_STATUS ParameterStatus(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend); - -extern int pool_init_params(ParamStatus * params); -extern void pool_discard_params(ParamStatus * params); -extern char *pool_find_name(ParamStatus * params, char *name, int *pos); -extern int pool_get_param(ParamStatus * params, int index, char **name, char **value); -extern int pool_add_param(ParamStatus * params, char *name, char *value); -extern void pool_param_debug_print(ParamStatus * params); - -extern void pool_send_error_message(POOL_CONNECTION * frontend, int protoMajor, - char *code, - char *message, - char *detail, - char *hint, - char *file, - int line); -extern void pool_send_fatal_message(POOL_CONNECTION * frontend, int protoMajor, - char *code, - char *message, - char *detail, - char *hint, - char *file, - int line); -extern void pool_send_severity_message(POOL_CONNECTION * frontend, int protoMajor, - char *code, - char *message, - char *detail, - char *hint, - char *file, - char *severity, - int line); -extern void pool_send_readyforquery(POOL_CONNECTION * frontend); -extern void send_startup_packet(POOL_CONNECTION_POOL_SLOT * cp); -extern void pool_free_startup_packet(StartupPacket *sp); -extern void child_exit(int code); - -extern void init_prepared_list(void); -extern void proc_exit(int); - -extern void *pool_shared_memory_create(size_t size); -extern void pool_shmem_exit(int code); +extern char *get_pool_key(void); -extern void pool_semaphore_create(int numSems); -extern void pool_semaphore_lock(int semNum); -extern void pool_semaphore_unlock(int semNum); -extern BackendInfo * pool_get_node_info(int node_number); -extern int pool_get_node_count(void); -extern int *pool_get_process_list(int *array_size); -extern ProcessInfo * pool_get_process_info(pid_t pid); -extern POOL_STATUS OneNode_do_command(POOL_CONNECTION * frontend, POOL_CONNECTION * backend, char *query, char *database); +/*pcp_child.c*/ +extern void pcp_main(int unix_fd, int inet_fd); -/* child.c */ -extern POOL_CONNECTION_POOL_SLOT * make_persistent_db_connection( - int db_node_id, char *hostname, int port, char *dbname, char *user, char *password, bool retry); -extern POOL_CONNECTION_POOL_SLOT * make_persistent_db_connection_noerror( - int db_node_id, char *hostname, int port, char *dbname, char *user, char *password, bool retry); -extern void discard_persistent_db_connection(POOL_CONNECTION_POOL_SLOT * cp); -/* define pool_system.c */ -extern void pool_close_libpq_connection(void); -/* pool_ip.c */ -extern void pool_getnameinfo_all(SockAddr *saddr, char *remote_host, char *remote_port); +/*child.c*/ -/* strlcpy.c */ -#ifndef HAVE_STRLCPY -extern size_t strlcpy(char *dst, const char *src, size_t siz); -#endif +extern void do_child(int *fds); +extern void child_exit(int code); -/* ps_status.c */ -extern bool update_process_title; -extern char **save_ps_display_args(int argc, char **argv); -extern void init_ps_display(const char *username, const char *dbname, - const char *host_info, const char *initial_str); -extern void set_ps_display(const char *activity, bool force); -extern const char *get_ps_display(int *displen); -extern void pool_ps_idle_display(POOL_CONNECTION_POOL * backend); - -/* recovery.c */ -extern void start_recovery(int recovery_node); -extern void finish_recovery(void); -extern int wait_connection_closed(void); -extern int ensure_conn_counter_validity(void); - -/* child.c */ extern void cancel_request(CancelPacket * sp); extern void check_stop_request(void); extern void pool_initialize_private_backend_status(void); -extern bool is_session_connected(void); extern int send_to_pg_frontend(char *data, int len, bool flush); extern int pg_frontend_exists(void); extern int set_pg_frontend_blocking(bool blocking); extern int get_frontend_protocol_version(void); -extern PGVersion *Pgversion(POOL_CONNECTION_POOL * backend); - -/* pool_process_query.c */ -extern void reset_variables(void); -extern void reset_connection(void); -extern void per_node_statement_log(POOL_CONNECTION_POOL * backend, int node_id, char *query); -extern void per_node_error_log(POOL_CONNECTION_POOL * backend, int node_id, char *query, char *prefix, bool unread); -extern int pool_extract_error_message(bool read_kind, POOL_CONNECTION * backend, int major, bool unread, char **message); -extern POOL_STATUS do_command(POOL_CONNECTION * frontend, POOL_CONNECTION * backend, - char *query, int protoMajor, int pid, int key, int no_ready_for_query); -extern void do_query(POOL_CONNECTION * backend, char *query, POOL_SELECT_RESULT * *result, int major); -extern void free_select_result(POOL_SELECT_RESULT * result); -extern int compare(const void *p1, const void *p2); -extern void do_error_execute_command(POOL_CONNECTION_POOL * backend, int node_id, int major); -extern POOL_STATUS pool_discard_packet_contents(POOL_CONNECTION_POOL * cp); -extern void pool_dump_valid_backend(int backend_id); -extern bool pool_push_pending_data(POOL_CONNECTION * backend); - -/* pool_auth.c */ -extern void pool_random_salt(char *md5Salt); -extern void pool_random(void *buf, size_t len); - -/* main.c */ -extern void pool_sleep(unsigned int second); -extern char *get_pool_key(void); -/* pool_worker_child.c */ -extern void do_worker_child(void); -extern int get_query_result(POOL_CONNECTION_POOL_SLOT * *slots, int backend_id, char *query, POOL_SELECT_RESULT * *res); -/* md5.c */ -extern bool pg_md5_encrypt(const char *passwd, const char *salt, size_t salt_len, char *buf); - -/* pool_connection_pool.c */ -extern int pool_init_cp(void); -extern POOL_CONNECTION_POOL * pool_create_cp(void); -extern POOL_CONNECTION_POOL * pool_get_cp(char *user, char *database, int protoMajor, int check_socket); -extern void pool_discard_cp(char *user, char *database, int protoMajor); -extern void pool_backend_timer(void); -extern void pool_connection_pool_timer(POOL_CONNECTION_POOL * backend); -extern RETSIGTYPE pool_backend_timer_handler(int sig); -extern int connect_inet_domain_socket(int slot, bool retry); -extern int connect_unix_domain_socket(int slot, bool retry); -extern int connect_inet_domain_socket_by_port(char *host, int port, bool retry); -extern int connect_unix_domain_socket_by_port(int port, char *socket_dir, bool retry); -extern int pool_pool_index(void); - -/* utils/statistics.c */ -size_t stat_shared_memory_size(void); -void stat_set_stat_area(void *address); -void stat_init_stat_area(void); -void stat_count_up(int backend_node_id, Node *parsetree); -uint64 stat_get_select_count(int backend_node_id); -extern int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps); -/* pcp_child.c */ -extern int send_to_pcp_frontend(char *data, int len, bool flush); -extern int pcp_frontend_exists(void); -extern void pcp_worker_main(int port); -extern void pcp_mark_recovery_finished(void); -extern bool pcp_mark_recovery_in_progress(void); +/*pool_shmem.c*/ +extern void *pool_shared_memory_create(size_t size); +extern void pool_shmem_exit(int code); -/* pgpool_main.c */ +/* pool_main.c*/ +extern BackendInfo * pool_get_node_info(int node_number); +extern int pool_get_node_count(void); +extern int *pool_get_process_list(int *array_size); +extern ProcessInfo * pool_get_process_info(pid_t pid); +extern void pool_sleep(unsigned int second); +extern int PgpoolMain(bool discard_status, bool clear_memcache_oidmaps); extern int pool_send_to_frontend(char *data, int len, bool flush); extern int pool_frontend_exists(void); extern pid_t pool_waitpid(int *status); extern int write_status_file(void); -extern void do_health_check_child(int *node_id); extern POOL_NODE_STATUS * verify_backend_node_status(POOL_CONNECTION_POOL_SLOT * *slots); extern POOL_NODE_STATUS * pool_get_node_status(void); extern void pool_set_backend_status_changed_time(int backend_id); @@ -913,8 +605,16 @@ extern void set_application_name_with_string(char *string); extern void set_application_name_with_suffix(ProcessType ptype, int suffix); extern char *get_application_name(void); -/* health_check.c */ -extern size_t health_check_stats_shared_memory_size(void); -extern void health_check_stats_init(POOL_HEALTH_CHECK_STATISTICS *addr); + + +/* strlcpy.c */ +#ifndef HAVE_STRLCPY +extern size_t strlcpy(char *dst, const char *src, size_t siz); +#endif + +/* pool_worker_child.c */ +extern void do_worker_child(void); +extern int get_query_result(POOL_CONNECTION_POOL_SLOT * *slots, int backend_id, char *query, POOL_SELECT_RESULT * *res); + #endif /* POOL_H */ diff --git a/src/include/protocol/pool_proto_modules.h b/src/include/protocol/pool_proto_modules.h index 5db7730a..0d84e89d 100644 --- a/src/include/protocol/pool_proto_modules.h +++ b/src/include/protocol/pool_proto_modules.h @@ -190,4 +190,19 @@ extern void pool_at_command_success(POOL_CONNECTION * frontend, POOL_CONNECTION_ */ extern POOL_STATUS CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool command_complete); +extern int pool_read_message_length(POOL_CONNECTION_POOL * cp); +extern int *pool_read_message_length2(POOL_CONNECTION_POOL * cp); +extern signed char pool_read_kind(POOL_CONNECTION_POOL * cp); +extern int pool_read_int(POOL_CONNECTION_POOL * cp); + +/* pool_proto2.c */ +extern POOL_STATUS ErrorResponse(POOL_CONNECTION * frontend, + POOL_CONNECTION_POOL * backend); + +extern void NoticeResponse(POOL_CONNECTION * frontend, + POOL_CONNECTION_POOL * backend); + +extern void per_node_error_log(POOL_CONNECTION_POOL * backend, int node_id, +char *query, char *prefix, bool unread); + #endif diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h index 31d8038a..25a06320 100644 --- a/src/include/utils/elog.h +++ b/src/include/utils/elog.h @@ -478,6 +478,7 @@ extern void ReThrowError(ErrorData *edata) __attribute__((noreturn)); extern void pg_re_throw(void) __attribute__((noreturn)); extern char *GetErrorContextStack(void); +extern void proc_exit(int); /* Hook for intercepting messages before they are sent to the server log */ typedef void (*emit_log_hook_type) (ErrorData *edata); @@ -524,4 +525,5 @@ void on_proc_exit(pg_on_exit_callback function, Datum arg); void on_shmem_exit(pg_on_exit_callback function, Datum arg); void on_system_exit(pg_on_exit_callback function, Datum arg); + #endif /* ELOG_H */ diff --git a/src/include/utils/pool_ip.h b/src/include/utils/pool_ip.h index b43c2ecb..beccb276 100644 --- a/src/include/utils/pool_ip.h +++ b/src/include/utils/pool_ip.h @@ -9,7 +9,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Portions Copyright (c) 2003-2008 PgPool Global Development Group + * Portions Copyright (c) 2003-2020 PgPool Global Development Group * Portions Copyright (c) 2003-2005, PostgreSQL Global Development Group * * Permission to use, copy, modify, and distribute this software and @@ -62,6 +62,8 @@ extern void promote_v4_to_v6_mask(struct sockaddr_storage *addr); extern int pg_foreach_ifaddr(PgIfAddrCallback callback, void *cb_data); +extern void pool_getnameinfo_all(SockAddr *saddr, char *remote_host, char *remote_port); + #define IS_AF_INET(fam) ((fam) == AF_INET) #define IS_AF_UNIX(fam) ((fam) == AF_UNIX) diff --git a/src/include/utils/pool_ipc.h b/src/include/utils/pool_ipc.h index c65de05b..ba3f9ed2 100644 --- a/src/include/utils/pool_ipc.h +++ b/src/include/utils/pool_ipc.h @@ -29,4 +29,9 @@ extern void shmem_exit(int code); extern void on_shmem_exit(void (*function) (int code, Datum arg), Datum arg); extern void on_exit_reset(void); +/*pool_sema.c*/ +extern void pool_semaphore_create(int numSems); +extern void pool_semaphore_lock(int semNum); +extern void pool_semaphore_unlock(int semNum); + #endif /* IPC_H */ diff --git a/src/include/utils/socket_stream.h b/src/include/utils/socket_stream.h index eac9fa91..47f06041 100644 --- a/src/include/utils/socket_stream.h +++ b/src/include/utils/socket_stream.h @@ -6,7 +6,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2019 PgPool Global Development Group + * Copyright (c) 2003-2020 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby diff --git a/src/include/watchdog/watchdog.h b/src/include/watchdog/watchdog.h index 83f6651b..cee44e05 100644 --- a/src/include/watchdog/watchdog.h +++ b/src/include/watchdog/watchdog.h @@ -187,6 +187,18 @@ typedef struct WatchdogNode * initiated by local */ } WatchdogNode; +/* + * Argument for WD Exec cluster command + */ +#define WD_MAX_ARG_NAME_LEN 64 +#define WD_MAX_ARG_VALUE_LEN 64 + +typedef struct WDExecCommandArg +{ + char arg_name[WD_MAX_ARG_NAME_LEN]; + char arg_value[WD_MAX_ARG_VALUE_LEN]; +} WDExecCommandArg; + extern pid_t initialize_watchdog(void); #endif /* WATCHDOG_H */ diff --git a/src/include/watchdog/wd_commands.h b/src/include/watchdog/wd_commands.h index cbab4026..8e43c5bd 100644 --- a/src/include/watchdog/wd_commands.h +++ b/src/include/watchdog/wd_commands.h @@ -7,7 +7,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2019 PgPool Global Development Group + * Copyright (c) 2003-2020 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby diff --git a/src/include/watchdog/wd_internal_commands.h b/src/include/watchdog/wd_internal_commands.h index e2240183..d8d70147 100644 --- a/src/include/watchdog/wd_internal_commands.h +++ b/src/include/watchdog/wd_internal_commands.h @@ -37,6 +37,9 @@ extern WDFailoverCMDResults wd_send_failback_request(int node_id, unsigned char extern WDFailoverCMDResults wd_degenerate_backend_set(int *node_id_set, int count, unsigned char flags); extern WDFailoverCMDResults wd_promote_backend(int node_id, unsigned char flags); +extern WdCommandResult wd_execute_cluster_command(char* clusterCommand, + int nArgs, WDExecCommandArg *wdExecCommandArg); + extern WDPGBackendStatus * get_pg_backend_status_from_master_wd_node(void); extern WD_STATES wd_internal_get_watchdog_local_node_state(void); @@ -57,5 +60,4 @@ extern void set_watchdog_node_escalated(void); extern void reset_watchdog_node_escalated(void); extern bool get_watchdog_node_escalation_state(void); - #endif /* WD_INTERNAL_COMMANDS_H */ diff --git a/src/include/watchdog/wd_ipc_defines.h b/src/include/watchdog/wd_ipc_defines.h index 0bda9c1b..74f9b85c 100644 --- a/src/include/watchdog/wd_ipc_defines.h +++ b/src/include/watchdog/wd_ipc_defines.h @@ -6,7 +6,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2019 PgPool Global Development Group + * Copyright (c) 2003-2020 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby @@ -62,13 +62,20 @@ typedef enum WDValueDataType #define WD_IPC_CMD_RESULT_OK '7' #define WD_IPC_CMD_TIMEOUT '8' +#define WD_EXECUTE_CLUSTER_COMMAND 'c' #define WD_IPC_FAILOVER_COMMAND 'f' -#define WD_IPC_ONLINE_RECOVERY_COMMAND 'r' -#define WD_FAILOVER_LOCKING_REQUEST 's' +#define WD_IPC_ONLINE_RECOVERY_COMMAND 'r' +#define WD_FAILOVER_LOCKING_REQUEST 's' #define WD_GET_MASTER_DATA_REQUEST 'd' #define WD_GET_RUNTIME_VARIABLE_VALUE 'v' #define WD_FAILOVER_INDICATION 'i' + +#define WD_COMMAND_RESTART_CLUSTER "RESTART_CLUSTER" +#define WD_COMMAND_REELECT_MASTER "REELECT_MASTER" +#define WD_COMMAND_SHUTDOWN_CLUSTER "SHUTDOWN_CLUSTER" + + #define WD_FUNCTION_START_RECOVERY "START_RECOVERY" #define WD_FUNCTION_END_RECOVERY "END_RECOVERY" #define WD_FUNCTION_FAILBACK_REQUEST "FAILBACK_REQUEST" @@ -112,7 +119,7 @@ typedef enum WDValueDataType #define WD_RUNTIME_VAR_ESCALATION_STATE "Escalated" /* Use to inform node new node status by lifecheck */ -#define WD_LIFECHECK_NODE_STATUS_DEAD 1 +#define WD_LIFECHECK_NODE_STATUS_DEAD 1 #define WD_LIFECHECK_NODE_STATUS_ALIVE 2 diff --git a/src/include/watchdog/wd_json_data.h b/src/include/watchdog/wd_json_data.h index 0080a217..6bd46063 100644 --- a/src/include/watchdog/wd_json_data.h +++ b/src/include/watchdog/wd_json_data.h @@ -69,4 +69,14 @@ extern char *get_simple_request_json(char *key, char *value, unsigned int shared extern bool parse_data_request_json(char *json_data, int data_len, char **request_type); extern char *get_data_request_json(char *request_type, unsigned int sharedKey, char *authKey); +extern bool +parse_wd_exec_cluster_command_json(char *json_data, int data_len, + char **clusterCommand, + int *nArgs, WDExecCommandArg **wdExecCommandArg); + +extern char * +get_wd_exec_cluster_command_json(char *clusterCommand,int nArgs, + WDExecCommandArg *wdExecCommandArg, + unsigned int sharedKey, char *authKey); + #endif diff --git a/src/libs/pcp/Makefile.am b/src/libs/pcp/Makefile.am index feecdf49..9127c782 100644 --- a/src/libs/pcp/Makefile.am +++ b/src/libs/pcp/Makefile.am @@ -1,7 +1,7 @@ AM_CPPFLAGS = -D_GNU_SOURCE -DPOOL_PRIVATE -I @PGSQL_INCLUDE_DIR@ lib_LTLIBRARIES = libpcp.la -libpcp_la_LDFLAGS = -version-info 1:0:0 +libpcp_la_LDFLAGS = -version-info 2:0:0 dist_libpcp_la_SOURCES = pcp.c \ ../../utils/pool_path.c \ ../../tools/fe_port.c \ diff --git a/src/libs/pcp/pcp.c b/src/libs/pcp/pcp.c index 788d7224..0e25ff84 100644 --- a/src/libs/pcp/pcp.c +++ b/src/libs/pcp/pcp.c @@ -608,7 +608,7 @@ pcp_disconnect(PCPConnInfo * pcpConn) * -------------------------------- */ PCPResultInfo * -pcp_terminate_pgpool(PCPConnInfo * pcpConn, char mode) +pcp_terminate_pgpool(PCPConnInfo * pcpConn, char mode, char command_scope) { int wsize; @@ -617,7 +617,10 @@ pcp_terminate_pgpool(PCPConnInfo * pcpConn, char mode) pcp_internal_error(pcpConn, "invalid PCP connection"); return NULL; } - pcp_write(pcpConn->pcpConn, "T", 1); + if (command_scope == 'l') /*local only*/ + pcp_write(pcpConn->pcpConn, "T", 1); + else + pcp_write(pcpConn->pcpConn, "t", 1); wsize = htonl(sizeof(int) + sizeof(char)); pcp_write(pcpConn->pcpConn, &wsize, sizeof(int)); pcp_write(pcpConn->pcpConn, &mode, sizeof(char)); diff --git a/src/main/health_check.c b/src/main/health_check.c index 3f15951a..9c220b4f 100644 --- a/src/main/health_check.c +++ b/src/main/health_check.c @@ -44,23 +44,29 @@ #include #include #include +#include +#include #ifdef HAVE_CRYPT_H #include #endif #include "pool.h" +#include "main/health_check.h" +#include "main/pgpool_internal_commands.h" #include "utils/palloc.h" #include "utils/memutils.h" #include "utils/elog.h" +#include "utils/pool_ip.h" +#include "utils/ps_status.h" +#include "utils/pool_stream.h" #include "context/pool_process_context.h" #include "context/pool_session_context.h" +#include "protocol/pool_pg_utils.h" #include "pool_config.h" -#include "utils/pool_ip.h" #include "auth/md5.h" #include "auth/pool_hba.h" -#include "utils/pool_stream.h" volatile POOL_HEALTH_CHECK_STATISTICS *health_check_stats; /* health check stats area in shared memory */ diff --git a/src/main/main.c b/src/main/main.c index 08567584..109360b3 100644 --- a/src/main/main.c +++ b/src/main/main.c @@ -5,7 +5,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2018 PgPool Global Development Group + * Copyright (c) 2003-2020 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby @@ -20,6 +20,9 @@ */ #include "pool.h" #include "pool_config.h" +#include "version.h" +#include "pool_config_variables.h" + #include #include #include @@ -35,17 +38,19 @@ #include #include #include + #include "utils/elog.h" #include "utils/palloc.h" #include "utils/memutils.h" #include "utils/pool_path.h" +#include "utils/pool_signal.h" +#include "utils/pool_ipc.h" +#include "utils/ps_status.h" -#include "version.h" #include "auth/pool_passwd.h" #include "auth/pool_hba.h" #include "query_cache/pool_memqcache.h" #include "watchdog/wd_utils.h" -#include "pool_config_variables.h" static bool get_pool_key_filename(char *poolKeyFile); diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c index 38b58228..7851c1bc 100644 --- a/src/main/pgpool_main.c +++ b/src/main/pgpool_main.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #ifdef HAVE_SYS_SELECT_H #include @@ -48,10 +49,16 @@ #include "utils/elog.h" #include "pool.h" +#include "main/health_check.h" +#include "main/pgpool_internal_commands.h" #include "utils/palloc.h" #include "utils/memutils.h" +#include "utils/statistics.h" +#include "utils/pool_ipc.h" #include "pool_config.h" #include "context/pool_process_context.h" +#include "protocol/pool_process_query.h" +#include "protocol/pool_pg_utils.h" #include "version.h" #include "parser/pool_string.h" #include "auth/pool_passwd.h" @@ -59,8 +66,9 @@ #include "query_cache/pool_memqcache.h" #include "watchdog/wd_internal_commands.h" #include "watchdog/wd_lifecheck.h" - #include "watchdog/watchdog.h" +#include "pcp/pcp_worker.h" + /* * Reasons for signalling a pgpool-II main process @@ -148,7 +156,6 @@ static void system_will_go_down(int code, Datum arg); static char *process_name_from_pid(pid_t pid); static void sync_backend_from_watchdog(void); static void update_backend_quarantine_status(void); -static void degenerate_all_quarantine_nodes(void); static int get_server_version(POOL_CONNECTION_POOL_SLOT * *slots, int node_id); static void get_info_from_conninfo(char *conninfo, char *host, char *port); @@ -680,7 +687,8 @@ fork_a_child(int *fds, int id) /* * fork worker child process */ -static pid_t worker_fork_a_child(ProcessType type, void (*func) (), void *params) +static pid_t +worker_fork_a_child(ProcessType type, void (*func) (), void *params) { pid_t pid; @@ -1062,346 +1070,6 @@ terminate_all_childrens() POOL_SETMASK(&UnBlockSig); } -/* - * Reuest failover. If "switch_over" is false, request all existing sessions - * restarting. - */ -void -notice_backend_error(int node_id, unsigned char flags) -{ - int n = node_id; - - if (getpid() == mypid) - { - ereport(LOG, - (errmsg("notice_backend_error: called from pgpool main. ignored."))); - } - else - { - degenerate_backend_set(&n, 1, flags); - } -} - -/* - * degenerate_backend_set_ex: - * - * The function registers/verifies the node down operation request. - * The request is then processed by failover function. - * - * node_id_set: array of node ids to be registered for NODE DOWN operation - * count: number of elements in node_id_set array - * error: if set error is thrown as soon as any node id is found in - * node_id_set on which operation could not be performed. - * test_only: When set, function only checks if NODE DOWN operation can be - * executed on provided node ids and never registers the operation - * request. - * For test_only case function returs false or throws an error as - * soon as first non complient node in node_id_set is found - * switch_over: if set, the request is originated by switch over, not errors. - * - * wd_failover_id: The watchdog internal ID for this failover - */ -bool -degenerate_backend_set_ex(int *node_id_set, int count, unsigned char flags, bool error, bool test_only) -{ - int i; - int node_id[MAX_NUM_BACKENDS]; - int node_count = 0; - int elevel = LOG; - - if (error) - elevel = ERROR; - - for (i = 0; i < count; i++) - { - if (node_id_set[i] < 0 || node_id_set[i] >= MAX_NUM_BACKENDS || - (!VALID_BACKEND(node_id_set[i]) && BACKEND_INFO(node_id_set[i]).quarantine == false)) - { - if (node_id_set[i] < 0 || node_id_set[i] >= MAX_NUM_BACKENDS) - ereport(elevel, - (errmsg("invalid degenerate backend request, node id: %d is out of range. node id must be between [0 and %d]" - ,node_id_set[i], MAX_NUM_BACKENDS))); - else - ereport(elevel, - (errmsg("invalid degenerate backend request, node id : %d status: [%d] is not valid for failover" - ,node_id_set[i], BACKEND_INFO(node_id_set[i]).backend_status))); - if (test_only) - return false; - - continue; - } - - if (POOL_DISALLOW_TO_FAILOVER(BACKEND_INFO(node_id_set[i]).flag)) - { - ereport(elevel, - (errmsg("degenerate backend request for node_id: %d from pid [%d] is canceled because failover is disallowed on the node", - node_id_set[i], getpid()))); - if (test_only) - return false; - } - else - { - if (!test_only) /* do not produce this log if we are in - * testing mode */ - ereport(LOG, - (errmsg("received degenerate backend request for node_id: %d from pid [%d]", - node_id_set[i], getpid()))); - - node_id[node_count++] = node_id_set[i]; - } - } - - if (node_count) - { - WDFailoverCMDResults res = FAILOVER_RES_PROCEED; - - /* If this was only a test. Inform the caller without doing anything */ - if (test_only) - return true; - - if (!(flags & REQ_DETAIL_WATCHDOG)) - { - int x; - - for (x = 0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++) - { - res = wd_degenerate_backend_set(node_id_set, count, flags); - if (res != FAILOVER_RES_TRANSITION) - break; - sleep(1); - } - } - if (res == FAILOVER_RES_TRANSITION) - { - /* - * What to do when cluster is still not stable Is proceeding to - * failover is the right choice ??? - */ - ereport(NOTICE, - (errmsg("received degenerate backend request for %d node(s) from pid [%d], But cluster is not in stable state" - ,node_count, getpid()))); - } - if (res == FAILOVER_RES_PROCEED) - { - register_node_operation_request(NODE_DOWN_REQUEST, node_id, node_count, flags); - } - else if (res == FAILOVER_RES_NO_QUORUM) - { - ereport(LOG, - (errmsg("degenerate backend request for %d node(s) from pid [%d], is changed to quarantine node request by watchdog" - ,node_count, getpid()), - errdetail("watchdog does not holds the quorum"))); - - register_node_operation_request(NODE_QUARANTINE_REQUEST, node_id, node_count, flags); - } - else if (res == FAILOVER_RES_CONSENSUS_MAY_FAIL) - { - ereport(LOG, - (errmsg("degenerate backend request for %d node(s) from pid [%d], is changed to quarantine node request by watchdog" - ,node_count, getpid()), - errdetail("watchdog is taking time to build consensus"))); - register_node_operation_request(NODE_QUARANTINE_REQUEST, node_id, node_count, flags); - } - else if (res == FAILOVER_RES_BUILDING_CONSENSUS) - { - ereport(LOG, - (errmsg("degenerate backend request for node_id: %d from pid [%d], will be handled by watchdog, which is building consensus for request" - ,*node_id, getpid()))); - } - else if (res == FAILOVER_RES_WILL_BE_DONE) - { - /* we will receive a sync request from master watchdog node */ - ereport(LOG, - (errmsg("degenerate backend request for %d node(s) from pid [%d], will be handled by watchdog" - ,node_count, getpid()))); - } - else - { - ereport(elevel, - (errmsg("degenerate backend request for %d node(s) from pid [%d] is canceled by other pgpool" - ,node_count, getpid()))); - return false; - } - } - return true; -} - -/* - * wrapper over degenerate_backend_set_ex function to register - * NODE down operation request - */ -bool -degenerate_backend_set(int *node_id_set, int count, unsigned char flags) -{ - return degenerate_backend_set_ex(node_id_set, count, flags, false, false); -} - -/* send promote node request using SIGUSR1 */ -bool -promote_backend(int node_id, unsigned char flags) -{ - WDFailoverCMDResults res = FAILOVER_RES_PROCEED; - bool ret = false; - - if (!SL_MODE) - { - return false; - } - - if (node_id < 0 || node_id >= MAX_NUM_BACKENDS || !VALID_BACKEND(node_id)) - { - if (node_id < 0 || node_id >= MAX_NUM_BACKENDS) - ereport(LOG, - (errmsg("invalid promote backend request, node id: %d is out of range. node id must be between [0 and %d]" - ,node_id, MAX_NUM_BACKENDS))); - else - ereport(LOG, - (errmsg("invalid promote backend request, node id : %d status: [%d] not valid" - ,node_id, BACKEND_INFO(node_id).backend_status))); - return false; - } - ereport(LOG, - (errmsg("received promote backend request for node_id: %d from pid [%d]", - node_id, getpid()))); - - /* If this was only a test. Inform the caller without doing anything */ - if (!(flags & REQ_DETAIL_WATCHDOG)) - { - int x; - - for (x = 0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++) - { - res = wd_promote_backend(node_id, flags); - if (res != FAILOVER_RES_TRANSITION) - break; - sleep(1); - } - } - if (res == FAILOVER_RES_TRANSITION) - { - /* - * What to do when cluster is still not stable Is proceeding to - * failover is the right choice ??? - */ - ereport(NOTICE, - (errmsg("promote backend request for node_id: %d from pid [%d], But cluster is not in stable state" - ,node_id, getpid()))); - } - - if (res == FAILOVER_RES_PROCEED) - { - ret = register_node_operation_request(PROMOTE_NODE_REQUEST, &node_id, 1, flags); - } - else if (res == FAILOVER_RES_WILL_BE_DONE) - { - ereport(LOG, - (errmsg("promote backend request for node_id: %d from pid [%d], will be handled by watchdog" - ,node_id, getpid()))); - } - else if (res == FAILOVER_RES_NO_QUORUM) - { - ereport(LOG, - (errmsg("promote backend request for node_id: %d from pid [%d], is canceled because watchdog does not hold quorum" - ,node_id, getpid()))); - } - else if (res == FAILOVER_RES_BUILDING_CONSENSUS) - { - ereport(LOG, - (errmsg("promote backend request for node_id: %d from pid [%d], will be handled by watchdog, which is building consensus for request" - ,node_id, getpid()))); - } - else - { - ereport(LOG, - (errmsg("promote backend request for node_id: %d from pid [%d] is canceled by other pgpool" - ,node_id, getpid()))); - } - return ret; -} - -/* send failback request using SIGUSR1 */ -bool -send_failback_request(int node_id, bool throw_error, unsigned char flags) -{ - WDFailoverCMDResults res = FAILOVER_RES_PROCEED; - bool ret = false; - - if (node_id < 0 || node_id >= MAX_NUM_BACKENDS || - (RAW_MODE && BACKEND_INFO(node_id).backend_status != CON_DOWN && VALID_BACKEND(node_id))) - { - if (node_id < 0 || node_id >= MAX_NUM_BACKENDS) - ereport(throw_error ? ERROR : LOG, - (errmsg("invalid failback request, node id: %d is out of range. node id must be between [0 and %d]" - ,node_id, MAX_NUM_BACKENDS))); - else - ereport(throw_error ? ERROR : LOG, - (errmsg("invalid failback request, node id : %d status: [%d] not valid for failback" - ,node_id, BACKEND_INFO(node_id).backend_status))); - return false; - } - - ereport(LOG, - (errmsg("received failback request for node_id: %d from pid [%d]", - node_id, getpid()))); - - /* check we are trying to failback the quarantine node */ - if (BACKEND_INFO(node_id).quarantine) - { - /* set the update flags */ - ereport(LOG, - (errmsg("failback request from pid [%d] is changed to update status request because node_id: %d was quarantined", - getpid(), node_id))); - flags |= REQ_DETAIL_UPDATE; - } - else - { - /* - * no need to go to watchdog if it's an update or already initiated - * from watchdog - */ - if (!(flags & REQ_DETAIL_WATCHDOG)) - { - int x; - - for (x = 0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++) - { - res = wd_send_failback_request(node_id, flags); - if (res != FAILOVER_RES_TRANSITION) - break; - sleep(1); - } - } - } - - if (res == FAILOVER_RES_TRANSITION) - { - /* - * What to do when cluster is still not stable Is proceeding to - * failover is the right choice ??? - */ - ereport(NOTICE, - (errmsg("failback request for node_id: %d from pid [%d], But cluster is not in stable state" - ,node_id, getpid()))); - } - - if (res == FAILOVER_RES_PROCEED) - { - ret = register_node_operation_request(NODE_UP_REQUEST, &node_id, 1, flags); - } - else if (res == FAILOVER_RES_WILL_BE_DONE) - { - ereport(LOG, - (errmsg("failback request for node_id: %d from pid [%d], will be handled by watchdog" - ,node_id, getpid()))); - } - else - { - ereport(throw_error ? ERROR : LOG, - (errmsg("failback request for node_id: %d from pid [%d] is canceled by other pgpool" - ,node_id, getpid()))); - } - return ret; -} static RETSIGTYPE exit_handler(int sig) { @@ -4073,21 +3741,6 @@ pool_frontend_exists(void) return -1; } -static void -degenerate_all_quarantine_nodes(void) -{ - int i; - - for (i = 0; i < NUM_BACKENDS; i++) - { - if (BACKEND_INFO(i).quarantine && BACKEND_INFO(i).backend_status == CON_DOWN) - { - /* just send the request to watchdog */ - if (wd_degenerate_backend_set(&i, 1, REQ_DETAIL_UPDATE) == FAILOVER_RES_PROCEED) - register_node_operation_request(NODE_DOWN_REQUEST, &i, 1, REQ_DETAIL_WATCHDOG | REQ_DETAIL_SWITCHOVER); - } - } -} static void update_backend_quarantine_status(void) diff --git a/src/pcp_con/pcp_child.c b/src/pcp_con/pcp_child.c index 0ac47ef2..abd055a1 100644 --- a/src/pcp_con/pcp_child.c +++ b/src/pcp_con/pcp_child.c @@ -25,6 +25,11 @@ #include "pool.h" #include "utils/palloc.h" #include "utils/memutils.h" +#include "utils/pool_signal.h" +#include "utils/pool_ipc.h" +#include "utils/ps_status.h" + +#include "pcp/pcp_worker.h" #include #include diff --git a/src/pcp_con/pcp_worker.c b/src/pcp_con/pcp_worker.c index 35109e6a..ec7dc7da 100644 --- a/src/pcp_con/pcp_worker.c +++ b/src/pcp_con/pcp_worker.c @@ -25,6 +25,9 @@ #include "pool.h" #include "utils/palloc.h" #include "utils/memutils.h" +#include "utils/ps_status.h" + +#include "main/pgpool_internal_commands.h" #include #include @@ -42,7 +45,10 @@ #include "pcp/pcp_stream.h" #include "pcp/pcp.h" +#include "pcp/pcp_worker.h" +#include "pcp/recovery.h" #include "auth/md5.h" +#include "auth/pool_auth.h" #include "pool_config.h" #include "context/pool_process_context.h" #include "utils/pool_process_reporting.h" @@ -80,7 +86,7 @@ static void process_attach_node(PCP_CONNECTION * frontend, char *buf); static void process_recovery_request(PCP_CONNECTION * frontend, char *buf); static void process_status_request(PCP_CONNECTION * frontend); static void process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos); -static void process_shutown_request(PCP_CONNECTION * frontend, char mode); +static void process_shutown_request(PCP_CONNECTION * frontend, char mode, char tos); static void process_set_configration_parameter(PCP_CONNECTION * frontend, char *buf, int len); static void pcp_worker_will_go_down(int code, Datum arg); @@ -297,8 +303,9 @@ pcp_process_command(char tos, char *buf, int buf_len) break; case 'T': + case 't': set_ps_display("PCP: processing shutdown request", false); - process_shutown_request(pcp_frontend, buf[0]); + process_shutown_request(pcp_frontend, buf[0], tos); break; case 'O': /* recovery request */ @@ -1260,48 +1267,48 @@ send_md5salt(PCP_CONNECTION * frontend, char *salt) } static void -process_shutown_request(PCP_CONNECTION * frontend, char mode) +process_shutown_request(PCP_CONNECTION * frontend, char mode, char tos) { char code[] = "CommandComplete"; - pid_t ppid = getppid(); - int sig, - len; + int len; - if (mode == 's') - { - ereport(DEBUG1, - (errmsg("PCP: processing shutdown request"), - errdetail("sending SIGTERM to the parent process with PID:%d", ppid))); - sig = SIGTERM; - } - else if (mode == 'f') - { - ereport(DEBUG1, - (errmsg("PCP: processing shutdown request"), - errdetail("sending SIGINT to the parent process with PID:%d", ppid))); - sig = SIGINT; - } - else if (mode == 'i') - { - ereport(DEBUG1, - (errmsg("PCP: processing shutdown request"), - errdetail("sending SIGQUIT to the parent process with PID:%d", ppid))); - sig = SIGQUIT; - } - else + ereport(DEBUG1, + (errmsg("PCP: processing shutdown request"), + errdetail("shutdown mode \"%c\"", mode))); + + /* quickly bail out if invalid mode is specified + * because we do not want to propogate the command + * with invalid mode over the watchdog network */ + if (mode != 's' && mode != 'i' && mode != 'f' ) { ereport(ERROR, (errmsg("PCP: error while processing shutdown request"), errdetail("invalid shutdown mode \"%c\"", mode))); } + if (tos == 't' && pool_config->use_watchdog) + { + WDExecCommandArg wdExecCommandArg; + + strncpy(wdExecCommandArg.arg_name, "mode", sizeof(wdExecCommandArg.arg_name) - 1); + snprintf(wdExecCommandArg.arg_value, sizeof(wdExecCommandArg.arg_name) - 1, "%c",mode); + + ereport(LOG, + (errmsg("PCP: sending command to watchdog to shutdown cluster"))); + + if (wd_execute_cluster_command(WD_COMMAND_SHUTDOWN_CLUSTER,1, &wdExecCommandArg) != COMMAND_OK) + ereport(ERROR, + (errmsg("PCP: error while processing shutdown cluster request"), + errdetail("failed to propogate shutdown command through watchdog"))); + } + pcp_write(frontend, "t", 1); len = htonl(sizeof(code) + sizeof(int)); pcp_write(frontend, &len, sizeof(int)); pcp_write(frontend, code, sizeof(code)); do_pcp_flush(frontend); - pool_signal_parent(sig); + terminate_pgpool(mode, true); } static void diff --git a/src/pcp_con/recovery.c b/src/pcp_con/recovery.c index 7bb98de8..d4142067 100644 --- a/src/pcp_con/recovery.c +++ b/src/pcp_con/recovery.c @@ -5,7 +5,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2018 PgPool Global Development Group + * Copyright (c) 2003-2020 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby @@ -22,17 +22,21 @@ * */ + #include "config.h" +#include "pool.h" +#include "pool_config.h" + #include #include +#include "pcp/recovery.h" #include "utils/elog.h" - -#include "pool.h" -#include "pool_config.h" +#include "utils/pool_signal.h" #include "libpq-fe.h" +#include "main/pgpool_internal_commands.h" #include "watchdog/wd_internal_commands.h" #define WAIT_RETRY_COUNT (pool_config->recovery_timeout / 3) diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c index e266f9d0..4eb1aca6 100644 --- a/src/protocol/CommandComplete.c +++ b/src/protocol/CommandComplete.c @@ -29,6 +29,7 @@ #include "pool.h" #include "protocol/pool_proto_modules.h" +#include "protocol/pool_process_query.h" #include "parser/pg_config_manual.h" #include "parser/pool_string.h" #include "pool_config.h" diff --git a/src/protocol/child.c b/src/protocol/child.c index fd2c7610..57df61c5 100644 --- a/src/protocol/child.c +++ b/src/protocol/child.c @@ -45,19 +45,27 @@ #include #include "pool.h" -#include "utils/palloc.h" -#include "utils/memutils.h" -#include "context/pool_process_context.h" -#include "context/pool_session_context.h" #include "pool_config.h" #include "pool_config_variables.h" +#include "utils/palloc.h" +#include "utils/memutils.h" +#include "utils/pool_ssl.h" +#include "utils/pool_ipc.h" +#include "utils/pool_relcache.h" #include "utils/pool_ip.h" #include "utils/pool_stream.h" #include "utils/elog.h" +#include "utils/ps_status.h" + +#include "context/pool_process_context.h" +#include "context/pool_session_context.h" +#include "protocol/pool_connection_pool.h" +#include "protocol/pool_process_query.h" +#include "protocol/pool_pg_utils.h" +#include "auth/pool_auth.h" #include "auth/md5.h" #include "auth/pool_passwd.h" #include "auth/pool_hba.h" -#include "utils/pool_relcache.h" static StartupPacket *read_startup_packet(POOL_CONNECTION * cp); static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION * frontend); @@ -67,7 +75,6 @@ static RETSIGTYPE wakeup_handler(int sig); static RETSIGTYPE reload_config_handler(int sig); static RETSIGTYPE authentication_timeout(int sig); static void send_params(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend); -static void send_frontend_exits(void); static int connection_count_up(void); static void connection_count_down(void); static bool connect_using_existing_connection(POOL_CONNECTION * frontend, @@ -85,8 +92,7 @@ static POOL_CONNECTION_POOL * get_backend_connection(POOL_CONNECTION * frontend) static StartupPacket *StartupPacketCopy(StartupPacket *sp); static void print_process_status(char *remote_host, char *remote_port); static bool backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * volatile backend, bool frontend_invalid); -static void free_persisten_db_connection_memory(POOL_CONNECTION_POOL_SLOT * cp); -static int choose_db_node_id(char *str); + static void child_will_go_down(int code, Datum arg); static int opt_sort(const void *a, const void *b); @@ -112,8 +118,6 @@ static int child_unix_fd = 0; extern int myargc; extern char **myargv; -char remote_ps_data[NI_MAXHOST + NI_MAXSERV + 2]; /* used for set_ps_display */ - volatile sig_atomic_t got_sighup = 0; char remote_host[NI_MAXHOST]; /* client host */ @@ -717,18 +721,6 @@ read_startup_packet(POOL_CONNECTION * cp) return sp; } -/* - * send startup packet - */ -void -send_startup_packet(POOL_CONNECTION_POOL_SLOT * cp) -{ - int len; - - len = htonl(cp->sp->len + sizeof(len)); - pool_write(cp->con, &len, sizeof(len)); - pool_write_and_flush(cp->con, cp->sp->startup_packet, cp->sp->len); -} /* * Reuse existing connection @@ -1207,34 +1199,6 @@ disable_authentication_timeout(void) } } -/* - * send frontend exiting messages to all connections. this is called - * in any case when child process exits, for example failover, child - * life time expires or child max connections expires. - */ -static void -send_frontend_exits(void) -{ - int i; - POOL_CONNECTION_POOL *p = pool_connection_pool; - - pool_sigset_t oldmask; - - POOL_SETMASK2(&BlockSig, &oldmask); - - for (i = 0; i < pool_config->max_pool; i++, p++) - { - if (!MASTER_CONNECTION(p)) - continue; - if (!MASTER_CONNECTION(p)->sp) - continue; - if (MASTER_CONNECTION(p)->sp->user == NULL) - continue; - pool_send_frontend_exits(p); - } - - POOL_SETMASK(&oldmask); -} static void send_params(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) @@ -1264,22 +1228,6 @@ send_params(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) } } -void -pool_free_startup_packet(StartupPacket *sp) -{ - if (sp) - { - if (sp->startup_packet) - pfree(sp->startup_packet); - if (sp->database) - pfree(sp->database); - if (sp->user) - pfree(sp->user); - pfree(sp); - } - sp = NULL; -} - /* * Do house keeping works when pgpool child process exits */ @@ -1306,7 +1254,7 @@ child_will_go_down(int code, Datum arg) /* let backend know now we are exiting */ if (pool_connection_pool) - send_frontend_exits(); + close_all_backend_connections(); } void child_exit(int code) @@ -1328,215 +1276,7 @@ child_exit(int code) exit(code); } -/* - * create a persistent connection - */ -POOL_CONNECTION_POOL_SLOT * -make_persistent_db_connection( - int db_node_id, char *hostname, int port, char *dbname, char *user, char *password, bool retry) -{ - POOL_CONNECTION_POOL_SLOT *cp; - int fd; - -#define MAX_USER_AND_DATABASE 1024 - - /* V3 startup packet */ - typedef struct - { - int protoVersion; - char data[MAX_USER_AND_DATABASE]; - } StartupPacket_v3; - - static StartupPacket_v3 * startup_packet; - int len, - len1; - cp = palloc0(sizeof(POOL_CONNECTION_POOL_SLOT)); - startup_packet = palloc0(sizeof(*startup_packet)); - startup_packet->protoVersion = htonl(0x00030000); /* set V3 proto - * major/minor */ - - /* - * create socket - */ - if (*hostname == '/') - { - fd = connect_unix_domain_socket_by_port(port, hostname, retry); - } - else - { - fd = connect_inet_domain_socket_by_port(hostname, port, retry); - } - - if (fd < 0) - { - free_persisten_db_connection_memory(cp); - pfree(startup_packet); - ereport(ERROR, - (errmsg("failed to make persistent db connection"), - errdetail("connection to host:\"%s:%d\" failed", hostname, port))); - } - - cp->con = pool_open(fd, true); - cp->closetime = 0; - cp->con->isbackend = 1; - pool_set_db_node_id(cp->con, db_node_id); - - pool_ssl_negotiate_clientserver(cp->con); - - /* - * build V3 startup packet - */ - len = snprintf(startup_packet->data, sizeof(startup_packet->data), "user") + 1; - len1 = snprintf(&startup_packet->data[len], sizeof(startup_packet->data) - len, "%s", user) + 1; - if (len1 >= (sizeof(startup_packet->data) - len)) - { - pool_close(cp->con); - free_persisten_db_connection_memory(cp); - pfree(startup_packet); - ereport(ERROR, - (errmsg("failed to make persistent db connection"), - errdetail("user name is too long"))); - } - - len += len1; - len1 = snprintf(&startup_packet->data[len], sizeof(startup_packet->data) - len, "database") + 1; - if (len1 >= (sizeof(startup_packet->data) - len)) - { - pool_close(cp->con); - free_persisten_db_connection_memory(cp); - pfree(startup_packet); - ereport(ERROR, - (errmsg("failed to make persistent db connection"), - errdetail("user name is too long"))); - } - - len += len1; - len1 = snprintf(&startup_packet->data[len], sizeof(startup_packet->data) - len, "%s", dbname) + 1; - if (len1 >= (sizeof(startup_packet->data) - len)) - { - pool_close(cp->con); - free_persisten_db_connection_memory(cp); - pfree(startup_packet); - ereport(ERROR, - (errmsg("failed to make persistent db connection"), - errdetail("database name is too long"))); - } - len += len1; - startup_packet->data[len++] = '\0'; - - cp->sp = palloc(sizeof(StartupPacket)); - - cp->sp->startup_packet = (char *) startup_packet; - cp->sp->len = len + 4; - cp->sp->major = 3; - cp->sp->minor = 0; - cp->sp->database = pstrdup(dbname); - cp->sp->user = pstrdup(user); - - /* - * send startup packet - */ - PG_TRY(); - { - send_startup_packet(cp); - connection_do_auth(cp, password); - } - PG_CATCH(); - { - pool_close(cp->con); - free_persisten_db_connection_memory(cp); - PG_RE_THROW(); - } - PG_END_TRY(); - - return cp; -} - -/* - * make_persistent_db_connection_noerror() is a wrapper over - * make_persistent_db_connection() which does not ereports in case of an error - */ -POOL_CONNECTION_POOL_SLOT * -make_persistent_db_connection_noerror( - int db_node_id, char *hostname, int port, char *dbname, char *user, char *password, bool retry) -{ - POOL_CONNECTION_POOL_SLOT *slot = NULL; - MemoryContext oldContext = CurrentMemoryContext; - - PG_TRY(); - { - slot = make_persistent_db_connection(db_node_id, - hostname, - port, - dbname, - user, - password, retry); - } - PG_CATCH(); - { - EmitErrorReport(); - MemoryContextSwitchTo(oldContext); - FlushErrorState(); - slot = NULL; - } - PG_END_TRY(); - return slot; -} - -/* - * Free memory of POOL_CONNECTION_POOL_SLOT. Should only be used in - * make_persistent_db_connection and discard_persistent_db_connection. - */ -void -free_persisten_db_connection_memory(POOL_CONNECTION_POOL_SLOT * cp) -{ - if (!cp) - return; - if (!cp->sp) - { - pfree(cp); - return; - } - if (cp->sp->startup_packet) - pfree(cp->sp->startup_packet); - if (cp->sp->database) - pfree(cp->sp->database); - if (cp->sp->user) - pfree(cp->sp->user); - pfree(cp->sp); - pfree(cp); -} - -/* - * Discard connection and memory allocated by - * make_persistent_db_connection(). - */ -void -discard_persistent_db_connection(POOL_CONNECTION_POOL_SLOT * cp) -{ - int len; - - if (cp == NULL) - return; - - pool_write(cp->con, "X", 1); - len = htonl(4); - pool_write(cp->con, &len, sizeof(len)); - - /* - * XXX we cannot call pool_flush() here since backend may already close - * the socket and pool_flush() automatically invokes fail over handler. - * This could happen in copy command (remember the famous "lost - * synchronization with server, resetting connection" message) - */ - socket_set_nonblock(cp->con->fd); - pool_flush_it(cp->con); - socket_unset_nonblock(cp->con->fd); - - pool_close(cp->con); - free_persisten_db_connection_memory(cp); -} /* * Count up connection counter (from frontend to pgpool) in shared memory and @@ -1592,185 +1332,7 @@ static RETSIGTYPE wakeup_handler(int sig) } -/* - * Select load balancing node. This function is called when: - * 1) client connects - * 2) the node previously selected for the load balance node is down - */ -int -select_load_balancing_node(void) -{ - int selected_slot; - double total_weight, - r; - int i; - int index_db = -1, - index_app = -1; - POOL_SESSION_CONTEXT *ses = pool_get_session_context(false); - int tmp; - int no_load_balance_node_id = -2; - /* - * -2 indicates there's no database_redirect_preference_list. -1 indicates - * database_redirect_preference_list exists and any of standby nodes - * specified. - */ - int suggested_node_id = -2; - -#if defined(sun) || defined(__sun) - r = (((double) rand()) / RAND_MAX); -#else - r = (((double) random()) / RAND_MAX); -#endif - - /* - * Check database_redirect_preference_list - */ - if (SL_MODE && pool_config->redirect_dbnames) - { - char *database = MASTER_CONNECTION(ses->backend)->sp->database; - - /* - * Check to see if the database matches any of - * database_redirect_preference_list - */ - index_db = regex_array_match(pool_config->redirect_dbnames, database); - if (index_db >= 0) - { - /* Matches */ - ereport(DEBUG1, - (errmsg("selecting load balance node db matched"), - errdetail("dbname: %s index is %d dbnode is %s weight is %f", database, index_db, - pool_config->db_redirect_tokens->token[index_db].right_token, - pool_config->db_redirect_tokens->token[index_db].weight_token))); - - tmp = choose_db_node_id(pool_config->db_redirect_tokens->token[index_db].right_token); - if (tmp == -1 || (tmp >= 0 && VALID_BACKEND(tmp))) - suggested_node_id = tmp; - } - } - - /* - * Check app_name_redirect_preference_list - */ - if (SL_MODE && pool_config->redirect_app_names) - { - char *app_name = MASTER_CONNECTION(ses->backend)->sp->application_name; - - /* - * Check only if application name is set. Old applications may not - * have application name. - */ - if (app_name && strlen(app_name) > 0) - { - /* - * Check to see if the aplication name matches any of - * app_name_redirect_preference_list. - */ - index_app = regex_array_match(pool_config->redirect_app_names, app_name); - if (index_app >= 0) - { - - /* - * if the aplication name matches any of - * app_name_redirect_preference_list, - * database_redirect_preference_list will be ignored. - */ - index_db = -1; - - /* Matches */ - ereport(DEBUG1, - (errmsg("selecting load balance node db matched"), - errdetail("app_name: %s index is %d dbnode is %s weight is %f", app_name, index_app, - pool_config->app_name_redirect_tokens->token[index_app].right_token, - pool_config->app_name_redirect_tokens->token[index_app].weight_token))); - - tmp = choose_db_node_id(pool_config->app_name_redirect_tokens->token[index_app].right_token); - if (tmp == -1 || (tmp >= 0 && VALID_BACKEND(tmp))) - suggested_node_id = tmp; - } - } - } - - if (suggested_node_id >= 0) - { - /* - * If the weight is bigger than random rate then send to - * suggested_node_id. If the weight is less than random rate then - * choose load balance node from other nodes. - */ - if ((index_db >= 0 && r <= pool_config->db_redirect_tokens->token[index_db].weight_token) || - (index_app >= 0 && r <= pool_config->app_name_redirect_tokens->token[index_app].weight_token)) - { - ereport(DEBUG1, - (errmsg("selecting load balance node"), - errdetail("selected backend id is %d", suggested_node_id))); - return suggested_node_id; - } - else - no_load_balance_node_id = suggested_node_id; - } - - /* In case of sending to standby */ - if (suggested_node_id == -1) - { - /* If the weight is less than random rate then send to primary. */ - if ((index_db >= 0 && r > pool_config->db_redirect_tokens->token[index_db].weight_token) || - (index_app >= 0 && r > pool_config->app_name_redirect_tokens->token[index_app].weight_token)) - { - ereport(DEBUG1, - (errmsg("selecting load balance node"), - errdetail("selected backend id is %d", PRIMARY_NODE_ID))); - return PRIMARY_NODE_ID; - } - } - - /* Choose a backend in random manner with weight */ - selected_slot = MASTER_NODE_ID; - total_weight = 0.0; - - for (i = 0; i < NUM_BACKENDS; i++) - { - if (VALID_BACKEND_RAW(i)) - { - if (i == no_load_balance_node_id) - continue; - if (suggested_node_id == -1) - { - if (i != PRIMARY_NODE_ID) - total_weight += BACKEND_INFO(i).backend_weight; - } - else - total_weight += BACKEND_INFO(i).backend_weight; - } - } - -#if defined(sun) || defined(__sun) - r = (((double) rand()) / RAND_MAX) * total_weight; -#else - r = (((double) random()) / RAND_MAX) * total_weight; -#endif - - total_weight = 0.0; - for (i = 0; i < NUM_BACKENDS; i++) - { - if ((suggested_node_id == -1 && i == PRIMARY_NODE_ID) || i == no_load_balance_node_id) - continue; - - if (VALID_BACKEND_RAW(i) && BACKEND_INFO(i).backend_weight > 0.0) - { - if (r >= total_weight) - selected_slot = i; - else - break; - total_weight += BACKEND_INFO(i).backend_weight; - } - } - ereport(DEBUG1, - (errmsg("selecting load balance node"), - errdetail("selected backend id is %d", selected_slot))); - return selected_slot; -} /* SIGHUP handler */ static RETSIGTYPE reload_config_handler(int sig) @@ -2384,49 +1946,7 @@ print_process_status(char *remote_host, char *remote_port) snprintf(remote_ps_data, sizeof(remote_ps_data), "%s(%s)", remote_host, remote_port); } -bool -is_session_connected() -{ - if (processType == PT_CHILD) - return (pool_get_session_context(true) != NULL); - return false; -} - -/* - * Given db node specified in pgpool.conf, returns appropriate physical - * DB node id. - * Acceptable db node specifications are: - * - * primary: primary node - * standby: any of standby node - * numeric: physical node id - * - * If specified node does exist, returns MASTER_NODE_ID. If "standby" is - * specified, returns -1. Caller should choose one of standby nodes - * appropriately. - */ -static int -choose_db_node_id(char *str) -{ - int node_id = MASTER_NODE_ID; - - if (!strcmp("primary", str) && PRIMARY_NODE_ID >= 0) - { - node_id = PRIMARY_NODE_ID; - } - else if (!strcmp("standby", str)) - { - node_id = -1; - } - else - { - int tmp = atoi(str); - if (tmp >= 0 && tmp < NUM_BACKENDS) - node_id = tmp; - } - return node_id; -} int send_to_pg_frontend(char *data, int len, bool flush) @@ -2478,155 +1998,4 @@ static int opt_sort(const void *a, const void *b) return strcmp( *(char **)a, *(char **)b); } -/* - * Returns PostgreSQL version. - * The returned PgVersion struct is in static memory. - * Caller must not modify it. - * - * Note: - * Must be called while query context already exists. - * If there's something goes wrong, this raises FATAL. So never returns to caller. - * - */ -PGVersion * -Pgversion(POOL_CONNECTION_POOL * backend) -{ -#define VERSION_BUF_SIZE 10 - static PGVersion pgversion; - static POOL_RELCACHE *relcache; - char *result; - char *p; - char buf[VERSION_BUF_SIZE]; - int i; - int major; - int minor; - - /* - * First, check local cache. If cache is set, just return it. - */ - if (pgversion.major != 0) - { - ereport(DEBUG5, - (errmsg("Pgversion: local cache returned"))); - - return &pgversion; - } - if (!relcache) - { - /* - * Create relcache. - */ - relcache = pool_create_relcache(pool_config->relcache_size, "SELECT version()", - string_register_func, string_unregister_func, false); - if (relcache == NULL) - { - ereport(FATAL, - (errmsg("Pgversion: unable to create relcache while getting PostgreSQL version."))); - return NULL; - } - } - - /* - * Search relcache. - */ - result = (char *)pool_search_relcache(relcache, backend, "version"); - if (result == 0) - { - ereport(FATAL, - (errmsg("Pgversion: unable to search relcache while getting PostgreSQL version."))); - return NULL; - } - - ereport(DEBUG5, - (errmsg("Pgversion: version string: %s", result))); - - /* - * Extract major version number. We create major version as "version" * - * 10. For example, for V10, the major version number will be 100, for - * V9.6 it will be 96, and so on. For alpha or beta version, the version - * string could be something like "12beta1". In this case we assume that - * atoi(3) is smart enough to stop at the first character which is not a - * valid digit (in our case 'b')). So "12beta1" should be converted to 12. - */ - p = strchr(result, ' '); - if (p == NULL) - { - ereport(FATAL, - (errmsg("Pgversion: unable to find the first space in the version string: %s", result))); - return NULL; - } - - p++; - i = 0; - while (i < VERSION_BUF_SIZE - 1 && p && *p != '.') - { - buf[i++] = *p++; - } - buf[i] = '\0'; - major = atoi(buf); - ereport(DEBUG5, - (errmsg("Pgversion: major version: %d", major))); - - /* Assuming PostgreSQL V100 is the final release:-) */ - if (major < 6 || major > 100) - { - ereport(FATAL, - (errmsg("Pgversion: wrong major version: %d", major))); - return NULL; - } - - /* - * If major version is 10 or above, we are done to extract major. - * Otherwise extract below decimal point part. - */ - if (major >= 10) - { - major *= 10; - } - else - { - p++; - i = 0; - while (i < VERSION_BUF_SIZE -1 && p && *p != '.' && *p != ' ') - { - buf[i++] = *p++; - } - buf[i] = '\0'; - major = major * 10 + atoi(buf); - ereport(DEBUG5, - (errmsg("Pgversion: major version: %d", major))); - pgversion.major = major; - } - - /* - * Extract minor version. - */ - p++; - i = 0; - while (i < VERSION_BUF_SIZE -1 && p && *p != '.' && *p != ' ') - { - buf[i++] = *p++; - } - buf[i] = '\0'; - minor = atoi(buf); - ereport(DEBUG5, - (errmsg("Pgversion: minor version: %d", minor))); - - if (minor < 0 || minor > 100) - { - ereport(FATAL, - (errmsg("Pgversion: wrong minor version: %d", minor))); - return NULL; - } - - - /* - * Ok, everything looks good. Set the local cache. - */ - pgversion.major = major; - pgversion.minor = minor; - strncpy(pgversion.version_string, result, sizeof(pgversion.version_string) - 1); - - return &pgversion; -} diff --git a/src/protocol/pool_connection_pool.c b/src/protocol/pool_connection_pool.c index 325fd1bc..5b6b150b 100644 --- a/src/protocol/pool_connection_pool.c +++ b/src/protocol/pool_connection_pool.c @@ -5,7 +5,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2019 PgPool Global Development Group + * Copyright (c) 2003-2020 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby @@ -34,7 +34,7 @@ #include #endif #include - +#include #include #include #include @@ -48,6 +48,12 @@ #include "pool_config.h" #include "utils/elog.h" #include "utils/memutils.h" +#include "protocol/pool_connection_pool.h" +#include "protocol/pool_process_query.h" +#include "protocol/pool_pg_utils.h" +#include "main/pgpool_internal_commands.h" + + #include "context/pool_process_context.h" static int pool_index; /* Active pool index */ @@ -1019,3 +1025,34 @@ pool_pool_index(void) { return pool_index; } + +/* + * send frontend exiting messages to all connections. this is called + * in any case when child process exits, for example failover, child + * life time expires or child max connections expires. + */ + +void +close_all_backend_connections(void) +{ + int i; + POOL_CONNECTION_POOL *p = pool_connection_pool; + + pool_sigset_t oldmask; + + POOL_SETMASK2(&BlockSig, &oldmask); + + for (i = 0; i < pool_config->max_pool; i++, p++) + { + if (!MASTER_CONNECTION(p)) + continue; + if (!MASTER_CONNECTION(p)->sp) + continue; + if (MASTER_CONNECTION(p)->sp->user == NULL) + continue; + pool_send_frontend_exits(p); + } + + POOL_SETMASK(&oldmask); +} + diff --git a/src/protocol/pool_process_query.c b/src/protocol/pool_process_query.c index 2e889e8f..2ab2fab8 100644 --- a/src/protocol/pool_process_query.c +++ b/src/protocol/pool_process_query.c @@ -43,11 +43,19 @@ #include "pool.h" #include "pool_config.h" #include "rewrite/pool_timestamp.h" +#include "main/pgpool_internal_commands.h" +#include "protocol/pool_process_query.h" #include "protocol/pool_proto_modules.h" +#include "protocol/pool_connection_pool.h" +#include "protocol/pool_pg_utils.h" #include "protocol/protocol_defs.h" #include "utils/palloc.h" #include "utils/memutils.h" #include "utils/elog.h" +#include "utils/pool_ssl.h" +#include "utils/ps_status.h" +#include "utils/pool_signal.h" + #include "auth/pool_hba.h" #include "utils/pool_relcache.h" #include "utils/pool_stream.h" @@ -55,7 +63,6 @@ #include "context/pool_query_context.h" #include "utils/pool_select_walker.h" #include "query_cache/pool_memqcache.h" -#include "utils/pool_signal.h" #include "parser/pool_string.h" #ifndef FD_SETSIZE diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c index 7bfbde46..222875b2 100644 --- a/src/protocol/pool_proto_modules.c +++ b/src/protocol/pool_proto_modules.c @@ -45,6 +45,8 @@ #include "rewrite/pool_timestamp.h" #include "rewrite/pool_lobj.h" #include "protocol/pool_proto_modules.h" +#include "protocol/pool_process_query.h" +#include "protocol/pool_pg_utils.h" #include "pool_config.h" #include "parser/pool_string.h" #include "context/pool_session_context.h" @@ -53,10 +55,12 @@ #include "utils/pool_select_walker.h" #include "utils/pool_relcache.h" #include "utils/pool_stream.h" -#include "query_cache/pool_memqcache.h" +#include "utils/ps_status.h" #include "utils/pool_signal.h" #include "utils/palloc.h" #include "utils/memutils.h" +#include "query_cache/pool_memqcache.h" +#include "main/pgpool_internal_commands.h" #include "pool_config_variables.h" char *copy_table = NULL; /* copy table name */ @@ -530,7 +534,7 @@ SimpleQuery(POOL_CONNECTION * frontend, (errmsg("Query: sending SIGUSR1 signal to parent"))); ignore_sigusr1 = 1; /* disable SIGUSR1 handler */ - register_node_operation_request(CLOSE_IDLE_REQUEST, NULL, 0, 0); + close_idle_connections(); /* * We need to loop over here since we might get some signals while @@ -4034,3 +4038,196 @@ pool_at_command_success(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backe } } } + +/* + * read message length (V3 only) + */ +int +pool_read_message_length(POOL_CONNECTION_POOL * cp) +{ + int length, + length0; + int i; + + /* read message from master node */ + pool_read(CONNECTION(cp, MASTER_NODE_ID), &length0, sizeof(length0)); + length0 = ntohl(length0); + + ereport(DEBUG5, + (errmsg("reading message length"), + errdetail("slot: %d length: %d", MASTER_NODE_ID, length0))); + + for (i = 0; i < NUM_BACKENDS; i++) + { + if (!VALID_BACKEND(i) || IS_MASTER_NODE_ID(i)) + { + continue; + } + + pool_read(CONNECTION(cp, i), &length, sizeof(length)); + + length = ntohl(length); + ereport(DEBUG5, + (errmsg("reading message length"), + errdetail("slot: %d length: %d", i, length))); + + if (length != length0) + ereport(ERROR, + (errmsg("unable to read message length"), + errdetail("message length (%d) in slot %d does not match with slot 0(%d)", length, i, length0))); + + } + + if (length0 < 0) + ereport(ERROR, + (errmsg("unable to read message length"), + errdetail("invalid message length (%d)", length))); + + return length0; +} + +/* + * read message length2 (V3 only) + * unlike pool_read_message_length, this returns an array of message length. + * The array is in the static storage, thus it will be destroyed by subsequent calls. + */ +int * +pool_read_message_length2(POOL_CONNECTION_POOL * cp) +{ + int length, + length0; + int i; + static int length_array[MAX_CONNECTION_SLOTS]; + + /* read message from master node */ + pool_read(CONNECTION(cp, MASTER_NODE_ID), &length0, sizeof(length0)); + + length0 = ntohl(length0); + length_array[MASTER_NODE_ID] = length0; + ereport(DEBUG5, + (errmsg("reading message length"), + errdetail("master slot: %d length: %d", MASTER_NODE_ID, length0))); + + for (i = 0; i < NUM_BACKENDS; i++) + { + if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i)) + { + pool_read(CONNECTION(cp, i), &length, sizeof(length)); + + length = ntohl(length); + ereport(DEBUG5, + (errmsg("reading message length"), + errdetail("master slot: %d length: %d", i, length))); + + if (length != length0) + { + ereport(LOG, + (errmsg("reading message length"), + errdetail("message length (%d) in slot %d does not match with slot 0(%d)", length, i, length0))); + } + + if (length < 0) + { + ereport(ERROR, + (errmsg("unable to read message length"), + errdetail("invalid message length (%d)", length))); + } + + length_array[i] = length; + } + + } + return &length_array[0]; +} + +signed char +pool_read_kind(POOL_CONNECTION_POOL * cp) +{ + char kind0, + kind; + int i; + + kind = -1; + kind0 = 0; + + for (i = 0; i < NUM_BACKENDS; i++) + { + if (!VALID_BACKEND(i)) + { + continue; + } + + pool_read(CONNECTION(cp, i), &kind, sizeof(kind)); + + if (IS_MASTER_NODE_ID(i)) + { + kind0 = kind; + } + else + { + if (kind != kind0) + { + char *message; + + if (kind0 == 'E') + { + if (pool_extract_error_message(false, MASTER(cp), MAJOR(cp), true, &message) == 1) + { + ereport(LOG, + (errmsg("pool_read_kind: error message from master backend:%s", message))); + pfree(message); + } + } + else if (kind == 'E') + { + if (pool_extract_error_message(false, CONNECTION(cp, i), MAJOR(cp), true, &message) == 1) + { + ereport(LOG, + (errmsg("pool_read_kind: error message from %d th backend:%s", i, message))); + pfree(message); + } + } + ereport(ERROR, + (errmsg("unable to read message kind"), + errdetail("kind does not match between master(%x) slot[%d] (%x)", kind0, i, kind))); + } + } + } + + return kind; +} + +int +pool_read_int(POOL_CONNECTION_POOL * cp) +{ + int data0, + data; + int i; + + data = -1; + data0 = 0; + + for (i = 0; i < NUM_BACKENDS; i++) + { + if (!VALID_BACKEND(i)) + { + continue; + } + pool_read(CONNECTION(cp, i), &data, sizeof(data)); + if (IS_MASTER_NODE_ID(i)) + { + data0 = data; + } + else + { + if (data != data0) + { + ereport(ERROR, + (errmsg("unable to read int value"), + errdetail("data does not match between between master(%x) slot[%d] (%x)", data0, i, data))); + + } + } + } + return data; +} diff --git a/src/query_cache/pool_memqcache.c b/src/query_cache/pool_memqcache.c index c8dae757..ecd655bb 100644 --- a/src/query_cache/pool_memqcache.c +++ b/src/query_cache/pool_memqcache.c @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -44,9 +45,11 @@ #include "auth/md5.h" #include "pool_config.h" #include "protocol/pool_proto_modules.h" +#include "protocol/pool_process_query.h" #include "parser/parsenodes.h" #include "context/pool_session_context.h" #include "query_cache/pool_memqcache.h" +#include "utils/pool_ssl.h" #include "utils/pool_relcache.h" #include "utils/pool_select_walker.h" #include "utils/pool_stream.h" @@ -54,6 +57,7 @@ #include "utils/elog.h" #include "utils/palloc.h" #include "utils/memutils.h" +#include "utils/pool_ipc.h" #ifdef USE_MEMCACHED memcached_st *memc; diff --git a/src/rewrite/pool_lobj.c b/src/rewrite/pool_lobj.c index e13debad..01e50a86 100644 --- a/src/rewrite/pool_lobj.c +++ b/src/rewrite/pool_lobj.c @@ -31,6 +31,7 @@ #include "pool.h" #include "rewrite/pool_lobj.h" #include "utils/pool_relcache.h" +#include "protocol/pool_process_query.h" #include "utils/elog.h" #include "pool_config.h" diff --git a/src/rewrite/pool_timestamp.c b/src/rewrite/pool_timestamp.c index 61a927ae..58ef5f68 100644 --- a/src/rewrite/pool_timestamp.c +++ b/src/rewrite/pool_timestamp.c @@ -34,6 +34,8 @@ #include "utils/palloc.h" #include "utils/memutils.h" #include "context/pool_session_context.h" +#include "protocol/pool_process_query.h" +#include "protocol/pool_pg_utils.h" typedef struct { diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c index 079494cb..1108a9f0 100644 --- a/src/streaming_replication/pool_worker_child.c +++ b/src/streaming_replication/pool_worker_child.c @@ -52,16 +52,19 @@ #include "utils/palloc.h" #include "utils/memutils.h" #include "utils/elog.h" +#include "utils/pool_ip.h" +#include "utils/ps_status.h" +#include "utils/pool_stream.h" #include "context/pool_process_context.h" #include "context/pool_session_context.h" +#include "protocol/pool_process_query.h" +#include "protocol/pool_pg_utils.h" +#include "main/pgpool_internal_commands.h" #include "pool_config.h" -#include "utils/pool_ip.h" #include "auth/md5.h" #include "auth/pool_hba.h" -#include "utils/pool_stream.h" -char remote_ps_data[NI_MAXHOST]; /* used for set_ps_display */ static POOL_CONNECTION_POOL_SLOT * slots[MAX_NUM_BACKENDS]; static volatile sig_atomic_t reload_config_request = 0; static volatile sig_atomic_t restart_request = 0; diff --git a/src/tools/pcp/pcp_frontend_client.c b/src/tools/pcp/pcp_frontend_client.c index b89c5030..6637d249 100644 --- a/src/tools/pcp/pcp_frontend_client.c +++ b/src/tools/pcp/pcp_frontend_client.c @@ -90,7 +90,7 @@ struct AppTypes AllAppTypes[] = {"pcp_proc_info", PCP_PROC_INFO, "h:p:P:U:awWvd", "display a pgpool-II child process' information"}, {"pcp_promote_node", PCP_PROMOTE_NODE, "n:h:p:U:gwWvd", "promote a node as new master from pgpool-II"}, {"pcp_recovery_node", PCP_RECOVERY_NODE, "n:h:p:U:wWvd", "recover a node"}, - {"pcp_stop_pgpool", PCP_STOP_PGPOOL, "m:h:p:U:wWvd", "terminate pgpool-II"}, + {"pcp_stop_pgpool", PCP_STOP_PGPOOL, "m:h:p:U:s:wWvda", "terminate pgpool-II"}, {"pcp_watchdog_info", PCP_WATCHDOG_INFO, "n:h:p:U:wWvd", "display a pgpool-II watchdog's information"}, {NULL, UNKNOWN, NULL, NULL}, }; @@ -107,6 +107,7 @@ main(int argc, char **argv) int processID = 0; int ch; char shutdown_mode = 's'; + char command_scope = 'l'; int optindex; int i; bool all = false; @@ -129,6 +130,7 @@ main(int argc, char **argv) {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"mode", required_argument, NULL, 'm'}, + {"scope", required_argument, NULL, 's'}, {"gracefully", no_argument, NULL, 'g'}, {"verbose", no_argument, NULL, 'v'}, {"all", no_argument, NULL, 'a'}, @@ -193,6 +195,26 @@ main(int argc, char **argv) gracefully = true; break; + case 's': + if (current_app_type->app_type == PCP_STOP_PGPOOL) + { + if (strcmp(optarg, "c") == 0 || strcmp(optarg, "cluster") == 0) + command_scope = 'c'; + else if (strcmp(optarg, "l") == 0 || strcmp(optarg, "local") == 0) + command_scope = 'l'; + else + { + fprintf(stderr, "%s: Invalid command socpe \"%s\", must be either \"cluster\" or \"local\" \n", progname, optarg); + exit(1); + } + } + else + { + fprintf(stderr, "Invalid argument \"%s\", Try \"%s --help\" for more information.\n", optarg, progname); + exit(1); + } + break; + case 'm': if (current_app_type->app_type == PCP_STOP_PGPOOL) { @@ -404,7 +426,7 @@ main(int argc, char **argv) else if (current_app_type->app_type == PCP_STOP_PGPOOL) { - pcpResInfo = pcp_terminate_pgpool(pcpConn, shutdown_mode); + pcpResInfo = pcp_terminate_pgpool(pcpConn, shutdown_mode, command_scope); } else if (current_app_type->app_type == PCP_WATCHDOG_INFO) @@ -819,6 +841,9 @@ usage(void) if (current_app_type->app_type == PCP_STOP_PGPOOL) { fprintf(stderr, " -m, --mode=MODE MODE can be \"smart\", \"fast\", or \"immediate\"\n"); + fprintf(stderr, " -s, --scope=SCOPE SCOPE can be \"cluster\", or \"local\"\n"); + fprintf(stderr, " cluster scope terminates all Pgpool-II nodes part\n"); + fprintf(stderr, " of the watchdog cluster\n"); } if (current_app_type->app_type == PCP_PROMOTE_NODE || current_app_type->app_type == PCP_DETACH_NODE) diff --git a/src/utils/pool_params.c b/src/utils/pool_params.c index db5f183e..43bdb7c5 100644 --- a/src/utils/pool_params.c +++ b/src/utils/pool_params.c @@ -5,7 +5,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2008 PgPool Global Development Group + * Copyright (c) 2003-2020 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby @@ -26,6 +26,7 @@ #include #include #include "utils/elog.h" +#include "utils/pool_params.h" #include "pool.h" #include "parser/parser.h" diff --git a/src/utils/pool_process_reporting.c b/src/utils/pool_process_reporting.c index 7d5c3d61..412367d0 100644 --- a/src/utils/pool_process_reporting.c +++ b/src/utils/pool_process_reporting.c @@ -21,15 +21,18 @@ * Process pgPool-II "SHOW" queries. */ #include "pool.h" +#include "main/health_check.h" #include "protocol/pool_proto_modules.h" #include "utils/elog.h" #include "utils/pool_stream.h" +#include "utils/statistics.h" #include "pool_config.h" #include "query_cache/pool_memqcache.h" #include "version.h" #include #include +#include #include static void write_one_field(POOL_CONNECTION * frontend, char *field); diff --git a/src/utils/pool_relcache.c b/src/utils/pool_relcache.c index 3125dc2f..c801b948 100644 --- a/src/utils/pool_relcache.c +++ b/src/utils/pool_relcache.c @@ -24,11 +24,14 @@ #include #include #include +#include +#include #include "pool.h" #include "utils/pool_relcache.h" #include "context/pool_session_context.h" #include "query_cache/pool_memqcache.h" +#include "protocol/pool_process_query.h" #include "pool_config.h" #include "utils/palloc.h" #include "utils/memutils.h" diff --git a/src/utils/pool_select_walker.c b/src/utils/pool_select_walker.c index 2f513be7..1bbda8ab 100644 --- a/src/utils/pool_select_walker.c +++ b/src/utils/pool_select_walker.c @@ -29,6 +29,7 @@ #include "parser/parsenodes.h" #include "context/pool_session_context.h" #include "rewrite/pool_timestamp.h" +#include "protocol/pool_pg_utils.h" static bool function_call_walker(Node *node, void *context); static bool system_catalog_walker(Node *node, void *context); diff --git a/src/utils/pool_signal.c b/src/utils/pool_signal.c index 923f79b1..6d6f4672 100644 --- a/src/utils/pool_signal.c +++ b/src/utils/pool_signal.c @@ -5,7 +5,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Portions Copyright (c) 2003-2008, PgPool Global Development Group + * Portions Copyright (c) 2003-2020, PgPool Global Development Group * Portions Copyright (c) 2003-2004, PostgreSQL Global Development Group * * Permission to use, copy, modify, and distribute this software and diff --git a/src/utils/pool_ssl.c b/src/utils/pool_ssl.c index cd99374c..300d6fdb 100644 --- a/src/utils/pool_ssl.c +++ b/src/utils/pool_ssl.c @@ -26,10 +26,12 @@ #include "config.h" #include "pool.h" +#include "utils/pool_ssl.h" #include "utils/elog.h" #include "utils/palloc.h" #include "utils/memutils.h" #include "utils/pool_stream.h" +#include "main/pgpool_internal_commands.h" #include "pool_config.h" #include #include diff --git a/src/utils/pool_stream.c b/src/utils/pool_stream.c index d1e588a8..8c5550a4 100644 --- a/src/utils/pool_stream.c +++ b/src/utils/pool_stream.c @@ -39,7 +39,9 @@ #include "utils/memutils.h" #include "utils/socket_stream.h" #include "utils/pool_stream.h" +#include "utils/pool_ssl.h" #include "pool_config.h" +#include "main/pgpool_internal_commands.h" static int mystrlen(char *str, int upper, int *flag); static int mystrlinelen(char *str, int upper, int *flag); diff --git a/src/utils/ps_status.c b/src/utils/ps_status.c index 90338461..1e12ec19 100644 --- a/src/utils/ps_status.c +++ b/src/utils/ps_status.c @@ -9,7 +9,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Portions Copyright (c) 2003-2012 PgPool Global Development Group + * Portions Copyright (c) 2003-2020 PgPool Global Development Group * */ /*-------------------------------------------------------------------- @@ -38,12 +38,14 @@ #include #endif -#include "pool.h" +#include "utils/ps_status.h" +#include "pool_type.h" #include #include extern char **environ; bool update_process_title = true; +char remote_ps_data[NI_MAXHOST + NI_MAXSERV + 2]; /* used for set_ps_display */ /* diff --git a/src/utils/statistics.c b/src/utils/statistics.c index 940de5b2..db3ac0ba 100644 --- a/src/utils/statistics.c +++ b/src/utils/statistics.c @@ -6,7 +6,7 @@ * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * - * Copyright (c) 2003-2014 PgPool Global Development Group + * Copyright (c) 2003-2020 PgPool Global Development Group * */ /*-------------------------------------------------------------------- @@ -20,6 +20,7 @@ #include #include "pool.h" +#include "utils/statistics.h" #include "parser/nodes.h" /* diff --git a/src/watchdog/watchdog.c b/src/watchdog/watchdog.c index 5176f158..fb5ef1ab 100644 --- a/src/watchdog/watchdog.c +++ b/src/watchdog/watchdog.c @@ -40,6 +40,8 @@ #include #include "pool.h" +#include "utils/pool_signal.h" +#include "main/pgpool_internal_commands.h" #include "auth/md5.h" #include "utils/palloc.h" #include "utils/memutils.h" @@ -47,6 +49,8 @@ #include "utils/json_writer.h" #include "utils/json.h" #include "utils/socket_stream.h" +#include "utils/ps_status.h" +#include "pcp/recovery.h" #include "pool_config.h" #include @@ -128,6 +132,8 @@ typedef enum IPC_CMD_PREOCESS_RES #define WD_CMD_REPLY_IN_DATA '-' #define WD_CLUSTER_SERVICE_MESSAGE '#' +#define WD_EXECUTE_COMMAND_REQUEST '!' + #define WD_FAILOVER_START 'F' #define WD_FAILOVER_END 'H' #define WD_FAILOVER_WAITING_FOR_CONSENSUS 'K' @@ -145,6 +151,7 @@ typedef enum IPC_CMD_PREOCESS_RES #define CLUSTER_NODE_APPEARING_LOST 'Y' #define CLUSTER_NODE_APPEARING_FOUND 'Z' + #define WD_MASTER_NODE getMasterWatchdogNode() typedef struct packet_types @@ -169,6 +176,7 @@ packet_types all_packet_types[] = { {WD_STAND_FOR_COORDINATOR_MESSAGE, "STAND FOR COORDINATOR"}, {WD_REMOTE_FAILOVER_REQUEST, "REPLICATE FAILOVER REQUEST"}, {WD_IPC_ONLINE_RECOVERY_COMMAND, "ONLINE RECOVERY REQUEST"}, + {WD_EXECUTE_CLUSTER_COMMAND, "EXECUTE CLUSTER COMMAND"}, {WD_IPC_FAILOVER_COMMAND, "FAILOVER FUNCTION COMMAND"}, {WD_INFORM_I_AM_GOING_DOWN, "INFORM I AM GOING DOWN"}, {WD_ASK_FOR_POOL_CONFIG, "ASK FOR POOL CONFIG"}, @@ -186,17 +194,11 @@ packet_types all_packet_types[] = { {WD_IPC_CMD_RESULT_BAD, "IPC RESPONSE BAD"}, {WD_IPC_CMD_RESULT_OK, "IPC RESPONSE GOOD"}, {WD_IPC_CMD_TIMEOUT, "IPC TIMEOUT"}, + {WD_EXECUTE_COMMAND_REQUEST, "WD EXECUTE COMMAND"}, {WD_NO_MESSAGE, ""} }; -char *wd_failover_lock_name[] = -{ - "FAILOVER", - "FAILBACK", - "FOLLOW MASTER" -}; - char *wd_event_name[] = {"STATE CHANGED", "TIMEOUT", @@ -541,6 +543,7 @@ static IPC_CMD_PREOCESS_RES process_IPC_failover_indication(WDCommandData * ipcC static IPC_CMD_PREOCESS_RES process_IPC_data_request_from_master(WDCommandData * ipcCommand); static IPC_CMD_PREOCESS_RES process_IPC_failover_command(WDCommandData * ipcCommand); static IPC_CMD_PREOCESS_RES process_failover_command_on_coordinator(WDCommandData * ipcCommand); +static IPC_CMD_PREOCESS_RES process_IPC_execute_cluster_command(WDCommandData * ipcCommand); static bool write_ipc_command_with_result_data(WDCommandData * ipcCommand, char type, char *data, int len); @@ -579,6 +582,7 @@ static void clear_standby_nodes_list(void); static int standby_node_left_cluster(WatchdogNode * wdNode); static int standby_node_join_cluster(WatchdogNode * wdNode); static void update_missed_beacon_count(WDCommandData* ipcCommand, bool clear); +static void wd_execute_cluster_command_processor(WatchdogNode * wdNode, WDPacketData * pkt); /* global variables */ wd_cluster g_cluster; @@ -2001,12 +2005,20 @@ static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData * ipcCommand) case WD_FAILOVER_INDICATION: return process_IPC_failover_indication(ipcCommand); + break; case WD_GET_MASTER_DATA_REQUEST: return process_IPC_data_request_from_master(ipcCommand); + break; case WD_GET_RUNTIME_VARIABLE_VALUE: return process_IPC_get_runtime_variable_value_request(ipcCommand); + break; + + case WD_EXECUTE_CLUSTER_COMMAND: + return process_IPC_execute_cluster_command(ipcCommand); + break; + default: ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext, "unknown IPC command type"); break; @@ -2014,6 +2026,56 @@ static IPC_CMD_PREOCESS_RES process_IPC_command(WDCommandData * ipcCommand) return IPC_CMD_ERROR; } +static IPC_CMD_PREOCESS_RES +process_IPC_execute_cluster_command(WDCommandData * ipcCommand) +{ + /* get the json for node list */ + char *clusterCommand = NULL; + int nArgs; + WDExecCommandArg *wdExecCommandArg = NULL; + + if (ipcCommand->sourcePacket.len <= 0 || ipcCommand->sourcePacket.data == NULL) + return IPC_CMD_ERROR; + + if (!parse_wd_exec_cluster_command_json(ipcCommand->sourcePacket.data, ipcCommand->sourcePacket.len, + &clusterCommand, &nArgs, &wdExecCommandArg)) + { + goto ERROR_EXIT; + } + if (strcasecmp(WD_COMMAND_SHUTDOWN_CLUSTER, clusterCommand) == 0) + { + ereport(LOG, + (errmsg("Watchdog has received shutdown cluster command from IPC channel"))); + } + else + { + ipcCommand->errorMessage = MemoryContextStrdup(ipcCommand->memoryContext, + "unknown cluster command requested"); + goto ERROR_EXIT; + } + + /* + * Just broadcast the execute command request to destination node + * Processing the command on the local node is the responsibility of caller + * process + */ + reply_with_message(NULL, WD_EXECUTE_COMMAND_REQUEST, + ipcCommand->sourcePacket.data, ipcCommand->sourcePacket.len, + NULL); + + if (wdExecCommandArg) + pfree(wdExecCommandArg); + if (clusterCommand) + pfree(clusterCommand); + return IPC_CMD_OK; + +ERROR_EXIT: + if (wdExecCommandArg) + pfree(wdExecCommandArg); + if (clusterCommand) + pfree(clusterCommand); + return IPC_CMD_ERROR; +} static IPC_CMD_PREOCESS_RES process_IPC_get_runtime_variable_value_request(WDCommandData * ipcCommand) { @@ -3922,6 +3984,66 @@ cluster_service_message_processor(WatchdogNode * wdNode, WDPacketData * pkt) } } +static void +wd_execute_cluster_command_processor(WatchdogNode * wdNode, WDPacketData * pkt) +{ + /* get the json for node list */ + char *clusterCommand = NULL; + int nArgs; + WDExecCommandArg *wdExecCommandArg = NULL; + + if (pkt->type != WD_EXECUTE_COMMAND_REQUEST) + return; + + if (pkt->len <= 0 || pkt->data == NULL) + { + ereport(LOG, + (errmsg("node \"%s\" sent an empty execute cluster command message", wdNode->nodeName))); + return; + } + + if (!parse_wd_exec_cluster_command_json(pkt->data, pkt->len, + &clusterCommand, &nArgs, &wdExecCommandArg)) + { + ereport(LOG, + (errmsg("node \"%s\" sent an invalid JSON data in cluster command message", wdNode->nodeName))); + return; + } + + ereport(DEBUG1, + (errmsg("received \"%s\" command from node \"%s\"",clusterCommand, wdNode->nodeName))); + + if (strcasecmp(WD_COMMAND_SHUTDOWN_CLUSTER, clusterCommand) == 0) + { + int i; + char mode = 's'; + for ( i =0; i < nArgs; i++) + { + if (strcmp(wdExecCommandArg[i].arg_name, "mode") == 0) + { + mode = wdExecCommandArg[i].arg_value[0]; + } + else + ereport(LOG, + (errmsg("unsupported argument \"%s\" in shutdown command from remote node \"%s\"", wdExecCommandArg[i].arg_name, wdNode->nodeName))); + } + ereport(LOG, + (errmsg("processing shutdown command from remote node \"%s\"", wdNode->nodeName))); + terminate_pgpool(mode, false); + } + else + { + ereport(WARNING, + (errmsg("received \"%s\" command from node \"%s\" is not supported",clusterCommand, wdNode->nodeName))); + } + + if (wdExecCommandArg) + pfree(wdExecCommandArg); + if (clusterCommand) + pfree(clusterCommand); + return; +} + static int standard_packet_processor(WatchdogNode * wdNode, WDPacketData * pkt) { @@ -3935,6 +4057,10 @@ standard_packet_processor(WatchdogNode * wdNode, WDPacketData * pkt) register_inform_quarantine_nodes_req(); break; + case WD_EXECUTE_COMMAND_REQUEST: + wd_execute_cluster_command_processor(wdNode, pkt); + break; + case WD_CLUSTER_SERVICE_MESSAGE: cluster_service_message_processor(wdNode, pkt); break; @@ -7420,6 +7546,7 @@ check_and_report_IPC_authentication(WDCommandData * ipcCommand) case WD_IPC_FAILOVER_COMMAND: case WD_IPC_ONLINE_RECOVERY_COMMAND: + case WD_EXECUTE_CLUSTER_COMMAND: case WD_GET_MASTER_DATA_REQUEST: /* only allowed internaly. */ internal_client_only = true; diff --git a/src/watchdog/wd_escalation.c b/src/watchdog/wd_escalation.c index c5ed9b3c..9b9105aa 100644 --- a/src/watchdog/wd_escalation.c +++ b/src/watchdog/wd_escalation.c @@ -31,10 +31,11 @@ #include #endif -#include "pool.h" +#include "utils/pool_signal.h" #include "utils/elog.h" #include "utils/palloc.h" #include "utils/memutils.h" +#include "utils/ps_status.h" #include "pool_config.h" #include "watchdog/wd_utils.h" diff --git a/src/watchdog/wd_heartbeat.c b/src/watchdog/wd_heartbeat.c index 08d436e3..11e28150 100644 --- a/src/watchdog/wd_heartbeat.c +++ b/src/watchdog/wd_heartbeat.c @@ -44,15 +44,18 @@ #endif #include "pool.h" +#include "pool_config.h" +#include "utils/pool_signal.h" #include "utils/palloc.h" #include "utils/memutils.h" #include "utils/elog.h" -#include "pool_config.h" +#include "utils/ps_status.h" #include "auth/md5.h" #include "watchdog/watchdog.h" #include "watchdog/wd_lifecheck.h" #include "watchdog/wd_utils.h" + #define MAX_BIND_TRIES 5 /* * heartbeat packet diff --git a/src/watchdog/wd_internal_commands.c b/src/watchdog/wd_internal_commands.c index e87d467d..1e984983 100644 --- a/src/watchdog/wd_internal_commands.c +++ b/src/watchdog/wd_internal_commands.c @@ -40,6 +40,8 @@ #include #include "pool.h" +#include "auth/pool_auth.h" +#include "utils/pool_signal.h" #include "utils/elog.h" #include "utils/json_writer.h" #include "utils/json.h" @@ -264,6 +266,56 @@ wd_end_recovery(void) return COMMAND_FAILED; } +WdCommandResult +wd_execute_cluster_command(char* clusterCommand, + int nArgs, WDExecCommandArg *wdExecCommandArg) +{ + char type; + unsigned int *shared_key = get_ipc_shared_key(); + + char *func = get_wd_exec_cluster_command_json(clusterCommand, nArgs, wdExecCommandArg, + shared_key ? *shared_key : 0, pool_config->wd_authkey); + + WDIPCCmdResult *result = issue_command_to_watchdog(WD_EXECUTE_CLUSTER_COMMAND, + WD_DEFAULT_IPC_COMMAND_TIMEOUT, + func, strlen(func), true); + + pfree(func); + + if (result == NULL) + { + ereport(WARNING, + (errmsg("execute cluster command failed"), + errdetail("issue command to watchdog returned NULL"))); + return COMMAND_FAILED; + } + + type = result->type; + FreeCmdResult(result); + + if (type == WD_IPC_CMD_CLUSTER_IN_TRAN) + { + ereport(WARNING, + (errmsg("execute cluster command failed"), + errdetail("watchdog cluster is not in stable state"), + errhint("try again when the cluster is fully initialized"))); + return CLUSTER_IN_TRANSATIONING; + } + else if (type == WD_IPC_CMD_TIMEOUT) + { + ereport(WARNING, + (errmsg("execute cluster command failed"), + errdetail("ipc command timeout"))); + return COMMAND_TIMEOUT; + } + else if (type == WD_IPC_CMD_RESULT_OK) + { + return COMMAND_OK; + } + return COMMAND_FAILED; +} + + static char * get_wd_failover_state_json(bool start) { diff --git a/src/watchdog/wd_json_data.c b/src/watchdog/wd_json_data.c index 61db6d71..ef566f38 100644 --- a/src/watchdog/wd_json_data.c +++ b/src/watchdog/wd_json_data.c @@ -805,3 +805,121 @@ get_wd_simple_message_json(char *message) jw_destroy(jNode); return json_str; } + +char * +get_wd_exec_cluster_command_json(char *clusterCommand, int nArgs, + WDExecCommandArg *wdExecCommandArg, + unsigned int sharedKey, char *authKey) +{ + int i; + char *json_str; + JsonNode *jNode = jw_create_with_object(true); + + jw_put_int(jNode, WD_IPC_SHARED_KEY, sharedKey); /* put the shared key */ + + if (authKey != NULL && strlen(authKey) > 0) + jw_put_string(jNode, WD_IPC_AUTH_KEY, authKey); /* put the auth key */ + + jw_put_string(jNode, "Command", clusterCommand); + + jw_put_int(jNode, "nArgs", nArgs); + + /* Array of arguments */ + jw_start_array(jNode, "argument_list"); + for (i = 0; i < nArgs; i++) + { + jw_start_object(jNode, "Arg"); + jw_put_string(jNode, "arg_name", wdExecCommandArg[i].arg_name); + jw_put_string(jNode, "arg_value", wdExecCommandArg[i].arg_value); + jw_end_element(jNode); + } + jw_end_element(jNode); /* argument_list array End */ + + jw_finish_document(jNode); + json_str = pstrdup(jw_get_json_string(jNode)); + jw_destroy(jNode); + return json_str; +} + +bool +parse_wd_exec_cluster_command_json(char *json_data, int data_len, + char **clusterCommand, + int *nArgs, WDExecCommandArg **wdExecCommandArg) +{ + json_value *root; + char *ptr = NULL; + int i; + + root = json_parse(json_data, data_len); + + /* The root node must be object */ + if (root == NULL || root->type != json_object) + { + json_value_free(root); + ereport(LOG, + (errmsg("watchdog is unable to parse exec cluster command json"), + errdetail("invalid json data \"%.*s\"", data_len, json_data))); + return false; + } + ptr = json_get_string_value_for_key(root, "Command"); + if (ptr == NULL) + { + json_value_free(root); + ereport(LOG, + (errmsg("watchdog is unable to parse exec cluster command json"), + errdetail("command node not found in json data \"%s\"", json_data))); + return false; + } + *clusterCommand = pstrdup(ptr); + + if (json_get_int_value_for_key(root, "nArgs", nArgs)) + { + /* nArgs not found, Just ignore it */ + *nArgs = 0; + /* it may be from the old version */ + } + if (*nArgs > 0) + { + json_value *value; + + *wdExecCommandArg = palloc0(sizeof(WDExecCommandArg) * *nArgs); + + /* backend_desc array */ + value = json_get_value_for_key(root, "argument_list"); + if (value == NULL || value->type != json_array) + goto ERROR_EXIT; + + if (*nArgs!= value->u.array.length) + { + ereport(LOG, + (errmsg("watchdog is unable to parse exec cluster command json"), + errdetail("nArgs is different than argument array length \"%s\"", json_data))); + goto ERROR_EXIT; + } + for (i = 0; i < *nArgs; i++) + { + json_value *arr_value = value->u.array.values[i]; + char *ptr; + + ptr = json_get_string_value_for_key(arr_value, "arg_name"); + if (ptr == NULL) + goto ERROR_EXIT; + strncpy(wdExecCommandArg[i]->arg_name, ptr, sizeof(wdExecCommandArg[i]->arg_name) - 1); + + ptr = json_get_string_value_for_key(arr_value, "arg_value"); + if (ptr == NULL) + goto ERROR_EXIT; + strncpy(wdExecCommandArg[i]->arg_value, ptr, sizeof(wdExecCommandArg[i]->arg_value) - 1); + } + } + + json_value_free(root); + return true; + + ERROR_EXIT: + if (root) + json_value_free(root); + if (*wdExecCommandArg ) + pfree(*wdExecCommandArg); + return false; +} diff --git a/src/watchdog/wd_lifecheck.c b/src/watchdog/wd_lifecheck.c index 75fe0156..12653934 100644 --- a/src/watchdog/wd_lifecheck.c +++ b/src/watchdog/wd_lifecheck.c @@ -39,6 +39,8 @@ #include "utils/elog.h" #include "utils/palloc.h" #include "utils/memutils.h" +#include "utils/pool_signal.h" +#include "utils/ps_status.h" #include "watchdog/wd_utils.h" #include "watchdog/wd_lifecheck.h" diff --git a/src/watchdog/wd_ping.c b/src/watchdog/wd_ping.c index 8ef26992..2113e4e9 100644 --- a/src/watchdog/wd_ping.c +++ b/src/watchdog/wd_ping.c @@ -34,6 +34,7 @@ #include "utils/elog.h" #include "pool_config.h" #include "watchdog/wd_utils.h" +#include "utils/pool_signal.h" #define WD_MAX_PING_RESULT 256 diff --git a/src/include/auth/pool_auth.h b/src/include/auth/pool_auth.h new file mode 100644 index 00000000..08a21c8a --- /dev/null +++ b/src/include/auth/pool_auth.h @@ -0,0 +1,33 @@ +/* + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + */ + + +#ifndef pool_auth_h +#define pool_auth_h + +extern void connection_do_auth(POOL_CONNECTION_POOL_SLOT * cp, char *password); +extern int pool_do_auth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend); +extern int pool_do_reauth(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * cp); +extern void authenticate_frontend(POOL_CONNECTION * frontend); + +extern void pool_random_salt(char *md5Salt); +extern void pool_random(void *buf, size_t len); + +#endif /* pool_auth_h */ diff --git a/src/include/main/health_check.h b/src/include/main/health_check.h new file mode 100644 index 00000000..d40f0099 --- /dev/null +++ b/src/include/main/health_check.h @@ -0,0 +1,50 @@ +/* + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + */ + + +#ifndef health_check_h +#define health_check_h + +/* + * Health check statistics per node +*/ +typedef struct { + uint64 total_count; /* total count of health check */ + uint64 success_count; /* total count of successful health check */ + uint64 fail_count; /* total count of failed health check */ + uint64 skip_count; /* total count of skipped health check */ + uint64 retry_count; /* total count of health check retries */ + uint32 max_retry_count; /* max retry count in a health check session */ + uint64 total_health_check_duration; /* sum of health check duration */ + int32 max_health_check_duration; /* maximum duration spent for a health check session in milli seconds */ + int32 min_health_check_duration; /* minimum duration spent for a health check session in milli seconds */ + time_t last_health_check; /* last health check timestamp */ + time_t last_successful_health_check; /* last succesfull health check timestamp */ + time_t last_skip_health_check; /* last skipped health check timestamp */ + time_t last_failed_health_check; /* last failed health check timestamp */ +} POOL_HEALTH_CHECK_STATISTICS; + +extern volatile POOL_HEALTH_CHECK_STATISTICS *health_check_stats; /* health check stats area in shared memory */ + +extern void do_health_check_child(int *node_id); +extern size_t health_check_stats_shared_memory_size(void); +extern void health_check_stats_init(POOL_HEALTH_CHECK_STATISTICS *addr); + +#endif /* health_check_h */ diff --git a/src/include/main/pgpool_internal_commands.h b/src/include/main/pgpool_internal_commands.h new file mode 100644 index 00000000..ba735578 --- /dev/null +++ b/src/include/main/pgpool_internal_commands.h @@ -0,0 +1,46 @@ +/* + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + */ + + +#ifndef pgpool_internal_commands_h +#define pgpool_internal_commands_h + +extern bool terminate_pgpool(char mode, bool error); +extern void notice_backend_error(int node_id, unsigned char flags); +extern bool degenerate_backend_set(int *node_id_set, int count, + unsigned char flags); +extern bool degenerate_backend_set_ex(int *node_id_set, int count, + unsigned char flags, bool error, bool test_only); +extern bool promote_backend(int node_id, unsigned char flags); +extern bool send_failback_request(int node_id, bool throw_error, + unsigned char flags); + +extern void degenerate_all_quarantine_nodes(void); +extern bool close_idle_connections(void); + +/* defined in pgpool_main.c */ +extern void register_watchdog_quorum_change_interupt(void); +extern void register_watchdog_state_change_interupt(void); +extern void register_backend_state_sync_req_interupt(void); +extern void register_inform_quarantine_nodes_req(void); +extern bool register_node_operation_request(POOL_REQUEST_KIND kind, + int *node_id_set, int count, unsigned char flags); + +#endif /* pgpool_internal_commands_h */ diff --git a/src/include/pcp/pcp_worker.h b/src/include/pcp/pcp_worker.h new file mode 100644 index 00000000..6b5710fc --- /dev/null +++ b/src/include/pcp/pcp_worker.h @@ -0,0 +1,32 @@ +/* +* +* pgpool: a language independent connection pool server for PostgreSQL +* written by Tatsuo Ishii +* +* Copyright (c) 2003-2020 PgPool Global Development Group +* +* Permission to use, copy, modify, and distribute this software and +* its documentation for any purpose and without fee is hereby +* granted, provided that the above copyright notice appear in all +* copies and that both that copyright notice and this permission +* notice appear in supporting documentation, and that the name of the +* author not be used in advertising or publicity pertaining to +* distribution of the software without specific, written prior +* permission. The author makes no representations about the +* suitability of this software for any purpose. It is provided "as +* is" without express or implied warranty. +* +* +*/ + +#ifndef pcp_worker_h +#define pcp_worker_h + +extern int send_to_pcp_frontend(char *data, int len, bool flush); +extern int pcp_frontend_exists(void); +extern void pcp_worker_main(int port); +extern void pcp_mark_recovery_finished(void); +extern bool pcp_mark_recovery_in_progress(void); + + +#endif /* pcp_worker_h */ diff --git a/src/include/pcp/recovery.h b/src/include/pcp/recovery.h new file mode 100644 index 00000000..bbf06903 --- /dev/null +++ b/src/include/pcp/recovery.h @@ -0,0 +1,30 @@ +/* + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + */ + + +#ifndef recovery_h +#define recovery_h + +extern void start_recovery(int recovery_node); +extern void finish_recovery(void); +extern int wait_connection_closed(void); +extern int ensure_conn_counter_validity(void); + +#endif /* recovery_h */ diff --git a/src/include/protocol/pool_connection_pool.h b/src/include/protocol/pool_connection_pool.h new file mode 100644 index 00000000..19b8f72a --- /dev/null +++ b/src/include/protocol/pool_connection_pool.h @@ -0,0 +1,40 @@ +/* + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + */ + + +#ifndef pool_connection_pool_h +#define pool_connection_pool_h + +extern POOL_CONNECTION_POOL * pool_connection_pool; /* connection pool */ + +extern int pool_init_cp(void); +extern POOL_CONNECTION_POOL * pool_create_cp(void); +extern POOL_CONNECTION_POOL * pool_get_cp(char *user, char *database, int protoMajor, int check_socket); +extern void pool_discard_cp(char *user, char *database, int protoMajor); +extern void pool_backend_timer(void); +extern void pool_connection_pool_timer(POOL_CONNECTION_POOL * backend); +extern RETSIGTYPE pool_backend_timer_handler(int sig); +extern int connect_inet_domain_socket(int slot, bool retry); +extern int connect_unix_domain_socket(int slot, bool retry); +extern int connect_inet_domain_socket_by_port(char *host, int port, bool retry); +extern int connect_unix_domain_socket_by_port(int port, char *socket_dir, bool retry); +extern int pool_pool_index(void); +extern void close_all_backend_connections(void); +#endif /* pool_connection_pool_h */ diff --git a/src/include/protocol/pool_pg_utils.h b/src/include/protocol/pool_pg_utils.h new file mode 100644 index 00000000..cd22bc59 --- /dev/null +++ b/src/include/protocol/pool_pg_utils.h @@ -0,0 +1,56 @@ +/* + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + */ + + +#ifndef pool_pg_utils_h +#define pool_pg_utils_h + +#include "pool.h" + +#define MAX_PG_VERSION_STRING 512 + +/* + * PostgreSQL version descriptor + */ +typedef struct +{ + short major; /* major version number in up to 3 digits decimal. + * Examples: 120, 110, 100, 96. + */ + short minor; /* minor version number in up to 2 digits decimal. + * Examples: 0, 1, 2, 10, 23. + */ + char version_string[MAX_PG_VERSION_STRING+1]; /* original version string */ +} PGVersion; + + +extern void send_startup_packet(POOL_CONNECTION_POOL_SLOT * cp); +extern void pool_free_startup_packet(StartupPacket *sp); + +extern POOL_CONNECTION_POOL_SLOT * make_persistent_db_connection( + int db_node_id, char *hostname, int port, char *dbname, char *user, char *password, bool retry); +extern POOL_CONNECTION_POOL_SLOT * make_persistent_db_connection_noerror( + int db_node_id, char *hostname, int port, char *dbname, char *user, char *password, bool retry); +extern void discard_persistent_db_connection(POOL_CONNECTION_POOL_SLOT * cp); +extern int select_load_balancing_node(void); + +extern PGVersion *Pgversion(POOL_CONNECTION_POOL * backend); + +#endif /* pool_pg_utils_h */ diff --git a/src/include/protocol/pool_process_query.h b/src/include/protocol/pool_process_query.h new file mode 100644 index 00000000..a8a832cd --- /dev/null +++ b/src/include/protocol/pool_process_query.h @@ -0,0 +1,86 @@ +/* + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + */ + + +#ifndef pool_process_query_h +#define pool_process_query_h + +#define LOCK_COMMENT "/*INSERT LOCK*/" +#define LOCK_COMMENT_SZ (sizeof(LOCK_COMMENT)-1) +#define NO_LOCK_COMMENT "/*NO INSERT LOCK*/" +#define NO_LOCK_COMMENT_SZ (sizeof(NO_LOCK_COMMENT)-1) +#define NO_LOAD_BALANCE "/*NO LOAD BALANCE*/" +#define NO_LOAD_BALANCE_COMMENT_SZ (sizeof(NO_LOAD_BALANCE)-1) + + +extern void reset_variables(void); +extern void reset_connection(void); +extern void per_node_statement_log(POOL_CONNECTION_POOL * backend, + int node_id, char *query); +extern int pool_extract_error_message(bool read_kind, POOL_CONNECTION * backend, + int major, bool unread, char **message); +extern POOL_STATUS do_command(POOL_CONNECTION * frontend, POOL_CONNECTION * backend, + char *query, int protoMajor, int pid, int key, int no_ready_for_query); +extern void do_query(POOL_CONNECTION * backend, char *query, POOL_SELECT_RESULT * *result, int major); +extern void free_select_result(POOL_SELECT_RESULT * result); +extern int compare(const void *p1, const void *p2); +extern void do_error_execute_command(POOL_CONNECTION_POOL * backend, int node_id, int major); +extern POOL_STATUS pool_discard_packet_contents(POOL_CONNECTION_POOL * cp); +extern void pool_dump_valid_backend(int backend_id); +extern bool pool_push_pending_data(POOL_CONNECTION * backend); + + +extern void pool_send_frontend_exits(POOL_CONNECTION_POOL * backend); +extern POOL_STATUS ParameterStatus(POOL_CONNECTION * frontend, + POOL_CONNECTION_POOL * backend); + +extern void pool_send_error_message(POOL_CONNECTION * frontend, int protoMajor, + char *code, + char *message, + char *detail, + char *hint, + char *file, + int line); +extern void pool_send_fatal_message(POOL_CONNECTION * frontend, int protoMajor, + char *code, + char *message, + char *detail, + char *hint, + char *file, + int line); +extern void pool_send_severity_message(POOL_CONNECTION * frontend, int protoMajor, + char *code, + char *message, + char *detail, + char *hint, + char *file, + char *severity, + int line); + +extern POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend); +extern POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, int len, char *contents); + +extern POOL_STATUS pool_process_query(POOL_CONNECTION * frontend, +POOL_CONNECTION_POOL * backend, +int reset_request); +extern bool is_backend_cache_empty(POOL_CONNECTION_POOL * backend); +extern void pool_send_readyforquery(POOL_CONNECTION * frontend); + +#endif /* pool_process_query_h */ diff --git a/src/include/utils/pool_params.h b/src/include/utils/pool_params.h new file mode 100644 index 00000000..c50eca60 --- /dev/null +++ b/src/include/utils/pool_params.h @@ -0,0 +1,39 @@ +/* + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + */ + +#ifndef pool_params_h +#define pool_params_h + +typedef struct +{ + int num; /* number of entries */ + char **names; /* parameter names */ + char **values; /* values */ +} ParamStatus; + +extern int pool_init_params(ParamStatus * params); +extern void pool_discard_params(ParamStatus * params); +extern char *pool_find_name(ParamStatus * params, char *name, int *pos); +extern int pool_get_param(ParamStatus * params, int index, char **name, char **value); +extern int pool_add_param(ParamStatus * params, char *name, char *value); +extern void pool_param_debug_print(ParamStatus * params); + + +#endif /* pool_params_h */ diff --git a/src/include/utils/pool_ssl.h b/src/include/utils/pool_ssl.h new file mode 100644 index 00000000..8ea6398c --- /dev/null +++ b/src/include/utils/pool_ssl.h @@ -0,0 +1,81 @@ +/* + * + * $Header$ + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + */ + +#ifndef pool_ssl_h +#define pool_ssl_h + +#ifdef USE_SSL +/* + * Hardcoded DH parameters, used in ephemeral DH keying. + * + * If you want to create your own hardcoded DH parameters + * for fun and profit, review "Assigned Number for SKIP + * Protocols" (http://www.skip-vpn.org/spec/numbers.html) + * for suggestions. + */ +#define FILE_DH2048 \ +"-----BEGIN DH PARAMETERS-----\n\ +MIIBCAKCAQEA9kJXtwh/CBdyorrWqULzBej5UxE5T7bxbrlLOCDaAadWoxTpj0BV\n\ +89AHxstDqZSt90xkhkn4DIO9ZekX1KHTUPj1WV/cdlJPPT2N286Z4VeSWc39uK50\n\ +T8X8dryDxUcwYc58yWb/Ffm7/ZFexwGq01uejaClcjrUGvC/RgBYK+X0iP1YTknb\n\ +zSC0neSRBzZrM2w4DUUdD3yIsxx8Wy2O9vPJI8BD8KVbGI2Ou1WMuF040zT9fBdX\n\ +Q6MdGGzeMyEstSr/POGxKUAYEY18hKcKctaGxAMZyAcpesqVDNmWn6vQClCbAkbT\n\ +CD1mpF1Bn5x8vYlLIhkmuquiXsNV6TILOwIBAg==\n\ +-----END DH PARAMETERS-----\n" +#endif + +/* + * Macro that allows to cast constness away from an expression, but doesn't + * allow changing the underlying type. Enforcement of the latter + * currently only works for gcc like compilers. + * + * Please note IT IS NOT SAFE to cast constness away if the result will ever + * be modified (it would be undefined behaviour). Doing so anyway can cause + * compiler misoptimizations or runtime crashes (modifying readonly memory). + * It is only safe to use when the the result will not be modified, but API + * design or language restrictions prevent you from declaring that + * (e.g. because a function returns both const and non-const variables). + * + * Note that this only works in function scope, not for global variables (it'd + * be nice, but not trivial, to improve that). + */ +#if defined(HAVE__BUILTIN_TYPES_COMPATIBLE_P) +#define unconstify(underlying_type, expr) \ + (StaticAssertExpr(__builtin_types_compatible_p(__typeof(expr), const underlying_type), \ + "wrong cast"), \ + (underlying_type) (expr)) +#else +#define unconstify(underlying_type, expr) \ + ((underlying_type) (expr)) +#endif + + +extern void pool_ssl_negotiate_serverclient(POOL_CONNECTION * cp); +extern void pool_ssl_negotiate_clientserver(POOL_CONNECTION * cp); +extern void pool_ssl_close(POOL_CONNECTION * cp); +extern int pool_ssl_read(POOL_CONNECTION * cp, void *buf, int size); +extern int pool_ssl_write(POOL_CONNECTION * cp, const void *buf, int size); +extern bool pool_ssl_pending(POOL_CONNECTION * cp); +extern int SSL_ServerSide_init(void); + + +#endif /* pool_ssl_h */ diff --git a/src/include/utils/ps_status.h b/src/include/utils/ps_status.h new file mode 100644 index 00000000..a8fb3f1a --- /dev/null +++ b/src/include/utils/ps_status.h @@ -0,0 +1,37 @@ +/* + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + */ + +#ifndef ps_status_h +#define ps_status_h + +#include "pool.h" +#include + +extern char remote_ps_data[NI_MAXHOST + NI_MAXSERV + 2]; /* used for set_ps_display */ + +extern char **save_ps_display_args(int argc, char **argv); +extern void init_ps_display(const char *username, const char *dbname, + const char *host_info, const char *initial_str); +extern void set_ps_display(const char *activity, bool force); +extern const char *get_ps_display(int *displen); +extern void pool_ps_idle_display(POOL_CONNECTION_POOL * backend); + + +#endif /* ps_status_h */ diff --git a/src/include/utils/statistics.h b/src/include/utils/statistics.h new file mode 100644 index 00000000..4f0a33f4 --- /dev/null +++ b/src/include/utils/statistics.h @@ -0,0 +1,30 @@ +/* + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + */ + +#ifndef statistics_h +#define statistics_h + +size_t stat_shared_memory_size(void); +void stat_set_stat_area(void *address); +void stat_init_stat_area(void); +void stat_count_up(int backend_node_id, Node *parsetree); +uint64 stat_get_select_count(int backend_node_id); + +#endif /* statistics_h */ diff --git a/src/main/pgpool_internal_commands.c b/src/main/pgpool_internal_commands.c new file mode 100644 index 00000000..2afa7dbb --- /dev/null +++ b/src/main/pgpool_internal_commands.c @@ -0,0 +1,447 @@ +/* -*-pgpool_main-c-*- */ +/* + * $Header$ + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + */ + +/* + * pgpool_internal_commands consistes of functions that can be called + * from any pgpool-II process to instruct pgpool-II main process to + * perform a particular function + */ +#include +#include +#include + +#include "utils/elog.h" +#include "utils/pool_signal.h" + +#include "pool.h" +#include "main/pgpool_internal_commands.h" +#include "watchdog/wd_internal_commands.h" +#include "utils/palloc.h" +#include "utils/memutils.h" +#include "pool_config.h" + +/* + * sends the signal to pgpool-II main process to terminate Pgpool-II + * process. + */ +bool terminate_pgpool(char mode, bool error) +{ + + pid_t ppid = getppid(); + int sig; + + if (mode == 's') + { + ereport(DEBUG1, + (errmsg("processing shutdown request"), + errdetail("sending SIGTERM to the parent process with PID:%d", ppid))); + sig = SIGTERM; + } + else if (mode == 'f') + { + ereport(DEBUG1, + (errmsg("processing shutdown request"), + errdetail("sending SIGINT to the parent process with PID:%d", ppid))); + sig = SIGINT; + } + else if (mode == 'i') + { + ereport(DEBUG1, + (errmsg("processing shutdown request"), + errdetail("sending SIGQUIT to the parent process with PID:%d", ppid))); + sig = SIGQUIT; + } + else + { + ereport(error?ERROR:WARNING, + (errmsg("error while processing shutdown request"), + errdetail("invalid shutdown mode \"%c\"", mode))); + return false; + } + + pool_signal_parent(sig); + return true; +} + +/* + * degenerate_backend_set_ex: + * + * The function registers/verifies the node down operation request. + * The request is then processed by failover function. + * + * node_id_set: array of node ids to be registered for NODE DOWN operation + * count: number of elements in node_id_set array + * error: if set error is thrown as soon as any node id is found in + * node_id_set on which operation could not be performed. + * test_only: When set, function only checks if NODE DOWN operation can be + * executed on provided node ids and never registers the operation + * request. + * For test_only case function returs false or throws an error as + * soon as first non complient node in node_id_set is found + * switch_over: if set, the request is originated by switch over, not errors. + * + * wd_failover_id: The watchdog internal ID for this failover + */ +bool +degenerate_backend_set_ex(int *node_id_set, int count, unsigned char flags, bool error, bool test_only) +{ + int i; + int node_id[MAX_NUM_BACKENDS]; + int node_count = 0; + int elevel = LOG; + + if (error) + elevel = ERROR; + + for (i = 0; i < count; i++) + { + if (node_id_set[i] < 0 || node_id_set[i] >= MAX_NUM_BACKENDS || + (!VALID_BACKEND(node_id_set[i]) && BACKEND_INFO(node_id_set[i]).quarantine == false)) + { + if (node_id_set[i] < 0 || node_id_set[i] >= MAX_NUM_BACKENDS) + ereport(elevel, + (errmsg("invalid degenerate backend request, node id: %d is out of range. node id must be between [0 and %d]" + ,node_id_set[i], MAX_NUM_BACKENDS))); + else + ereport(elevel, + (errmsg("invalid degenerate backend request, node id : %d status: [%d] is not valid for failover" + ,node_id_set[i], BACKEND_INFO(node_id_set[i]).backend_status))); + if (test_only) + return false; + + continue; + } + + if (POOL_DISALLOW_TO_FAILOVER(BACKEND_INFO(node_id_set[i]).flag)) + { + ereport(elevel, + (errmsg("degenerate backend request for node_id: %d from pid [%d] is canceled because failover is disallowed on the node", + node_id_set[i], getpid()))); + if (test_only) + return false; + } + else + { + if (!test_only) /* do not produce this log if we are in + * testing mode */ + ereport(LOG, + (errmsg("received degenerate backend request for node_id: %d from pid [%d]", + node_id_set[i], getpid()))); + + node_id[node_count++] = node_id_set[i]; + } + } + + if (node_count) + { + WDFailoverCMDResults res = FAILOVER_RES_PROCEED; + + /* If this was only a test. Inform the caller without doing anything */ + if (test_only) + return true; + + if (!(flags & REQ_DETAIL_WATCHDOG)) + { + int x; + + for (x = 0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++) + { + res = wd_degenerate_backend_set(node_id_set, count, flags); + if (res != FAILOVER_RES_TRANSITION) + break; + sleep(1); + } + } + if (res == FAILOVER_RES_TRANSITION) + { + /* + * What to do when cluster is still not stable Is proceeding to + * failover is the right choice ??? + */ + ereport(NOTICE, + (errmsg("received degenerate backend request for %d node(s) from pid [%d], But cluster is not in stable state" + ,node_count, getpid()))); + } + if (res == FAILOVER_RES_PROCEED) + { + register_node_operation_request(NODE_DOWN_REQUEST, node_id, node_count, flags); + } + else if (res == FAILOVER_RES_NO_QUORUM) + { + ereport(LOG, + (errmsg("degenerate backend request for %d node(s) from pid [%d], is changed to quarantine node request by watchdog" + ,node_count, getpid()), + errdetail("watchdog does not holds the quorum"))); + + register_node_operation_request(NODE_QUARANTINE_REQUEST, node_id, node_count, flags); + } + else if (res == FAILOVER_RES_CONSENSUS_MAY_FAIL) + { + ereport(LOG, + (errmsg("degenerate backend request for %d node(s) from pid [%d], is changed to quarantine node request by watchdog" + ,node_count, getpid()), + errdetail("watchdog is taking time to build consensus"))); + register_node_operation_request(NODE_QUARANTINE_REQUEST, node_id, node_count, flags); + } + else if (res == FAILOVER_RES_BUILDING_CONSENSUS) + { + ereport(LOG, + (errmsg("degenerate backend request for node_id: %d from pid [%d], will be handled by watchdog, which is building consensus for request" + ,*node_id, getpid()))); + } + else if (res == FAILOVER_RES_WILL_BE_DONE) + { + /* we will receive a sync request from master watchdog node */ + ereport(LOG, + (errmsg("degenerate backend request for %d node(s) from pid [%d], will be handled by watchdog" + ,node_count, getpid()))); + } + else + { + ereport(elevel, + (errmsg("degenerate backend request for %d node(s) from pid [%d] is canceled by other pgpool" + ,node_count, getpid()))); + return false; + } + } + return true; +} + +/* + * wrapper over degenerate_backend_set_ex function to register + * NODE down operation request + */ +bool +degenerate_backend_set(int *node_id_set, int count, unsigned char flags) +{ + return degenerate_backend_set_ex(node_id_set, count, flags, false, false); +} + +/* send promote node request using SIGUSR1 */ +bool +promote_backend(int node_id, unsigned char flags) +{ + WDFailoverCMDResults res = FAILOVER_RES_PROCEED; + bool ret = false; + + if (!SL_MODE) + { + return false; + } + + if (node_id < 0 || node_id >= MAX_NUM_BACKENDS || !VALID_BACKEND(node_id)) + { + if (node_id < 0 || node_id >= MAX_NUM_BACKENDS) + ereport(LOG, + (errmsg("invalid promote backend request, node id: %d is out of range. node id must be between [0 and %d]" + ,node_id, MAX_NUM_BACKENDS))); + else + ereport(LOG, + (errmsg("invalid promote backend request, node id : %d status: [%d] not valid" + ,node_id, BACKEND_INFO(node_id).backend_status))); + return false; + } + ereport(LOG, + (errmsg("received promote backend request for node_id: %d from pid [%d]", + node_id, getpid()))); + + /* If this was only a test. Inform the caller without doing anything */ + if (!(flags & REQ_DETAIL_WATCHDOG)) + { + int x; + + for (x = 0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++) + { + res = wd_promote_backend(node_id, flags); + if (res != FAILOVER_RES_TRANSITION) + break; + sleep(1); + } + } + if (res == FAILOVER_RES_TRANSITION) + { + /* + * What to do when cluster is still not stable Is proceeding to + * failover is the right choice ??? + */ + ereport(NOTICE, + (errmsg("promote backend request for node_id: %d from pid [%d], But cluster is not in stable state" + ,node_id, getpid()))); + } + + if (res == FAILOVER_RES_PROCEED) + { + ret = register_node_operation_request(PROMOTE_NODE_REQUEST, &node_id, 1, flags); + } + else if (res == FAILOVER_RES_WILL_BE_DONE) + { + ereport(LOG, + (errmsg("promote backend request for node_id: %d from pid [%d], will be handled by watchdog" + ,node_id, getpid()))); + } + else if (res == FAILOVER_RES_NO_QUORUM) + { + ereport(LOG, + (errmsg("promote backend request for node_id: %d from pid [%d], is canceled because watchdog does not hold quorum" + ,node_id, getpid()))); + } + else if (res == FAILOVER_RES_BUILDING_CONSENSUS) + { + ereport(LOG, + (errmsg("promote backend request for node_id: %d from pid [%d], will be handled by watchdog, which is building consensus for request" + ,node_id, getpid()))); + } + else + { + ereport(LOG, + (errmsg("promote backend request for node_id: %d from pid [%d] is canceled by other pgpool" + ,node_id, getpid()))); + } + return ret; +} + +/* send failback request using SIGUSR1 */ +bool +send_failback_request(int node_id, bool throw_error, unsigned char flags) +{ + WDFailoverCMDResults res = FAILOVER_RES_PROCEED; + bool ret = false; + + if (node_id < 0 || node_id >= MAX_NUM_BACKENDS || + (RAW_MODE && BACKEND_INFO(node_id).backend_status != CON_DOWN && VALID_BACKEND(node_id))) + { + if (node_id < 0 || node_id >= MAX_NUM_BACKENDS) + ereport(throw_error ? ERROR : LOG, + (errmsg("invalid failback request, node id: %d is out of range. node id must be between [0 and %d]" + ,node_id, MAX_NUM_BACKENDS))); + else + ereport(throw_error ? ERROR : LOG, + (errmsg("invalid failback request, node id : %d status: [%d] not valid for failback" + ,node_id, BACKEND_INFO(node_id).backend_status))); + return false; + } + + ereport(LOG, + (errmsg("received failback request for node_id: %d from pid [%d]", + node_id, getpid()))); + + /* check we are trying to failback the quarantine node */ + if (BACKEND_INFO(node_id).quarantine) + { + /* set the update flags */ + ereport(LOG, + (errmsg("failback request from pid [%d] is changed to update status request because node_id: %d was quarantined", + getpid(), node_id))); + flags |= REQ_DETAIL_UPDATE; + } + else + { + /* + * no need to go to watchdog if it's an update or already initiated + * from watchdog + */ + if (!(flags & REQ_DETAIL_WATCHDOG)) + { + int x; + + for (x = 0; x < MAX_SEC_WAIT_FOR_CLUSTER_TRANSATION; x++) + { + res = wd_send_failback_request(node_id, flags); + if (res != FAILOVER_RES_TRANSITION) + break; + sleep(1); + } + } + } + + if (res == FAILOVER_RES_TRANSITION) + { + /* + * What to do when cluster is still not stable Is proceeding to + * failover is the right choice ??? + */ + ereport(NOTICE, + (errmsg("failback request for node_id: %d from pid [%d], But cluster is not in stable state" + ,node_id, getpid()))); + } + + if (res == FAILOVER_RES_PROCEED) + { + ret = register_node_operation_request(NODE_UP_REQUEST, &node_id, 1, flags); + } + else if (res == FAILOVER_RES_WILL_BE_DONE) + { + ereport(LOG, + (errmsg("failback request for node_id: %d from pid [%d], will be handled by watchdog" + ,node_id, getpid()))); + } + else + { + ereport(throw_error ? ERROR : LOG, + (errmsg("failback request for node_id: %d from pid [%d] is canceled by other pgpool" + ,node_id, getpid()))); + } + return ret; +} + +/* + * Reuest failover. If "switch_over" is false, request all existing sessions + * restarting. + */ +void +notice_backend_error(int node_id, unsigned char flags) +{ + int n = node_id; + + if (getpid() == mypid) + { + ereport(LOG, + (errmsg("notice_backend_error: called from pgpool main. ignored."))); + } + else + { + degenerate_backend_set(&n, 1, flags); + } +} + +void +degenerate_all_quarantine_nodes(void) +{ + int i; + + for (i = 0; i < NUM_BACKENDS; i++) + { + if (BACKEND_INFO(i).quarantine && BACKEND_INFO(i).backend_status == CON_DOWN) + { + /* just send the request to watchdog */ + if (wd_degenerate_backend_set(&i, 1, REQ_DETAIL_UPDATE) == FAILOVER_RES_PROCEED) + register_node_operation_request(NODE_DOWN_REQUEST, &i, 1, REQ_DETAIL_WATCHDOG | REQ_DETAIL_SWITCHOVER); + } + } +} + + +bool +close_idle_connections(void) +{ + return register_node_operation_request(CLOSE_IDLE_REQUEST, NULL, 0, 0); +} + diff --git a/src/protocol/pool_pg_utils.c b/src/protocol/pool_pg_utils.c new file mode 100644 index 00000000..ec9b5c53 --- /dev/null +++ b/src/protocol/pool_pg_utils.c @@ -0,0 +1,647 @@ +/* + * + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2020 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + */ + + +#include +#include +#include "protocol/pool_pg_utils.h" +#include "protocol/pool_connection_pool.h" +#include "utils/palloc.h" +#include "utils/memutils.h" +#include "utils/pool_stream.h" +#include "utils/pool_ssl.h" +#include "utils/elog.h" +#include "utils/pool_relcache.h" +#include "auth/pool_auth.h" +#include "context/pool_session_context.h" + +#include "pool_config.h" +#include "pool_config_variables.h" + +static int choose_db_node_id(char *str); +static void free_persisten_db_connection_memory(POOL_CONNECTION_POOL_SLOT * cp); + +/* + * create a persistent connection + */ +POOL_CONNECTION_POOL_SLOT * +make_persistent_db_connection( + int db_node_id, char *hostname, int port, char *dbname, char *user, char *password, bool retry) +{ + POOL_CONNECTION_POOL_SLOT *cp; + int fd; + +#define MAX_USER_AND_DATABASE 1024 + + /* V3 startup packet */ + typedef struct + { + int protoVersion; + char data[MAX_USER_AND_DATABASE]; + } StartupPacket_v3; + + static StartupPacket_v3 * startup_packet; + int len, + len1; + + cp = palloc0(sizeof(POOL_CONNECTION_POOL_SLOT)); + startup_packet = palloc0(sizeof(*startup_packet)); + startup_packet->protoVersion = htonl(0x00030000); /* set V3 proto + * major/minor */ + + /* + * create socket + */ + if (*hostname == '/') + { + fd = connect_unix_domain_socket_by_port(port, hostname, retry); + } + else + { + fd = connect_inet_domain_socket_by_port(hostname, port, retry); + } + + if (fd < 0) + { + free_persisten_db_connection_memory(cp); + pfree(startup_packet); + ereport(ERROR, + (errmsg("failed to make persistent db connection"), + errdetail("connection to host:\"%s:%d\" failed", hostname, port))); + } + + cp->con = pool_open(fd, true); + cp->closetime = 0; + cp->con->isbackend = 1; + pool_set_db_node_id(cp->con, db_node_id); + + pool_ssl_negotiate_clientserver(cp->con); + + /* + * build V3 startup packet + */ + len = snprintf(startup_packet->data, sizeof(startup_packet->data), "user") + 1; + len1 = snprintf(&startup_packet->data[len], sizeof(startup_packet->data) - len, "%s", user) + 1; + if (len1 >= (sizeof(startup_packet->data) - len)) + { + pool_close(cp->con); + free_persisten_db_connection_memory(cp); + pfree(startup_packet); + ereport(ERROR, + (errmsg("failed to make persistent db connection"), + errdetail("user name is too long"))); + } + + len += len1; + len1 = snprintf(&startup_packet->data[len], sizeof(startup_packet->data) - len, "database") + 1; + if (len1 >= (sizeof(startup_packet->data) - len)) + { + pool_close(cp->con); + free_persisten_db_connection_memory(cp); + pfree(startup_packet); + ereport(ERROR, + (errmsg("failed to make persistent db connection"), + errdetail("user name is too long"))); + } + + len += len1; + len1 = snprintf(&startup_packet->data[len], sizeof(startup_packet->data) - len, "%s", dbname) + 1; + if (len1 >= (sizeof(startup_packet->data) - len)) + { + pool_close(cp->con); + free_persisten_db_connection_memory(cp); + pfree(startup_packet); + ereport(ERROR, + (errmsg("failed to make persistent db connection"), + errdetail("database name is too long"))); + } + len += len1; + startup_packet->data[len++] = '\0'; + + cp->sp = palloc(sizeof(StartupPacket)); + + cp->sp->startup_packet = (char *) startup_packet; + cp->sp->len = len + 4; + cp->sp->major = 3; + cp->sp->minor = 0; + cp->sp->database = pstrdup(dbname); + cp->sp->user = pstrdup(user); + + /* + * send startup packet + */ + PG_TRY(); + { + send_startup_packet(cp); + connection_do_auth(cp, password); + } + PG_CATCH(); + { + pool_close(cp->con); + free_persisten_db_connection_memory(cp); + PG_RE_THROW(); + } + PG_END_TRY(); + + return cp; +} + +/* + * make_persistent_db_connection_noerror() is a wrapper over + * make_persistent_db_connection() which does not ereports in case of an error + */ +POOL_CONNECTION_POOL_SLOT * +make_persistent_db_connection_noerror( + int db_node_id, char *hostname, int port, char *dbname, char *user, char *password, bool retry) +{ + POOL_CONNECTION_POOL_SLOT *slot = NULL; + MemoryContext oldContext = CurrentMemoryContext; + + PG_TRY(); + { + slot = make_persistent_db_connection(db_node_id, + hostname, + port, + dbname, + user, + password, retry); + } + PG_CATCH(); + { + EmitErrorReport(); + MemoryContextSwitchTo(oldContext); + FlushErrorState(); + slot = NULL; + } + PG_END_TRY(); + return slot; +} + +/* + * Free memory of POOL_CONNECTION_POOL_SLOT. Should only be used in + * make_persistent_db_connection and discard_persistent_db_connection. + */ +static void +free_persisten_db_connection_memory(POOL_CONNECTION_POOL_SLOT * cp) +{ + if (!cp) + return; + if (!cp->sp) + { + pfree(cp); + return; + } + if (cp->sp->startup_packet) + pfree(cp->sp->startup_packet); + if (cp->sp->database) + pfree(cp->sp->database); + if (cp->sp->user) + pfree(cp->sp->user); + pfree(cp->sp); + pfree(cp); +} + +/* + * Discard connection and memory allocated by + * make_persistent_db_connection(). + */ +void +discard_persistent_db_connection(POOL_CONNECTION_POOL_SLOT * cp) +{ + int len; + + if (cp == NULL) + return; + + pool_write(cp->con, "X", 1); + len = htonl(4); + pool_write(cp->con, &len, sizeof(len)); + + /* + * XXX we cannot call pool_flush() here since backend may already close + * the socket and pool_flush() automatically invokes fail over handler. + * This could happen in copy command (remember the famous "lost + * synchronization with server, resetting connection" message) + */ + socket_set_nonblock(cp->con->fd); + pool_flush_it(cp->con); + socket_unset_nonblock(cp->con->fd); + + pool_close(cp->con); + free_persisten_db_connection_memory(cp); +} + +/* + * send startup packet + */ +void +send_startup_packet(POOL_CONNECTION_POOL_SLOT * cp) +{ + int len; + + len = htonl(cp->sp->len + sizeof(len)); + pool_write(cp->con, &len, sizeof(len)); + pool_write_and_flush(cp->con, cp->sp->startup_packet, cp->sp->len); +} + +void +pool_free_startup_packet(StartupPacket *sp) +{ + if (sp) + { + if (sp->startup_packet) + pfree(sp->startup_packet); + if (sp->database) + pfree(sp->database); + if (sp->user) + pfree(sp->user); + pfree(sp); + } + sp = NULL; +} + +/* + * Select load balancing node. This function is called when: + * 1) client connects + * 2) the node previously selected for the load balance node is down + */ +int +select_load_balancing_node(void) +{ + int selected_slot; + double total_weight, + r; + int i; + int index_db = -1, + index_app = -1; + POOL_SESSION_CONTEXT *ses = pool_get_session_context(false); + int tmp; + int no_load_balance_node_id = -2; + + /* + * -2 indicates there's no database_redirect_preference_list. -1 indicates + * database_redirect_preference_list exists and any of standby nodes + * specified. + */ + int suggested_node_id = -2; + +#if defined(sun) || defined(__sun) + r = (((double) rand()) / RAND_MAX); +#else + r = (((double) random()) / RAND_MAX); +#endif + + /* + * Check database_redirect_preference_list + */ + if (SL_MODE && pool_config->redirect_dbnames) + { + char *database = MASTER_CONNECTION(ses->backend)->sp->database; + + /* + * Check to see if the database matches any of + * database_redirect_preference_list + */ + index_db = regex_array_match(pool_config->redirect_dbnames, database); + if (index_db >= 0) + { + /* Matches */ + ereport(DEBUG1, + (errmsg("selecting load balance node db matched"), + errdetail("dbname: %s index is %d dbnode is %s weight is %f", database, index_db, + pool_config->db_redirect_tokens->token[index_db].right_token, + pool_config->db_redirect_tokens->token[index_db].weight_token))); + + tmp = choose_db_node_id(pool_config->db_redirect_tokens->token[index_db].right_token); + if (tmp == -1 || (tmp >= 0 && VALID_BACKEND(tmp))) + suggested_node_id = tmp; + } + } + + /* + * Check app_name_redirect_preference_list + */ + if (SL_MODE && pool_config->redirect_app_names) + { + char *app_name = MASTER_CONNECTION(ses->backend)->sp->application_name; + + /* + * Check only if application name is set. Old applications may not + * have application name. + */ + if (app_name && strlen(app_name) > 0) + { + /* + * Check to see if the aplication name matches any of + * app_name_redirect_preference_list. + */ + index_app = regex_array_match(pool_config->redirect_app_names, app_name); + if (index_app >= 0) + { + + /* + * if the aplication name matches any of + * app_name_redirect_preference_list, + * database_redirect_preference_list will be ignored. + */ + index_db = -1; + + /* Matches */ + ereport(DEBUG1, + (errmsg("selecting load balance node db matched"), + errdetail("app_name: %s index is %d dbnode is %s weight is %f", app_name, index_app, + pool_config->app_name_redirect_tokens->token[index_app].right_token, + pool_config->app_name_redirect_tokens->token[index_app].weight_token))); + + tmp = choose_db_node_id(pool_config->app_name_redirect_tokens->token[index_app].right_token); + if (tmp == -1 || (tmp >= 0 && VALID_BACKEND(tmp))) + suggested_node_id = tmp; + } + } + } + + if (suggested_node_id >= 0) + { + /* + * If the weight is bigger than random rate then send to + * suggested_node_id. If the weight is less than random rate then + * choose load balance node from other nodes. + */ + if ((index_db >= 0 && r <= pool_config->db_redirect_tokens->token[index_db].weight_token) || + (index_app >= 0 && r <= pool_config->app_name_redirect_tokens->token[index_app].weight_token)) + { + ereport(DEBUG1, + (errmsg("selecting load balance node"), + errdetail("selected backend id is %d", suggested_node_id))); + return suggested_node_id; + } + else + no_load_balance_node_id = suggested_node_id; + } + + /* In case of sending to standby */ + if (suggested_node_id == -1) + { + /* If the weight is less than random rate then send to primary. */ + if ((index_db >= 0 && r > pool_config->db_redirect_tokens->token[index_db].weight_token) || + (index_app >= 0 && r > pool_config->app_name_redirect_tokens->token[index_app].weight_token)) + { + ereport(DEBUG1, + (errmsg("selecting load balance node"), + errdetail("selected backend id is %d", PRIMARY_NODE_ID))); + return PRIMARY_NODE_ID; + } + } + + /* Choose a backend in random manner with weight */ + selected_slot = MASTER_NODE_ID; + total_weight = 0.0; + + for (i = 0; i < NUM_BACKENDS; i++) + { + if (VALID_BACKEND_RAW(i)) + { + if (i == no_load_balance_node_id) + continue; + if (suggested_node_id == -1) + { + if (i != PRIMARY_NODE_ID) + total_weight += BACKEND_INFO(i).backend_weight; + } + else + total_weight += BACKEND_INFO(i).backend_weight; + } + } + +#if defined(sun) || defined(__sun) + r = (((double) rand()) / RAND_MAX) * total_weight; +#else + r = (((double) random()) / RAND_MAX) * total_weight; +#endif + + total_weight = 0.0; + for (i = 0; i < NUM_BACKENDS; i++) + { + if ((suggested_node_id == -1 && i == PRIMARY_NODE_ID) || i == no_load_balance_node_id) + continue; + + if (VALID_BACKEND_RAW(i) && BACKEND_INFO(i).backend_weight > 0.0) + { + if (r >= total_weight) + selected_slot = i; + else + break; + total_weight += BACKEND_INFO(i).backend_weight; + } + } + ereport(DEBUG1, + (errmsg("selecting load balance node"), + errdetail("selected backend id is %d", selected_slot))); + return selected_slot; +} + +/* + * Returns PostgreSQL version. + * The returned PgVersion struct is in static memory. + * Caller must not modify it. + * + * Note: + * Must be called while query context already exists. + * If there's something goes wrong, this raises FATAL. So never returns to caller. + * + */ +PGVersion * +Pgversion(POOL_CONNECTION_POOL * backend) +{ +#define VERSION_BUF_SIZE 10 + static PGVersion pgversion; + static POOL_RELCACHE *relcache; + char *result; + char *p; + char buf[VERSION_BUF_SIZE]; + int i; + int major; + int minor; + + /* + * First, check local cache. If cache is set, just return it. + */ + if (pgversion.major != 0) + { + ereport(DEBUG5, + (errmsg("Pgversion: local cache returned"))); + + return &pgversion; + } + + if (!relcache) + { + /* + * Create relcache. + */ + relcache = pool_create_relcache(pool_config->relcache_size, "SELECT version()", + string_register_func, string_unregister_func, false); + if (relcache == NULL) + { + ereport(FATAL, + (errmsg("Pgversion: unable to create relcache while getting PostgreSQL version."))); + return NULL; + } + } + + /* + * Search relcache. + */ + result = (char *)pool_search_relcache(relcache, backend, "version"); + if (result == 0) + { + ereport(FATAL, + (errmsg("Pgversion: unable to search relcache while getting PostgreSQL version."))); + return NULL; + } + + ereport(DEBUG5, + (errmsg("Pgversion: version string: %s", result))); + + /* + * Extract major version number. We create major version as "version" * + * 10. For example, for V10, the major version number will be 100, for + * V9.6 it will be 96, and so on. For alpha or beta version, the version + * string could be something like "12beta1". In this case we assume that + * atoi(3) is smart enough to stop at the first character which is not a + * valid digit (in our case 'b')). So "12beta1" should be converted to 12. + */ + p = strchr(result, ' '); + if (p == NULL) + { + ereport(FATAL, + (errmsg("Pgversion: unable to find the first space in the version string: %s", result))); + return NULL; + } + + p++; + i = 0; + while (i < VERSION_BUF_SIZE - 1 && p && *p != '.') + { + buf[i++] = *p++; + } + buf[i] = '\0'; + major = atoi(buf); + ereport(DEBUG5, + (errmsg("Pgversion: major version: %d", major))); + + /* Assuming PostgreSQL V100 is the final release:-) */ + if (major < 6 || major > 100) + { + ereport(FATAL, + (errmsg("Pgversion: wrong major version: %d", major))); + return NULL; + } + + /* + * If major version is 10 or above, we are done to extract major. + * Otherwise extract below decimal point part. + */ + if (major >= 10) + { + major *= 10; + } + else + { + p++; + i = 0; + while (i < VERSION_BUF_SIZE -1 && p && *p != '.' && *p != ' ') + { + buf[i++] = *p++; + } + buf[i] = '\0'; + major = major * 10 + atoi(buf); + ereport(DEBUG5, + (errmsg("Pgversion: major version: %d", major))); + pgversion.major = major; + } + + /* + * Extract minor version. + */ + p++; + i = 0; + while (i < VERSION_BUF_SIZE -1 && p && *p != '.' && *p != ' ') + { + buf[i++] = *p++; + } + buf[i] = '\0'; + minor = atoi(buf); + ereport(DEBUG5, + (errmsg("Pgversion: minor version: %d", minor))); + + if (minor < 0 || minor > 100) + { + ereport(FATAL, + (errmsg("Pgversion: wrong minor version: %d", minor))); + return NULL; + } + + + /* + * Ok, everything looks good. Set the local cache. + */ + pgversion.major = major; + pgversion.minor = minor; + strncpy(pgversion.version_string, result, sizeof(pgversion.version_string) - 1); + + return &pgversion; +} + +/* + * Given db node specified in pgpool.conf, returns appropriate physical + * DB node id. + * Acceptable db node specifications are: + * + * primary: primary node + * standby: any of standby node + * numeric: physical node id + * + * If specified node does exist, returns MASTER_NODE_ID. If "standby" is + * specified, returns -1. Caller should choose one of standby nodes + * appropriately. + */ +static int +choose_db_node_id(char *str) +{ + int node_id = MASTER_NODE_ID; + + if (!strcmp("primary", str) && PRIMARY_NODE_ID >= 0) + { + node_id = PRIMARY_NODE_ID; + } + else if (!strcmp("standby", str)) + { + node_id = -1; + } + else + { + int tmp = atoi(str); + + if (tmp >= 0 && tmp < NUM_BACKENDS) + node_id = tmp; + } + return node_id; +}