Index: pool_process_query.c =================================================================== RCS file: /cvsroot/pgpool/pgpool-II/pool_process_query.c,v retrieving revision 1.249.2.3 diff -u -r1.249.2.3 pool_process_query.c --- pool_process_query.c 26 Jan 2011 01:36:14 -0000 1.249.2.3 +++ pool_process_query.c 26 Jan 2011 04:46:24 -0000 @@ -65,7 +65,7 @@ static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt); static char *get_insert_command_table_name(InsertStmt *node); -static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList p, int n); +static int send_deallocate(POOL_CONNECTION_POOL *backend, POOL_SENT_MESSAGE_LIST msglist, int n); static int is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS ParallelForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *database, bool send_to_frontend); static bool is_panic_or_fatal_error(const char *message, int major); @@ -1335,7 +1335,7 @@ void reset_connection(void) { reset_variables(); - pool_clear_prepared_statement_list(); + pool_clear_sent_message_list(); } @@ -1375,36 +1375,45 @@ */ if (qcnt >= qn) { - if (session_context->pstmt_list.size == 0) + if (session_context->message_list.size == 0) return 2; - char *name = session_context->pstmt_list.pstmts[0]->name; + char kind = session_context->message_list.sent_messages[0]->kind; + char *name = session_context->message_list.sent_messages[0]->name; - /* Delete from prepared list */ - if (send_deallocate(backend, session_context->pstmt_list, 0)) + if ((kind == 'P' || kind == 'Q') && *name != '\0') { - /* Deallocate failed. We are in unknown state. Ask caller - * to reset backend connection. - */ + /* Deallocate a prepared statement */ + if (send_deallocate(backend, session_context->message_list, 0)) + { + /* Deallocate failed. We are in unknown state. Ask caller + * to reset backend connection. + */ #ifdef NOT_USED - reset_prepared_list(&prepared_list); + reset_prepared_list(&prepared_list); #endif - pool_remove_prepared_statement_by_pstmt_name(name); - return -1; - } - /* - * If DEALLOCATE returns ERROR response, instead of - * CommandComplete, del_prepared_list is not called and the - * prepared object keeps on sitting on the prepared list. This - * will cause infinite call to reset_backend. So we call - * del_prepared_list() again. This is harmless since trying to - * remove same prepared object will be ignored. - */ + pool_remove_sent_message(kind, name); + return -1; + } + /* + * If DEALLOCATE returns ERROR response, instead of + * CommandComplete, del_prepared_list is not called and the + * prepared object keeps on sitting on the prepared list. This + * will cause infinite call to reset_backend. So we call + * del_prepared_list() again. This is harmless since trying to + * remove same prepared object will be ignored. + */ #ifdef NOT_USED - del_prepared_list(&prepared_list, prepared_list.portal_list[0]); + del_prepared_list(&prepared_list, prepared_list.portal_list[0]); #endif - pool_remove_prepared_statement_by_pstmt_name(name); - return 1; + pool_remove_sent_message(kind, name); + return 1; + } + else + { + pool_remove_sent_message(kind, name); + return 0; + } } query = pool_config->reset_query_list[qcnt]; @@ -3261,15 +3270,20 @@ if (IsA(node, DeallocateStmt)) { - PreparedStatement *ps; + POOL_SENT_MESSAGE *sent_msg; DeallocateStmt *d = (DeallocateStmt *)node; - ps = pool_get_prepared_statement_by_pstmt_name(d->name); - if (ps && ps->qctxt->original_query) + sent_msg = pool_get_sent_message('Q', d->name); + if (!sent_msg) + sent_msg = pool_get_sent_message('P', d->name); + if (sent_msg) { - string_append_char(msg, "["); - string_append_char(msg, ps->qctxt->original_query); - string_append_char(msg, "]"); + if (sent_msg->query_context->original_query) + { + string_append_char(msg, "["); + string_append_char(msg, sent_msg->query_context->original_query); + string_append_char(msg, "]"); + } } } } @@ -3310,25 +3324,26 @@ /* * Send DEALLOCATE message to backend by using SimpleQuery. */ -static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList p, - int n) +static int send_deallocate(POOL_CONNECTION_POOL *backend, + POOL_SENT_MESSAGE_LIST msglist, int n) { - char *query; int len; - PrepareStmt *p_stmt; + char *name; + char *query; - if (p.size <= n) + if (msglist.size <= n) return 1; - p_stmt = (PrepareStmt *)p.pstmts[n]->qctxt->parse_tree; - len = strlen(p_stmt->name) + 14; /* "DEALLOCATE \"" + "\"" + '\0' */ + name = msglist.sent_messages[n]->name; + + len = strlen(name) + 14; /* "DEALLOCATE \"" + "\"" + '\0' */ query = malloc(len); if (query == NULL) { pool_error("send_deallocate: malloc failed"); return -1; } - sprintf(query, "DEALLOCATE \"%s\"", p_stmt->name); + sprintf(query, "DEALLOCATE \"%s\"", name); if (SimpleQuery(NULL, backend, strlen(query)+1, query) != POOL_CONTINUE) { Index: pool_proto_modules.c =================================================================== RCS file: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v retrieving revision 1.89.2.2 diff -u -r1.89.2.2 pool_proto_modules.c --- pool_proto_modules.c 26 Jan 2011 01:36:14 -0000 1.89.2.2 +++ pool_proto_modules.c 26 Jan 2011 04:46:26 -0000 @@ -88,8 +88,7 @@ static void generate_error_message(char *prefix, int specific_error, char *query); static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, - PreparedStatement *ps); -static void overwrite_map_for_deallocate(POOL_QUERY_CONTEXT *query_context); + POOL_SENT_MESSAGE *message); static int* find_victim_nodes(int *ntuples, int nmembers, int master_node, int *number_of_nodes); static int extract_ntuples(char *message); @@ -332,6 +331,7 @@ return POOL_ERROR; } } + if (MAJOR(backend) == PROTO_MAJOR_V2 && is_start_transaction_query(node)) { int i; @@ -343,6 +343,24 @@ } } + if (node) + { + POOL_SENT_MESSAGE *msg = NULL; + + if (IsA(node, PrepareStmt)) + { + msg = pool_create_sent_message('Q', len, contents, 0, + ((PrepareStmt *)node)->name, + query_context); + if (!msg) + { + pool_error("SimpleQuery: cannot create query message: %s", strerror(errno)); + return POOL_END; + } + session_context->uncompleted_message = msg; + } + } + string = query_context->original_query; if (!RAW_MODE) @@ -359,20 +377,17 @@ if (node) { - PreparedStatement *ps = NULL; + POOL_SENT_MESSAGE *msg = NULL; - if (IsA(node, PrepareStmt)) + if (IsA(node, ExecuteStmt)) { -#ifdef NOT_USED - ps = session_context->pending_pstmt; - ps->num_tsparams = 0; -#endif + msg = pool_get_sent_message('Q', ((ExecuteStmt *)node)->name); + if (!msg) + msg = pool_get_sent_message('P', ((ExecuteStmt *)node)->name); } - else if (IsA(node, ExecuteStmt)) - ps = pool_get_prepared_statement_by_pstmt_name(((ExecuteStmt *) node)->name); /* rewrite `now()' to timestamp literal */ - rewrite_query = rewrite_timestamp(backend, query_context->parse_tree, false, ps); + rewrite_query = rewrite_timestamp(backend, query_context->parse_tree, false, msg); if (rewrite_query != NULL) { query_context->rewritten_query = rewrite_query; @@ -450,12 +465,12 @@ int len, char *contents) { int commit = 0; - Portal *portal; char *query = NULL; Node *node; int specific_error = 0; POOL_SESSION_CONTEXT *session_context; POOL_QUERY_CONTEXT *query_context; + POOL_SENT_MESSAGE *msg; /* Get session context */ session_context = pool_get_session_context(); @@ -467,31 +482,31 @@ pool_debug("Execute: portal name <%s>", contents); - portal = pool_get_portal_by_portal_name(contents); - if (portal == NULL) + msg = pool_get_sent_message('B', contents); + if (!msg) { - pool_error("Execute: cannot get portal"); + pool_error("Execute: cannot get bind message"); return POOL_END; } - if (portal->pstmt == NULL) + if(!msg->query_context) { - pool_error("Execute: cannot get prepared statement"); + pool_error("Execute: cannot get query context"); return POOL_END; } - if (portal->pstmt->qctxt == NULL) + if (!msg->query_context->original_query) { - pool_error("Execute: cannot get query context"); + pool_error("Execute: cannot get original query"); return POOL_END; } - if (portal->pstmt->qctxt->parse_tree== NULL) + if (!msg->query_context->parse_tree) { pool_error("Execute: cannot get parse tree"); return POOL_END; } - query_context = portal->pstmt->qctxt; - node = query_context->parse_tree; - query = portal->pstmt->qctxt->original_query; + query_context = msg->query_context; + node = msg->query_context->parse_tree; + query = msg->query_context->original_query; pool_debug("Execute: query: %s", query); strncpy(query_string_buffer, query, sizeof(query_string_buffer)); @@ -501,10 +516,6 @@ session_context->query_context = query_context; pool_where_to_send(query_context, query, node); - - if (IsA(query_context->parse_tree, DeallocateStmt)) - overwrite_map_for_deallocate(query_context); - /* check if query is "COMMIT" or "ROLLBACK" */ commit = is_commit_query(node); @@ -584,7 +595,7 @@ char *stmt; List *parse_tree_list; Node *node = NULL; - PreparedStatement *ps; + POOL_SENT_MESSAGE *msg; POOL_STATUS status; POOL_MEMORY_POOL *old_context; POOL_SESSION_CONTEXT *session_context; @@ -642,8 +653,14 @@ */ pool_start_query(query_context, pstrdup(stmt), node); - ps = pool_create_prepared_statement(name, 0, len, contents, query_context); - session_context->pending_pstmt = ps; + msg = pool_create_sent_message('P', len, contents, 0, name, query_context); + if (!msg) + { + pool_error("Parse: cannot create parse message: %s", strerror(errno)); + return POOL_END; + } + + session_context->uncompleted_message = msg; /* * Decide where to send query @@ -651,9 +668,6 @@ pool_where_to_send(query_context, query_context->original_query, query_context->parse_tree); - if (IsA(query_context->parse_tree, DeallocateStmt)) - overwrite_map_for_deallocate(query_context); - if (REPLICATION) { char *rewrite_query; @@ -667,8 +681,8 @@ */ if (*name == '\0') rewrite_to_params = false; - ps->num_tsparams = 0; - rewrite_query = rewrite_timestamp(backend, node, rewrite_to_params, ps); + msg->num_tsparams = 0; + rewrite_query = rewrite_timestamp(backend, node, rewrite_to_params, msg); if (rewrite_query != NULL) { int alloc_len = len - strlen(stmt) + strlen(rewrite_query); @@ -684,8 +698,8 @@ stmt = contents + strlen(name) + 1; pool_debug("Parse: rewrite query %s %s len=%d", name, stmt, len); - ps->parse_len = len; - ps->parse_contents = contents; + msg->len = len; + msg->contents = contents; } } } @@ -721,6 +735,13 @@ /* free_parser(); */ return POOL_END; } + + /* + * set in_progress flag, because ReadyForQuery unset it. + * in_progress flag influences VALID_BACKEND. + */ + if (!pool_is_query_in_progress()) + pool_set_query_in_progress(); } if (is_strict_query(query_context->parse_tree)) @@ -823,8 +844,8 @@ char *pstmt_name; char *portal_name; char *rewrite_msg; - Portal *portal = NULL; - PreparedStatement *pstmt = NULL; + POOL_SENT_MESSAGE *parse_msg; + POOL_SENT_MESSAGE *bind_msg; POOL_SESSION_CONTEXT *session_context; POOL_QUERY_CONTEXT *query_context; @@ -842,33 +863,37 @@ portal_name = contents; pstmt_name = contents + strlen(portal_name) + 1; - pstmt = pool_get_prepared_statement_by_pstmt_name(pstmt_name); - if (pstmt == NULL) + parse_msg = pool_get_sent_message('Q', pstmt_name); + if (!parse_msg) + parse_msg = pool_get_sent_message('P', pstmt_name); + if (!parse_msg) { - pool_error("Bind: cannot get prepared statement \"%s\"", pstmt_name); + pool_error("Bind: cannot get parse message \"%s\"", pstmt_name); return POOL_END; } - portal = pool_create_portal(portal_name, pstmt->num_tsparams, pstmt); - if (portal == NULL) + bind_msg = pool_create_sent_message('B', len, contents, + parse_msg->num_tsparams, portal_name, + parse_msg->query_context); + if (!bind_msg) { - pool_error("Bind: cannot create portal: %s", strerror(errno)); + pool_error("Bind: cannot create bind message: %s", strerror(errno)); return POOL_END; } - query_context = pstmt->qctxt; - if (query_context == NULL) + query_context = parse_msg->query_context; + if (!query_context) { pool_error("Bind: cannot get query context"); return POOL_END; } - session_context->pending_portal = portal; + session_context->uncompleted_message = bind_msg; /* rewrite bind message */ - if (REPLICATION && portal->num_tsparams > 0) + if (REPLICATION && bind_msg->num_tsparams > 0) { - rewrite_msg = bind_rewrite_timestamp(backend, portal, contents, &len); + rewrite_msg = bind_rewrite_timestamp(backend, bind_msg, contents, &len); if (rewrite_msg != NULL) contents = rewrite_msg; } @@ -877,12 +902,9 @@ pool_where_to_send(query_context, query_context->original_query, query_context->parse_tree); - if (IsA(query_context->parse_tree, DeallocateStmt)) - overwrite_map_for_deallocate(query_context); - if (pool_config->load_balance_mode && pool_is_writing_transaction()) { - if(parse_before_bind(frontend, backend, pstmt) != POOL_CONTINUE) + if(parse_before_bind(frontend, backend, parse_msg) != POOL_CONTINUE) return POOL_END; } @@ -912,8 +934,7 @@ POOL_STATUS Describe(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int len, char *contents) { - Portal *portal = NULL; - PreparedStatement *pstmt = NULL; + POOL_SENT_MESSAGE *msg; POOL_SESSION_CONTEXT *session_context; POOL_QUERY_CONTEXT *query_context; @@ -928,28 +949,28 @@ /* Prepared Statement */ if (*contents == 'S') { - pstmt = pool_get_prepared_statement_by_pstmt_name(contents+1); + msg = pool_get_sent_message('Q', contents+1); + if (!msg) + msg = pool_get_sent_message('P', contents+1); + if (!msg) + { + pool_error("Describe: cannot get parse message"); + return POOL_END; + } } /* Portal */ else { - portal = pool_get_portal_by_portal_name(contents+1); - if (portal == NULL) + msg = pool_get_sent_message('B', contents+1); + if (!msg) { - pool_error("Describe: cannot get portal \"%s\"", contents+1); + pool_error("Describe: cannot get bind message"); return POOL_END; } - - pstmt = portal->pstmt; } - if (pstmt == NULL) - { - pool_error("Describe: cannot get prepared statement"); - return POOL_END; - } + query_context = msg->query_context; - query_context = pstmt->qctxt; if (query_context == NULL) { pool_error("Describe: cannot get query context"); @@ -960,9 +981,6 @@ pool_where_to_send(query_context, query_context->original_query, query_context->parse_tree); - if (IsA(query_context->parse_tree, DeallocateStmt)) - overwrite_map_for_deallocate(query_context); - pool_debug("Describe: waiting for master completing the query"); if (pool_send_and_wait(query_context, contents, len, 1, MASTER_NODE_ID, "D") != POOL_CONTINUE) @@ -979,8 +997,7 @@ POOL_STATUS Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int len, char *contents) { - Portal *portal = NULL; - PreparedStatement *pstmt = NULL; + POOL_SENT_MESSAGE *msg; POOL_SESSION_CONTEXT *session_context; POOL_QUERY_CONTEXT *query_context; @@ -995,28 +1012,24 @@ /* Prepared Statement */ if (*contents == 'S') { - pstmt = pool_get_prepared_statement_by_pstmt_name(contents+1); - if (pstmt == NULL) + msg = pool_get_sent_message('Q', contents+1); + if (!msg) + msg = pool_get_sent_message('P', contents+1); + if (!msg) { - pool_error("Close: cannot get prepared statement"); + pool_error("Close: cannot get parse message"); return POOL_END; } - - session_context->pending_pstmt = pstmt; - query_context = pstmt->qctxt; } /* Portal */ else if (*contents == 'P') { - portal = pool_get_portal_by_portal_name(contents+1); - if (portal == NULL) + msg = pool_get_sent_message('B', contents+1); + if (!msg) { - pool_error("Close: cannot get portal"); + pool_error("Close: cannot get bind message"); return POOL_END; } - - session_context->pending_portal = portal; - query_context = portal->qctxt; } else { @@ -1024,7 +1037,10 @@ return POOL_END; } - if (query_context == NULL) + session_context->uncompleted_message = msg; + query_context = msg->query_context; + + if (!query_context) { pool_error("Close: cannot get query context"); return POOL_END; @@ -1083,6 +1099,8 @@ signed char kind; signed char state; POOL_SESSION_CONTEXT *session_context; + Node *node = NULL; + char *query = NULL; /* Get session context */ session_context = pool_get_session_context(); @@ -1265,9 +1283,6 @@ if (pool_is_query_in_progress() && pool_is_command_success()) { - Node *node; - char *query; - node = pool_get_parse_tree(); query = pool_get_query_string(); @@ -1332,7 +1347,10 @@ } if (!pool_is_doing_extended_query_message()) - pool_query_context_destroy(pool_get_session_context()->query_context); + { + if (!(node && IsA(node, PrepareStmt))) + pool_query_context_destroy(pool_get_session_context()->query_context); + } sp = MASTER_CONNECTION(backend)->sp; if (MASTER(backend)->tstate == 'T') @@ -1358,18 +1376,17 @@ return POOL_END; } - if (session_context->pending_pstmt != NULL) + if (session_context->uncompleted_message) { POOL_QUERY_CONTEXT *qc; - pool_add_prepared_statement(); + pool_add_sent_message(session_context->uncompleted_message); - /* Set "parse done" to query_state */ - qc = session_context->pending_pstmt->qctxt; - if (qc != NULL) - pool_set_query_state(qc, 1); + qc = session_context->uncompleted_message->query_context; + if (qc) + pool_set_query_state(qc, POOL_PARSE_COMPLETE); - session_context->pending_pstmt = NULL; + session_context->uncompleted_message = NULL; } return SimpleForwardToFrontend('1', frontend, backend); @@ -1387,18 +1404,17 @@ return POOL_END; } - if (session_context->pending_portal != NULL) + if (session_context->uncompleted_message) { - PreparedStatement *pstmt; + POOL_QUERY_CONTEXT *qc; - pool_add_portal(); + pool_add_sent_message(session_context->uncompleted_message); - /* Set "bind done" to query_state */ - pstmt = session_context->pending_portal->pstmt; - if (pstmt != NULL && pstmt->qctxt != NULL) - pool_set_query_state(pstmt->qctxt, 2); + qc = session_context->uncompleted_message->query_context; + if (qc) + pool_set_query_state(qc, POOL_BIND_COMPLETE); - session_context->pending_portal = NULL; + session_context->uncompleted_message = NULL; } return SimpleForwardToFrontend('2', frontend, backend); @@ -1416,19 +1432,15 @@ return POOL_END; } - if (session_context->pending_pstmt != NULL) + if (session_context->uncompleted_message) { - pool_remove_prepared_statement(); - session_context->pending_pstmt = NULL; - } - else if (session_context->pending_portal != NULL) - { - pool_remove_portal(); - session_context->pending_portal = NULL; + pool_remove_sent_message(session_context->uncompleted_message->kind, + session_context->uncompleted_message->name); + session_context->uncompleted_message = NULL; } else { - pool_error("CloseComplete: pending object not found"); + pool_error("CloseComplete: uncompleted message not found"); return POOL_END; } @@ -1459,8 +1471,11 @@ if (IsA(node, PrepareStmt)) { - pool_add_prepared_statement(); - session_context->pending_pstmt = NULL; + if (session_context->uncompleted_message) + { + pool_add_sent_message(session_context->uncompleted_message); + session_context->uncompleted_message = NULL; + } } else if (IsA(node, DeallocateStmt)) { @@ -1468,18 +1483,28 @@ name = ((DeallocateStmt *)node)->name; if (name == NULL) - pool_clear_prepared_statement_list(); + { + pool_remove_sent_messages('Q'); + pool_remove_sent_messages('P'); + } else - pool_remove_prepared_statement(); - session_context->pending_pstmt = NULL; + { + pool_remove_sent_message('Q', name); + pool_remove_sent_message('P', name); + } } else if (IsA(node, DiscardStmt)) { DiscardStmt *stmt = (DiscardStmt *)node; - if (stmt->target == DISCARD_ALL || stmt->target == DISCARD_PLANS) + + if (stmt->target == DISCARD_PLANS) + { + pool_remove_sent_messages('Q'); + pool_remove_sent_messages('P'); + } + else if (stmt->target == DISCARD_ALL) { - pool_remove_pending_objects(); - pool_clear_prepared_statement_list(); + pool_clear_sent_message_list(); } } /* @@ -1679,6 +1704,7 @@ POOL_CONNECTION_POOL *backend) { POOL_STATUS ret; + POOL_SESSION_CONTEXT *session_context; ret = SimpleForwardToFrontend('E', frontend, backend); if (ret != POOL_CONTINUE) @@ -1746,7 +1772,17 @@ /* An error occurred with PREPARE or DEALLOCATE command. * Free pending portal object. */ - pool_remove_pending_objects(); + session_context = pool_get_session_context(); + if (session_context) + { + if (session_context->uncompleted_message) + { + pool_add_sent_message(session_context->uncompleted_message); + pool_remove_sent_message(session_context->uncompleted_message->kind, + session_context->uncompleted_message->name); + session_context->uncompleted_message = NULL; + } + } return POOL_CONTINUE; } @@ -2079,6 +2115,7 @@ case 'E': /* ErrorResponse */ status = ErrorResponse3(frontend, backend); + pool_set_command_success(); if (TSTATE(backend, REAL_MASTER_NODE_ID) != 'I') pool_set_failed_transaction(); if (pool_is_doing_extended_query_message()) @@ -2710,23 +2747,23 @@ static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, - PreparedStatement *ps) + POOL_SENT_MESSAGE *message) { int i; - int len = ps->parse_len; + int len = message->len; char kind = '\0'; - char *contents = ps->parse_contents; + char *contents = message->contents; bool parse_was_sent = false; bool backup[MAX_NUM_BACKENDS]; POOL_STATUS status; - POOL_QUERY_CONTEXT *qc = ps->qctxt; + POOL_QUERY_CONTEXT *qc = message->query_context; memcpy(backup, qc->where_to_send, sizeof(qc->where_to_send)); /* expect to send to master node only */ for (i = 0; i < NUM_BACKENDS; i++) { - if (qc->where_to_send[i] && qc->query_state[i] < 1) /* 1: parse done */ + if (qc->where_to_send[i] && statecmp(qc->query_state[i], POOL_PARSE_COMPLETE) < 0) { pool_debug("parse_before_bind: waiting for backend %d completing parse", i); if (pool_send_and_wait(qc, contents, len, 1, i, "P") != POOL_CONTINUE) @@ -2770,30 +2807,6 @@ return POOL_CONTINUE; } -static void overwrite_map_for_deallocate(POOL_QUERY_CONTEXT *query_context) -{ - char *name; - PreparedStatement *ps; - POOL_QUERY_CONTEXT *qc; - - if (IsA(query_context->parse_tree, DeallocateStmt)) { - name = ((DeallocateStmt *)query_context->parse_tree)->name; - if (name != NULL) /* NULL is "DEALLOCATE ALL" */ - { - ps = pool_get_prepared_statement_by_pstmt_name(name); - if (ps != NULL) - { - qc = ps->qctxt; - if (qc != NULL) - { - memcpy(query_context->where_to_send, qc->where_to_send, - sizeof(query_context->where_to_send)); - } - } - } - } -} - /* * Find victim nodes by "decide by majority" rule and returns array * of victim node ids. If no victim is found, return NULL. Index: pool_query_context.c =================================================================== RCS file: /cvsroot/pgpool/pgpool-II/pool_query_context.c,v retrieving revision 1.32 diff -u -r1.32 pool_query_context.c --- pool_query_context.c 20 Oct 2010 01:08:55 -0000 1.32 +++ pool_query_context.c 26 Jan 2011 04:46:27 -0000 @@ -420,40 +420,6 @@ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); } } - - /* PREPARE? */ - if (IsA(node, PrepareStmt)) - { - /* Make sure that same prepared statement does not exist */ - if (pool_get_prep_where(((PrepareStmt *)node)->name) == NULL) - { - /* Save the send map */ - pool_add_prep_where(((PrepareStmt *)node)->name, query_context->where_to_send); - } - } - - /* - * EXECUTE? - */ - else if (IsA(node, ExecuteStmt)) - { - bool *wts; - - wts = pool_get_prep_where(((ExecuteStmt *)node)->name); - if (wts) - { - /* Inherit same map from PREPARE */ - pool_copy_prep_where(wts, query_context->where_to_send); - } - } - - /* - * DEALLOCATE? - */ - else if (IsA(node, DeallocateStmt)) - { - where_to_send_deallocate(query_context, node); - } } else if (REPLICATION || PARALLEL_MODE) { @@ -481,13 +447,6 @@ pool_set_node_to_be_sent(query_context, REAL_MASTER_NODE_ID); } } - /* - * DEALLOCATE? - */ - else if (IsA(node, DeallocateStmt)) - { - where_to_send_deallocate(query_context, node); - } else { /* send to all nodes */ @@ -500,6 +459,29 @@ return; } + /* + * EXECUTE? + */ + if (IsA(node, ExecuteStmt)) + { + POOL_SENT_MESSAGE *msg; + + msg = pool_get_sent_message('Q', ((ExecuteStmt *)node)->name); + if (!msg) + msg = pool_get_sent_message('P', ((ExecuteStmt *)node)->name); + if (msg) + pool_copy_prep_where(msg->query_context->where_to_send, + query_context->where_to_send); + } + + /* + * DEALLOCATE? + */ + else if (IsA(node, DeallocateStmt)) + { + where_to_send_deallocate(query_context, node); + } + for (i=0;iwhere_to_send[i]) @@ -508,10 +490,10 @@ break; } } + return; } - /* * Send query and wait for response * string: @@ -973,7 +955,7 @@ void where_to_send_deallocate(POOL_QUERY_CONTEXT *query_context, Node *node) { DeallocateStmt *d = (DeallocateStmt *)node; - bool *wts; + POOL_SENT_MESSAGE *msg; /* DELLOCATE ALL? */ if (d->name == NULL) @@ -983,24 +965,16 @@ } else { - wts = pool_get_prep_where(d->name); - if (wts) - { - /* Inherit same map from PREPARE */ - pool_copy_prep_where(wts, query_context->where_to_send); - return; - } - else - { - PreparedStatement *ps; - - ps = pool_get_prepared_statement_by_pstmt_name(d->name); - if (ps && ps->qctxt) - { - pool_copy_prep_where(ps->qctxt->where_to_send, query_context->where_to_send); - return; - } + msg = pool_get_sent_message('Q', d->name); + if (!msg) + msg = pool_get_sent_message('P', d->name); + if (msg) + { + /* Inherit same map from PREPARE or PARSE */ + pool_copy_prep_where(msg->query_context->where_to_send, + query_context->where_to_send); } + return; } /* prepared statement was not found */ pool_setall_node_to_be_sent(query_context); @@ -1114,12 +1088,9 @@ } /* - * Set query state, if specified state less than current state - * state: - * 0: before parse 1: parse done 2: bind done - * 3: describe done 4: execute done -1: in error + * Set query state, if a current state is before it than the specified state. */ -void pool_set_query_state(POOL_QUERY_CONTEXT *query_context, short state) +void pool_set_query_state(POOL_QUERY_CONTEXT *query_context, POOL_QUERY_STATE state) { int i; @@ -1129,16 +1100,38 @@ return; } - if (state < -1 || state > 4) - { - pool_error("pool_set_query_state: invalid query state: %d", state); - return; - } - for (i = 0; i < NUM_BACKENDS; i++) { if (query_context->where_to_send[i] && - query_context->query_state[i] < state) + statecmp(query_context->query_state[i], state) < 0) query_context->query_state[i] = state; } } + +int statecmp(POOL_QUERY_STATE s1, POOL_QUERY_STATE s2) +{ + int ret; + + switch (s2) { + case POOL_UNPARSED: + ret = (s1 == s2) ? 0 : 1; + break; + case POOL_PARSE_COMPLETE: + if (s1 == POOL_UNPARSED) + ret = -1; + else + ret = (s1 == s2) ? 0 : 1; + break; + case POOL_BIND_COMPLETE: + if (s1 == POOL_UNPARSED || s1 == POOL_PARSE_COMPLETE) + ret = -1; + else + ret = (s1 == s2) ? 0 : 1; + break; + case POOL_EXECUTE_COMPLETE: + ret = (s1 == s2) ? 0 : -1; + break; + } + + return ret; +} Index: pool_query_context.h =================================================================== RCS file: /cvsroot/pgpool/pgpool-II/pool_query_context.h,v retrieving revision 1.8 diff -u -r1.8 pool_query_context.h --- pool_query_context.h 21 Jul 2010 04:33:59 -0000 1.8 +++ pool_query_context.h 26 Jan 2011 04:46:27 -0000 @@ -31,6 +31,13 @@ #include "parser/nodes.h" #include "parser/pool_memory.h" +typedef enum { + POOL_UNPARSED, + POOL_PARSE_COMPLETE, + POOL_BIND_COMPLETE, + POOL_EXECUTE_COMPLETE +} POOL_QUERY_STATE; + /* * Query context: * Manages per query context @@ -38,19 +45,12 @@ typedef struct { char *original_query; /* original query string */ char *rewritten_query; /* rewritten query string if any */ - Node *parse_tree; /* raw parser output if any */ + Node *parse_tree; /* raw parser output if any */ Node *rewritten_parse_tree; /* rewritten raw parser output if any */ - bool where_to_send[MAX_NUM_BACKENDS]; /* DB node map to send query */ - int virtual_master_node_id; /* the first DB node to send query */ - POOL_MEMORY_POOL *memory_context; /* memory context for query */ - - /* - * query_state: - * 0: before parse 1: parse done 2: bind done - * 3: describe done 4: execute done -1: in error - */ - short query_state[MAX_NUM_BACKENDS]; - + bool where_to_send[MAX_NUM_BACKENDS]; /* DB node map to send query */ + int virtual_master_node_id; /* the 1st DB node to send query */ + POOL_MEMORY_POOL *memory_context; /* memory context for query */ + POOL_QUERY_STATE query_state[MAX_NUM_BACKENDS]; /* for extended query protocol */ } POOL_QUERY_CONTEXT; extern POOL_QUERY_CONTEXT *pool_init_query_context(void); @@ -71,6 +71,7 @@ extern char *pool_get_query_string(void); extern bool is_set_transaction_serializable(Node *ndoe, char *query); extern bool is_2pc_transaction_query(Node *node, char *query); -extern void pool_set_query_state(POOL_QUERY_CONTEXT *query_context, short state); +extern void pool_set_query_state(POOL_QUERY_CONTEXT *query_context, POOL_QUERY_STATE state); +extern int statecmp(POOL_QUERY_STATE s1, POOL_QUERY_STATE s2); #endif /* POOL_QUERY_CONTEXT_H */ Index: pool_session_context.c =================================================================== RCS file: /cvsroot/pgpool/pgpool-II/pool_session_context.c,v retrieving revision 1.25 diff -u -r1.25 pool_session_context.c --- pool_session_context.c 19 Oct 2010 08:57:18 -0000 1.25 +++ pool_session_context.c 26 Jan 2011 04:46:28 -0000 @@ -31,10 +31,8 @@ static POOL_SESSION_CONTEXT session_context_d; static POOL_SESSION_CONTEXT *session_context = NULL; -static void init_prepared_statement_list(void); -static void init_portal_list(void); -static bool can_prepared_statement_destroy(POOL_QUERY_CONTEXT *qc); -static bool can_portal_destroy(POOL_QUERY_CONTEXT *qc); +static void init_sent_message_list(void); +static bool can_query_context_destroy(POOL_QUERY_CONTEXT *qc); /* * Initialize per session context @@ -61,11 +59,8 @@ /* Initialize local session id */ pool_incremnet_local_session_id(); - /* Initialize prepared statement list */ - init_prepared_statement_list(); - - /* Initialize portal list */ - init_portal_list(); + /* Initialize sent message list */ + init_sent_message_list(); /* Create memory context */ session_context->memory_context = pool_memory_create(PREPARE_BLOCK_SIZE); @@ -109,9 +104,10 @@ pool_unset_ignore_till_sync(); /* Initialize where to send map for PREPARE statemets */ +#ifdef NOT_USED memset(&session_context->prep_where, 0, sizeof(session_context->prep_where)); session_context->prep_where.nelem = POOL_MAX_PREPARED_STATEMENTS; - +#endif /* NOT_USED */ /* Reset flag to indicate difference in number of affected tuples * in UPDATE/DELETE. */ @@ -125,8 +121,8 @@ { if (session_context) { - free(session_context->pstmt_list.pstmts); - free(session_context->portal_list.portals); + pool_clear_sent_message_list(); + free(session_context->message_list.sent_messages); pool_memory_delete(session_context->memory_context, 0); } /* XXX For now, just zap memory */ @@ -346,80 +342,73 @@ } /* - * Remove a portal by portal name + * Remove a sent message */ -static void pool_remove_portal_by_portal_name(const char *name) +bool pool_remove_sent_message(char kind, const char *name) { int i; - PortalList *plist; + POOL_SENT_MESSAGE_LIST *msglist; - if (*name == '\0') + if (!session_context) { - if (session_context->unnamed_portal) - { - pool_memory_free(session_context->memory_context, - session_context->unnamed_portal); - session_context->unnamed_portal = NULL; - } - return; + pool_error("pool_remove_sent_message: session context is not initialized"); + return false; } - plist = &session_context->portal_list; + msglist = &session_context->message_list; - for (i = 0; i < plist->size; i++) + for (i = 0; i < msglist->size; i++) { - if (strcmp(plist->portals[i]->name, name) == 0) + if (msglist->sent_messages[i]->kind == kind && + !strcmp(msglist->sent_messages[i]->name, name)) { - if (can_portal_destroy(plist->portals[i]->qctxt)) - pool_query_context_destroy(plist->portals[i]->qctxt); - pool_memory_free(session_context->memory_context, plist->portals[i]); + pool_sent_message_destroy(msglist->sent_messages[i]); break; } } - - /* portal not found */ - if (i == plist->size) - return; - - if (i != plist->size - 1) + + /* sent message not found */ + if (i == msglist->size) + return false; + + if (i != msglist->size - 1) { - memmove(&plist->portals[i], &plist->portals[i+1], - sizeof(Portal *) * (plist->size - i - 1)); + memmove(&msglist->sent_messages[i], &msglist->sent_messages[i+1], + sizeof(POOL_SENT_MESSAGE *) * (msglist->size - i - 1)); } - plist->size--; + + msglist->size--; + + return true; } /* - * Remove portals by prepared statement name - * prepared statement : portal = 1 : N + * Remove same kind of sent messages */ -#ifdef NOT_USED -static void pool_remove_portal_by_pstmt_name(const char *name) +void pool_remove_sent_messages(char kind) { int i; - PortalList *plist; + POOL_SENT_MESSAGE_LIST *msglist; - if (*name == '\0') + if (!session_context) { - if (session_context->unnamed_portal) - { - pool_memory_free(session_context->memory_context, - session_context->unnamed_portal); - session_context->unnamed_portal = NULL; - } + pool_error("pool_remove_sent_messages: session context is not initialized"); return; } - plist = &session_context->portal_list; + msglist = &session_context->message_list; - for (i = 0; i < plist->size; i++) + for (i = 0; i < msglist->size; i++) { - if (strcmp(plist->portals[i]->pstmt->name, name) == 0) - pool_remove_portal_by_portal_name(plist->portals[i]->name); + if (msglist->sent_messages[i]->kind == kind) + { + if (pool_remove_sent_message(kind, msglist->sent_messages[i]->name)) + i--; /* for relocation by removing */ + } } } -#endif +#ifdef NOT_USED /* * Remove a prepared statement by prepared statement name */ @@ -488,354 +477,187 @@ if (in_progress) pool_set_query_in_progress(); } - +#endif /* NOT_USED */ /* - * Remove a pending prepared statement from prepared statement list + * Destroy sent message */ -void pool_remove_prepared_statement(void) +void pool_sent_message_destroy(POOL_SENT_MESSAGE *message) { - char *name; - - if (!session_context) - { - pool_error("pool_remove_prepared_statement: session context is not initialized"); - return; - } - - if (session_context->pending_pstmt) - { - name = session_context->pending_pstmt->name; - pool_remove_prepared_statement_by_pstmt_name(name); - } - else - { - pool_debug("pool_remove_prepared_statement: pending prepared statement is NULL"); - } -} - - -/* - * Remove a pending portal from portal list - */ -void pool_remove_portal(void) -{ - char *name; + bool in_progress; + POOL_QUERY_CONTEXT *qc = NULL; if (!session_context) { - pool_error("pool_remove_portal: session context is not initialized"); + pool_error("pool_sent_message_destroy: session context is not initialized"); return; } - if (session_context->pending_portal) - { - name = session_context->pending_portal->name; - pool_remove_portal_by_portal_name(name); - } -} - -/* - * Remove pending objects - */ -void pool_remove_pending_objects(void) -{ - PreparedStatement *ps; - Portal *p; - - ps = session_context->pending_pstmt; - - if (ps && ps->name) - pool_memory_free(session_context->memory_context, ps->name); + in_progress = pool_is_query_in_progress(); - if (ps && ps->qctxt) + if (message) { - if (can_prepared_statement_destroy(ps->qctxt) && - can_portal_destroy(ps->qctxt)) - pool_query_context_destroy(ps->qctxt); - } - - if (ps) - pool_memory_free(session_context->memory_context, ps); - - p = session_context->pending_portal; + if (message->contents) + pool_memory_free(session_context->memory_context, message->contents); + + if (message->name) + pool_memory_free(session_context->memory_context, message->name); - if (p && p->name) - pool_memory_free(session_context->memory_context, p->name); + if (message->query_context) + { + if (session_context->query_context != message->query_context) + qc = session_context->query_context; - if (p && p->pstmt) - pool_memory_free(session_context->memory_context, p->pstmt); + if (can_query_context_destroy(message->query_context)) + { + pool_query_context_destroy(message->query_context); + /* + * set in_progress flag, because pool_query_context_destroy() + * unsets in_progress flag + */ + if (in_progress) + pool_set_query_in_progress(); + /* + * set query_context of session_context, because + * pool_query_context_destroy() sets it to NULL. + */ + if (qc) + session_context->query_context = qc; + } + } - if (p && p->qctxt) - { - if (can_portal_destroy(p->qctxt) && - can_prepared_statement_destroy(p->qctxt)) - pool_query_context_destroy(p->qctxt); + if (session_context->memory_context) + pool_memory_free(session_context->memory_context, message); } - - if (p) - pool_memory_free(session_context->memory_context, p); - - session_context->pending_pstmt = NULL; - session_context->pending_portal = NULL; } /* - * Clear prepared statement list and portal list + * Clear sent message list */ -void pool_clear_prepared_statement_list(void) +void pool_clear_sent_message_list(void) { - PreparedStatementList *pslist; + POOL_SENT_MESSAGE_LIST *msglist; if (!session_context) { - pool_error("pool_clear_prepared_statement_list: session context is not initialized"); + pool_error("pool_clear_sent_message_list: session context is not initialized"); return; } - pslist = &session_context->pstmt_list; + msglist = &session_context->message_list; - while (pslist->size > 0) + while (msglist->size > 0) { - pool_remove_prepared_statement_by_pstmt_name(pslist->pstmts[0]->name); + pool_remove_sent_messages(msglist->sent_messages[0]->kind); } } /* - * Create a prepared statement - * len: the length of parse message which is not network byte order - * contents: the contents of parse message - */ -PreparedStatement *pool_create_prepared_statement(const char *name, - int num_tsparams, - int len, char *contents, - POOL_QUERY_CONTEXT *qc) -{ - PreparedStatement *ps; - - if (!session_context) - { - pool_error("pool_create_prepared_statement: session context is not initialized"); - return NULL; - } - - ps = pool_memory_alloc(session_context->memory_context, - sizeof(PreparedStatement)); - ps->name = pool_memory_strdup(session_context->memory_context, name); - ps->num_tsparams = num_tsparams; - ps->parse_len = len; - ps->parse_contents = pool_memory_alloc(session_context->memory_context, len); - memcpy(ps->parse_contents, contents, len); - -#ifdef NOT_USED - /* - * duplicate query_context because session_context->query_context is - * freed by pool_query_context_destroy() - */ - q = malloc(sizeof(POOL_QUERY_CONTEXT)); - if (q == NULL) - { - pool_error("pool_create_prepared_statement: malloc failed: %s", strerror(errno)); - exit(1); - } - ps->qctxt = memcpy(q, qc, sizeof(POOL_QUERY_CONTEXT)); -#endif - ps->qctxt = qc; - - return ps; -} - -/* - * Create a portal + * Create a sent message + * kind: one of 'P':Parse, 'B':Bind or'Q':Query(PREPARE) + * len: message length that is not network byte order + * contents: message contents + * num_tsparams: number of timestamp parameters + * name: prepared statement name or portal name */ -Portal *pool_create_portal(const char *name, int num_tsparams, PreparedStatement *pstmt) +POOL_SENT_MESSAGE *pool_create_sent_message(char kind, int len, char *contents, + int num_tsparams, const char *name, + POOL_QUERY_CONTEXT *query_context) { - Portal *portal; + POOL_SENT_MESSAGE *msg; if (!session_context) { - pool_error("pool_create_portal: session context is not initialized"); + pool_error("pool_create_sent_message: session context is not initialized"); return NULL; } - portal = pool_memory_alloc(session_context->memory_context, sizeof(Portal)); - portal->name = pool_memory_strdup(session_context->memory_context, name); - portal->num_tsparams = num_tsparams; - portal->pstmt = pstmt; - portal->qctxt = pstmt->qctxt; + msg = pool_memory_alloc(session_context->memory_context, + sizeof(POOL_SENT_MESSAGE)); + msg->kind = kind; + msg->len = len; + msg->contents = pool_memory_alloc(session_context->memory_context, len); + memcpy(msg->contents, contents, len); + msg->num_tsparams = num_tsparams; + msg->name = pool_memory_strdup(session_context->memory_context, name); + msg->query_context = query_context; - return portal; + return msg; } /* - * Add a prepared statement to prepared statement list + * Add a sent message to sent message list */ -void pool_add_prepared_statement(void) +void pool_add_sent_message(POOL_SENT_MESSAGE *message) { - PreparedStatement *ps; - PreparedStatementList *pslist; + POOL_SENT_MESSAGE *old_msg; + POOL_SENT_MESSAGE_LIST *msglist; if (!session_context) { - pool_error("pool_add_prepared_statement: session context is not initialized"); + pool_error("pool_add_sent_message: session context is not initialized"); return; } - if (!session_context->pending_pstmt) + if (!message) { - pool_debug("pool_add_prepared_statement: pending prepared statement is NULL"); + pool_debug("pool_add_sent_message: message is NULL"); return; } - ps = pool_get_prepared_statement_by_pstmt_name(session_context->pending_pstmt->name); - pslist = &session_context->pstmt_list; + old_msg = pool_get_sent_message(message->kind, message->name); + msglist = &session_context->message_list; - if (ps) + if (old_msg) { - pool_remove_prepared_statement_by_pstmt_name(ps->name); - if (*session_context->pending_pstmt->name == '\0') - { - session_context->unnamed_pstmt = session_context->pending_pstmt; - session_context->query_context = session_context->pending_pstmt->qctxt; - } + if (message->kind == 'B') + pool_debug("pool_add_sent_message: portal \"%s\" already exists", + message->name); else - { - pool_error("pool_add_prepared_statement: prepared statement \"%s\" already exists", - session_context->pending_pstmt->name); - } - } - else - { - if (*session_context->pending_pstmt->name == '\0') - { - session_context->unnamed_pstmt = session_context->pending_pstmt; - } - else - { - if (pslist->size == pslist->capacity) - { - pslist->capacity *= 2; - pslist->pstmts = realloc(pslist->pstmts, sizeof(PreparedStatement *) * pslist->capacity); - if (pslist->pstmts == NULL) - { - pool_error("pool_add_prepared_statement: realloc failed: %s", strerror(errno)); - exit(1); - } - } - pslist->pstmts[pslist->size++] = session_context->pending_pstmt; - } - } -} - -/* - * Add a portal to portal list - */ -void pool_add_portal(void) -{ - Portal *p; - PortalList *plist; - - if (!session_context) - { - pool_error("pool_add_portal: session context is not initialized"); - return; - } - - if (!session_context->pending_portal) - { - pool_debug("pool_add_portal: pending portal is NULL"); - return; - } - - p = pool_get_portal_by_portal_name(session_context->pending_portal->name); - plist = &session_context->portal_list; + pool_debug("pool_add_sent_message: prepared statement \"%s\" already exists", + message->name); - if (p) - { - pool_remove_portal_by_portal_name(p->name); - if (*session_context->pending_portal->name == '\0') - { - session_context->unnamed_portal = session_context->pending_portal; - } + if (*message->name == '\0') + pool_remove_sent_message(old_msg->kind, old_msg->name); else - { - pool_error("pool_add_portal: portal \"%s\" already exists", - session_context->pending_portal->name); - } + return; } - else + + if (msglist->size == msglist->capacity) { - if (*session_context->pending_portal->name == '\0') + msglist->capacity *= 2; + msglist->sent_messages = realloc(msglist->sent_messages, + sizeof(POOL_SENT_MESSAGE *) * msglist->capacity); + if (!msglist->sent_messages) { - session_context->unnamed_portal = session_context->pending_portal; + pool_error("pool_add_sent_message: realloc failed: %s", strerror(errno)); + exit(1); } - else - { - if (plist->size == plist->capacity) - { - plist->capacity *= 2; - plist->portals = realloc(plist->portals, sizeof(Portal *) * plist->capacity); - if (plist->portals == NULL) - { - pool_error("pool_add_portal: realloc failed: %s", strerror(errno)); - exit(1); - } - } - plist->portals[plist->size++] = session_context->pending_portal; - } - } -} - -/* - * Get a prepared statement by prepared statement name - */ -PreparedStatement *pool_get_prepared_statement_by_pstmt_name(const char *name) -{ - int i; - PreparedStatementList *pslist; - - if (!session_context) - { - pool_error("pool_get_prepared_statement_by_pstmt_name: session context is not initialized"); - return NULL; } - if (*name == '\0') - return session_context->unnamed_pstmt; - - pslist = &session_context->pstmt_list; - - for (i = 0; i < pslist->size; i++) - { - if (strcmp(pslist->pstmts[i]->name, name) == 0) - return pslist->pstmts[i]; - } - - return NULL; + msglist->sent_messages[msglist->size++] = message; } /* - * Get a portal by portal name + * Get a sent message */ -Portal *pool_get_portal_by_portal_name(const char *name) +POOL_SENT_MESSAGE *pool_get_sent_message(char kind, const char *name) { int i; - PortalList *plist; + POOL_SENT_MESSAGE_LIST *msglist; if (!session_context) { - pool_error("pool_get_portal_by_portal_name: session context is not initialized"); + pool_error("pool_get_sent_message: session context is not initialized"); return NULL; } - if (*name == '\0') - return session_context->unnamed_portal; - - plist = &session_context->portal_list; + msglist = &session_context->message_list; - for (i = 0; i < plist->size; i++) + for (i = 0; i < msglist->size; i++) { - if (strcmp(plist->portals[i]->name, name) == 0) - return plist->portals[i]; + if (msglist->sent_messages[i]->kind == kind && + !strcmp(msglist->sent_messages[i]->name, name)) + return msglist->sent_messages[i]; } return NULL; @@ -1052,7 +874,7 @@ { memcpy(dest, src, sizeof(bool)*MAX_NUM_BACKENDS); } - +#ifdef NOT_USED /* * Add to send map a PREPARED statement */ @@ -1124,76 +946,42 @@ } } } - -/* - * Initialize prepared statement list - */ -static void init_prepared_statement_list(void) -{ - PreparedStatementList *pslist; - - pslist = &session_context->pstmt_list; - pslist->size = 0; - pslist->capacity = INIT_LIST_SIZE; - pslist->pstmts = malloc(sizeof(PreparedStatement *) * INIT_LIST_SIZE); - if (pslist->pstmts == NULL) - { - pool_error("init_prepared_statement_list: malloc failed: %s", strerror(errno)); - exit(1); - } -} - +#endif /* NOT_USED */ /* - * Initialize portal list + * Initialize sent message list */ -static void init_portal_list(void) +static void init_sent_message_list(void) { - PortalList *plist; + POOL_SENT_MESSAGE_LIST *msglist; - plist = &session_context->portal_list; - plist->size = 0; - plist->capacity = INIT_LIST_SIZE; - plist->portals = malloc(sizeof(Portal *) * INIT_LIST_SIZE); - if (plist->portals == NULL) + msglist = &session_context->message_list; + msglist->size = 0; + msglist->capacity = INIT_LIST_SIZE; + msglist->sent_messages = malloc(sizeof(POOL_SENT_MESSAGE *) * INIT_LIST_SIZE); + if (!msglist->sent_messages) { - pool_error("init_portal_list: malloc failed: %s", strerror(errno)); + pool_error("init_sent_message_list: malloc failed: %s", strerror(errno)); exit(1); } } -static bool can_prepared_statement_destroy(POOL_QUERY_CONTEXT *qc) +static bool can_query_context_destroy(POOL_QUERY_CONTEXT *qc) { int i; - PortalList *plist; + int count = 0; + POOL_SENT_MESSAGE_LIST *msglist; - plist = &session_context->portal_list; + msglist = &session_context->message_list; - for (i = 0; i < plist->size; i++) + for (i = 0; i < msglist->size; i++) { - if (plist->portals[i]->qctxt == qc) - { - pool_debug("can_prepared_statement_destroy: query context is still used."); - return false; - } + if (msglist->sent_messages[i]->query_context == qc) + count++; } - - return true; -} - -static bool can_portal_destroy(POOL_QUERY_CONTEXT *qc) -{ - int i; - PreparedStatementList *pslist; - - pslist = &session_context->pstmt_list; - - for (i = 0; i < pslist->size; i++) + if (count > 1) { - if (pslist->pstmts[i]->qctxt == qc) - { - pool_debug("can_portal_destroy: query context is still used."); - return false; - } + pool_debug("can_query_context_destroy: query context is still used."); + return false; } return true; Index: pool_session_context.h =================================================================== RCS file: /cvsroot/pgpool/pgpool-II/pool_session_context.h,v retrieving revision 1.20 diff -u -r1.20 pool_session_context.h --- pool_session_context.h 12 Oct 2010 11:39:37 -0000 1.20 +++ pool_session_context.h 26 Jan 2011 04:46:28 -0000 @@ -34,46 +34,6 @@ #include "parser/pool_memory.h" /* - * Prepared Statement: - */ -typedef struct { - char *name; /* prepared statement name */ - int num_tsparams; - int parse_len; /* the length of parse message which is - not network byte order */ - char *parse_contents; /* contents of parse message */ - POOL_QUERY_CONTEXT *qctxt; -} PreparedStatement; - -/* - * Prepared statement list: - */ -typedef struct { - int capacity; /* capacity of list */ - int size; /* number of PreparedStatement */ - PreparedStatement **pstmts; /* prepared statement list */ -} PreparedStatementList; - -/* - * Portal: - */ -typedef struct { - char *name; /* portal name */ - int num_tsparams; - PreparedStatement *pstmt; - POOL_QUERY_CONTEXT *qctxt; -} Portal; - -/* - * Portal list: - */ -typedef struct { - int capacity; /* capacity of list */ - int size; /* number of portal */ - Portal **portals; /* portal list */ -} PortalList; - -/* * Transaction isolation mode */ typedef enum { @@ -81,7 +41,7 @@ POOL_READ_COMMITTED, /* Read committed */ POOL_SERIALIZABLE /* Serializable */ } POOL_TRANSACTION_ISOLATION; - +#ifdef NOT_USED /* * where to send map for PREPARE/EXECUTE/DEALLOCATE */ @@ -93,7 +53,22 @@ char name[POOL_MAX_PREPARED_STATEMENTS][POOL_MAX_PREPARED_NAME]; /* Prepared statement name */ bool where_to_send[POOL_MAX_PREPARED_STATEMENTS][MAX_NUM_BACKENDS]; } POOL_PREPARED_SEND_MAP; - +#endif /* NOT_USED */ +typedef struct { + char kind; /* one of 'P':Parse, 'B':Bind or 'Q':Query(PREPARE) */ + int len; /* not network byte order */ + char *contents; + int num_tsparams; + char *name; /* object name of prepared statement or portal */ + POOL_QUERY_CONTEXT *query_context; +} POOL_SENT_MESSAGE; + +typedef struct { + int capacity; /* capacity of list */ + int size; /* number of elements */ + POOL_SENT_MESSAGE **sent_messages; +} POOL_SENT_MESSAGE_LIST; + /* * Per session context: */ @@ -137,17 +112,16 @@ * "PreparedStatementList *pstmt_list" (see below). */ POOL_QUERY_CONTEXT *query_context; - +#ifdef NOT_USED /* where to send map for PREPARE/EXECUTE/DEALLOCATE */ POOL_PREPARED_SEND_MAP prep_where; - +#endif /* NOT_USED */ POOL_MEMORY_POOL *memory_context; /* memory context for session */ - PreparedStatement *unnamed_pstmt; /* unnamed statement */ - PreparedStatement *pending_pstmt; /* used until receive backend response */ - Portal *unnamed_portal; /* unnamed portal */ - Portal *pending_portal; /* used until receive backend response */ - PreparedStatementList pstmt_list; /* named statement list */ - PortalList portal_list; /* named portal list */ + + /* message which does'nt receive complete message */ + POOL_SENT_MESSAGE *uncompleted_message; + + POOL_SENT_MESSAGE_LIST message_list; int load_balance_node_id; /* selected load balance node id */ @@ -186,20 +160,15 @@ extern bool pool_is_ignore_till_sync(void); extern void pool_set_ignore_till_sync(void); extern void pool_unset_ignore_till_sync(void); -extern void pool_remove_prepared_statement_by_pstmt_name(const char *name); -extern void pool_remove_prepared_statement(void); -extern void pool_remove_portal(void); -extern void pool_remove_pending_objects(void); -extern void pool_clear_prepared_statement_list(void); -extern PreparedStatement *pool_create_prepared_statement(const char *name, int num_tsparams, - int len, char *contents, - POOL_QUERY_CONTEXT *qc); -extern Portal *pool_create_portal(const char *name, int num_tsparams, PreparedStatement *pstmt); -extern void pool_add_prepared_statement(void); -extern void pool_add_portal(void); -extern PreparedStatement *pool_get_prepared_statement_by_pstmt_name(const char *name); -extern Portal *pool_get_portal_by_portal_name(const char *name); - +extern POOL_SENT_MESSAGE *pool_create_sent_message(char kind, int len, char *contents, + int num_tsparams, const char *name, + POOL_QUERY_CONTEXT *query_context); +extern void pool_add_sent_message(POOL_SENT_MESSAGE *message); +extern bool pool_remove_sent_message(char kind, const char *name); +extern void pool_remove_sent_messages(char kind); +extern void pool_clear_sent_message_list(void); +extern void pool_sent_message_destroy(POOL_SENT_MESSAGE *message); +extern POOL_SENT_MESSAGE *pool_get_sent_message(char kind, const char *name); extern void pool_unset_writing_transaction(void); extern void pool_set_writing_transaction(void); extern bool pool_is_writing_transaction(void); @@ -213,8 +182,9 @@ extern void pool_set_command_success(void); extern bool pool_is_command_success(void); extern void pool_copy_prep_where(bool *src, bool *dest); +#ifdef NOT_USED extern void pool_add_prep_where(char *name, bool *map); extern bool *pool_get_prep_where(char *name); extern void pool_delete_prep_where(char *name); - +#endif /* NOT_USED */ #endif /* POOL_SESSION_CONTEXT_H */ Index: pool_timestamp.c =================================================================== RCS file: /cvsroot/pgpool/pgpool-II/pool_timestamp.c,v retrieving revision 1.14 diff -u -r1.14 pool_timestamp.c --- pool_timestamp.c 6 Sep 2010 08:33:32 -0000 1.14 +++ pool_timestamp.c 26 Jan 2011 04:46:29 -0000 @@ -560,7 +560,7 @@ */ char * rewrite_timestamp(POOL_CONNECTION_POOL *backend, Node *node, - bool rewrite_to_params, PreparedStatement *pstmt) + bool rewrite_to_params, POOL_SENT_MESSAGE *message) { TSRewriteContext ctx; Node *stmt; @@ -630,11 +630,11 @@ rewrite = ctx.rewrite; /* add params */ - if (pstmt) + if (message) { int i; - for (i = 0; i < pstmt->num_tsparams; i++) + for (i = 0; i < message->num_tsparams; i++) { e_stmt->params = lappend(e_stmt->params, ctx.ts_const); rewrite = true; @@ -647,7 +647,7 @@ if (!rewrite) return NULL; - if (ctx.rewrite_to_params && pstmt) + if (ctx.rewrite_to_params && message) { ListCell *lc; int num = ctx.num_params + 1; @@ -660,7 +660,7 @@ } /* save to portal */ - pstmt->num_tsparams = list_length(ctx.params); + message->num_tsparams = list_length(ctx.params); /* add param type */ if (IsA(node, PrepareStmt)) @@ -668,7 +668,7 @@ int i; PrepareStmt *p_stmt = (PrepareStmt *) node; - for (i = 0; i < pstmt->num_tsparams; i++) + for (i = 0; i < message->num_tsparams; i++) p_stmt->argtypes = lappend(p_stmt->argtypes, SystemTypeName("timestamptz")); } @@ -694,8 +694,9 @@ * rewrite Bind message to add parameter */ char * -bind_rewrite_timestamp(POOL_CONNECTION_POOL *backend, Portal *portal, - const char *orig_msg, int *len) +bind_rewrite_timestamp(POOL_CONNECTION_POOL *backend, + POOL_SENT_MESSAGE *message, + const char *orig_msg, int *len) { int16 tmp2, num_params, @@ -726,8 +727,8 @@ ts_len = strlen(ts); - *len += (strlen(ts) + sizeof(int32)) * portal->num_tsparams; - new_msg = copy_to = (char *) malloc(*len + portal->num_tsparams * sizeof(int16)); + *len += (strlen(ts) + sizeof(int32)) * message->num_tsparams; + new_msg = copy_to = (char *) malloc(*len + message->num_tsparams * sizeof(int16)); copy_from = orig_msg; /* portal_name */ @@ -748,8 +749,8 @@ if (num_formats > 1) { /* enlarge message length */ - *len += portal->num_tsparams * sizeof(int16); - tmp2 += portal->num_tsparams; + *len += message->num_tsparams * sizeof(int16); + tmp2 += message->num_tsparams; } tmp2 = htons(tmp2); memcpy(copy_to, &tmp2, copy_len); /* copy number of format codes */ @@ -763,15 +764,15 @@ if (num_formats > 1) { /* set format codes to zero(text) */ - memset(copy_to, 0, portal->num_tsparams * 2); - copy_to += sizeof(int16) * portal->num_tsparams; + memset(copy_to, 0, message->num_tsparams * 2); + copy_to += sizeof(int16) * message->num_tsparams; } /* num params */ memcpy(&tmp2, copy_from, sizeof(int16)); copy_len = sizeof(int16); num_params = ntohs(tmp2); - tmp2 = htons(num_params + portal->num_tsparams); + tmp2 = htons(num_params + message->num_tsparams); memcpy(copy_to, &tmp2, sizeof(int16)); copy_to += copy_len; copy_from += copy_len; @@ -795,7 +796,7 @@ copy_to += copy_len; copy_from += copy_len; tmp4 = htonl(ts_len); - for (i = 0; i < portal->num_tsparams; i++) + for (i = 0; i < message->num_tsparams; i++) { memcpy(copy_to, &tmp4, sizeof(int32)); copy_to += sizeof(int32); Index: pool_timestamp.h =================================================================== RCS file: /cvsroot/pgpool/pgpool-II/pool_timestamp.h,v retrieving revision 1.2 diff -u -r1.2 pool_timestamp.h --- pool_timestamp.h 9 Jul 2010 01:14:28 -0000 1.2 +++ pool_timestamp.h 26 Jan 2011 04:46:29 -0000 @@ -5,7 +5,7 @@ #include "parser/nodes.h" #include "pool_session_context.h" -char *rewrite_timestamp(POOL_CONNECTION_POOL *backend, Node *node, bool rewrite_to_params, PreparedStatement *pstmt); -char *bind_rewrite_timestamp(POOL_CONNECTION_POOL *backend, Portal *portal, const char *orig_msg, int *len); +char *rewrite_timestamp(POOL_CONNECTION_POOL *backend, Node *node, bool rewrite_to_params, POOL_SENT_MESSAGE *message); +char *bind_rewrite_timestamp(POOL_CONNECTION_POOL *backend, POOL_SENT_MESSAGE *message, const char *orig_msg, int *len); #endif /* POOL_TIMESTAMP_H */