/* -*-pgsql-c-*- */ /* * $Header: /cvsroot/pgpool/pgpool-II/pool_process_query.c,v 1.23.2.22 2007/10/18 02:49:13 y-asaba Exp $ * * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * * Copyright (c) 2003-2007 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby * granted, provided that the above copyright notice appear in all * copies and that both that copyright notice and this permission * notice appear in supporting documentation, and that the name of the * author not be used in advertising or publicity pertaining to * distribution of the software without specific, written prior * permission. The author makes no representations about the * suitability of this software for any purpose. It is provided "as * is" without express or implied warranty. * * pool_process_query.c: query processing stuff * */ #include "config.h" #include #ifdef HAVE_SYS_TYPES_H #include #endif #ifdef HAVE_SYS_TIME_H #include #endif #include #include #include #include #include #include "pool.h" #include "parser/parser.h" #include "parser/pg_list.h" #include "parser/parsenodes.h" #include "pool_rewrite_query.h" #define INIT_STATEMENT_LIST_SIZE 8 #define DEADLOCK_ERROR_CODE "40P01" #define ADMIN_SHUTDOWN_ERROR_CODE "57P01" #define CRASH_SHUTDOWN_ERROR_CODE "57P02" #define POOL_ERROR_QUERY "send invalid query from pgpool to abort transaction" typedef struct { char *portal_name; Node *stmt; } Portal; /* * prepared statement list */ typedef struct { int size; int cnt; Portal **portal_list; } PreparedStatementList; static POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *query); static POOL_STATUS Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); #ifdef NOT_USED static POOL_STATUS Sync(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); #endif static POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int send_ready); static POOL_STATUS CompleteCommandResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS CopyInResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS CopyOutResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS CopyDataRows(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int copyin); static POOL_STATUS CursorResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS EmptyQueryResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static int RowDescription(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS AsciiRow(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, short num_fields); static POOL_STATUS BinaryRow(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, short num_fields); static POOL_STATUS FunctionCall(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS FunctionResultResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS send_extended_protocol_message(POOL_CONNECTION_POOL *backend, int node_id, char *kind, int len, char *string); static POOL_STATUS send_execute_message(POOL_CONNECTION_POOL *backend, int node_id, int len, char *string); static int synchronize(POOL_CONNECTION *cp); static void process_reporting(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt); static int is_select_query(Node *node, char *sql); static int is_sequence_query(Node *node); static int load_balance_enabled(POOL_CONNECTION_POOL *backend, Node* node, char *sql); static void start_load_balance(POOL_CONNECTION_POOL *backend); static void end_load_balance(POOL_CONNECTION_POOL *backend); static POOL_STATUS do_command(POOL_CONNECTION *backend, char *query, int protoMajor, int no_ready_for_query); static POOL_STATUS do_error_command(POOL_CONNECTION *backend, int protoMajor); static int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query, Node *node); static POOL_STATUS insert_lock(POOL_CONNECTION_POOL *backend, char *query, InsertStmt *node); static char *get_insert_command_table_name(InsertStmt *node); static void add_prepared_list(PreparedStatementList *p, Portal *portal); static void add_unnamed_portal(PreparedStatementList *p, Portal *portal); static void del_prepared_list(PreparedStatementList *p, Portal *portal); static void reset_prepared_list(PreparedStatementList *p); static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p, int n); static char *parse_copy_data(char *buf, int len, char delimiter, int col_id); static Portal *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name); static Portal *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name); static int detect_error(POOL_CONNECTION *master, char *error_code, int major, bool unread); static int detect_deadlock_error(POOL_CONNECTION *master, int major); static int detect_postmaster_down_error(POOL_CONNECTION *master, int major); #ifdef NOT_USED static POOL_CONNECTION_POOL_SLOT *slots[MAX_CONNECTION_SLOTS]; #endif int in_load_balance; /* non 0 if in load balance mode */ int selected_slot; /* selected DB node */ int master_slave_dml; /* non 0 if master/slave mode is specified in config file */ static int replication_was_enabled; /* replication mode was enabled */ static int master_slave_was_enabled; /* master/slave mode was enabled */ static int internal_transaction_started; /* to issue table lock command a transaction has been started internally */ static char *copy_table = NULL; /* copy table name */ static char *copy_schema = NULL; /* copy table name */ static char copy_delimiter; /* copy delimiter char */ static char *copy_null = NULL; /* copy null string */ static void (*pending_function)(PreparedStatementList *p, Portal *portal) = NULL; static Portal *pending_prepared_portal = NULL; static Portal *unnamed_statement = NULL; static Portal *unnamed_portal = NULL; static int select_in_transaction = 0; /* non 0 if select query is in transaction */ static PreparedStatementList prepared_list; /* prepared statement name list */ static int is_drop_database(Node *node); /* returns non 0 if this is a DROP DATABASE command */ static int is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static int is_strict_query(Node *node); /* returns non 0 if this is strict query */ static int check_copy_from_stdin(Node *node); /* returns non 0 if this is a COPY FROM STDIN */ static POOL_STATUS read_kind_from_one_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *kind, int node); static POOL_STATUS read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *kind); static POOL_STATUS ParallelForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *database, bool send_to_frontend); static void query_ps_status(char *query, POOL_CONNECTION_POOL *backend); /* show ps status */ static int is_select_pgcatalog = 0; static int is_select_for_update = 0; /* also for SELECT ... INTO */ static char *parsed_query = NULL; static POOL_MEMORY_POOL *prepare_memory_context = NULL; static void query_cache_register(char kind, POOL_CONNECTION *frontend, char *database, char *data, int data_len); POOL_STATUS pool_process_query(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int connection_reuse, int first_ready_for_query_received) { char kind; /* packet kind (backend) */ char fkind; /* packet kind (frontend) */ short num_fields = 0; fd_set readmask; fd_set writemask; fd_set exceptmask; int fds; POOL_STATUS status; int state; /* 0: ok to issue commands 1: waiting for "ready for query" response */ int qcnt; int i; frontend->no_forward = connection_reuse; qcnt = 0; state = 0; for (;;) { kind = 0; fkind = 0; if (state == 0 && connection_reuse) { int st; /* send query for resetting connection such as "ROLLBACK" "RESET ALL"... */ st = reset_backend(backend, qcnt); if (st < 0) /* error? */ { /* probably we don't need this, since caller will * close the connection to frontend after returning with POOL_END. But I * guess I would like to be a paranoid... */ frontend->no_forward = 0; return POOL_END; } else if (st == 0) /* no query issued? */ { qcnt++; continue; } else if (st == 1) /* more query remains */ { state = 1; qcnt++; continue; } else /* no more query(st == 2) */ { frontend->no_forward = 0; return POOL_CONTINUE; } } /* * if all backends and frontend do not have any pending data * in the receiving data cache, then issue select(2) to wait for new data arrival */ if (is_cache_empty(frontend, backend)) { struct timeval timeout; int num_fds, was_error = 0; timeout.tv_sec = 1; timeout.tv_usec = 0; FD_ZERO(&readmask); FD_ZERO(&writemask); FD_ZERO(&exceptmask); if (!connection_reuse) { FD_SET(frontend->fd, &readmask); FD_SET(frontend->fd, &exceptmask); } num_fds = 0; if (!VALID_BACKEND(backend->info->load_balancing_node)) { /* select load balancing node */ backend->info->load_balancing_node = select_load_balancing_node(); } for (i=0;ifd + 1, num_fds); FD_SET(CONNECTION(backend, i)->fd, &readmask); FD_SET(CONNECTION(backend, i)->fd, &exceptmask); } } if (connection_reuse) { num_fds = Max(frontend->fd + 1, num_fds); } pool_debug("pool_process_query: num_fds: %d", num_fds); fds = select(num_fds, &readmask, &writemask, &exceptmask, NULL); if (fds == -1) { if (errno == EINTR) continue; pool_error("select() failed. reason: %s", strerror(errno)); return POOL_ERROR; } if (fds == 0) { return POOL_CONTINUE; } for (i = 0; i < NUM_BACKENDS; i++) { if (VALID_BACKEND(i) && FD_ISSET(CONNECTION(backend, i)->fd, &readmask)) { if (detect_postmaster_down_error(CONNECTION(backend, i), MAJOR(backend))) { was_error = 1; if (!VALID_BACKEND(i)) break; notice_backend_error(i); sleep(5); break; } status = read_kind_from_backend(frontend, backend, &kind); if (status != POOL_CONTINUE) return status; break; } } if (was_error) continue; if (!connection_reuse) { if (FD_ISSET(frontend->fd, &exceptmask)) return POOL_END; else if (FD_ISSET(frontend->fd, &readmask)) { status = ProcessFrontendResponse(frontend, backend); if (status != POOL_CONTINUE) return status; continue; } if (kind == 0) continue; } if (FD_ISSET(MASTER(backend)->fd, &exceptmask)) { return POOL_ERROR; } } else { if (frontend->len > 0) { status = ProcessFrontendResponse(frontend, backend); if (status != POOL_CONTINUE) return status; continue; } } /* this is the synchronous point */ if (kind == 0) { status = read_kind_from_backend(frontend, backend, &kind); if (status != POOL_CONTINUE) return status; } first_ready_for_query_received = 0; /* * Prrocess backend Response */ /* * Sanity check */ if (kind == 0) { pool_error("pool_process_query: kind is 0!"); return POOL_ERROR; } pool_debug("pool_process_query: kind from backend: %c", kind); if (MAJOR(backend) == PROTO_MAJOR_V3) { switch (kind) { case 'G': /* CopyIn response */ status = CopyInResponse(frontend, backend); break; case 'S': /* Paramter Status */ status = ParameterStatus(frontend, backend); break; case 'Z': /* Ready for query */ status = ReadyForQuery(frontend, backend, 1); break; default: status = SimpleForwardToFrontend(kind, frontend, backend); break; } } else { switch (kind) { case 'A': /* Notification response */ status = NotificationResponse(frontend, backend); break; case 'B': /* BinaryRow */ status = BinaryRow(frontend, backend, num_fields); break; case 'C': /* Complete command response */ status = CompleteCommandResponse(frontend, backend); break; case 'D': /* AsciiRow */ status = AsciiRow(frontend, backend, num_fields); break; case 'E': /* Error Response */ status = ErrorResponse(frontend, backend); break; case 'G': /* CopyIn Response */ status = CopyInResponse(frontend, backend); break; case 'H': /* CopyOut Response */ status = CopyOutResponse(frontend, backend); break; case 'I': /* Empty Query Response */ status = EmptyQueryResponse(frontend, backend); break; case 'N': /* Notice Response */ status = NoticeResponse(frontend, backend); break; case 'P': /* CursorResponse */ status = CursorResponse(frontend, backend); break; case 'T': /* RowDescription */ status = RowDescription(frontend, backend); if (status < 0) return POOL_ERROR; num_fields = status; status = POOL_CONTINUE; break; case 'V': /* FunctionResultResponse and FunctionVoidResponse */ status = FunctionResultResponse(frontend, backend); break; case 'Z': /* Ready for query */ status = ReadyForQuery(frontend, backend, 1); break; default: pool_error("Unknown message type %c(%02x)", kind, kind); exit(1); } } if (status != POOL_CONTINUE) return status; if (kind == 'Z' && frontend->no_forward && state == 1) { state = 0; } } return POOL_CONTINUE; } POOL_STATUS pool_parallel_exec(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *string, Node *node,bool send_to_frontend) { int len; int fds; int i; char kind; fd_set readmask; fd_set writemask; fd_set exceptmask; static char *sq = "show pool_status"; POOL_STATUS status; struct timeval timeout; int num_fds; int used_count = 0; int error_flag = 0; timeout.tv_sec = 1; timeout.tv_usec = 0; len = strlen(string) + 1; if (is_drop_database(node)) { int stime = 5; /* XXX give arbitary time to allow closing idle connections */ pool_debug("Query: sending HUP signal to parent"); kill(getppid(), SIGHUP); /* send HUP signal to parent */ /* we need to loop over here since we will get HUP signal while sleeping */ while (stime > 0) { stime = sleep(stime); } } /* process status reporting? */ if (strncasecmp(sq, string, strlen(sq)) == 0) { pool_debug("process reporting"); process_reporting(frontend, backend); return POOL_CONTINUE; } /* In this loop,forward the query to the backend */ for (i=0;ireplication_strict && !NO_STRICT_MODE(string) && is_strict_query(node)) || STRICT_MODE(string)) { pool_debug("waiting for backend %d completing the query", i); if (synchronize(CONNECTION(backend, i))) return POOL_END; } } if (!is_cache_empty(frontend, backend)) { return POOL_END; } /* In this loop,get data from backend */ for (;;) { FD_ZERO(&readmask); FD_ZERO(&writemask); FD_ZERO(&exceptmask); num_fds = 0; for (i=0;ifd + 1, num_fds); FD_SET(CONNECTION(backend, i)->fd, &readmask); FD_SET(CONNECTION(backend, i)->fd, &exceptmask); } } pool_debug("pool_parallel_query: num_fds: %d", num_fds); fds = select(num_fds, &readmask, &writemask, &exceptmask, NULL); if (fds == -1) { if (errno == EINTR) continue; pool_error("select() failed. reason: %s", strerror(errno)); return POOL_ERROR; } if (fds == 0) { return POOL_CONTINUE; } /* get header of protcol */ for (i=0;ifd, &readmask)) { continue; } else { status = read_kind_from_one_backend(frontend, backend, &kind,i); if (status != POOL_CONTINUE) return status; if (used_count == 0) { status = ParallelForwardToFrontend(kind, frontend, CONNECTION(backend, i), backend->info->database, send_to_frontend); pool_debug("pool_parallel_exec: kind from backend: %c", kind); } else { status = ParallelForwardToFrontend(kind, frontend, CONNECTION(backend, i), backend->info->database, false); pool_debug("pool_parallel_exec: dummy kind from backend: %c", kind); } if (status != POOL_CONTINUE) return status; if(kind == 'C' || kind == 'E' || kind == 'c') { if(used_count == NUM_BACKENDS -1) return POOL_CONTINUE; used_count++; continue; } /* get body of protcol */ for(;;) { if (pool_read(CONNECTION(backend, i), &kind, 1) < 0) { pool_error("pool_parallel_exec: failed to read kind from %d th backend", i); return POOL_ERROR; } /* * Sanity check */ if (kind == 0) { pool_error("pool_parallel_exec: kind is 0!"); return POOL_ERROR; } if((kind == 'E' ) && used_count != NUM_BACKENDS -1) { if(error_flag ==0) { pool_debug("pool_parallel_exec: kind from backend: %c", kind); status = ParallelForwardToFrontend(kind, frontend, CONNECTION(backend, i), backend->info->database, send_to_frontend); error_flag++; } else { pool_debug("pool_parallel_exec: dummy from backend: %c", kind); status = ParallelForwardToFrontend(kind, frontend, CONNECTION(backend, i), backend->info->database, false); } used_count++; break; } if((kind == 'c' || kind == 'C') && used_count != NUM_BACKENDS -1) { pool_debug("pool_parallel_exec: dummy from backend: %c", kind); status = ParallelForwardToFrontend(kind, frontend, CONNECTION(backend, i), backend->info->database, false); used_count++; break; } if((kind == 'C' || kind == 'c' || kind == 'E') && used_count == NUM_BACKENDS -1) { if(error_flag == 0) { pool_debug("pool_parallel_exec: kind from backend: %c", kind); status = ParallelForwardToFrontend(kind, frontend, CONNECTION(backend, i), backend->info->database, send_to_frontend); } else { pool_debug("pool_parallel_exec: dummy from backend: %c", kind); status = ParallelForwardToFrontend(kind, frontend, CONNECTION(backend, i), backend->info->database, false); } return POOL_CONTINUE; } pool_debug("pool_parallel_exec: kind from backend: %c", kind); status = ParallelForwardToFrontend(kind, frontend, CONNECTION(backend, i), backend->info->database, send_to_frontend); if (status != POOL_CONTINUE) { return status; } else { pool_flush(frontend); } } } } } } static POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *query) { char *string, *string1; int len; static char *sq = "show pool_status"; int i; List *parse_tree_list; Node *node, *node1; POOL_STATUS status; int force_replication; /* non 0 if force to replicate query */ int deadlock_detected = 0, checked = 0; force_replication = 0; if (query == NULL) /* need to read query from frontend? */ { /* read actual query */ if (MAJOR(backend) == PROTO_MAJOR_V3) { if (pool_read(frontend, &len, sizeof(len)) < 0) return POOL_END; len = ntohl(len) - 4; string = pool_read2(frontend, len); } else string = pool_read_string(frontend, &len, 0); if (string == NULL) return POOL_END; } else { len = strlen(query)+1; string = query; if (DUAL_MODE) force_replication = 1; } /* show ps status */ query_ps_status(string, backend); /* log query to log file if neccessary */ if (pool_config->log_statement) { pool_log("statement: %s", string); } else { pool_debug("statement2: %s", string); } parse_tree_list = raw_parser(string); if (parse_tree_list != NIL) { node = (Node *) lfirst(list_head(parse_tree_list)); if (pool_config->enable_query_cache && SYSDB_STATUS == CON_UP && IsA(node, SelectStmt) && !(is_select_pgcatalog = IsSelectpgcatalog(node, backend))) { SelectStmt *select = (SelectStmt *)node; if (! (select->into || select->lockingClause)) { parsed_query = strdup(nodeToString(node)); if (parsed_query == NULL) { pool_error("pool_process_query: malloc failed"); return POOL_ERROR; } if (parsed_query) { if (pool_query_cache_lookup(frontend, parsed_query, backend->info->database, TSTATE(backend)) == POOL_CONTINUE) { free(parsed_query); parsed_query = NULL; free_parser(); return POOL_CONTINUE; } } is_select_for_update = 0; } else { is_select_for_update = 1; } } if (pool_config->parallel_mode) { char *parallel_query = NULL; /* Do select pool_parallel ? */ parallel_query = is_parallel_query(node,backend); if (parallel_query) { POOL_STATUS stats = pool_parallel_exec(frontend,backend,parallel_query, node,true); free_parser(); return stats; } /* rewrite_query_phase */ { RewriteQuery *r_query = rewrite_query_stmt(node,frontend,backend); if(r_query->type == T_InsertStmt) { free_parser(); return r_query->status; } else if(r_query->type == T_SelectStmt) { free_parser(); return r_query->status; } } } /* check COPY FROM STDIN * if true, set copy_* variable */ check_copy_from_stdin(node); /* * if this is DROP DATABASE command, send HUP signal to parent and * ask it to close all idle connections. * XXX This is overkill. It would be better to close the idle * connection for the database which DROP DATABASE command tries * to drop. This is impossible at this point, since we have no way * to pass such info to other processes. */ if (is_drop_database(node)) { int stime = 5; /* XXX give arbitary time to allow closing idle connections */ pool_debug("Query: sending HUP signal to parent"); kill(getppid(), SIGHUP); /* send HUP signal to parent */ /* we need to loop over here since we will get HUP signal while sleeping */ while (stime > 0) { stime = sleep(stime); } } /* process status reporting? */ if (strncasecmp(sq, string, strlen(sq)) == 0) { StartupPacket *sp; char psbuf[1024]; pool_debug("process reporting"); process_reporting(frontend, backend); /* show ps status */ sp = MASTER_CONNECTION(backend)->sp; snprintf(psbuf, sizeof(psbuf), "%s %s %s idle", sp->user, sp->database, remote_ps_data); set_ps_display(psbuf, false); free_parser(); return POOL_CONTINUE; } if (IsA(node, PrepareStmt) || IsA(node, DeallocateStmt) || IsA(node, VariableSetStmt)) { if (DUAL_MODE) force_replication = 1; if (frontend) { POOL_MEMORY_POOL *old_context; Portal *portal; if (prepare_memory_context == NULL) { prepare_memory_context = pool_memory_create(); if (prepare_memory_context == NULL) { pool_error("Simple Query: pool_memory_create() failed"); return POOL_ERROR; } } /* switch memory context */ old_context = pool_memory; pool_memory = prepare_memory_context; if (IsA(node, PrepareStmt)) { pending_function = add_prepared_list; portal = malloc(sizeof(Portal)); portal->portal_name = NULL; portal->stmt = copyObject(node); pending_prepared_portal = portal; } else if (IsA(node, DeallocateStmt)) { pending_function = del_prepared_list; portal = malloc(sizeof(Portal)); portal->portal_name = NULL; portal->stmt = copyObject(node); pending_prepared_portal = portal; } /* switch old memory context */ pool_memory = old_context; } } if (frontend && IsA(node, ExecuteStmt)) { Portal *portal; PrepareStmt *p_stmt; ExecuteStmt *e_stmt = (ExecuteStmt *)node; portal = lookup_prepared_statement_by_statement(&prepared_list, e_stmt->name); if (!portal) { string1 = string; node1 = node; } else { p_stmt = (PrepareStmt *)portal->stmt; string1 = nodeToString(p_stmt->query); node1 = (Node *)p_stmt->query; } } else { string1 = string; node1 = node; } /* load balance trick */ if (load_balance_enabled(backend, node1, string1)) start_load_balance(backend); else if (MASTER_SLAVE && !force_replication) { pool_debug("SimpleQuery: set master_slave_dml query: %s", string); master_slave_was_enabled = 1; MASTER_SLAVE = 0; master_slave_dml = 1; } else if (REPLICATION && !pool_config->replicate_select && is_select_query(node1, string1) && !is_sequence_query(node1)) { selected_slot = MASTER_NODE_ID; replication_was_enabled = 1; REPLICATION = 0; LOAD_BALANCE_STATUS(MASTER_NODE_ID) = LOAD_SELECTED; in_load_balance = 1; select_in_transaction = 1; } /* * determine if we need to lock the table * to keep SERIAL data consistency among servers * conditions: * - replication is enabled * - protocol is V3 * - statement is INSERT * - either "INSERT LOCK" comment exists or insert_lock directive specified */ if (REPLICATION && need_insert_lock(backend, string, node)) { /* start a transaction if needed and lock the table */ status = insert_lock(backend, string, (InsertStmt *)node); if (status != POOL_CONTINUE) { free_parser(); return status; } } } free_parser(); for (i=0;ireplication_strict && !NO_STRICT_MODE(string)) || STRICT_MODE(string)) { pool_debug("waiting for backend %d completing the query", i); if (synchronize(CONNECTION(backend, i))) return POOL_END; if (!checked) { /* * We must check deadlock error because a aborted transaction * by detecting deadlock isn't same on all nodes. * If a transaction is aborted on master node, pgpool send a * error query to another nodes. */ deadlock_detected = detect_deadlock_error(CONNECTION(backend, i), MAJOR(backend)); if (deadlock_detected < 0) return POOL_END; checked = 1; } } } return POOL_CONTINUE; } /* * process EXECUTE (V3 only) */ static POOL_STATUS Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char *string; /* portal name + null terminate + max_tobe_returned_rows */ int len; int sendlen; int i; char kind; int status; Portal *portal; char *string1; PrepareStmt *p_stmt; int deadlock_detected = 0, checked = 0; /* read Execute packet */ if (pool_read(frontend, &len, sizeof(len)) < 0) return POOL_END; len = ntohl(len) - 4; string = pool_read2(frontend, len); pool_debug("Execute: portal name <%s>", string); portal = lookup_prepared_statement_by_portal(&prepared_list, string); /* load balance trick */ if (portal) { p_stmt = (PrepareStmt *)portal->stmt; pool_memory = pool_memory_create(); string1 = nodeToString(p_stmt->query); if (load_balance_enabled(backend, (Node *)p_stmt->query, string1)) start_load_balance(backend); else if (REPLICATION && !pool_config->replicate_select && is_select_query((Node *)p_stmt->query, string1) && !is_sequence_query((Node *)p_stmt->query)) { selected_slot = MASTER_NODE_ID; replication_was_enabled = 1; REPLICATION = 0; LOAD_BALANCE_STATUS(MASTER_NODE_ID) = LOAD_SELECTED; in_load_balance = 1; select_in_transaction = 1; } else if (MASTER_SLAVE && !IsA((Node *)p_stmt->query, TransactionStmt)) { master_slave_was_enabled = 1; MASTER_SLAVE = 0; master_slave_dml = 1; } pool_memory_delete(pool_memory); } else if (MASTER_SLAVE) { master_slave_was_enabled = 1; MASTER_SLAVE = 0; master_slave_dml = 1; } for (i=0;ireplication_strict) { pool_debug("waiting for backend completing the query"); if (synchronize(cp)) return POOL_END; if (!checked) { /* * We must check deadlock error because a aborted transaction * by detecting deadlock isn't same on all nodes. * If a transaction is aborted on master node, pgpool send a * error query to another nodes. */ deadlock_detected = detect_deadlock_error(CONNECTION(backend, i), MAJOR(backend)); if (deadlock_detected < 0) return POOL_END; checked = 1; } } } while ((kind = pool_read_kind(backend)), (kind != 'C' && kind != 'E' && kind != 'I' && kind != 's')) { if (kind < 0) { pool_error("Execute: pool_read_kind error"); return POOL_ERROR; } status = SimpleForwardToFrontend(kind, frontend, backend); if (status != POOL_CONTINUE) return status; pool_flush(frontend); } status = SimpleForwardToFrontend(kind, frontend, backend); if (status != POOL_CONTINUE) return status; pool_flush(frontend); /* end load balance mode */ if (in_load_balance) end_load_balance(backend); if (master_slave_dml) { MASTER_SLAVE = 1; master_slave_was_enabled = 0; master_slave_dml = 0; } return POOL_CONTINUE; } /* * Extended query protocol has to send Flush message. */ static POOL_STATUS send_extended_protocol_message(POOL_CONNECTION_POOL *backend, int node_id, char *kind, int len, char *string) { POOL_CONNECTION *cp = CONNECTION(backend, node_id); int sendlen; /* forward the query to the backend */ pool_write(cp, kind, 1); sendlen = htonl(len + 4); pool_write(cp, &sendlen, sizeof(sendlen)); pool_write(cp, string, len); /* * send "Flush" message so that backend notices us * the completion of the command */ pool_write(cp, "H", 1); sendlen = htonl(4); if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0) { return POOL_ERROR; } return POOL_CONTINUE; } static POOL_STATUS send_execute_message(POOL_CONNECTION_POOL *backend, int node_id, int len, char *string) { return send_extended_protocol_message(backend, node_id, "E", len, string); } /* * process EXECUTE (V3 only) */ static POOL_STATUS Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char kind; int len; char *string; int i; Portal *portal; POOL_MEMORY_POOL *old_context; PrepareStmt *p_stmt; char *name, *stmt; List *parse_tree_list; Node *node = NULL; int deadlock_detected = 0; int checked = 0; /* read Parse packet */ if (pool_read(frontend, &len, sizeof(len)) < 0) return POOL_END; len = ntohl(len) - 4; string = pool_read2(frontend, len); pool_debug("Parse: portal name <%s>", string); name = string; stmt = string + strlen(string) + 1; parse_tree_list = raw_parser(stmt); if (parse_tree_list == NIL) { free_parser(); } else { node = (Node *) lfirst(list_head(parse_tree_list)); if (prepare_memory_context == NULL) { prepare_memory_context = pool_memory_create(); if (prepare_memory_context == NULL) { pool_error("Simple Query: pool_memory_create() failed"); return POOL_ERROR; } } /* switch memory context */ old_context = pool_memory; pool_memory = prepare_memory_context; portal = malloc(sizeof(Portal)); /* translate Parse message to PrepareStmt */ p_stmt = palloc(sizeof(PrepareStmt)); p_stmt->type = T_PrepareStmt; p_stmt->name = pstrdup(name); p_stmt->query = copyObject(node); portal->stmt = (Node *)p_stmt; portal->portal_name = NULL; if (*name) { pending_function = add_prepared_list; pending_prepared_portal = portal; } else /* unnamed statement */ { pending_function = add_unnamed_portal; pfree(p_stmt->name); p_stmt->name = NULL; pending_prepared_portal = portal; } /* switch old memory context */ pool_memory = old_context; free_parser(); } for (i = 0; i < NUM_BACKENDS; i++) { POOL_CONNECTION *cp; if (!VALID_BACKEND(i)) continue; cp = CONNECTION(backend, i); if (deadlock_detected) { int sendlen; pool_write(cp, "Q", 1); len = strlen(POOL_ERROR_QUERY) + 1; sendlen = htonl(len + 4); pool_write(cp, &sendlen, sizeof(sendlen)); pool_write_and_flush(cp, POOL_ERROR_QUERY, len); } else if (send_extended_protocol_message(backend, i, "P", len, string)) return POOL_END; if (pool_config->replication_strict) { pool_debug("waiting for backend completing the query"); if (synchronize(cp)) return POOL_END; if (!checked) { /* * We must check deadlock error because a aborted transaction * by detecting deadlock isn't same on all nodes. * If a transaction is aborted on master node, pgpool send a * error query to another nodes. */ deadlock_detected = detect_deadlock_error(cp, MAJOR(backend)); if (deadlock_detected < 0) return POOL_END; checked = 1; } } } for (;;) { kind = pool_read_kind(backend); if (kind < 0) { pool_error("SimpleForwardToBackend: pool_read_kind error"); return POOL_ERROR; } SimpleForwardToFrontend(kind, frontend, backend); if (pool_flush(frontend) < 0) return POOL_ERROR; if (kind != 'N') break; } return POOL_CONTINUE; } #ifdef NOT_USED /* * process Sync (V3 only) */ static POOL_STATUS Sync(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char *string; /* portal name + null terminate + max_tobe_returned_rows */ int len; int sendlen; /* read Sync packet */ if (pool_read(frontend, &len, sizeof(len)) < 0) return POOL_END; len = ntohl(len) - 4; string = pool_read2(frontend, len); /* forward the query to the backend */ pool_write(MASTER(backend), "S", 1); sendlen = htonl(len + 4); pool_write(MASTER(backend), &sendlen, sizeof(sendlen)); if (pool_write_and_flush(MASTER(backend), string, len) < 0) { return POOL_END; } if (REPLICATION) { /* * in "strict mode" we need to wait for master completing the query. * note that this is not applied if "NO STRICT" is specified as a comment. */ if (pool_config->replication_strict) { pool_debug("waiting for master completing the query"); if (synchronize(MASTER(backend))) return POOL_END; } pool_write(SECONDARY(backend), "S", 1); sendlen = htonl(len + 4); pool_write(SECONDARY(backend), &sendlen, sizeof(sendlen)); if (pool_write_and_flush(SECONDARY(backend), string, len) < 0) { return POOL_END; } /* in "strict mode" we need to wait for secondary completing the query */ if (pool_config->replication_strict) if (synchronize(SECONDARY(backend))) return POOL_END; } return POOL_CONTINUE; } #endif static POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int send_ready) { StartupPacket *sp; char psbuf[1024]; int i; /* if a transaction is started for insert lock, we need to close it. */ if (internal_transaction_started) { int len; signed char state; if ((len = pool_read_message_length(backend)) < 0) return POOL_END; pool_debug("ReadyForQuery: message length: %d", len); len = htonl(len); state = pool_read_kind(backend); if (state < 0) return POOL_END; /* set transaction state */ pool_debug("ReadyForQuery: transaction state: %c", state); for (i=0;itstate = state; if (do_command(CONNECTION(backend, i), "COMMIT", PROTO_MAJOR_V3, 1) != POOL_CONTINUE) return POOL_ERROR; } internal_transaction_started = 0; } pool_flush(frontend); if (send_ready) { pool_write(frontend, "Z", 1); if (MAJOR(backend) == PROTO_MAJOR_V3) { int len; signed char state; if ((len = pool_read_message_length(backend)) < 0) return POOL_END; pool_debug("ReadyForQuery: message length: %d", len); len = htonl(len); pool_write(frontend, &len, sizeof(len)); state = pool_read_kind(backend); if (state < 0) return POOL_END; /* set transaction state */ pool_debug("ReadyForQuery: transaction state: %c", state); for (i=0;itstate = state; } pool_write(frontend, &state, 1); } if (pool_flush(frontend)) return POOL_END; } /* end load balance mode */ if (in_load_balance) end_load_balance(backend); if (master_slave_dml) { MASTER_SLAVE = 1; master_slave_was_enabled = 0; master_slave_dml = 0; } #ifdef NOT_USED return ProcessFrontendResponse(frontend, backend); #endif sp = MASTER_CONNECTION(backend)->sp; if (MASTER(backend)->tstate == 'T') snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction", sp->user, sp->database, remote_ps_data); else snprintf(psbuf, sizeof(psbuf), "%s %s %s idle", sp->user, sp->database, remote_ps_data); set_ps_display(psbuf, false); return POOL_CONTINUE; } static POOL_STATUS CompleteCommandResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { int i; char *string = NULL; char *string1 = NULL; int len, len1 = 0; /* read command tag */ string = pool_read_string(MASTER(backend), &len, 0); if (string == NULL) return POOL_END; len1 = len; string1 = strdup(string); for (i=0;i 0 */ if (size > 0) { buf = pool_read2(CONNECTION(backend, j), size); if (buf == NULL) return POOL_END; if (IS_MASTER_NODE_ID(j)) { pool_write(frontend, buf, size); snprintf(msgbuf, Min(sizeof(msgbuf), size+1), "%s", buf); pool_debug("AsciiRow: len: %d data: %s", size, msgbuf); } } } } mask >>= 1; } return POOL_CONTINUE; } static POOL_STATUS BinaryRow(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, short num_fields) { static char nullmap[8192], nullmap1[8192]; int nbytes; int i, j; unsigned char mask; int size, size1 = 0; char *buf = NULL; pool_write(frontend, "B", 1); nbytes = (num_fields + 7)/8; if (nbytes <= 0) return POOL_CONTINUE; /* NULL map */ pool_read(MASTER(backend), nullmap, nbytes); if (pool_write(frontend, nullmap, nbytes) < 0) return POOL_END; memcpy(nullmap1, nullmap, nbytes); for (i=0;i 0 */ if (size > 0) { buf = pool_read2(CONNECTION(backend, j), size); if (buf == NULL) return POOL_END; if (IS_MASTER_NODE_ID(j)) { pool_write(frontend, buf, size); } } } mask >>= 1; } } return POOL_CONTINUE; } static POOL_STATUS CursorResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char *string = NULL; char *string1 = NULL; int len, len1 = 0; int i; /* read cursor name */ string = pool_read_string(MASTER(backend), &len, 0); if (string == NULL) return POOL_END; len1 = len; string1 = strdup(string); for (i=0;iparallel_mode == TRUE) { info = pool_get_dist_def_info(MASTER_CONNECTION(backend)->sp->database, copy_schema, copy_table); } for (;;) { if (copyin) { if (MAJOR(backend) == PROTO_MAJOR_V3) { char kind; int sendlen; char *p, *p1; if (pool_read(frontend, &kind, 1) < 0) return POOL_END; if (info && kind == 'd') { int id; if (pool_read(frontend, &sendlen, sizeof(sendlen))) { return POOL_END; } len = ntohl(sendlen) - 4; if (len <= 0) return POOL_CONTINUE; p = pool_read2(frontend, len); if (p == NULL) return POOL_END; /* copy end ? */ if (len == 3 && memcmp(p, "\\.\n", 3) == 0) { for (i=0;idist_key_col_id); if (!p1) { pool_error("CopyDataRow: cannot parse data"); return POOL_END; } else if (strcmp(p1, copy_null) == 0) { pool_error("CopyDataRow: key parameter is NULL"); free(p1); return POOL_END; } id = pool_get_id(info, p1); pool_debug("CopyDataRow: copying id: %d", id); free(p1); if (!VALID_BACKEND(id)) { exit(1); } if (pool_write(CONNECTION(backend, id), &kind, 1)) { return POOL_END; } if (pool_write(CONNECTION(backend, id), &sendlen, sizeof(sendlen))) { return POOL_END; } if (pool_write_and_flush(CONNECTION(backend, id), p, len)) { return POOL_END; } } } else { SimpleForwardToBackend(kind, frontend, backend); } /* CopyData? */ if (kind == 'd') continue; else { pool_debug("CopyDataRows: copyin kind other than d (%c)", kind); break; } } else string = pool_read_string(frontend, &len, 1); } else { /* CopyOut */ if (MAJOR(backend) == PROTO_MAJOR_V3) { signed char kind; if ((kind = pool_read_kind(backend)) < 0) return POOL_END; SimpleForwardToFrontend(kind, frontend, backend); /* CopyData? */ if (kind == 'd') continue; else break; } else { for (i=0;ireplication_strict) { if (synchronize(CONNECTION(backend, i))) return POOL_END; } } } } else if (pool_flush(frontend) <0) return POOL_END; return POOL_CONTINUE; } static POOL_STATUS EmptyQueryResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char c; int i; for (i=0;ilen <= 0 && frontend->no_forward != 0) return POOL_CONTINUE; if (pool_read(frontend, &fkind, 1) < 0) { pool_error("ProcessFrontendResponse: failed to read kind from frontend. fronend abnormally exited"); return POOL_ERROR; } pool_debug("read kind from frontend %c(%02x)", fkind, fkind); switch (fkind) { case 'X': if (MAJOR(backend) == PROTO_MAJOR_V3) { int len; pool_read(frontend, &len, sizeof(len)); } return POOL_END; case 'Q': status = SimpleQuery(frontend, backend, NULL); break; /* case 'S': status = Sync(frontend, backend); break; */ case 'E': status = Execute(frontend, backend); break; case 'P': status = Parse(frontend, backend); break; default: if (MAJOR(backend) == PROTO_MAJOR_V3) { status = SimpleForwardToBackend(fkind, frontend, backend); for (i=0;ireplication_timeout; } /* * disable read timeout */ void pool_disable_timeout() { timeoutmsec = 0; } /* * wait until read data is ready */ static int synchronize(POOL_CONNECTION *cp) { return pool_check_fd(cp, 1); } /* * wait until read data is ready * if notimeout is non 0, wait forever. */ int pool_check_fd(POOL_CONNECTION *cp, int notimeout) { fd_set readmask; fd_set exceptmask; int fd; int fds; struct timeval timeout; struct timeval *tp; fd = cp->fd; for (;;) { FD_ZERO(&readmask); FD_ZERO(&exceptmask); FD_SET(fd, &readmask); FD_SET(fd, &exceptmask); if (notimeout || timeoutmsec == 0) tp = NULL; else { timeout.tv_sec = pool_config->replication_timeout / 1000; timeout.tv_usec = (pool_config->replication_timeout - (timeout.tv_sec * 1000))*1000; tp = &timeout; } fds = select(fd+1, &readmask, NULL, &exceptmask, tp); if (fds == -1) { if (errno == EAGAIN || errno == EINTR) continue; pool_error("pool_check_fd: select() failed. reason %s", strerror(errno)); break; } if (FD_ISSET(fd, &exceptmask)) { pool_error("pool_check_fd: exception occurred"); break; } if (fds == 0) { pool_error("pool_check_fd: data is not ready tp->tv_sec %d tp->tp_usec %d", pool_config->replication_timeout / 1000, (pool_config->replication_timeout - (timeout.tv_sec * 1000))*1000); break; } return 0; } return -1; } static void process_reporting(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { static char *cursorname = "blank"; static short num_fields = 3; static char *field_names[] = {"item", "value", "description"}; static int oid = 0; static short fsize = -1; static int mod = 0; short n; int i, j; short s; int len; short colnum; static char nullmap[2] = {0xff, 0xff}; int nbytes = (num_fields + 7)/8; #define POOLCONFIG_MAXNAMELEN 32 #define POOLCONFIG_MAXVALLEN 512 #define POOLCONFIG_MAXDESCLEN 64 typedef struct { char name[POOLCONFIG_MAXNAMELEN+1]; char value[POOLCONFIG_MAXVALLEN+1]; char desc[POOLCONFIG_MAXDESCLEN+1]; } POOL_REPORT_STATUS; #define MAXITEMS 128 POOL_REPORT_STATUS status[MAXITEMS]; short nrows; int size; int hsize; i = 0; strncpy(status[i].name, "listen_addresses", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->listen_addresses); strncpy(status[i].desc, "host name(s) or IP address(es) to listen to", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "port", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->port); strncpy(status[i].desc, "pgpool accepting port number", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "socket_dir", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->socket_dir); strncpy(status[i].desc, "pgpool socket directory", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "num_init_children", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->num_init_children); strncpy(status[i].desc, "# of children initially pre-forked", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "child_life_time", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->child_life_time); strncpy(status[i].desc, "if idle for this seconds, child exits", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "connection_life_time", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->connection_life_time); strncpy(status[i].desc, "if idle for this seconds, connection closes", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "child_max_connections", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->child_max_connections); strncpy(status[i].desc, "if max_connections received, chile exits", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "max_pool", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->max_pool); strncpy(status[i].desc, "max # of connection pool per child", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "authentication_timeout", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->authentication_timeout); strncpy(status[i].desc, "maximum time in seconds to complete client authentication", POOLCONFIG_MAXNAMELEN); i++; strncpy(status[i].name, "logdir", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->logdir); strncpy(status[i].desc, "logging directory", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "backend_socket_dir", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->backend_socket_dir); strncpy(status[i].desc, "Unix domain socket directory for the PostgreSQL server", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "replication_mode", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_mode); strncpy(status[i].desc, "non 0 if operating in replication mode", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "replication_strict", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_strict); strncpy(status[i].desc, "non 0 if operating in strict mode", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "replication_timeout", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_timeout); strncpy(status[i].desc, "if secondary does not respond in this milli seconds, abort the session", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "load_balance_mode", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->load_balance_mode); strncpy(status[i].desc, "non 0 if operating in load balancing mode", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "replication_stop_on_mismatch", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_stop_on_mismatch); strncpy(status[i].desc, "stop replication mode on fatal error", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "replicate_select", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replicate_select); strncpy(status[i].desc, "non 0 if SELECT statement is replicated", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "reset_query_list", POOLCONFIG_MAXNAMELEN); *(status[i].value) = '\0'; for (j=0;jnum_reset_queries;j++) { int len; len = POOLCONFIG_MAXVALLEN - strlen(status[i].value); strncat(status[i].value, pool_config->reset_query_list[j], len); len = POOLCONFIG_MAXVALLEN - strlen(status[i].value); strncat(status[i].value, ";", len); } strncpy(status[i].desc, "queries issued at the end of session", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "print_timestamp", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->print_timestamp); strncpy(status[i].desc, "if true print time stamp to each log line", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "master_slave_mode", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->master_slave_mode); strncpy(status[i].desc, "if true, operate in master/slave mode", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "connection_cache", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->connection_cache); strncpy(status[i].desc, "if true, cache connection pool", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "health_check_timeout", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->health_check_timeout); strncpy(status[i].desc, "health check timeout", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "health_check_period", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->health_check_period); strncpy(status[i].desc, "health check period", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "health_check_user", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->health_check_user); strncpy(status[i].desc, "health check user", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "insert_lock", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->insert_lock); strncpy(status[i].desc, "insert lock", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "ignore_leading_white_space", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->ignore_leading_white_space); strncpy(status[i].desc, "ignore leading white spaces", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "replication_enabled", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->replication_enabled); strncpy(status[i].desc, "non 0 if actually operating in replication mode", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "master_slave_enabled", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->master_slave_enabled); strncpy(status[i].desc, "non 0 if actually operating in master/slave", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "num_reset_queries", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->num_reset_queries); strncpy(status[i].desc, "number of queries in reset_query_list", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "pcp_port", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->pcp_port); strncpy(status[i].desc, "PCP port # to bind", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "pcp_socket_dir", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->pcp_socket_dir); strncpy(status[i].desc, "PCP socket directory", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "pcp_timeout", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->pcp_timeout); strncpy(status[i].desc, "PCP timeout for an idle client", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "log_statement", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->log_statement); strncpy(status[i].desc, "if non 0, logs all SQL statements", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "log_connections", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->log_connections); strncpy(status[i].desc, "if true, print incoming connections to the log", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "log_hostname", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->log_hostname); strncpy(status[i].desc, "if true, resolve hostname for ps and log print", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "enable_pool_hba", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->enable_pool_hba); strncpy(status[i].desc, "if true, use pool_hba.conf for client authentication", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "parallel_mode", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->parallel_mode); strncpy(status[i].desc, "if non 0, run in parallel query mode", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "enable_query_cache", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->enable_query_cache); strncpy(status[i].desc, "if non 0, use query cache", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "pgpool2_hostname", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->pgpool2_hostname); strncpy(status[i].desc, "pgpool2 hostname", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "system_db_hostname", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_hostname); strncpy(status[i].desc, "system DB hostname", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "system_db_port", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", pool_config->system_db_port); strncpy(status[i].desc, "system DB port number", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "system_db_dbname", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_dbname); strncpy(status[i].desc, "system DB name", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "system_db_schema", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_schema); strncpy(status[i].desc, "system DB schema name", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "system_db_user", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_user); strncpy(status[i].desc, "user name to access system DB", POOLCONFIG_MAXDESCLEN); i++; strncpy(status[i].name, "system_db_password", POOLCONFIG_MAXNAMELEN); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", pool_config->system_db_password); strncpy(status[i].desc, "password to access system DB", POOLCONFIG_MAXDESCLEN); i++; for (j = 0; j < NUM_BACKENDS; j++) { if (BACKEND_INFO(j).backend_port == 0) continue; snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend_hostname%d", j); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%s", BACKEND_INFO(j).backend_hostname); snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "backend #%d hostname", j); i++; snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend_port%d", j); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", BACKEND_INFO(j).backend_port); snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "backend #%d port number", j); i++; snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend_weight%d", j); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%f", BACKEND_INFO(j).backend_weight); snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "weight of backend #%d", j); i++; snprintf(status[i].name, POOLCONFIG_MAXNAMELEN, "backend status%d", j); snprintf(status[i].value, POOLCONFIG_MAXVALLEN, "%d", BACKEND_INFO(j).backend_status); snprintf(status[i].desc, POOLCONFIG_MAXDESCLEN, "status of backend #%d", j); i++; } nrows = i; if (MAJOR(backend) == PROTO_MAJOR_V2) { /* cursor response */ pool_write(frontend, "P", 1); pool_write(frontend, cursorname, strlen(cursorname)+1); } /* row description */ pool_write(frontend, "T", 1); if (MAJOR(backend) == PROTO_MAJOR_V3) { len = sizeof(num_fields) + sizeof(len); for (i=0;ienable_query_cache && SYSDB_STATUS == CON_UP && status == 0) { query_cache_register(kind, frontend, database, p, len); } } return status; } POOL_STATUS SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { int len, len1 = 0; char *p = NULL; char *p1 = NULL; char *p2 = NULL; int status; int sendlen; int i; int command_ok_row_count = 0; int delete_or_update = 0; pool_write(frontend, &kind, 1); /* * Check if packet kind == 'C'(Command complete), '1'(Parse * complete), '3'(Close complete). If so, then register or * unregister pending prepared statement. */ if ((kind == 'C' || kind == '1' || kind == '3') && pending_function && pending_prepared_portal) { pending_function(&prepared_list, pending_prepared_portal); if (pending_prepared_portal && pending_prepared_portal->stmt && IsA(pending_prepared_portal->stmt, DeallocateStmt)) { DeallocateStmt *s = (DeallocateStmt *)pending_prepared_portal->stmt; POOL_MEMORY_POOL *old_context = pool_memory; free(pending_prepared_portal->portal_name); pool_memory = prepare_memory_context; pfree(s->name); pfree(s); pool_memory = old_context; free(pending_prepared_portal); } } else if (kind == 'C' && select_in_transaction) select_in_transaction = 0; /* * Remove a pending function if a received message is not * NoticeResponse. */ if (kind != 'N') { pending_function = NULL; pending_prepared_portal = NULL; } for (i=0;ienable_query_cache && SYSDB_STATUS == CON_UP) { query_cache_register(kind, frontend, backend->info->database, p1, len1); } free(p1); if (status) return POOL_END; if (kind == 'A') /* notification response */ { pool_flush(frontend); /* we need to immediately notice to frontend */ } else if (kind == 'E') /* error response? */ { int i, k; int res1; char *p1; /* * check if the error was PANIC or FATAL. If so, we just flush * the message and exit since the backend will close the * channel immediately. */ for (;;) { char e; e = *p++; if (e == '\0') break; if (e == 'S' && (strcasecmp("PANIC", p) == 0 || strcasecmp("FATAL", p) == 0)) { pool_flush(frontend); return POOL_END; } else { while (*p++) ; continue; } } if (select_in_transaction) { int i; if (TSTATE(backend) != 'E') { in_load_balance = 0; REPLICATION = 1; for (i = 0; i < NUM_BACKENDS; i++) { if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i)) { do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3); } } } select_in_transaction = 0; } for (i = 0;i < NUM_BACKENDS; i++) { if (VALID_BACKEND(i)) { POOL_CONNECTION *cp = CONNECTION(backend, i); /* We need to send "sync" message to backend in extend mode * so that it accepts next command. * Note that this may be overkill since client may send * it by itself. Moreover we do not need it in non-extend mode. * At this point we regard it is not harmfull since error resonse * will not be sent too frequently. */ pool_write(cp, "S", 1); res1 = htonl(4); if (pool_write_and_flush(cp, &res1, sizeof(res1)) < 0) { return POOL_END; } } } while ((k = pool_read_kind(backend)) != 'Z') { POOL_STATUS ret; if (k < 0) { pool_error("SimpleForwardToBackend: pool_read_kind error"); return POOL_ERROR; } ret = SimpleForwardToFrontend(k, frontend, backend); if (ret != POOL_CONTINUE) return ret; pool_flush(frontend); } for (i = 0; i < NUM_BACKENDS; i++) { if (VALID_BACKEND(i)) { status = pool_read(CONNECTION(backend, i), &res1, sizeof(res1)); if (status < 0) { pool_error("SimpleForwardToFrontend: error while reading message length"); return POOL_END; } res1 = ntohl(res1) - sizeof(res1); p1 = pool_read2(CONNECTION(backend, i), res1); if (p1 == NULL) return POOL_END; } } } return POOL_CONTINUE; } POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { int len; int sendlen; char *p; int i; char *name; for (i=0;iportal_name) free(portal->portal_name); portal->portal_name = strdup(portal_name); } } else if (kind == 'C' && *p == 'S' && *(p + 1)) { name = strdup(p+1); if (name == NULL) { pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno)); return POOL_END; } pending_function = del_prepared_list; pending_prepared_portal = NULL; } if (kind == 'B' || kind == 'D' || kind == 'C') { int i; int kind1; for (i = 0;i < NUM_BACKENDS; i++) { if (VALID_BACKEND(i)) { POOL_CONNECTION *cp = CONNECTION(backend, i); /* * send "Flush" message so that backend notices us * the completion of the command */ pool_write(cp, "H", 1); sendlen = htonl(4); if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0) { return POOL_END; } } } /* * Describe message with a portal name receive two messages. * 1. ParameterDescription * 2. RowDescriptions or NoData */ if (kind == 'D' && *p == 'S') { kind1 = pool_read_kind(backend); if (kind1 < 0) { pool_error("SimpleForwardToBackend: pool_read_kind error"); return POOL_ERROR; } SimpleForwardToFrontend(kind1, frontend, backend); pool_flush(frontend); } for (;;) { kind1 = pool_read_kind(backend); if (kind1 < 0) { pool_error("SimpleForwardToBackend: pool_read_kind error"); return POOL_ERROR; } SimpleForwardToFrontend(kind1, frontend, backend); if (pool_flush(frontend) < 0) return POOL_ERROR; if (kind1 != 'N') break; } } return POOL_CONTINUE; } POOL_STATUS ParameterStatus(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { int len, len1 = 0; int *len_array; int sendlen; char *p; char *name; char *value; POOL_STATUS status; char parambuf[1024]; /* parameter + value string buffer. XXX is this enough? */ int i; pool_write(frontend, "S", 1); len_array = pool_read_message_length2(backend); if (len_array == NULL) { return POOL_END; } len = len_array[MASTER_NODE_ID]; sendlen = htonl(len); pool_write(frontend, &sendlen, sizeof(sendlen)); for (i=0;iparams, name, value); } #ifdef DEBUG pool_param_debug_print(&MASTER(backend)->params); #endif } } status = pool_write(frontend, parambuf, len1); return status; } /* * reset backend status. return values are: * 0: no query was issued 1: a query was issued 2: no more queries remain -1: error */ static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt) { char *query; int qn; qn = pool_config->num_reset_queries; if (qcnt >= qn) { if (qcnt >= qn + prepared_list.cnt) { reset_prepared_list(&prepared_list); return 2; } send_deallocate(backend, &prepared_list, qcnt - qn); return 1; } query = pool_config->reset_query_list[qcnt]; /* if transaction state is idle, we don't need to issue ABORT */ if (TSTATE(backend) == 'I' && !strcmp("ABORT", query)) return 0; if (SimpleQuery(NULL, backend, query) != POOL_CONTINUE) return -1; return 1; } /* * return non 0 if load balance is possible */ static int load_balance_enabled(POOL_CONNECTION_POOL *backend, Node* node, char *sql) { return (pool_config->load_balance_mode && DUAL_MODE && MAJOR(backend) == PROTO_MAJOR_V3 && TSTATE(backend) == 'I' && is_select_query(node, sql) && !is_sequence_query(node)); } /* * return non 0 if SQL is SELECT statement. */ static int is_select_query(Node *node, char *sql) { SelectStmt *select_stmt; if (!IsA(node, SelectStmt)) return 0; select_stmt = (SelectStmt *)node; if (select_stmt->into || select_stmt->lockingClause) return 0; if (pool_config->ignore_leading_white_space) { /* ignore leading white spaces */ while (*sql && isspace(*sql)) sql++; } return (*sql == 's' || *sql == 'S' || *sql == '('); } /* * return non 0 if SQL is SELECT statement. */ static int is_sequence_query(Node *node) { SelectStmt *select_stmt; ListCell *lc; if (!IsA(node, SelectStmt)) return 0; select_stmt = (SelectStmt *)node; foreach (lc, select_stmt->targetList) { if (IsA(lfirst(lc), ResTarget)) { ResTarget *t; FuncCall *fc; ListCell *c; t = (ResTarget *) lfirst(lc); if (IsA(t->val, FuncCall)) { fc = (FuncCall *) t->val; foreach (c, fc->funcname) { Value *v = lfirst(c); if (strncasecmp(v->val.str, "NEXTVAL", 7) == 0) return 1; else if (strncasecmp(v->val.str, "SETVAL", 6) == 0) return 1; } } } } return 0; } /* * start load balance mode */ static void start_load_balance(POOL_CONNECTION_POOL *backend) { #ifdef NOT_USED double total_weight,r; int i; /* save backend connection slots */ for (i=0;islots[0] = slots[selected_slot]; #endif LOAD_BALANCE_STATUS(backend->info->load_balancing_node) = LOAD_SELECTED; selected_slot = backend->info->load_balancing_node; /* start load balancing */ in_load_balance = 1; } /* * finish load balance mode */ static void end_load_balance(POOL_CONNECTION_POOL *backend) { in_load_balance = 0; LOAD_BALANCE_STATUS(selected_slot) = LOAD_UNSELECTED; #ifdef NOT_USED /* restore backend connection slots */ for (i=0;itstate = kind; } return deadlock_detected ? POOL_DEADLOCK : POOL_CONTINUE; } /* * Send syntax error query to abort transaction. * We need to sync transaction status in transaction block. * SELECT query is sended to master only. * If SELECT is error, we must abort transaction on other nodes. */ static POOL_STATUS do_error_command(POOL_CONNECTION *backend, int protoMajor) { int len; int status; char kind; char *string; char *error_query = "send invalid query from pgpool to abort transaction"; /* send the query to the backend */ pool_write(backend, "Q", 1); len = strlen(error_query)+1; if (protoMajor == PROTO_MAJOR_V3) { int sendlen = htonl(len + 4); pool_write(backend, &sendlen, sizeof(sendlen)); } if (pool_write_and_flush(backend, error_query, len) < 0) { return POOL_END; } /* * Expecting CompleteCommand */ status = pool_read(backend, &kind, sizeof(kind)); if (status < 0) { pool_error("do_command: error while reading message kind"); return POOL_END; } /* * read ErrorResponse message */ if (protoMajor == PROTO_MAJOR_V3) { if (pool_read(backend, &len, sizeof(len)) < 0) return POOL_END; len = ntohl(len) - 4; string = pool_read2(backend, len); if (string == NULL) return POOL_END; pool_debug("command tag: %s", string); } else { string = pool_read_string(backend, &len, 0); if (string == NULL) return POOL_END; } return POOL_CONTINUE; } POOL_STATUS OneNode_do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *query, char *database) { int len,sendlen; int status; char kind; pool_debug("OneNode_do_command: Query: %s", query); /* send the query to the backend */ pool_write(backend, "Q", 1); len = strlen(query)+1; sendlen = htonl(len + 4); pool_write(backend, &sendlen, sizeof(sendlen)); if (pool_write_and_flush(backend, query, len) < 0) { return POOL_END; } for(;;) { status = pool_read(backend, &kind, sizeof(kind)); if (status < 0) { pool_error("OneNode_do_command: error while reading message kind"); return POOL_END; } status = ParallelForwardToFrontend(kind, frontend, backend, database, true); if (kind == 'C' || kind =='E') { break; } } /* * * Expecting ReadyForQuery * */ status = pool_read(backend, &kind, sizeof(kind)); if (status < 0) { pool_error("OneNode_do_command: error while reading message kind"); return POOL_END; } if (kind != 'Z') { pool_error("OneNode_do_command: backend does not return ReadyForQuery"); return POOL_END; } status = ParallelForwardToFrontend(kind, frontend, backend, database, true); pool_flush(frontend); return status; } /* * judge if we need to lock the table * to keep SERIAL consistency among servers */ static int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query, Node *node) { if (MAJOR(backend) != PROTO_MAJOR_V3) return 0; /* * either insert_lock directive specified and without "NO INSERT LOCK" comment * or "INSERT LOCK" comment exists? */ if ((pool_config->insert_lock && strncasecmp(query, NO_LOCK_COMMENT, NO_LOCK_COMMENT_SZ)) || strncasecmp(query, LOCK_COMMENT, LOCK_COMMENT_SZ) == 0) { /* INSERT STATEMENT? */ if (IsA(node, InsertStmt)) return 1; } return 0; } /* * if a transaction has not already started, start a new one. * issue LOCK TABLE IN SHARE ROW EXCLUSIVE MODE */ static POOL_STATUS insert_lock(POOL_CONNECTION_POOL *backend, char *query, InsertStmt *node) { char *table; char qbuf[1024]; POOL_STATUS status; int i, deadlock_detected = 0; /* insert_lock can be used in V3 only */ if (MAJOR(backend) != PROTO_MAJOR_V3) return POOL_CONTINUE; /* get table name */ table = get_insert_command_table_name(node); /* could not get table name. probably wrong SQL command */ if (table == NULL) { return POOL_CONTINUE; } snprintf(qbuf, sizeof(qbuf), "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", table); /* if we are not in a transaction block, * start a new transaction */ if (TSTATE(backend) == 'I') { for (i=0;irelation); pool_debug("get_insert_command_table_name: extracted table name: %s", table); return table; } /* judge if this is a DROP DATABASE command */ static int is_drop_database(Node *node) { return (IsA(node, DropdbStmt)) ? 1 : 0; } /* * check if any pending data remains */ static int is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { int i; if (frontend->len > 0) return 0; for (i=0;ilen > 0) return 0; } return 1; } /* * check if query is needed to wait completion */ static int is_strict_query(Node *node) { switch (node->type) { case T_SelectStmt: { SelectStmt *stmt = (SelectStmt *)node; return (stmt->into || stmt->lockingClause) ? 1 : 0; } case T_UpdateStmt: case T_InsertStmt: case T_DeleteStmt: case T_LockStmt: return 1; default: return 0; } return 0; } static int check_copy_from_stdin(Node *node) { if (copy_schema) free(copy_schema); if (copy_table) free(copy_table); if (copy_null) free(copy_null); copy_schema = copy_table = copy_null = NULL; if (IsA(node, CopyStmt)) { CopyStmt *stmt = (CopyStmt *)node; if (stmt->is_from == TRUE && stmt->filename == NULL) { RangeVar *relation = (RangeVar *)stmt->relation; ListCell *lc; /* query is COPY FROM STDIN */ if (relation->schemaname) copy_schema = strdup(relation->schemaname); else copy_schema = strdup("public"); copy_table = strdup(relation->relname); copy_delimiter = '\t'; /* default delimiter */ copy_null = strdup("\\N"); /* default null string */ /* look up delimiter and null string. */ foreach (lc, stmt->options) { DefElem *elem = lfirst(lc); Value *v; if (strcmp(elem->defname, "delimiter") == 0) { v = (Value *)elem->arg; copy_delimiter = v->val.str[0]; } else if (strcmp(elem->defname, "null") == 0) { if (copy_null) free(copy_null); v = (Value *)elem->arg; copy_null = strdup(v->val.str); } } } return 1; } return 0; } static POOL_STATUS read_kind_from_one_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *kind, int node) { if (VALID_BACKEND(node)) { char k; if (pool_read(CONNECTION(backend, node), &k, 1) < 0) { pool_error("read_kind_from_one_backend: failed to read kind from %d th backend", node); return POOL_ERROR; } pool_debug("read_kind_from_one_backend: read kind from %d th backend %c", node, k); *kind = k; return POOL_CONTINUE; } else { pool_error("read_kind_from_one_backend: %d th backend is not valid", node); return POOL_ERROR; } } /* * read kind from backends */ static POOL_STATUS read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *kind) { int i; for (i=0;ireplication_stop_on_mismatch) { notice_backend_error(i); child_exit(1); } else return POOL_ERROR; } } } } return POOL_CONTINUE; } void init_prepared_list(void) { prepared_list.cnt = 0; prepared_list.size = INIT_STATEMENT_LIST_SIZE; prepared_list.portal_list = malloc(sizeof(Portal *) * prepared_list.size); if (prepared_list.portal_list == NULL) { pool_error("init_prepared_list: malloc failed: %s", strerror(errno)); exit(1); } } static void add_prepared_list(PreparedStatementList *p, Portal *portal) { if (p->cnt == p->size) { p->size *= 2; p->portal_list = realloc(p->portal_list, sizeof(Portal *) * p->size); if (p->portal_list == NULL) { pool_error("add_prepared_list: realloc failed: %s", strerror(errno)); exit(1); } } p->portal_list[p->cnt++] = portal; } static void add_unnamed_portal(PreparedStatementList *p, Portal *portal) { POOL_MEMORY_POOL *old_context; if (unnamed_statement) { PrepareStmt *p_stmt = (PrepareStmt *)unnamed_statement->stmt; if (p_stmt->name == NULL) { old_context = pool_memory; pool_memory = prepare_memory_context; pfree(p_stmt->query); pfree(p_stmt); pool_memory = old_context; } free(unnamed_statement); } unnamed_portal = NULL; unnamed_statement = portal; } static void del_prepared_list(PreparedStatementList *p, Portal *portal) { int i; DeallocateStmt *s = (DeallocateStmt *)portal->stmt; POOL_MEMORY_POOL *old_context; for (i = 0; i < p->cnt; i++) { PrepareStmt *p_stmt = (PrepareStmt *)p->portal_list[i]->stmt; if (strcmp(p_stmt->name, s->name) == 0) break; } if (i == p->cnt) return; old_context = pool_memory; pool_memory = prepare_memory_context; pfree((PrepareStmt *)p->portal_list[i]->stmt); pool_memory = old_context; free(p->portal_list[i]->portal_name); free(p->portal_list[i]); if (i != p->cnt - 1) { memmove(&p->portal_list[i], &p->portal_list[i+1], sizeof(Portal *) * (p->cnt - i - 1)); } p->cnt--; } static void reset_prepared_list(PreparedStatementList *p) { int i; if (prepare_memory_context) { for (i = 0; i < p->cnt; i++) { free(p->portal_list[i]->portal_name); free(p->portal_list[i]); } pool_memory_delete(prepare_memory_context); prepare_memory_context = NULL; free(unnamed_statement); unnamed_portal = NULL; unnamed_statement = NULL; } p->cnt = 0; } static Portal *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name) { int i; /* unnamed portal? */ if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"')) return unnamed_statement; for (i = 0; i < p->cnt; i++) { PrepareStmt *p_stmt = (PrepareStmt *)p->portal_list[i]->stmt; if (strcmp(p_stmt->name, name) == 0) return p->portal_list[i]; } return NULL; } static Portal *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name) { int i; /* unnamed portal? */ if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"')) return unnamed_portal; for (i = 0; i < p->cnt; i++) { if (p->portal_list[i]->portal_name && strcmp(p->portal_list[i]->portal_name, name) == 0) return p->portal_list[i]; } return NULL; } static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p, int n) { char *query; int len; PrepareStmt *p_stmt; if (p->cnt <= n) return 1; p_stmt = (PrepareStmt *)p->portal_list[n]->stmt; len = strlen(p_stmt->name) + 14; /* "DEALLOCATE \"" + "\"" + '\0' */ query = malloc(len); if (query == NULL) { pool_error("send_deallocate: malloc failed: %s", strerror(errno)); exit(1); } sprintf(query, "DEALLOCATE \"%s\"", p_stmt->name); if (SimpleQuery(NULL, backend, query) != POOL_CONTINUE) { free(query); return 1; } free(query); return 0; } /* * parse_copy_data() * Parses CopyDataRow string. * Returns divide key value. If cannot parse data, returns NULL. */ static char * parse_copy_data(char *buf, int len, char delimiter, int col_id) { int i, j, field = 0; char *str, *p = NULL; str = malloc(len + 1); /* buf is terminated by '\n'. */ /* skip '\n' in for loop. */ for (i = 0, j = 0; i < len - 1; i++) { if (buf[i] == '\\' && i != len - 2) /* escape */ { if (buf[i+1] == delimiter) { i++; str[j++] = buf[i]; } else { str[j++] = buf[i]; } } else if (buf[i] == delimiter) /* delimiter */ { if (field == col_id) { break; } else { field++; j = 0; } } else { str[j++] = buf[i]; } } if (field == col_id) { str[j] = '\0'; p = malloc(j); if (p == NULL) { pool_error("parse_copy_data: malloc failed: %s", strerror(errno)); return NULL; } strcpy(p, str); p[j] = '\0'; pool_debug("parse_copy_data: divide key value is %s", p); } free(str); return p; } static void query_cache_register(char kind, POOL_CONNECTION *frontend, char *database, char *data, int data_len) { static int inside_T; /* flag to see the result data sequence */ int result; if (is_select_pgcatalog || is_select_for_update) return; if (kind == 'T' && parsed_query) { result = pool_query_cache_register(kind, frontend, database, data, data_len, parsed_query); if (result < 0) { pool_error("pool_query_cache_register: query cache registration failed"); inside_T = 0; } else { inside_T = 1; } } else if ((kind == 'D' || kind == 'C' || kind == 'E') && inside_T) { result = pool_query_cache_register(kind, frontend, database, data, data_len, NULL); if (kind == 'C' || kind == 'E' || result < 0) { if (result < 0) pool_error("pool_query_cache_register: query cache registration failed"); else pool_debug("pool_query_cache_register: query cache saved"); inside_T = 0; free(parsed_query); parsed_query = NULL; } } } static void query_ps_status(char *query, POOL_CONNECTION_POOL *backend) { StartupPacket *sp; char psbuf[1024]; int i; if (*query == '\0') return; sp = MASTER_CONNECTION(backend)->sp; i = snprintf(psbuf, sizeof(psbuf), "%s %s %s ", sp->user, sp->database, remote_ps_data); /* skip spaces */ while (*query && isspace(*query)) query++; for (; i< sizeof(psbuf); i++) { if (!*query || isspace(*query)) break; psbuf[i] = toupper(*query++); } psbuf[i] = '\0'; set_ps_display(psbuf, false); } static int detect_postmaster_down_error(POOL_CONNECTION *backend, int major) { int r = detect_error(backend, ADMIN_SHUTDOWN_ERROR_CODE, major, false); if (r) { pool_debug("detect_stop_postmaster_error: receive admin shutdown error from a node."); return r; } r = detect_error(backend, CRASH_SHUTDOWN_ERROR_CODE, major, false); if (r == 1) { pool_debug("detect_stop_postmaster_error: receive crash shutdown error from a node."); } return r; } static int detect_deadlock_error(POOL_CONNECTION *backend, int major) { int r = detect_error(backend, DEADLOCK_ERROR_CODE, major, true); if (r == 1) pool_debug("detect_deadlock_error: receive deadlock error from master node."); return r; } /* * detect_error: Detect specified error from error code. */ static int detect_error(POOL_CONNECTION *backend, char *error_code, int major, bool unread) { int is_error = 0; char kind; int readlen = 0, len; static char buf[8192]; /* memory space is large enough */ char *p, *str; if (pool_read(backend, &kind, sizeof(kind))) return POOL_END; readlen += sizeof(kind); p = buf; memcpy(p, &kind, sizeof(kind)); p += sizeof(kind); if (kind == 'E') /* ErrorResponseMessage? */ { /* read actual query */ if (major == PROTO_MAJOR_V3) { char *e; if (pool_read(backend, &len, sizeof(len)) < 0) return POOL_END; readlen += sizeof(len); memcpy(p, &len, sizeof(len)); p += sizeof(len); len = ntohl(len) - 4; str = malloc(len); pool_read(backend, str, len); readlen += len; memcpy(p, str, len); /* * Checks error code which it is formatted 'Cxxxxxx' * (xxxxxx is error code). */ e = str; while (*e) { if (*e == 'C') { if (strcmp(e+1, error_code) == 0) /* specified error? */ { is_error = 1; } break; } else e = e + strlen(e) + 1; } free(str); } else { str = pool_read_string(backend, &len, 0); readlen += len; memcpy(p, str, len); } } if (unread || !is_error) { if (pool_unread(backend, buf, readlen) != 0) is_error = -1; } return is_error; }